Skip to main content

flow_application/
node_hub.rs

1//! Node Hub - browse + install catalog node types.
2//!
3//! Mirrors [`crate::template_hub`] function-for-function. The catalog
4//! itself lives in [`crate::nodes`] (parse + on-disk scheme IO); this
5//! module wraps install / update / uninstall and the "install status"
6//! annotation the Hub UI surfaces.
7
8use chrono::Utc;
9use flow_storage::{NodeLibraryRow, Store};
10use serde::Serialize;
11
12use crate::nodes::{self, NodeCatalogEntry, NodeError};
13
14/// Where a hub node currently lives, plus the installed version. `None`
15/// when nothing matches the catalog slug.
16#[derive(Debug, Clone, Serialize)]
17#[serde(rename_all = "camelCase")]
18pub struct NodeInstalledStatus {
19    pub slug: String,
20    pub installed_version: String,
21}
22
23/// One hub entry + install status. `update_available` is pre-computed
24/// server-side so the frontend doesn't need to parse versions.
25#[derive(Debug, Clone, Serialize)]
26#[serde(rename_all = "camelCase")]
27pub struct NodeCatalogEntryWithStatus {
28    #[serde(flatten)]
29    pub entry: NodeCatalogEntry,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub installed_as: Option<NodeInstalledStatus>,
32    pub update_available: bool,
33}
34
35/// Browse list. Sorted by `sortKey` then slug, same as [`nodes::catalog`].
36pub fn catalog() -> Vec<NodeCatalogEntry> {
37    nodes::catalog()
38}
39
40/// Annotate the catalog with per-entry install status by walking
41/// `node_library` once into a slug → row map.
42pub fn catalog_with_status(store: &Store) -> Result<Vec<NodeCatalogEntryWithStatus>, NodeError> {
43    let rows = store.list_node_library_rows()?;
44    let by_slug: std::collections::HashMap<String, &NodeLibraryRow> =
45        rows.iter().map(|r| (r.slug.clone(), r)).collect();
46    let mut out = Vec::with_capacity(nodes::catalog().len());
47    for entry in nodes::catalog() {
48        let installed = by_slug.get(&entry.slug).map(|row| NodeInstalledStatus {
49            slug: row.slug.clone(),
50            installed_version: row.version.clone(),
51        });
52        let update_available = installed
53            .as_ref()
54            .map(|status| {
55                compare_versions(&entry.version, &status.installed_version)
56                    == std::cmp::Ordering::Greater
57            })
58            .unwrap_or(false);
59        out.push(NodeCatalogEntryWithStatus {
60            entry,
61            installed_as: installed,
62            update_available,
63        });
64    }
65    Ok(out)
66}
67
68/// Install a catalog entry into the user's local library. Refuses if a
69/// row already exists for the slug - re-installs go through
70/// [`update_installed`].
71pub fn add_to_library(
72    nodes_dir: &std::path::Path,
73    store: &Store,
74    slug: &str,
75) -> Result<NodeLibraryRow, NodeError> {
76    let entry = nodes::entry_by_slug(slug).ok_or_else(|| NodeError::NotFound(slug.to_string()))?;
77    if store.get_node_library_row(slug)?.is_some() {
78        return Err(NodeError::Store(flow_storage::StoreError::Refused(format!(
79            "node `{slug}` is already installed; use Update to overwrite"
80        ))));
81    }
82    nodes::write_scheme(nodes_dir, &entry)?;
83    let row = NodeLibraryRow {
84        slug: entry.slug.clone(),
85        version: entry.version.clone(),
86        installed_at: Utc::now(),
87    };
88    let stored = store.upsert_node_library_row(&row)?;
89    Ok(stored)
90}
91
92/// Overwrite an installed scheme with the catalog's current version. When
93/// `force` is false and the user has edited the file (mtime > installed_at),
94/// returns Refused so the frontend can confirm before clobbering local edits.
95pub fn update_installed(
96    nodes_dir: &std::path::Path,
97    store: &Store,
98    slug: &str,
99    force: bool,
100) -> Result<NodeLibraryRow, NodeError> {
101    let entry = nodes::entry_by_slug(slug).ok_or_else(|| NodeError::NotFound(slug.to_string()))?;
102    let existing = store
103        .get_node_library_row(slug)?
104        .ok_or_else(|| NodeError::NotFound(format!("node `{slug}` is not installed")))?;
105
106    if !force {
107        let path = nodes_dir.join(format!("{slug}.json"));
108        if let Ok(meta) = std::fs::metadata(&path) {
109            if let Ok(modified) = meta.modified() {
110                let modified_dt: chrono::DateTime<Utc> = modified.into();
111                if modified_dt > existing.installed_at {
112                    return Err(NodeError::Store(flow_storage::StoreError::Refused(format!(
113                        "node `{slug}` has local edits since install; pass force=true to overwrite"
114                    ))));
115                }
116            }
117        }
118    }
119
120    nodes::write_scheme(nodes_dir, &entry)?;
121    let row = NodeLibraryRow {
122        slug: entry.slug.clone(),
123        version: entry.version.clone(),
124        installed_at: Utc::now(),
125    };
126    let stored = store.upsert_node_library_row(&row)?;
127    Ok(stored)
128}
129
130/// Remove an installed scheme from disk and drop the library row.
131pub fn uninstall(
132    nodes_dir: &std::path::Path,
133    store: &Store,
134    slug: &str,
135) -> Result<(), NodeError> {
136    store.delete_node_library_row(slug)?;
137    nodes::delete_scheme(nodes_dir, slug)?;
138    Ok(())
139}
140
141/// Tuple compare on the parsed dotted-int triple. Mirrors
142/// `template_hub::compare_versions` so the two hubs stay in sync.
143fn compare_versions(a: &str, b: &str) -> std::cmp::Ordering {
144    let parse = |s: &str| -> (u32, u32, u32) {
145        let mut it = s.split('.').map(|p| p.parse::<u32>().unwrap_or(0));
146        (it.next().unwrap_or(0), it.next().unwrap_or(0), it.next().unwrap_or(0))
147    };
148    parse(a).cmp(&parse(b))
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154
155    fn tmp_dir() -> std::path::PathBuf {
156        let p = std::env::temp_dir().join(format!("flow-node-hub-{}", uuid::Uuid::new_v4()));
157        std::fs::create_dir_all(&p).unwrap();
158        p
159    }
160
161    fn first_slug() -> Option<String> {
162        nodes::catalog().into_iter().next().map(|e| e.slug)
163    }
164
165    #[test]
166    fn add_to_library_then_status_marks_installed() {
167        let Some(slug) = first_slug() else { return };
168        let dir = tmp_dir();
169        let store = Store::open_in_memory().unwrap();
170        add_to_library(&dir, &store, &slug).unwrap();
171        let annotated = catalog_with_status(&store).unwrap();
172        let row = annotated
173            .iter()
174            .find(|e| e.entry.slug == slug)
175            .expect("present");
176        assert!(row.installed_as.is_some());
177        assert!(!row.update_available);
178        let _ = std::fs::remove_dir_all(&dir);
179    }
180
181    #[test]
182    fn add_to_library_refuses_unknown_slug() {
183        let dir = tmp_dir();
184        let store = Store::open_in_memory().unwrap();
185        let err = add_to_library(&dir, &store, "does-not-exist").expect_err("unknown refused");
186        assert!(matches!(err, NodeError::NotFound(_)));
187        let _ = std::fs::remove_dir_all(&dir);
188    }
189
190    #[test]
191    fn add_to_library_refuses_double_install() {
192        let Some(slug) = first_slug() else { return };
193        let dir = tmp_dir();
194        let store = Store::open_in_memory().unwrap();
195        add_to_library(&dir, &store, &slug).unwrap();
196        let err = add_to_library(&dir, &store, &slug).expect_err("double install refused");
197        assert!(err.to_string().contains("already installed"), "got {err}");
198        let _ = std::fs::remove_dir_all(&dir);
199    }
200
201    #[test]
202    fn uninstall_removes_scheme_and_row() {
203        let Some(slug) = first_slug() else { return };
204        let dir = tmp_dir();
205        let store = Store::open_in_memory().unwrap();
206        add_to_library(&dir, &store, &slug).unwrap();
207        uninstall(&dir, &store, &slug).unwrap();
208        assert!(store.get_node_library_row(&slug).unwrap().is_none());
209        let _ = std::fs::remove_dir_all(&dir);
210    }
211
212    #[test]
213    fn compare_versions_orders_dotted_triples() {
214        use std::cmp::Ordering::*;
215        assert_eq!(compare_versions("1.0.0", "1.0.0"), Equal);
216        assert_eq!(compare_versions("1.0.1", "1.0.0"), Greater);
217        assert_eq!(compare_versions("0.9.9", "1.0.0"), Less);
218    }
219
220    #[test]
221    fn compare_versions_treats_zero_zero_zero_as_oldest() {
222        use std::cmp::Ordering::*;
223        assert_eq!(compare_versions("1.0.0", "0.0.0"), Greater);
224    }
225}