1use chrono::{DateTime, Utc};
2use parking_lot::Mutex;
3use rusqlite::{params, Connection};
4use serde::{Deserialize, Serialize};
5use std::path::Path;
6use std::sync::Arc;
7use thiserror::Error;
8
9#[derive(Debug, Error)]
10pub enum StoreError {
11 #[error(transparent)]
12 Sqlite(#[from] rusqlite::Error),
13 #[error(transparent)]
14 Json(#[from] serde_json::Error),
15 #[error("refused: {0}")]
20 Refused(String),
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ExecutionRecord {
25 pub execution_id: String,
26 pub status: String,
27 pub started_at: DateTime<Utc>,
28 pub ended_at: Option<DateTime<Utc>>,
29 pub succeeded: u32,
30 pub failed: u32,
31 pub skipped: u32,
32 #[serde(default)]
35 pub flow_name: String,
36 #[serde(default = "default_trigger")]
39 pub trigger: String,
40}
41
42fn default_trigger() -> String {
43 "manual".into()
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ExecutionStepRecord {
48 pub execution_id: String,
49 pub node_id: String,
50 pub status: String,
51 pub started_at: DateTime<Utc>,
52 pub ended_at: Option<DateTime<Utc>>,
53 pub output: Option<serde_json::Value>,
54 pub error: Option<String>,
55 pub reason: Option<String>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct InterceptionRecord {
60 pub execution_id: String,
61 pub at: DateTime<Utc>,
62 pub failed_node: String,
64 pub issue: Option<String>,
66 pub summary: String,
68 #[serde(default)]
71 pub dsl_before: Option<String>,
72 #[serde(default)]
73 pub dsl_after: Option<String>,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct AiAuditRecord {
83 pub execution_id: String,
84 pub node_id: String,
85 pub at: DateTime<Utc>,
86 pub kind: String,
87 pub payload: serde_json::Value,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct RunSummary {
94 pub seq: i64,
95 pub execution_id: String,
96 pub flow_name: String,
97 pub status: String,
98 pub started_at: DateTime<Utc>,
99 pub ended_at: Option<DateTime<Utc>>,
100 pub succeeded: u32,
101 pub failed: u32,
102 pub skipped: u32,
103 pub failed_node: Option<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct TemplateCollectionRecord {
108 pub slug: String,
109 pub name: String,
110 pub is_system: bool,
111 pub created_at: DateTime<Utc>,
112 pub updated_at: DateTime<Utc>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct TemplateMembershipRow {
117 pub template_slug: String,
118 pub collection_slug: String,
119 pub hub_source_slug: Option<String>,
120 pub hub_version: Option<String>,
121 pub installed_at: Option<DateTime<Utc>>,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct NodeLibraryRow {
126 pub slug: String,
127 pub version: String,
128 pub installed_at: DateTime<Utc>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct ScheduleRecord {
137 pub template_slug: String,
138 pub collection_slug: String,
139 pub flow_name: String,
140 pub enabled: bool,
141 pub frequency: String,
144 pub anchor_at: DateTime<Utc>,
147 #[serde(default = "default_timezone")]
148 pub timezone: String,
149 #[serde(default)]
150 pub cron: Option<String>,
151 #[serde(default)]
152 pub every_minutes: Option<u32>,
153 #[serde(default)]
154 pub until: Option<DateTime<Utc>>,
155 #[serde(default)]
156 pub max_runs: Option<u32>,
157 #[serde(default = "default_catchup")]
158 pub catchup: String,
159 pub next_run_at: Option<DateTime<Utc>>,
160 pub last_run_at: Option<DateTime<Utc>>,
161 #[serde(default)]
162 pub last_status: Option<String>,
163 #[serde(default)]
165 pub run_count: u32,
166 pub created_at: DateTime<Utc>,
167 pub updated_at: DateTime<Utc>,
168}
169
170fn default_timezone() -> String {
171 "UTC".into()
172}
173
174fn default_catchup() -> String {
175 "run-once".into()
176}
177
178#[derive(Clone)]
179pub struct Store {
180 conn: Arc<Mutex<Connection>>,
181}
182
183impl Store {
184 pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
185 let conn = Connection::open(path.as_ref())?;
186 Self::init(conn)
187 }
188
189 pub fn open_in_memory() -> Result<Self, StoreError> {
190 let conn = Connection::open_in_memory()?;
191 Self::init(conn)
192 }
193
194 fn init(conn: Connection) -> Result<Self, StoreError> {
195 conn.execute_batch(
198 r#"
199 PRAGMA foreign_keys = ON;
200 CREATE TABLE IF NOT EXISTS executions (
201 execution_id TEXT PRIMARY KEY,
202 status TEXT NOT NULL,
203 started_at TEXT NOT NULL,
204 ended_at TEXT,
205 succeeded INTEGER NOT NULL DEFAULT 0,
206 failed INTEGER NOT NULL DEFAULT 0,
207 skipped INTEGER NOT NULL DEFAULT 0,
208 flow_name TEXT NOT NULL DEFAULT '',
209 trigger TEXT NOT NULL DEFAULT 'manual'
210 );
211 CREATE TABLE IF NOT EXISTS execution_steps (
212 execution_id TEXT NOT NULL,
213 node_id TEXT NOT NULL,
214 status TEXT NOT NULL,
215 started_at TEXT NOT NULL,
216 ended_at TEXT,
217 output TEXT,
218 error TEXT,
219 reason TEXT,
220 PRIMARY KEY (execution_id, node_id),
221 FOREIGN KEY (execution_id) REFERENCES executions(execution_id)
222 );
223 CREATE INDEX IF NOT EXISTS idx_executions_started_at
224 ON executions(started_at DESC);
225 CREATE TABLE IF NOT EXISTS template_collections (
226 slug TEXT PRIMARY KEY,
227 name TEXT NOT NULL,
228 is_system INTEGER NOT NULL DEFAULT 0,
229 created_at TEXT NOT NULL,
230 updated_at TEXT NOT NULL
231 );
232 CREATE TABLE IF NOT EXISTS template_membership (
233 template_slug TEXT PRIMARY KEY,
234 collection_slug TEXT NOT NULL,
235 hub_source_slug TEXT,
236 hub_version TEXT,
237 installed_at TEXT,
238 FOREIGN KEY (collection_slug) REFERENCES template_collections(slug)
239 );
240 CREATE INDEX IF NOT EXISTS idx_template_membership_collection
241 ON template_membership(collection_slug);
242 CREATE TABLE IF NOT EXISTS node_library (
243 slug TEXT PRIMARY KEY,
244 version TEXT NOT NULL,
245 installed_at TEXT NOT NULL
246 );
247 CREATE TABLE IF NOT EXISTS execution_interceptions (
248 id INTEGER PRIMARY KEY AUTOINCREMENT,
249 execution_id TEXT NOT NULL,
250 at TEXT NOT NULL,
251 failed_node TEXT NOT NULL,
252 issue TEXT,
253 summary TEXT NOT NULL,
254 dsl_before TEXT,
255 dsl_after TEXT
256 );
257 CREATE INDEX IF NOT EXISTS idx_interceptions_execution
258 ON execution_interceptions(execution_id);
259 CREATE TABLE IF NOT EXISTS execution_ai_audit (
260 id INTEGER PRIMARY KEY AUTOINCREMENT,
261 execution_id TEXT NOT NULL,
262 node_id TEXT NOT NULL,
263 at TEXT NOT NULL,
264 kind TEXT NOT NULL,
265 payload TEXT NOT NULL
266 );
267 CREATE INDEX IF NOT EXISTS idx_ai_audit_execution
268 ON execution_ai_audit(execution_id);
269 CREATE TABLE IF NOT EXISTS flow_schedules (
270 template_slug TEXT PRIMARY KEY,
271 collection_slug TEXT NOT NULL,
272 flow_name TEXT NOT NULL DEFAULT '',
273 enabled INTEGER NOT NULL DEFAULT 0,
274 frequency TEXT NOT NULL,
275 anchor_at TEXT NOT NULL,
276 timezone TEXT NOT NULL DEFAULT 'UTC',
277 cron TEXT,
278 every_minutes INTEGER,
279 until_at TEXT,
280 max_runs INTEGER,
281 catchup TEXT NOT NULL DEFAULT 'run-once',
282 next_run_at TEXT,
283 last_run_at TEXT,
284 last_status TEXT,
285 run_count INTEGER NOT NULL DEFAULT 0,
286 created_at TEXT NOT NULL,
287 updated_at TEXT NOT NULL
288 );
289 CREATE INDEX IF NOT EXISTS idx_flow_schedules_due
290 ON flow_schedules(enabled, next_run_at);
291 CREATE TABLE IF NOT EXISTS memory_kv (
292 key TEXT PRIMARY KEY,
293 value TEXT NOT NULL
294 );
295 CREATE TABLE IF NOT EXISTS flow_workspaces (
296 flow_id TEXT PRIMARY KEY,
297 path TEXT NOT NULL,
298 updated_at TEXT NOT NULL
299 );
300 "#,
301 )?;
302 Ok(Self {
303 conn: Arc::new(Mutex::new(conn)),
304 })
305 }
306
307 pub fn upsert_execution(&self, rec: &ExecutionRecord) -> Result<(), StoreError> {
308 let conn = self.conn.lock();
309 conn.execute(
310 r#"
311 INSERT INTO executions (execution_id, status, started_at, ended_at, succeeded, failed, skipped, flow_name, trigger)
312 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
313 ON CONFLICT(execution_id) DO UPDATE SET
314 status = excluded.status,
315 ended_at = excluded.ended_at,
316 succeeded = excluded.succeeded,
317 failed = excluded.failed,
318 skipped = excluded.skipped
319 "#,
320 params![
321 rec.execution_id,
322 rec.status,
323 rec.started_at.to_rfc3339(),
324 rec.ended_at.map(|d| d.to_rfc3339()),
325 rec.succeeded,
326 rec.failed,
327 rec.skipped,
328 rec.flow_name,
329 rec.trigger,
330 ],
331 )?;
332 Ok(())
333 }
334
335 pub fn upsert_step(&self, rec: &ExecutionStepRecord) -> Result<(), StoreError> {
336 let conn = self.conn.lock();
337 let output_json = match &rec.output {
338 Some(v) => Some(serde_json::to_string(v)?),
339 None => None,
340 };
341 conn.execute(
342 r#"
343 INSERT INTO execution_steps (execution_id, node_id, status, started_at, ended_at, output, error, reason)
344 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
345 ON CONFLICT(execution_id, node_id) DO UPDATE SET
346 status = excluded.status,
347 ended_at = excluded.ended_at,
348 output = excluded.output,
349 error = excluded.error,
350 reason = excluded.reason
351 "#,
352 params![
353 rec.execution_id,
354 rec.node_id,
355 rec.status,
356 rec.started_at.to_rfc3339(),
357 rec.ended_at.map(|d| d.to_rfc3339()),
358 output_json,
359 rec.error,
360 rec.reason,
361 ],
362 )?;
363 Ok(())
364 }
365
366 pub fn record_interception(&self, rec: &InterceptionRecord) -> Result<(), StoreError> {
367 let conn = self.conn.lock();
368 conn.execute(
369 r#"
370 INSERT INTO execution_interceptions (execution_id, at, failed_node, issue, summary, dsl_before, dsl_after)
371 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
372 "#,
373 params![
374 rec.execution_id,
375 rec.at.to_rfc3339(),
376 rec.failed_node,
377 rec.issue,
378 rec.summary,
379 rec.dsl_before,
380 rec.dsl_after,
381 ],
382 )?;
383 Ok(())
384 }
385
386 pub fn record_ai_audit(&self, rec: &AiAuditRecord) -> Result<(), StoreError> {
389 let conn = self.conn.lock();
390 conn.execute(
391 r#"
392 INSERT INTO execution_ai_audit (execution_id, node_id, at, kind, payload)
393 VALUES (?1, ?2, ?3, ?4, ?5)
394 "#,
395 params![
396 rec.execution_id,
397 rec.node_id,
398 rec.at.to_rfc3339(),
399 rec.kind,
400 serde_json::to_string(&rec.payload)?,
401 ],
402 )?;
403 Ok(())
404 }
405
406 pub fn list_ai_audit(&self, execution_id: &str) -> Result<Vec<AiAuditRecord>, StoreError> {
408 let conn = self.conn.lock();
409 let mut stmt = conn.prepare(
410 "SELECT execution_id, node_id, at, kind, payload
411 FROM execution_ai_audit WHERE execution_id = ?1 ORDER BY id",
412 )?;
413 let rows = stmt.query_map(params![execution_id], |r| {
414 Ok((
415 r.get::<_, String>(0)?,
416 r.get::<_, String>(1)?,
417 r.get::<_, String>(2)?,
418 r.get::<_, String>(3)?,
419 r.get::<_, String>(4)?,
420 ))
421 })?;
422 let mut out = Vec::new();
423 for row in rows {
424 let (execution_id, node_id, at, kind, payload) = row?;
425 out.push(AiAuditRecord {
426 execution_id,
427 node_id,
428 at: at.parse().unwrap_or_else(|_| Utc::now()),
429 kind,
430 payload: serde_json::from_str(&payload).unwrap_or(serde_json::Value::Null),
431 });
432 }
433 Ok(out)
434 }
435
436 pub fn load_memory(
440 &self,
441 ) -> Result<std::collections::HashMap<String, serde_json::Value>, StoreError> {
442 let conn = self.conn.lock();
443 let mut stmt = conn.prepare("SELECT key, value FROM memory_kv")?;
444 let rows = stmt.query_map([], |r| {
445 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
446 })?;
447 let mut map = std::collections::HashMap::new();
448 for row in rows {
449 let (k, v) = row?;
450 if let Ok(val) = serde_json::from_str::<serde_json::Value>(&v) {
451 map.insert(k, val);
452 }
453 }
454 Ok(map)
455 }
456
457 pub fn save_memory(
459 &self,
460 map: &std::collections::HashMap<String, serde_json::Value>,
461 ) -> Result<(), StoreError> {
462 let mut conn = self.conn.lock();
463 let tx = conn.transaction()?;
464 tx.execute("DELETE FROM memory_kv", [])?;
465 {
466 let mut stmt = tx.prepare("INSERT INTO memory_kv (key, value) VALUES (?1, ?2)")?;
467 for (k, v) in map {
468 stmt.execute(params![k, serde_json::to_string(v)?])?;
469 }
470 }
471 tx.commit()?;
472 Ok(())
473 }
474
475 pub fn clear_memory(&self) -> Result<(), StoreError> {
477 let conn = self.conn.lock();
478 conn.execute("DELETE FROM memory_kv", [])?;
479 Ok(())
480 }
481
482 pub fn list_executions(&self, limit: usize) -> Result<Vec<ExecutionRecord>, StoreError> {
483 let conn = self.conn.lock();
484 let mut stmt = conn.prepare(
485 r#"
486 SELECT execution_id, status, started_at, ended_at, succeeded, failed, skipped, flow_name, trigger
487 FROM executions
488 ORDER BY started_at DESC
489 LIMIT ?1
490 "#,
491 )?;
492 let rows = stmt
493 .query_map(params![limit as i64], row_to_execution)?
494 .collect::<Result<Vec<_>, _>>()?;
495 Ok(rows)
496 }
497
498 #[allow(clippy::type_complexity)]
499 pub fn get_execution(
500 &self,
501 execution_id: &str,
502 ) -> Result<
503 Option<(ExecutionRecord, Vec<ExecutionStepRecord>, Vec<InterceptionRecord>)>,
504 StoreError,
505 > {
506 let conn = self.conn.lock();
507 let mut stmt = conn.prepare(
508 "SELECT execution_id, status, started_at, ended_at, succeeded, failed, skipped, flow_name, trigger \
509 FROM executions WHERE execution_id = ?1",
510 )?;
511 let exec = stmt.query_row(params![execution_id], row_to_execution).ok();
512
513 let Some(exec) = exec else { return Ok(None) };
514
515 let mut step_stmt = conn.prepare(
516 "SELECT execution_id, node_id, status, started_at, ended_at, output, error, reason \
517 FROM execution_steps WHERE execution_id = ?1 ORDER BY started_at",
518 )?;
519 let steps = step_stmt
520 .query_map(params![execution_id], |row| {
521 let output_json: Option<String> = row.get(5)?;
522 Ok(ExecutionStepRecord {
523 execution_id: row.get(0)?,
524 node_id: row.get(1)?,
525 status: row.get(2)?,
526 started_at: parse_dt(row.get::<_, String>(3)?),
527 ended_at: row.get::<_, Option<String>>(4)?.map(parse_dt),
528 output: output_json
529 .as_deref()
530 .and_then(|s| serde_json::from_str(s).ok()),
531 error: row.get(6)?,
532 reason: row.get(7)?,
533 })
534 })?
535 .collect::<Result<Vec<_>, _>>()?;
536
537 let mut int_stmt = conn.prepare(
538 "SELECT execution_id, at, failed_node, issue, summary, dsl_before, dsl_after \
539 FROM execution_interceptions WHERE execution_id = ?1 ORDER BY at",
540 )?;
541 let interceptions = int_stmt
542 .query_map(params![execution_id], |row| {
543 Ok(InterceptionRecord {
544 execution_id: row.get(0)?,
545 at: parse_dt(row.get::<_, String>(1)?),
546 failed_node: row.get(2)?,
547 issue: row.get(3)?,
548 summary: row.get(4)?,
549 dsl_before: row.get(5)?,
550 dsl_after: row.get(6)?,
551 })
552 })?
553 .collect::<Result<Vec<_>, _>>()?;
554
555 Ok(Some((exec, steps, interceptions)))
556 }
557
558 pub fn list_runs(&self, limit: usize) -> Result<Vec<RunSummary>, StoreError> {
561 let conn = self.conn.lock();
562 let mut stmt = conn.prepare(
563 r#"
564 SELECT e.rowid, e.execution_id, e.flow_name, e.status, e.started_at, e.ended_at,
565 e.succeeded, e.failed, e.skipped,
566 (SELECT s.node_id FROM execution_steps s
567 WHERE s.execution_id = e.execution_id AND s.status = 'failed'
568 ORDER BY s.started_at LIMIT 1) AS failed_node
569 FROM executions e
570 ORDER BY e.started_at DESC
571 LIMIT ?1
572 "#,
573 )?;
574 let rows = stmt
575 .query_map(params![limit as i64], |row| {
576 Ok(RunSummary {
577 seq: row.get(0)?,
578 execution_id: row.get(1)?,
579 flow_name: row.get(2)?,
580 status: row.get(3)?,
581 started_at: parse_dt(row.get::<_, String>(4)?),
582 ended_at: row.get::<_, Option<String>>(5)?.map(parse_dt),
583 succeeded: row.get::<_, i64>(6)? as u32,
584 failed: row.get::<_, i64>(7)? as u32,
585 skipped: row.get::<_, i64>(8)? as u32,
586 failed_node: row.get(9)?,
587 })
588 })?
589 .collect::<Result<Vec<_>, _>>()?;
590 Ok(rows)
591 }
592
593 pub fn list_template_collections(&self) -> Result<Vec<TemplateCollectionRecord>, StoreError> {
594 let conn = self.conn.lock();
595 let mut stmt = conn.prepare(
596 "SELECT slug, name, is_system, created_at, updated_at \
597 FROM template_collections ORDER BY is_system DESC, name COLLATE NOCASE ASC",
598 )?;
599 let rows = stmt
600 .query_map([], |row| {
601 Ok(TemplateCollectionRecord {
602 slug: row.get(0)?,
603 name: row.get(1)?,
604 is_system: row.get::<_, i64>(2)? != 0,
605 created_at: parse_dt(row.get::<_, String>(3)?),
606 updated_at: parse_dt(row.get::<_, String>(4)?),
607 })
608 })?
609 .collect::<Result<Vec<_>, _>>()?;
610 Ok(rows)
611 }
612
613 pub fn get_template_collection(
614 &self,
615 slug: &str,
616 ) -> Result<Option<TemplateCollectionRecord>, StoreError> {
617 let conn = self.conn.lock();
618 let row = conn
619 .query_row(
620 "SELECT slug, name, is_system, created_at, updated_at \
621 FROM template_collections WHERE slug = ?1",
622 params![slug],
623 |row| {
624 Ok(TemplateCollectionRecord {
625 slug: row.get(0)?,
626 name: row.get(1)?,
627 is_system: row.get::<_, i64>(2)? != 0,
628 created_at: parse_dt(row.get::<_, String>(3)?),
629 updated_at: parse_dt(row.get::<_, String>(4)?),
630 })
631 },
632 )
633 .ok();
634 Ok(row)
635 }
636
637 pub fn upsert_template_collection(
642 &self,
643 slug: &str,
644 name: &str,
645 is_system: bool,
646 ) -> Result<TemplateCollectionRecord, StoreError> {
647 let now = Utc::now().to_rfc3339();
648 let conn = self.conn.lock();
649 conn.execute(
650 r#"
651 INSERT INTO template_collections (slug, name, is_system, created_at, updated_at)
652 VALUES (?1, ?2, ?3, ?4, ?4)
653 ON CONFLICT(slug) DO UPDATE SET
654 name = excluded.name,
655 updated_at = excluded.updated_at
656 "#,
657 params![slug, name, is_system as i64, now],
658 )?;
659 let rec = conn
660 .query_row(
661 "SELECT slug, name, is_system, created_at, updated_at \
662 FROM template_collections WHERE slug = ?1",
663 params![slug],
664 |row| {
665 Ok(TemplateCollectionRecord {
666 slug: row.get(0)?,
667 name: row.get(1)?,
668 is_system: row.get::<_, i64>(2)? != 0,
669 created_at: parse_dt(row.get::<_, String>(3)?),
670 updated_at: parse_dt(row.get::<_, String>(4)?),
671 })
672 },
673 )
674 .map_err(StoreError::from)?;
675 Ok(rec)
676 }
677
678 pub fn rename_template_collection(
679 &self,
680 slug: &str,
681 new_name: &str,
682 ) -> Result<TemplateCollectionRecord, StoreError> {
683 let now = Utc::now().to_rfc3339();
684 let conn = self.conn.lock();
685 let updated = conn.execute(
686 "UPDATE template_collections SET name = ?1, updated_at = ?2 WHERE slug = ?3",
687 params![new_name, now, slug],
688 )?;
689 if updated == 0 {
690 return Err(StoreError::Refused(format!("collection `{slug}` not found")));
691 }
692 let rec = conn
693 .query_row(
694 "SELECT slug, name, is_system, created_at, updated_at \
695 FROM template_collections WHERE slug = ?1",
696 params![slug],
697 |row| {
698 Ok(TemplateCollectionRecord {
699 slug: row.get(0)?,
700 name: row.get(1)?,
701 is_system: row.get::<_, i64>(2)? != 0,
702 created_at: parse_dt(row.get::<_, String>(3)?),
703 updated_at: parse_dt(row.get::<_, String>(4)?),
704 })
705 },
706 )
707 .map_err(StoreError::from)?;
708 Ok(rec)
709 }
710
711 pub fn delete_template_collection(&self, slug: &str) -> Result<(), StoreError> {
715 let conn = self.conn.lock();
716 let is_system: Option<i64> = conn
717 .query_row(
718 "SELECT is_system FROM template_collections WHERE slug = ?1",
719 params![slug],
720 |row| row.get(0),
721 )
722 .ok();
723 match is_system {
724 None => {
725 return Err(StoreError::Refused(format!(
726 "collection `{slug}` not found"
727 )))
728 }
729 Some(1) => {
730 return Err(StoreError::Refused(format!(
731 "collection `{slug}` is system-managed and cannot be deleted"
732 )))
733 }
734 _ => {}
735 }
736 let members: i64 = conn.query_row(
737 "SELECT COUNT(*) FROM template_membership WHERE collection_slug = ?1",
738 params![slug],
739 |row| row.get(0),
740 )?;
741 if members > 0 {
742 return Err(StoreError::Refused(format!(
743 "collection `{slug}` has {members} member(s); move them out before deleting"
744 )));
745 }
746 conn.execute(
747 "DELETE FROM template_collections WHERE slug = ?1",
748 params![slug],
749 )?;
750 Ok(())
751 }
752
753 pub fn list_template_memberships(&self) -> Result<Vec<TemplateMembershipRow>, StoreError> {
754 let conn = self.conn.lock();
755 let mut stmt = conn.prepare(
756 "SELECT template_slug, collection_slug, hub_source_slug, hub_version, installed_at \
757 FROM template_membership",
758 )?;
759 let rows = stmt
760 .query_map([], |row| {
761 Ok(TemplateMembershipRow {
762 template_slug: row.get(0)?,
763 collection_slug: row.get(1)?,
764 hub_source_slug: row.get(2)?,
765 hub_version: row.get(3)?,
766 installed_at: row.get::<_, Option<String>>(4)?.map(parse_dt),
767 })
768 })?
769 .collect::<Result<Vec<_>, _>>()?;
770 Ok(rows)
771 }
772
773 pub fn get_template_membership(
774 &self,
775 template_slug: &str,
776 ) -> Result<Option<TemplateMembershipRow>, StoreError> {
777 let conn = self.conn.lock();
778 let row = conn
779 .query_row(
780 "SELECT template_slug, collection_slug, hub_source_slug, hub_version, installed_at \
781 FROM template_membership WHERE template_slug = ?1",
782 params![template_slug],
783 |row| {
784 Ok(TemplateMembershipRow {
785 template_slug: row.get(0)?,
786 collection_slug: row.get(1)?,
787 hub_source_slug: row.get(2)?,
788 hub_version: row.get(3)?,
789 installed_at: row.get::<_, Option<String>>(4)?.map(parse_dt),
790 })
791 },
792 )
793 .ok();
794 Ok(row)
795 }
796
797 pub fn upsert_template_membership(
798 &self,
799 row: &TemplateMembershipRow,
800 ) -> Result<(), StoreError> {
801 let conn = self.conn.lock();
802 conn.execute(
803 r#"
804 INSERT INTO template_membership
805 (template_slug, collection_slug, hub_source_slug, hub_version, installed_at)
806 VALUES (?1, ?2, ?3, ?4, ?5)
807 ON CONFLICT(template_slug) DO UPDATE SET
808 collection_slug = excluded.collection_slug,
809 hub_source_slug = excluded.hub_source_slug,
810 hub_version = excluded.hub_version,
811 installed_at = excluded.installed_at
812 "#,
813 params![
814 row.template_slug,
815 row.collection_slug,
816 row.hub_source_slug,
817 row.hub_version,
818 row.installed_at.map(|d| d.to_rfc3339()),
819 ],
820 )?;
821 Ok(())
822 }
823
824 pub fn delete_template_membership(&self, template_slug: &str) -> Result<(), StoreError> {
825 let conn = self.conn.lock();
826 conn.execute(
827 "DELETE FROM template_membership WHERE template_slug = ?1",
828 params![template_slug],
829 )?;
830 Ok(())
831 }
832
833 pub fn get_flow_workspace(&self, flow_id: &str) -> Result<Option<String>, StoreError> {
836 let conn = self.conn.lock();
837 let path = conn
838 .query_row(
839 "SELECT path FROM flow_workspaces WHERE flow_id = ?1",
840 params![flow_id],
841 |row| row.get::<_, String>(0),
842 )
843 .ok();
844 Ok(path)
845 }
846
847 pub fn set_flow_workspace(&self, flow_id: &str, path: &str) -> Result<(), StoreError> {
848 let conn = self.conn.lock();
849 conn.execute(
850 r#"
851 INSERT INTO flow_workspaces (flow_id, path, updated_at)
852 VALUES (?1, ?2, ?3)
853 ON CONFLICT(flow_id) DO UPDATE SET
854 path = excluded.path,
855 updated_at = excluded.updated_at
856 "#,
857 params![flow_id, path, chrono::Utc::now().to_rfc3339()],
858 )?;
859 Ok(())
860 }
861
862 pub fn clear_flow_workspace(&self, flow_id: &str) -> Result<(), StoreError> {
863 let conn = self.conn.lock();
864 conn.execute(
865 "DELETE FROM flow_workspaces WHERE flow_id = ?1",
866 params![flow_id],
867 )?;
868 Ok(())
869 }
870
871 pub fn list_node_library_rows(&self) -> Result<Vec<NodeLibraryRow>, StoreError> {
872 let conn = self.conn.lock();
873 let mut stmt = conn.prepare(
874 "SELECT slug, version, installed_at \
875 FROM node_library ORDER BY slug ASC",
876 )?;
877 let rows = stmt
878 .query_map([], |row| {
879 Ok(NodeLibraryRow {
880 slug: row.get(0)?,
881 version: row.get(1)?,
882 installed_at: parse_dt(row.get::<_, String>(2)?),
883 })
884 })?
885 .collect::<Result<Vec<_>, _>>()?;
886 Ok(rows)
887 }
888
889 pub fn get_node_library_row(&self, slug: &str) -> Result<Option<NodeLibraryRow>, StoreError> {
890 let conn = self.conn.lock();
891 let row = conn
892 .query_row(
893 "SELECT slug, version, installed_at \
894 FROM node_library WHERE slug = ?1",
895 params![slug],
896 |row| {
897 Ok(NodeLibraryRow {
898 slug: row.get(0)?,
899 version: row.get(1)?,
900 installed_at: parse_dt(row.get::<_, String>(2)?),
901 })
902 },
903 )
904 .ok();
905 Ok(row)
906 }
907
908 pub fn upsert_node_library_row(
910 &self,
911 row: &NodeLibraryRow,
912 ) -> Result<NodeLibraryRow, StoreError> {
913 let conn = self.conn.lock();
914 conn.execute(
915 r#"
916 INSERT INTO node_library (slug, version, installed_at)
917 VALUES (?1, ?2, ?3)
918 ON CONFLICT(slug) DO UPDATE SET
919 version = excluded.version,
920 installed_at = excluded.installed_at
921 "#,
922 params![
923 row.slug,
924 row.version,
925 row.installed_at.to_rfc3339(),
926 ],
927 )?;
928 let stored = conn
929 .query_row(
930 "SELECT slug, version, installed_at \
931 FROM node_library WHERE slug = ?1",
932 params![row.slug],
933 |r| {
934 Ok(NodeLibraryRow {
935 slug: r.get(0)?,
936 version: r.get(1)?,
937 installed_at: parse_dt(r.get::<_, String>(2)?),
938 })
939 },
940 )
941 .map_err(StoreError::from)?;
942 Ok(stored)
943 }
944
945 pub fn delete_node_library_row(&self, slug: &str) -> Result<(), StoreError> {
946 let conn = self.conn.lock();
947 let affected = conn.execute("DELETE FROM node_library WHERE slug = ?1", params![slug])?;
948 if affected == 0 {
949 return Err(StoreError::Refused(format!("node `{slug}` not found")));
950 }
951 Ok(())
952 }
953
954 pub fn upsert_schedule(&self, rec: &ScheduleRecord) -> Result<(), StoreError> {
957 let conn = self.conn.lock();
958 conn.execute(
959 r#"
960 INSERT INTO flow_schedules
961 (template_slug, collection_slug, flow_name, enabled, frequency,
962 anchor_at, timezone, cron, every_minutes, until_at, max_runs, catchup,
963 next_run_at, last_run_at, last_status, run_count, created_at, updated_at)
964 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18)
965 ON CONFLICT(template_slug) DO UPDATE SET
966 collection_slug = excluded.collection_slug,
967 flow_name = excluded.flow_name,
968 enabled = excluded.enabled,
969 frequency = excluded.frequency,
970 anchor_at = excluded.anchor_at,
971 timezone = excluded.timezone,
972 cron = excluded.cron,
973 every_minutes = excluded.every_minutes,
974 until_at = excluded.until_at,
975 max_runs = excluded.max_runs,
976 catchup = excluded.catchup,
977 next_run_at = excluded.next_run_at,
978 last_run_at = excluded.last_run_at,
979 last_status = excluded.last_status,
980 run_count = excluded.run_count,
981 updated_at = excluded.updated_at
982 "#,
983 params![
984 rec.template_slug,
985 rec.collection_slug,
986 rec.flow_name,
987 rec.enabled as i64,
988 rec.frequency,
989 rec.anchor_at.to_rfc3339(),
990 rec.timezone,
991 rec.cron,
992 rec.every_minutes.map(|v| v as i64),
993 rec.until.map(|d| d.to_rfc3339()),
994 rec.max_runs.map(|v| v as i64),
995 rec.catchup,
996 rec.next_run_at.map(|d| d.to_rfc3339()),
997 rec.last_run_at.map(|d| d.to_rfc3339()),
998 rec.last_status,
999 rec.run_count as i64,
1000 rec.created_at.to_rfc3339(),
1001 rec.updated_at.to_rfc3339(),
1002 ],
1003 )?;
1004 Ok(())
1005 }
1006
1007 pub fn get_schedule(&self, template_slug: &str) -> Result<Option<ScheduleRecord>, StoreError> {
1008 let conn = self.conn.lock();
1009 let row = conn
1010 .query_row(
1011 "SELECT template_slug, collection_slug, flow_name, enabled, frequency, \
1012 anchor_at, timezone, cron, every_minutes, until_at, max_runs, catchup, \
1013 next_run_at, last_run_at, last_status, run_count, created_at, updated_at \
1014 FROM flow_schedules WHERE template_slug = ?1",
1015 params![template_slug],
1016 row_to_schedule,
1017 )
1018 .ok();
1019 Ok(row)
1020 }
1021
1022 pub fn list_schedules(&self) -> Result<Vec<ScheduleRecord>, StoreError> {
1023 let conn = self.conn.lock();
1024 let mut stmt = conn.prepare(
1025 "SELECT template_slug, collection_slug, flow_name, enabled, frequency, \
1026 anchor_at, timezone, cron, every_minutes, until_at, max_runs, catchup, \
1027 next_run_at, last_run_at, last_status, run_count, created_at, updated_at \
1028 FROM flow_schedules ORDER BY updated_at DESC",
1029 )?;
1030 let rows = stmt
1031 .query_map([], row_to_schedule)?
1032 .collect::<Result<Vec<_>, _>>()?;
1033 Ok(rows)
1034 }
1035
1036 pub fn list_due_schedules(
1039 &self,
1040 now: DateTime<Utc>,
1041 ) -> Result<Vec<ScheduleRecord>, StoreError> {
1042 let conn = self.conn.lock();
1043 let mut stmt = conn.prepare(
1044 "SELECT template_slug, collection_slug, flow_name, enabled, frequency, \
1045 anchor_at, timezone, cron, every_minutes, until_at, max_runs, catchup, \
1046 next_run_at, last_run_at, last_status, run_count, created_at, updated_at \
1047 FROM flow_schedules \
1048 WHERE enabled = 1 AND next_run_at IS NOT NULL AND next_run_at <= ?1 \
1049 ORDER BY next_run_at ASC",
1050 )?;
1051 let rows = stmt
1052 .query_map(params![now.to_rfc3339()], row_to_schedule)?
1053 .collect::<Result<Vec<_>, _>>()?;
1054 Ok(rows)
1055 }
1056
1057 pub fn mark_schedule_run(
1060 &self,
1061 template_slug: &str,
1062 last_run_at: DateTime<Utc>,
1063 next_run_at: Option<DateTime<Utc>>,
1064 last_status: &str,
1065 ) -> Result<(), StoreError> {
1066 let conn = self.conn.lock();
1067 conn.execute(
1068 "UPDATE flow_schedules \
1069 SET last_run_at = ?2, next_run_at = ?3, last_status = ?4, \
1070 run_count = run_count + 1, enabled = CASE WHEN ?3 IS NULL THEN 0 ELSE enabled END, updated_at = ?2 \
1071 WHERE template_slug = ?1",
1072 params![
1073 template_slug,
1074 last_run_at.to_rfc3339(),
1075 next_run_at.map(|d| d.to_rfc3339()),
1076 last_status,
1077 ],
1078 )?;
1079 Ok(())
1080 }
1081
1082 pub fn advance_schedule_without_run(
1083 &self,
1084 template_slug: &str,
1085 at: DateTime<Utc>,
1086 next_run_at: Option<DateTime<Utc>>,
1087 last_status: &str,
1088 ) -> Result<(), StoreError> {
1089 let conn = self.conn.lock();
1090 conn.execute(
1091 "UPDATE flow_schedules \
1092 SET next_run_at = ?2, last_status = ?3, \
1093 enabled = CASE WHEN ?2 IS NULL THEN 0 ELSE enabled END, updated_at = ?4 \
1094 WHERE template_slug = ?1",
1095 params![
1096 template_slug,
1097 next_run_at.map(|d| d.to_rfc3339()),
1098 last_status,
1099 at.to_rfc3339(),
1100 ],
1101 )?;
1102 Ok(())
1103 }
1104
1105 pub fn set_schedule_enabled(
1106 &self,
1107 template_slug: &str,
1108 enabled: bool,
1109 next_run_at: Option<DateTime<Utc>>,
1110 ) -> Result<(), StoreError> {
1111 let conn = self.conn.lock();
1112 let now = Utc::now().to_rfc3339();
1113 conn.execute(
1114 "UPDATE flow_schedules SET enabled = ?2, next_run_at = ?3, updated_at = ?4 \
1115 WHERE template_slug = ?1",
1116 params![
1117 template_slug,
1118 enabled as i64,
1119 next_run_at.map(|d| d.to_rfc3339()),
1120 now,
1121 ],
1122 )?;
1123 Ok(())
1124 }
1125
1126 pub fn migrate_schedule(
1127 &self,
1128 old_slug: &str,
1129 new_slug: &str,
1130 collection_slug: &str,
1131 flow_name: &str,
1132 ) -> Result<(), StoreError> {
1133 let conn = self.conn.lock();
1134 let now = Utc::now().to_rfc3339();
1135 conn.execute(
1136 "UPDATE flow_schedules \
1137 SET template_slug = ?2, collection_slug = ?3, flow_name = ?4, updated_at = ?5 \
1138 WHERE template_slug = ?1",
1139 params![old_slug, new_slug, collection_slug, flow_name, now],
1140 )?;
1141 Ok(())
1142 }
1143
1144 pub fn delete_schedule(&self, template_slug: &str) -> Result<(), StoreError> {
1145 let conn = self.conn.lock();
1146 conn.execute(
1147 "DELETE FROM flow_schedules WHERE template_slug = ?1",
1148 params![template_slug],
1149 )?;
1150 Ok(())
1151 }
1152}
1153
1154fn row_to_execution(row: &rusqlite::Row<'_>) -> rusqlite::Result<ExecutionRecord> {
1155 Ok(ExecutionRecord {
1156 execution_id: row.get(0)?,
1157 status: row.get(1)?,
1158 started_at: parse_dt(row.get::<_, String>(2)?),
1159 ended_at: row.get::<_, Option<String>>(3)?.map(parse_dt),
1160 succeeded: row.get::<_, i64>(4)? as u32,
1161 failed: row.get::<_, i64>(5)? as u32,
1162 skipped: row.get::<_, i64>(6)? as u32,
1163 flow_name: row.get(7)?,
1164 trigger: row.get(8)?,
1165 })
1166}
1167
1168fn row_to_schedule(row: &rusqlite::Row<'_>) -> rusqlite::Result<ScheduleRecord> {
1169 Ok(ScheduleRecord {
1170 template_slug: row.get(0)?,
1171 collection_slug: row.get(1)?,
1172 flow_name: row.get(2)?,
1173 enabled: row.get::<_, i64>(3)? != 0,
1174 frequency: row.get(4)?,
1175 anchor_at: parse_dt(row.get::<_, String>(5)?),
1176 timezone: row.get(6)?,
1177 cron: row.get(7)?,
1178 every_minutes: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1179 until: row.get::<_, Option<String>>(9)?.map(parse_dt),
1180 max_runs: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1181 catchup: row.get(11)?,
1182 next_run_at: row.get::<_, Option<String>>(12)?.map(parse_dt),
1183 last_run_at: row.get::<_, Option<String>>(13)?.map(parse_dt),
1184 last_status: row.get(14)?,
1185 run_count: row.get::<_, i64>(15)? as u32,
1186 created_at: parse_dt(row.get::<_, String>(16)?),
1187 updated_at: parse_dt(row.get::<_, String>(17)?),
1188 })
1189}
1190
1191fn parse_dt(raw: String) -> DateTime<Utc> {
1192 DateTime::parse_from_rfc3339(&raw)
1193 .map(|d| d.with_timezone(&Utc))
1194 .unwrap_or_else(|_| Utc::now())
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199 use super::*;
1200
1201 fn rec(id: &str, status: &str) -> ExecutionRecord {
1202 ExecutionRecord {
1203 execution_id: id.into(),
1204 status: status.into(),
1205 started_at: Utc::now(),
1206 ended_at: Some(Utc::now()),
1207 succeeded: 1,
1208 failed: 0,
1209 skipped: 0,
1210 flow_name: "demo".into(),
1211 trigger: "manual".into(),
1212 }
1213 }
1214
1215 fn step(exec: &str, node: &str, status: &str) -> ExecutionStepRecord {
1216 ExecutionStepRecord {
1217 execution_id: exec.into(),
1218 node_id: node.into(),
1219 status: status.into(),
1220 started_at: Utc::now(),
1221 ended_at: Some(Utc::now()),
1222 output: Some(serde_json::json!({ "ok": true })),
1223 error: None,
1224 reason: None,
1225 }
1226 }
1227
1228 #[test]
1229 fn round_trip_execution_and_steps() {
1230 let store = Store::open_in_memory().unwrap();
1231 store.upsert_execution(&rec("e1", "succeeded")).unwrap();
1232 store.upsert_step(&step("e1", "n1", "succeeded")).unwrap();
1233 store.upsert_step(&step("e1", "n2", "skipped")).unwrap();
1234
1235 let listed = store.list_executions(10).unwrap();
1236 assert_eq!(listed.len(), 1);
1237 assert_eq!(listed[0].execution_id, "e1");
1238
1239 let detail = store.get_execution("e1").unwrap().unwrap();
1240 assert_eq!(detail.0.status, "succeeded");
1241 assert_eq!(detail.1.len(), 2);
1242 }
1243
1244 #[test]
1245 fn memory_round_trips_and_replaces() {
1246 let store = Store::open_in_memory().unwrap();
1247 assert!(store.load_memory().unwrap().is_empty());
1248
1249 let mut m = std::collections::HashMap::new();
1250 m.insert("count".to_string(), serde_json::json!(3));
1251 m.insert("note".to_string(), serde_json::json!({ "k": "v" }));
1252 store.save_memory(&m).unwrap();
1253 let loaded = store.load_memory().unwrap();
1254 assert_eq!(loaded.get("count"), Some(&serde_json::json!(3)));
1255 assert_eq!(loaded.get("note"), Some(&serde_json::json!({ "k": "v" })));
1256
1257 let mut m2 = std::collections::HashMap::new();
1259 m2.insert("only".to_string(), serde_json::json!(true));
1260 store.save_memory(&m2).unwrap();
1261 let reloaded = store.load_memory().unwrap();
1262 assert_eq!(reloaded.len(), 1);
1263 assert_eq!(reloaded.get("only"), Some(&serde_json::json!(true)));
1264
1265 store.clear_memory().unwrap();
1266 assert!(store.load_memory().unwrap().is_empty());
1267 }
1268
1269 #[test]
1270 fn upsert_replaces_existing_step() {
1271 let store = Store::open_in_memory().unwrap();
1272 store.upsert_execution(&rec("e1", "running")).unwrap();
1273 let mut s = step("e1", "n1", "running");
1274 s.ended_at = None;
1275 store.upsert_step(&s).unwrap();
1276
1277 let mut s2 = step("e1", "n1", "succeeded");
1278 s2.error = Some("ignored".into());
1279 store.upsert_step(&s2).unwrap();
1280
1281 let detail = store.get_execution("e1").unwrap().unwrap();
1282 assert_eq!(detail.1.len(), 1);
1283 assert_eq!(detail.1[0].status, "succeeded");
1284 }
1285
1286 #[test]
1287 fn list_executions_orders_newest_first() {
1288 let store = Store::open_in_memory().unwrap();
1289 let mut a = rec("a", "succeeded");
1290 a.started_at = Utc::now() - chrono::Duration::hours(2);
1291 let mut b = rec("b", "failed");
1292 b.started_at = Utc::now() - chrono::Duration::hours(1);
1293 store.upsert_execution(&a).unwrap();
1294 store.upsert_execution(&b).unwrap();
1295 let listed = store.list_executions(10).unwrap();
1296 assert_eq!(listed[0].execution_id, "b");
1297 assert_eq!(listed[1].execution_id, "a");
1298 }
1299
1300 fn membership(template: &str, collection: &str) -> TemplateMembershipRow {
1301 TemplateMembershipRow {
1302 template_slug: template.into(),
1303 collection_slug: collection.into(),
1304 hub_source_slug: None,
1305 hub_version: None,
1306 installed_at: None,
1307 }
1308 }
1309
1310 #[test]
1311 fn upsert_and_list_template_collections_pins_system_first() {
1312 let store = Store::open_in_memory().unwrap();
1313 store
1314 .upsert_template_collection("default", "Default", true)
1315 .unwrap();
1316 store
1317 .upsert_template_collection("alpha", "Alpha collection", false)
1318 .unwrap();
1319 store
1320 .upsert_template_collection("beta", "Beta collection", false)
1321 .unwrap();
1322 let listed = store.list_template_collections().unwrap();
1323 assert_eq!(listed[0].slug, "default", "system collection pins first");
1324 assert!(listed[0].is_system);
1325 assert_eq!(listed[1].slug, "alpha");
1326 assert_eq!(listed[2].slug, "beta");
1327 }
1328
1329 #[test]
1330 fn upsert_does_not_downgrade_system_flag() {
1331 let store = Store::open_in_memory().unwrap();
1332 store
1333 .upsert_template_collection("default", "Default", true)
1334 .unwrap();
1335 store
1338 .upsert_template_collection("default", "Default", false)
1339 .unwrap();
1340 let rec = store
1341 .get_template_collection("default")
1342 .unwrap()
1343 .expect("row exists");
1344 assert!(rec.is_system, "is_system must not be downgraded on upsert");
1345 }
1346
1347 #[test]
1348 fn rename_template_collection_changes_name_only() {
1349 let store = Store::open_in_memory().unwrap();
1350 let original = store
1351 .upsert_template_collection("alpha", "Alpha", false)
1352 .unwrap();
1353 let renamed = store
1354 .rename_template_collection("alpha", "Renamed Alpha")
1355 .unwrap();
1356 assert_eq!(renamed.slug, original.slug, "slug must not change on rename");
1357 assert_eq!(renamed.name, "Renamed Alpha");
1358 }
1359
1360 #[test]
1361 fn rename_missing_collection_returns_refused() {
1362 let store = Store::open_in_memory().unwrap();
1363 let err = store
1364 .rename_template_collection("ghost", "x")
1365 .expect_err("missing slug refused");
1366 assert!(matches!(err, StoreError::Refused(_)), "got {err:?}");
1367 }
1368
1369 #[test]
1370 fn delete_refuses_system_and_non_empty_collections() {
1371 let store = Store::open_in_memory().unwrap();
1372 store
1373 .upsert_template_collection("default", "Default", true)
1374 .unwrap();
1375 store
1376 .upsert_template_collection("alpha", "Alpha", false)
1377 .unwrap();
1378 store
1379 .upsert_template_membership(&membership("foo", "alpha"))
1380 .unwrap();
1381
1382 let sys_err = store
1383 .delete_template_collection("default")
1384 .expect_err("system refused");
1385 assert!(matches!(sys_err, StoreError::Refused(_)), "got {sys_err:?}");
1386
1387 let non_empty_err = store
1388 .delete_template_collection("alpha")
1389 .expect_err("non-empty refused");
1390 assert!(
1391 matches!(non_empty_err, StoreError::Refused(_)),
1392 "got {non_empty_err:?}"
1393 );
1394
1395 store.delete_template_membership("foo").unwrap();
1396 store.delete_template_collection("alpha").unwrap();
1397 let listed = store.list_template_collections().unwrap();
1398 assert!(listed.iter().all(|c| c.slug != "alpha"));
1399 }
1400
1401 #[test]
1402 fn membership_upsert_replaces_and_get_returns_row() {
1403 let store = Store::open_in_memory().unwrap();
1404 store
1405 .upsert_template_collection("alpha", "Alpha", false)
1406 .unwrap();
1407 store
1408 .upsert_template_collection("beta", "Beta", false)
1409 .unwrap();
1410 store
1411 .upsert_template_membership(&membership("foo", "alpha"))
1412 .unwrap();
1413 let got = store
1414 .get_template_membership("foo")
1415 .unwrap()
1416 .expect("row exists");
1417 assert_eq!(got.collection_slug, "alpha");
1418
1419 store
1420 .upsert_template_membership(&membership("foo", "beta"))
1421 .unwrap();
1422 let moved = store
1423 .get_template_membership("foo")
1424 .unwrap()
1425 .expect("row exists");
1426 assert_eq!(moved.collection_slug, "beta", "upsert replaces in place");
1427 }
1428
1429 #[test]
1430 fn membership_fk_violation_when_collection_unknown() {
1431 let store = Store::open_in_memory().unwrap();
1432 let err = store
1434 .upsert_template_membership(&membership("foo", "ghost"))
1435 .expect_err("FK violation");
1436 assert!(matches!(err, StoreError::Sqlite(_)), "got {err:?}");
1437 }
1438
1439 #[test]
1440 fn delete_membership_is_idempotent() {
1441 let store = Store::open_in_memory().unwrap();
1442 store
1443 .upsert_template_collection("alpha", "Alpha", false)
1444 .unwrap();
1445 store
1446 .upsert_template_membership(&membership("foo", "alpha"))
1447 .unwrap();
1448 store.delete_template_membership("foo").unwrap();
1449 store.delete_template_membership("foo").unwrap();
1451 assert!(store.get_template_membership("foo").unwrap().is_none());
1452 }
1453
1454 #[test]
1455 fn hub_source_fields_round_trip() {
1456 let store = Store::open_in_memory().unwrap();
1457 store
1458 .upsert_template_collection("alpha", "Alpha", false)
1459 .unwrap();
1460 let installed_at = Utc::now();
1461 store
1462 .upsert_template_membership(&TemplateMembershipRow {
1463 template_slug: "foo".into(),
1464 collection_slug: "alpha".into(),
1465 hub_source_slug: Some("hub-foo".into()),
1466 hub_version: Some("1.2.3".into()),
1467 installed_at: Some(installed_at),
1468 })
1469 .unwrap();
1470 let got = store
1471 .get_template_membership("foo")
1472 .unwrap()
1473 .expect("row exists");
1474 assert_eq!(got.hub_source_slug.as_deref(), Some("hub-foo"));
1475 assert_eq!(got.hub_version.as_deref(), Some("1.2.3"));
1476 let got_at = got.installed_at.expect("installed_at present");
1478 assert_eq!(
1479 got_at.timestamp(),
1480 installed_at.timestamp(),
1481 "installed_at round-trips to second precision"
1482 );
1483 }
1484
1485 fn node_row(slug: &str, version: &str) -> NodeLibraryRow {
1486 NodeLibraryRow {
1487 slug: slug.into(),
1488 version: version.into(),
1489 installed_at: Utc::now(),
1490 }
1491 }
1492
1493 #[test]
1494 fn node_library_upsert_list_get_roundtrip() {
1495 let store = Store::open_in_memory().unwrap();
1496 store
1497 .upsert_node_library_row(&node_row("alpha", "1.0.0"))
1498 .unwrap();
1499 store
1500 .upsert_node_library_row(&node_row("beta", "2.0.0"))
1501 .unwrap();
1502 let listed = store.list_node_library_rows().unwrap();
1503 assert_eq!(listed.len(), 2);
1504 assert_eq!(listed[0].slug, "alpha");
1505 assert_eq!(listed[1].slug, "beta");
1506 let got = store
1507 .get_node_library_row("alpha")
1508 .unwrap()
1509 .expect("row exists");
1510 assert_eq!(got.version, "1.0.0");
1511 }
1512
1513 #[test]
1514 fn node_library_upsert_updates_version() {
1515 let store = Store::open_in_memory().unwrap();
1516 store
1517 .upsert_node_library_row(&node_row("alpha", "1.0.0"))
1518 .unwrap();
1519 store
1520 .upsert_node_library_row(&node_row("alpha", "1.1.0"))
1521 .unwrap();
1522 let got = store
1523 .get_node_library_row("alpha")
1524 .unwrap()
1525 .expect("row exists");
1526 assert_eq!(got.version, "1.1.0");
1527 }
1528
1529 #[test]
1530 fn node_library_delete_removes_row() {
1531 let store = Store::open_in_memory().unwrap();
1532 store
1533 .upsert_node_library_row(&node_row("alpha", "1.0.0"))
1534 .unwrap();
1535 store.delete_node_library_row("alpha").unwrap();
1536 assert!(store.get_node_library_row("alpha").unwrap().is_none());
1537 }
1538
1539 #[test]
1540 fn node_library_delete_refuses_missing_row() {
1541 let store = Store::open_in_memory().unwrap();
1542 let err = store
1543 .delete_node_library_row("ghost")
1544 .expect_err("missing slug refused");
1545 assert!(matches!(err, StoreError::Refused(_)), "got {err:?}");
1546 }
1547
1548 fn schedule(slug: &str, enabled: bool, next: Option<DateTime<Utc>>) -> ScheduleRecord {
1549 let now = Utc::now();
1550 ScheduleRecord {
1551 template_slug: slug.into(),
1552 collection_slug: "default".into(),
1553 flow_name: "demo".into(),
1554 enabled,
1555 frequency: "daily".into(),
1556 anchor_at: now,
1557 timezone: "UTC".into(),
1558 cron: None,
1559 every_minutes: None,
1560 until: None,
1561 max_runs: None,
1562 catchup: "run-once".into(),
1563 next_run_at: next,
1564 last_run_at: None,
1565 last_status: None,
1566 run_count: 0,
1567 created_at: now,
1568 updated_at: now,
1569 }
1570 }
1571
1572 #[test]
1573 fn schedule_upsert_get_and_replace() {
1574 let store = Store::open_in_memory().unwrap();
1575 store.upsert_schedule(&schedule("foo", true, Some(Utc::now()))).unwrap();
1576 let got = store.get_schedule("foo").unwrap().expect("row exists");
1577 assert!(got.enabled);
1578 assert_eq!(got.frequency, "daily");
1579
1580 let mut updated = schedule("foo", false, None);
1581 updated.frequency = "weekly".into();
1582 store.upsert_schedule(&updated).unwrap();
1583 let got = store.get_schedule("foo").unwrap().expect("row exists");
1584 assert!(!got.enabled);
1585 assert_eq!(got.frequency, "weekly", "upsert replaces in place");
1586 }
1587
1588 #[test]
1589 fn list_due_excludes_disabled_and_future() {
1590 let store = Store::open_in_memory().unwrap();
1591 let past = Utc::now() - chrono::Duration::minutes(5);
1592 let future = Utc::now() + chrono::Duration::hours(1);
1593 store.upsert_schedule(&schedule("due", true, Some(past))).unwrap();
1594 store.upsert_schedule(&schedule("future", true, Some(future))).unwrap();
1595 store.upsert_schedule(&schedule("off", false, Some(past))).unwrap();
1596 let due = store.list_due_schedules(Utc::now()).unwrap();
1597 assert_eq!(due.len(), 1);
1598 assert_eq!(due[0].template_slug, "due");
1599 }
1600
1601 #[test]
1602 fn mark_run_advances_timer_and_delete_removes() {
1603 let store = Store::open_in_memory().unwrap();
1604 let past = Utc::now() - chrono::Duration::minutes(5);
1605 store.upsert_schedule(&schedule("foo", true, Some(past))).unwrap();
1606 let next = Utc::now() + chrono::Duration::days(1);
1607 store
1608 .mark_schedule_run("foo", Utc::now(), Some(next), "succeeded")
1609 .unwrap();
1610 let got = store.get_schedule("foo").unwrap().expect("row exists");
1611 assert!(got.last_run_at.is_some());
1612 assert_eq!(got.last_status.as_deref(), Some("succeeded"));
1613 assert_eq!(got.run_count, 1, "mark_schedule_run bumps the counter");
1614 assert!(got.next_run_at.unwrap() > Utc::now());
1615 assert!(store.list_due_schedules(Utc::now()).unwrap().is_empty());
1616
1617 store.delete_schedule("foo").unwrap();
1618 assert!(store.get_schedule("foo").unwrap().is_none());
1619 }
1620
1621 #[test]
1622 fn flow_workspace_set_get_clear_roundtrip() {
1623 let store = Store::open_in_memory().unwrap();
1624 assert!(store.get_flow_workspace("flow-1").unwrap().is_none());
1625
1626 store.set_flow_workspace("flow-1", "/tmp/project").unwrap();
1627 assert_eq!(
1628 store.get_flow_workspace("flow-1").unwrap().as_deref(),
1629 Some("/tmp/project")
1630 );
1631
1632 store.set_flow_workspace("flow-1", "/tmp/other").unwrap();
1634 assert_eq!(
1635 store.get_flow_workspace("flow-1").unwrap().as_deref(),
1636 Some("/tmp/other")
1637 );
1638
1639 store.clear_flow_workspace("flow-1").unwrap();
1640 assert!(store.get_flow_workspace("flow-1").unwrap().is_none());
1641 }
1642}