1use std::path::Path;
18use std::sync::OnceLock;
19
20use flow_storage::{Store, StoreError as FsStoreError};
21use serde::{Deserialize, Serialize};
22use thiserror::Error;
23
24const CATALOG_JSON: &str = include_str!("../../../mocks/node_catalog.json");
25const SUFFIX: &str = ".json";
26
27#[derive(Debug, Error)]
28pub enum NodeError {
29 #[error("invalid node slug")]
30 InvalidSlug,
31 #[error("node `{0}` not found")]
32 NotFound(String),
33 #[error(transparent)]
34 Io(#[from] std::io::Error),
35 #[error(transparent)]
36 Json(#[from] serde_json::Error),
37 #[error(transparent)]
38 Store(#[from] FsStoreError),
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub struct NodeCatalogEntry {
47 pub slug: String,
48 pub version: String,
49 pub label: String,
50 #[serde(default)]
51 pub description: String,
52 #[serde(default)]
53 pub tags: Vec<String>,
54 #[serde(default)]
55 pub icon: String,
56 #[serde(default = "default_sort_key")]
59 pub sort_key: i32,
60
61 pub node_type: String,
63 pub canvas_type: String,
67 pub renderer: String,
71 pub inspector: String,
73
74 #[serde(default)]
77 pub adapter: Option<String>,
78 #[serde(default)]
79 pub action_id: Option<String>,
80
81 #[serde(default)]
85 pub defaults: serde_json::Value,
86
87 #[serde(default)]
93 pub schema: serde_json::Value,
94
95 #[serde(default, skip_serializing_if = "Option::is_none")]
99 pub command_tree: Option<serde_json::Value>,
100
101 #[serde(default)]
104 pub settings: Vec<SettingsRequirement>,
105
106 #[serde(default)]
108 pub runtime_bindings: RuntimeBindings,
109
110 #[serde(default)]
114 pub lowering: LoweringHint,
115
116 #[serde(default, skip_serializing_if = "Option::is_none")]
121 pub cli_tool: Option<CliTool>,
122
123 #[serde(default, skip_serializing_if = "Option::is_none")]
129 pub service_integration: Option<ServiceIntegration>,
130}
131
132#[derive(Debug, Clone, Serialize, Deserialize)]
133#[serde(rename_all = "camelCase")]
134pub struct SettingsRequirement {
135 pub key: String,
136 pub scope: String,
138 #[serde(default)]
139 pub per_provider: bool,
140 #[serde(default)]
141 pub required: bool,
142 #[serde(default)]
143 pub missing_message: String,
144}
145
146#[derive(Debug, Clone, Default, Serialize, Deserialize)]
147#[serde(rename_all = "camelCase")]
148pub struct RuntimeBindings {
149 #[serde(default)]
150 pub shows_execution_output: bool,
151 #[serde(default)]
152 pub shows_execution_status: bool,
153 #[serde(default, skip_serializing_if = "Option::is_none")]
156 pub model_inventory: Option<String>,
157 #[serde(default, skip_serializing_if = "Option::is_none")]
160 pub cli_preview: Option<CliPreview>,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(rename_all = "camelCase")]
165pub struct CliPreview {
166 pub prefix: String,
167 pub positionals_key: String,
168 pub values_key: String,
169 #[serde(default, skip_serializing_if = "Option::is_none")]
170 pub output_filter_key: Option<String>,
171}
172
173#[derive(Debug, Clone, Default, Serialize, Deserialize)]
174#[serde(rename_all = "camelCase")]
175pub struct LoweringHint {
176 #[serde(default, skip_serializing_if = "Option::is_none")]
177 pub adapter: Option<String>,
178 #[serde(default, skip_serializing_if = "Option::is_none")]
179 pub action_id: Option<String>,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize)]
189#[serde(rename_all = "camelCase")]
190pub struct CliTool {
191 pub name: String,
192 pub version: String,
193 #[serde(default, skip_serializing_if = "Option::is_none")]
194 pub version_command: Option<String>,
195}
196
197pub use flow_execution::service::{ServiceAuth, ServiceIntegration, ServiceOperation};
201
202pub fn service_integrations() -> std::collections::HashMap<String, ServiceIntegration> {
206 entries()
207 .iter()
208 .filter_map(|e| {
209 e.service_integration
210 .clone()
211 .map(|si| (e.slug.clone(), si))
212 })
213 .collect()
214}
215
216fn default_sort_key() -> i32 {
217 1_000
218}
219
220#[derive(Deserialize)]
221struct CatalogEnvelope {
222 #[serde(default, rename = "apiVersion")]
223 _api_version: String,
224 nodes: Vec<NodeCatalogEntry>,
225}
226
227pub(crate) fn entries() -> &'static [NodeCatalogEntry] {
231 static CATALOG: OnceLock<Vec<NodeCatalogEntry>> = OnceLock::new();
232 CATALOG
233 .get_or_init(|| {
234 let env: CatalogEnvelope =
235 serde_json::from_str(CATALOG_JSON).expect("node_catalog.json is malformed");
236 env.nodes
237 })
238 .as_slice()
239}
240
241pub fn catalog() -> Vec<NodeCatalogEntry> {
244 let mut out: Vec<NodeCatalogEntry> = entries().to_vec();
245 out.sort_by(|a, b| a.sort_key.cmp(&b.sort_key).then_with(|| a.slug.cmp(&b.slug)));
246 out
247}
248
249pub fn entry_by_slug(slug: &str) -> Option<NodeCatalogEntry> {
251 entries().iter().find(|e| e.slug == slug).cloned()
252}
253
254pub fn list_installed_schemes(store: &Store, dir: &Path) -> Result<Vec<NodeCatalogEntry>, NodeError> {
267 let rows = store.list_node_library_rows()?;
268 let dir_present = dir.is_dir();
271 let mut out = Vec::with_capacity(rows.len());
272 for row in rows {
273 match read_scheme(dir, &row.slug) {
274 Ok(entry) => out.push(entry),
275 Err(NodeError::NotFound(_)) if dir_present => match store.delete_node_library_row(&row.slug) {
276 Ok(()) => tracing::info!(
277 slug = %row.slug,
278 "pruned orphaned node_library row (scheme file missing)"
279 ),
280 Err(e) => tracing::warn!(
281 slug = %row.slug, error = ?e,
282 "failed to prune orphaned node_library row"
283 ),
284 },
285 Err(e) => {
286 tracing::warn!(slug = %row.slug, error = ?e, "installed node scheme missing or unreadable");
287 }
288 }
289 }
290 out.sort_by(|a, b| a.sort_key.cmp(&b.sort_key).then_with(|| a.slug.cmp(&b.slug)));
291 Ok(out)
292}
293
294pub fn read_scheme(dir: &Path, slug: &str) -> Result<NodeCatalogEntry, NodeError> {
296 validate_slug(slug)?;
297 let path = dir.join(format!("{slug}{SUFFIX}"));
298 if !path.exists() {
299 return Err(NodeError::NotFound(slug.to_string()));
300 }
301 let body = std::fs::read_to_string(&path)?;
302 Ok(serde_json::from_str(&body)?)
303}
304
305pub fn write_scheme(dir: &Path, entry: &NodeCatalogEntry) -> Result<(), NodeError> {
308 validate_slug(&entry.slug)?;
309 std::fs::create_dir_all(dir)?;
310 let path = dir.join(format!("{}{SUFFIX}", entry.slug));
311 let body = serde_json::to_string_pretty(entry)?;
312 std::fs::write(&path, body)?;
313 Ok(())
314}
315
316pub fn delete_scheme(dir: &Path, slug: &str) -> Result<(), NodeError> {
319 validate_slug(slug)?;
320 let path = dir.join(format!("{slug}{SUFFIX}"));
321 if path.exists() {
322 std::fs::remove_file(&path)?;
323 }
324 Ok(())
325}
326
327pub(crate) fn validate_slug(slug: &str) -> Result<(), NodeError> {
332 if slug.is_empty() {
333 return Err(NodeError::InvalidSlug);
334 }
335 if !slug.chars().all(|c| {
336 c.is_ascii_lowercase() || c.is_ascii_digit() || c == '-' || c == '.' || c == '_'
337 }) {
338 return Err(NodeError::InvalidSlug);
339 }
340 if slug.starts_with('-')
341 || slug.starts_with('.')
342 || slug.starts_with('_')
343 || slug.ends_with('-')
344 || slug.ends_with('.')
345 || slug.ends_with('_')
346 {
347 return Err(NodeError::InvalidSlug);
348 }
349 if slug.contains("..") {
350 return Err(NodeError::InvalidSlug);
351 }
352 Ok(())
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 fn tmp_dir() -> std::path::PathBuf {
360 let p = std::env::temp_dir().join(format!("flow-nodes-{}", uuid::Uuid::new_v4()));
361 std::fs::create_dir_all(&p).unwrap();
362 p
363 }
364
365 #[test]
366 fn catalog_parses_and_is_well_formed() {
367 for e in &catalog() {
368 assert!(!e.slug.is_empty());
369 assert!(!e.label.is_empty());
370 assert!(!e.version.is_empty());
371 assert!(!e.node_type.is_empty(), "{} missing nodeType", e.slug);
372 assert!(!e.canvas_type.is_empty(), "{} missing canvasType", e.slug);
373 }
374 }
375
376 #[test]
377 fn cli_tool_entries_carry_cli_tool_descriptor() {
378 for e in &catalog() {
381 if e.action_id.as_deref() == Some("cli-tool") {
382 let tool = e
383 .cli_tool
384 .as_ref()
385 .unwrap_or_else(|| panic!("{} missing cliTool", e.slug));
386 assert!(!tool.name.is_empty(), "{} cliTool.name empty", e.slug);
387 assert!(!tool.version.is_empty(), "{} cliTool.version empty", e.slug);
388 }
389 }
390 }
391
392 #[test]
393 fn catalog_sorted_by_sort_key() {
394 let listed = catalog();
395 let keys: Vec<i32> = listed.iter().map(|e| e.sort_key).collect();
396 let mut sorted = keys.clone();
397 sorted.sort();
398 assert_eq!(keys, sorted, "catalog must be sorted by sortKey");
399 }
400
401 #[test]
402 fn entry_by_slug_finds_known_entry() {
403 if let Some(first) = catalog().into_iter().next() {
404 let got = entry_by_slug(&first.slug).expect("present");
405 assert_eq!(got.slug, first.slug);
406 }
407 assert!(entry_by_slug("does-not-exist").is_none());
408 }
409
410 #[test]
411 fn validate_slug_accepts_namespaced_and_rejects_traversal() {
412 assert!(validate_slug("acme.foo").is_ok());
413 assert!(validate_slug("zowe.cli-tool").is_ok());
414 assert!(validate_slug("snake_case").is_ok());
415 assert!(matches!(
416 validate_slug("../escape"),
417 Err(NodeError::InvalidSlug)
418 ));
419 assert!(matches!(
420 validate_slug(".hidden"),
421 Err(NodeError::InvalidSlug)
422 ));
423 assert!(matches!(
424 validate_slug("_leading"),
425 Err(NodeError::InvalidSlug)
426 ));
427 assert!(matches!(
428 validate_slug("trailing-"),
429 Err(NodeError::InvalidSlug)
430 ));
431 }
432
433 #[test]
434 fn write_and_read_scheme_roundtrip() {
435 let dir = tmp_dir();
436 let entry = NodeCatalogEntry {
437 slug: "acme.foo".into(),
438 version: "1.0.0".into(),
439 label: "Acme Foo".into(),
440 description: "".into(),
441 tags: vec![],
442 icon: "".into(),
443 sort_key: 1000,
444 node_type: "action".into(),
445 canvas_type: "catalog".into(),
446 renderer: "Generic".into(),
447 inspector: "Generic".into(),
448 adapter: Some("shell".into()),
449 action_id: Some("run-command".into()),
450 defaults: serde_json::json!({}),
451 schema: serde_json::json!({}),
452 command_tree: None,
453 settings: vec![],
454 runtime_bindings: RuntimeBindings::default(),
455 lowering: LoweringHint::default(),
456 cli_tool: None,
457 service_integration: None,
458 };
459 write_scheme(&dir, &entry).unwrap();
460 let loaded = read_scheme(&dir, "acme.foo").unwrap();
461 assert_eq!(loaded.slug, entry.slug);
462 assert_eq!(loaded.canvas_type, "catalog");
463 let _ = std::fs::remove_dir_all(&dir);
464 }
465
466 #[test]
467 fn read_missing_returns_not_found() {
468 let dir = tmp_dir();
469 assert!(matches!(read_scheme(&dir, "nope"), Err(NodeError::NotFound(_))));
470 }
471
472 #[test]
473 fn delete_missing_is_ok() {
474 let dir = tmp_dir();
475 delete_scheme(&dir, "nope").unwrap();
476 }
477
478 #[test]
479 fn list_installed_schemes_returns_only_what_is_installed() {
480 use flow_storage::NodeLibraryRow;
481 let dir = tmp_dir();
482 let store = Store::open_in_memory().unwrap();
483 assert!(list_installed_schemes(&store, &dir).unwrap().is_empty());
484
485 let Some(slug) = catalog().into_iter().next().map(|e| e.slug) else {
486 return;
487 };
488 let entry = entry_by_slug(&slug).expect("present");
489 write_scheme(&dir, &entry).unwrap();
490 store
491 .upsert_node_library_row(&NodeLibraryRow {
492 slug: entry.slug.clone(),
493 version: entry.version.clone(),
494 installed_at: chrono::Utc::now(),
495 })
496 .unwrap();
497
498 let listed = list_installed_schemes(&store, &dir).unwrap();
499 assert_eq!(listed.len(), 1);
500 assert_eq!(listed[0].slug, slug);
501 let _ = std::fs::remove_dir_all(&dir);
502 }
503
504 #[test]
505 fn list_installed_schemes_prunes_orphan_rows_when_dir_present() {
506 use flow_storage::NodeLibraryRow;
507 let dir = tmp_dir(); let store = Store::open_in_memory().unwrap();
509 store
511 .upsert_node_library_row(&NodeLibraryRow {
512 slug: "ghost".into(),
513 version: "1.0.0".into(),
514 installed_at: chrono::Utc::now(),
515 })
516 .unwrap();
517 let listed = list_installed_schemes(&store, &dir).unwrap();
518 assert!(listed.is_empty(), "orphan row contributes no scheme");
519 assert!(
521 store.get_node_library_row("ghost").unwrap().is_none(),
522 "row whose scheme file is missing should be pruned"
523 );
524 let _ = std::fs::remove_dir_all(&dir);
525 }
526
527 #[test]
528 fn list_installed_schemes_keeps_rows_when_dir_is_missing() {
529 use flow_storage::NodeLibraryRow;
530 let dir = std::env::temp_dir().join(format!("flow-nodes-absent-{}", uuid::Uuid::new_v4()));
533 assert!(!dir.exists());
534 let store = Store::open_in_memory().unwrap();
535 store
536 .upsert_node_library_row(&NodeLibraryRow {
537 slug: "ghost".into(),
538 version: "1.0.0".into(),
539 installed_at: chrono::Utc::now(),
540 })
541 .unwrap();
542 let listed = list_installed_schemes(&store, &dir).unwrap();
543 assert!(listed.is_empty(), "no readable schemes, so nothing listed");
544 assert!(
545 store.get_node_library_row("ghost").unwrap().is_some(),
546 "rows must survive a missing nodes dir - do not wipe the registry"
547 );
548 }
549}