flow_application/
node_hub.rs1use chrono::Utc;
9use flow_storage::{NodeLibraryRow, Store};
10use serde::Serialize;
11
12use crate::nodes::{self, NodeCatalogEntry, NodeError};
13
14#[derive(Debug, Clone, Serialize)]
17#[serde(rename_all = "camelCase")]
18pub struct NodeInstalledStatus {
19 pub slug: String,
20 pub installed_version: String,
21}
22
23#[derive(Debug, Clone, Serialize)]
26#[serde(rename_all = "camelCase")]
27pub struct NodeCatalogEntryWithStatus {
28 #[serde(flatten)]
29 pub entry: NodeCatalogEntry,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub installed_as: Option<NodeInstalledStatus>,
32 pub update_available: bool,
33}
34
35pub fn catalog() -> Vec<NodeCatalogEntry> {
37 nodes::catalog()
38}
39
40pub fn catalog_with_status(store: &Store) -> Result<Vec<NodeCatalogEntryWithStatus>, NodeError> {
43 let rows = store.list_node_library_rows()?;
44 let by_slug: std::collections::HashMap<String, &NodeLibraryRow> =
45 rows.iter().map(|r| (r.slug.clone(), r)).collect();
46 let mut out = Vec::with_capacity(nodes::catalog().len());
47 for entry in nodes::catalog() {
48 let installed = by_slug.get(&entry.slug).map(|row| NodeInstalledStatus {
49 slug: row.slug.clone(),
50 installed_version: row.version.clone(),
51 });
52 let update_available = installed
53 .as_ref()
54 .map(|status| {
55 compare_versions(&entry.version, &status.installed_version)
56 == std::cmp::Ordering::Greater
57 })
58 .unwrap_or(false);
59 out.push(NodeCatalogEntryWithStatus {
60 entry,
61 installed_as: installed,
62 update_available,
63 });
64 }
65 Ok(out)
66}
67
68pub fn add_to_library(
72 nodes_dir: &std::path::Path,
73 store: &Store,
74 slug: &str,
75) -> Result<NodeLibraryRow, NodeError> {
76 let entry = nodes::entry_by_slug(slug).ok_or_else(|| NodeError::NotFound(slug.to_string()))?;
77 if store.get_node_library_row(slug)?.is_some() {
78 return Err(NodeError::Store(flow_storage::StoreError::Refused(format!(
79 "node `{slug}` is already installed; use Update to overwrite"
80 ))));
81 }
82 nodes::write_scheme(nodes_dir, &entry)?;
83 let row = NodeLibraryRow {
84 slug: entry.slug.clone(),
85 version: entry.version.clone(),
86 installed_at: Utc::now(),
87 };
88 let stored = store.upsert_node_library_row(&row)?;
89 Ok(stored)
90}
91
92pub fn update_installed(
96 nodes_dir: &std::path::Path,
97 store: &Store,
98 slug: &str,
99 force: bool,
100) -> Result<NodeLibraryRow, NodeError> {
101 let entry = nodes::entry_by_slug(slug).ok_or_else(|| NodeError::NotFound(slug.to_string()))?;
102 let existing = store
103 .get_node_library_row(slug)?
104 .ok_or_else(|| NodeError::NotFound(format!("node `{slug}` is not installed")))?;
105
106 if !force {
107 let path = nodes_dir.join(format!("{slug}.json"));
108 if let Ok(meta) = std::fs::metadata(&path) {
109 if let Ok(modified) = meta.modified() {
110 let modified_dt: chrono::DateTime<Utc> = modified.into();
111 if modified_dt > existing.installed_at {
112 return Err(NodeError::Store(flow_storage::StoreError::Refused(format!(
113 "node `{slug}` has local edits since install; pass force=true to overwrite"
114 ))));
115 }
116 }
117 }
118 }
119
120 nodes::write_scheme(nodes_dir, &entry)?;
121 let row = NodeLibraryRow {
122 slug: entry.slug.clone(),
123 version: entry.version.clone(),
124 installed_at: Utc::now(),
125 };
126 let stored = store.upsert_node_library_row(&row)?;
127 Ok(stored)
128}
129
130pub fn uninstall(
132 nodes_dir: &std::path::Path,
133 store: &Store,
134 slug: &str,
135) -> Result<(), NodeError> {
136 store.delete_node_library_row(slug)?;
137 nodes::delete_scheme(nodes_dir, slug)?;
138 Ok(())
139}
140
141fn compare_versions(a: &str, b: &str) -> std::cmp::Ordering {
144 let parse = |s: &str| -> (u32, u32, u32) {
145 let mut it = s.split('.').map(|p| p.parse::<u32>().unwrap_or(0));
146 (it.next().unwrap_or(0), it.next().unwrap_or(0), it.next().unwrap_or(0))
147 };
148 parse(a).cmp(&parse(b))
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 fn tmp_dir() -> std::path::PathBuf {
156 let p = std::env::temp_dir().join(format!("flow-node-hub-{}", uuid::Uuid::new_v4()));
157 std::fs::create_dir_all(&p).unwrap();
158 p
159 }
160
161 fn first_slug() -> Option<String> {
162 nodes::catalog().into_iter().next().map(|e| e.slug)
163 }
164
165 #[test]
166 fn add_to_library_then_status_marks_installed() {
167 let Some(slug) = first_slug() else { return };
168 let dir = tmp_dir();
169 let store = Store::open_in_memory().unwrap();
170 add_to_library(&dir, &store, &slug).unwrap();
171 let annotated = catalog_with_status(&store).unwrap();
172 let row = annotated
173 .iter()
174 .find(|e| e.entry.slug == slug)
175 .expect("present");
176 assert!(row.installed_as.is_some());
177 assert!(!row.update_available);
178 let _ = std::fs::remove_dir_all(&dir);
179 }
180
181 #[test]
182 fn add_to_library_refuses_unknown_slug() {
183 let dir = tmp_dir();
184 let store = Store::open_in_memory().unwrap();
185 let err = add_to_library(&dir, &store, "does-not-exist").expect_err("unknown refused");
186 assert!(matches!(err, NodeError::NotFound(_)));
187 let _ = std::fs::remove_dir_all(&dir);
188 }
189
190 #[test]
191 fn add_to_library_refuses_double_install() {
192 let Some(slug) = first_slug() else { return };
193 let dir = tmp_dir();
194 let store = Store::open_in_memory().unwrap();
195 add_to_library(&dir, &store, &slug).unwrap();
196 let err = add_to_library(&dir, &store, &slug).expect_err("double install refused");
197 assert!(err.to_string().contains("already installed"), "got {err}");
198 let _ = std::fs::remove_dir_all(&dir);
199 }
200
201 #[test]
202 fn uninstall_removes_scheme_and_row() {
203 let Some(slug) = first_slug() else { return };
204 let dir = tmp_dir();
205 let store = Store::open_in_memory().unwrap();
206 add_to_library(&dir, &store, &slug).unwrap();
207 uninstall(&dir, &store, &slug).unwrap();
208 assert!(store.get_node_library_row(&slug).unwrap().is_none());
209 let _ = std::fs::remove_dir_all(&dir);
210 }
211
212 #[test]
213 fn compare_versions_orders_dotted_triples() {
214 use std::cmp::Ordering::*;
215 assert_eq!(compare_versions("1.0.0", "1.0.0"), Equal);
216 assert_eq!(compare_versions("1.0.1", "1.0.0"), Greater);
217 assert_eq!(compare_versions("0.9.9", "1.0.0"), Less);
218 }
219
220 #[test]
221 fn compare_versions_treats_zero_zero_zero_as_oldest() {
222 use std::cmp::Ordering::*;
223 assert_eq!(compare_versions("1.0.0", "0.0.0"), Greater);
224 }
225}