Skip to main content

flow_storage/
store.rs

1use chrono::{DateTime, Utc};
2use parking_lot::Mutex;
3use rusqlite::{params, Connection};
4use serde::{Deserialize, Serialize};
5use std::path::Path;
6use std::sync::Arc;
7use thiserror::Error;
8
9#[derive(Debug, Error)]
10pub enum StoreError {
11    #[error(transparent)]
12    Sqlite(#[from] rusqlite::Error),
13    #[error(transparent)]
14    Json(#[from] serde_json::Error),
15    /// Domain-level rejection (e.g. deleting a system collection, deleting a
16    /// non-empty collection, renaming a row that does not exist). Distinct
17    /// from `Sqlite` so callers can branch on user-visible refusals without
18    /// inspecting SQLite error codes.
19    #[error("refused: {0}")]
20    Refused(String),
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ExecutionRecord {
25    pub execution_id: String,
26    pub status: String,
27    pub started_at: DateTime<Utc>,
28    pub ended_at: Option<DateTime<Utc>>,
29    pub succeeded: u32,
30    pub failed: u32,
31    pub skipped: u32,
32    /// Display name of the flow that ran (`FlowGraph.name`). Empty for runs
33    /// recorded before this field existed, or when the caller didn't supply it.
34    #[serde(default)]
35    pub flow_name: String,
36    /// How the run was triggered: `manual` (default) or `scheduled` (fired by
37    /// the background scheduler, roadmap E11).
38    #[serde(default = "default_trigger")]
39    pub trigger: String,
40}
41
42fn default_trigger() -> String {
43    "manual".into()
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct ExecutionStepRecord {
48    pub execution_id: String,
49    pub node_id: String,
50    pub status: String,
51    pub started_at: DateTime<Utc>,
52    pub ended_at: Option<DateTime<Utc>>,
53    pub output: Option<serde_json::Value>,
54    pub error: Option<String>,
55    pub reason: Option<String>,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
59pub struct InterceptionRecord {
60    pub execution_id: String,
61    pub at: DateTime<Utc>,
62    /// Label/id of the node whose failure triggered the interception.
63    pub failed_node: String,
64    /// The detected issue (error / validation message), if any.
65    pub issue: Option<String>,
66    /// What the auto-fix changed/added (e.g. the corrective sub-flow's nodes).
67    pub summary: String,
68    /// Flow DSL before the fix (the flow that ran) and after it - the snapshot +
69    /// diff for the History view (roadmap E1 autonomous session history).
70    #[serde(default)]
71    pub dsl_before: Option<String>,
72    #[serde(default)]
73    pub dsl_after: Option<String>,
74}
75
76/// One AI audit event in an execution's trail (RAO constraint 6): a model
77/// invocation, a routing decision, or a human review action. `kind` is the
78/// event discriminant (`ai_invocation` / `ai_routing_decision` /
79/// `ai_review_required` / `ai_review_resolved`); `payload` holds the
80/// kind-specific fields as JSON.
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct AiAuditRecord {
83    pub execution_id: String,
84    pub node_id: String,
85    pub at: DateTime<Utc>,
86    pub kind: String,
87    pub payload: serde_json::Value,
88}
89
90/// Compact run row for list views: the execution plus its monotonic run number
91/// (`seq` = SQLite rowid) and the first failed node, if any.
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct RunSummary {
94    pub seq: i64,
95    pub execution_id: String,
96    pub flow_name: String,
97    pub status: String,
98    pub started_at: DateTime<Utc>,
99    pub ended_at: Option<DateTime<Utc>>,
100    pub succeeded: u32,
101    pub failed: u32,
102    pub skipped: u32,
103    pub failed_node: Option<String>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct TemplateCollectionRecord {
108    pub slug: String,
109    pub name: String,
110    pub is_system: bool,
111    pub created_at: DateTime<Utc>,
112    pub updated_at: DateTime<Utc>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct TemplateMembershipRow {
117    pub template_slug: String,
118    pub collection_slug: String,
119    pub hub_source_slug: Option<String>,
120    pub hub_version: Option<String>,
121    pub installed_at: Option<DateTime<Utc>>,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct NodeLibraryRow {
126    pub slug: String,
127    pub version: String,
128    pub installed_at: DateTime<Utc>,
129}
130
131/// A persisted recurring-run timer for a saved flow (roadmap E11). Keyed by the
132/// saved template slug so a flow has at most one schedule. `next_run_at` is the
133/// next instant the background scheduler should fire (recomputed after each run
134/// from `frequency` + `anchor_at`); `None` when disabled.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct ScheduleRecord {
137    pub template_slug: String,
138    pub collection_slug: String,
139    pub flow_name: String,
140    pub enabled: bool,
141    /// One of `hourly` | `daily` | `weekly` | `monthly` | `yearly` |
142    /// `cron` | `every_n_minutes` | `once`.
143    pub frequency: String,
144    /// User-chosen first-run instant; also the recurrence anchor (its
145    /// minute/hour/weekday/day-of-month/month select the recurring slot).
146    pub anchor_at: DateTime<Utc>,
147    #[serde(default = "default_timezone")]
148    pub timezone: String,
149    #[serde(default)]
150    pub cron: Option<String>,
151    #[serde(default)]
152    pub every_minutes: Option<u32>,
153    #[serde(default)]
154    pub until: Option<DateTime<Utc>>,
155    #[serde(default)]
156    pub max_runs: Option<u32>,
157    #[serde(default = "default_catchup")]
158    pub catchup: String,
159    pub next_run_at: Option<DateTime<Utc>>,
160    pub last_run_at: Option<DateTime<Utc>>,
161    #[serde(default)]
162    pub last_status: Option<String>,
163    /// Number of times the background scheduler has fired this flow.
164    #[serde(default)]
165    pub run_count: u32,
166    pub created_at: DateTime<Utc>,
167    pub updated_at: DateTime<Utc>,
168}
169
170fn default_timezone() -> String {
171    "UTC".into()
172}
173
174fn default_catchup() -> String {
175    "run-once".into()
176}
177
178#[derive(Clone)]
179pub struct Store {
180    conn: Arc<Mutex<Connection>>,
181}
182
183impl Store {
184    pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
185        let conn = Connection::open(path.as_ref())?;
186        Self::init(conn)
187    }
188
189    pub fn open_in_memory() -> Result<Self, StoreError> {
190        let conn = Connection::open_in_memory()?;
191        Self::init(conn)
192    }
193
194    fn init(conn: Connection) -> Result<Self, StoreError> {
195        // PRAGMA must run before any DDL so the FK on template_membership
196        // actually enforces. rusqlite leaves FKs off by default per connection.
197        conn.execute_batch(
198            r#"
199            PRAGMA foreign_keys = ON;
200            CREATE TABLE IF NOT EXISTS executions (
201                execution_id TEXT PRIMARY KEY,
202                status       TEXT NOT NULL,
203                started_at   TEXT NOT NULL,
204                ended_at     TEXT,
205                succeeded    INTEGER NOT NULL DEFAULT 0,
206                failed       INTEGER NOT NULL DEFAULT 0,
207                skipped      INTEGER NOT NULL DEFAULT 0,
208                flow_name    TEXT NOT NULL DEFAULT '',
209                trigger      TEXT NOT NULL DEFAULT 'manual'
210            );
211            CREATE TABLE IF NOT EXISTS execution_steps (
212                execution_id TEXT NOT NULL,
213                node_id      TEXT NOT NULL,
214                status       TEXT NOT NULL,
215                started_at   TEXT NOT NULL,
216                ended_at     TEXT,
217                output       TEXT,
218                error        TEXT,
219                reason       TEXT,
220                PRIMARY KEY (execution_id, node_id),
221                FOREIGN KEY (execution_id) REFERENCES executions(execution_id)
222            );
223            CREATE INDEX IF NOT EXISTS idx_executions_started_at
224                ON executions(started_at DESC);
225            CREATE TABLE IF NOT EXISTS template_collections (
226                slug        TEXT PRIMARY KEY,
227                name        TEXT NOT NULL,
228                is_system   INTEGER NOT NULL DEFAULT 0,
229                created_at  TEXT NOT NULL,
230                updated_at  TEXT NOT NULL
231            );
232            CREATE TABLE IF NOT EXISTS template_membership (
233                template_slug    TEXT PRIMARY KEY,
234                collection_slug  TEXT NOT NULL,
235                hub_source_slug  TEXT,
236                hub_version      TEXT,
237                installed_at     TEXT,
238                FOREIGN KEY (collection_slug) REFERENCES template_collections(slug)
239            );
240            CREATE INDEX IF NOT EXISTS idx_template_membership_collection
241                ON template_membership(collection_slug);
242            CREATE TABLE IF NOT EXISTS node_library (
243                slug         TEXT PRIMARY KEY,
244                version      TEXT NOT NULL,
245                installed_at TEXT NOT NULL
246            );
247            CREATE TABLE IF NOT EXISTS execution_interceptions (
248                id           INTEGER PRIMARY KEY AUTOINCREMENT,
249                execution_id TEXT NOT NULL,
250                at           TEXT NOT NULL,
251                failed_node  TEXT NOT NULL,
252                issue        TEXT,
253                summary      TEXT NOT NULL,
254                dsl_before   TEXT,
255                dsl_after    TEXT
256            );
257            CREATE INDEX IF NOT EXISTS idx_interceptions_execution
258                ON execution_interceptions(execution_id);
259            CREATE TABLE IF NOT EXISTS execution_ai_audit (
260                id           INTEGER PRIMARY KEY AUTOINCREMENT,
261                execution_id TEXT NOT NULL,
262                node_id      TEXT NOT NULL,
263                at           TEXT NOT NULL,
264                kind         TEXT NOT NULL,
265                payload      TEXT NOT NULL
266            );
267            CREATE INDEX IF NOT EXISTS idx_ai_audit_execution
268                ON execution_ai_audit(execution_id);
269            CREATE TABLE IF NOT EXISTS flow_schedules (
270                template_slug   TEXT PRIMARY KEY,
271                collection_slug TEXT NOT NULL,
272                flow_name       TEXT NOT NULL DEFAULT '',
273                enabled         INTEGER NOT NULL DEFAULT 0,
274                frequency       TEXT NOT NULL,
275                anchor_at       TEXT NOT NULL,
276                timezone        TEXT NOT NULL DEFAULT 'UTC',
277                cron            TEXT,
278                every_minutes   INTEGER,
279                until_at        TEXT,
280                max_runs        INTEGER,
281                catchup         TEXT NOT NULL DEFAULT 'run-once',
282                next_run_at     TEXT,
283                last_run_at     TEXT,
284                last_status     TEXT,
285                run_count       INTEGER NOT NULL DEFAULT 0,
286                created_at      TEXT NOT NULL,
287                updated_at      TEXT NOT NULL
288            );
289            CREATE INDEX IF NOT EXISTS idx_flow_schedules_due
290                ON flow_schedules(enabled, next_run_at);
291            CREATE TABLE IF NOT EXISTS memory_kv (
292                key   TEXT PRIMARY KEY,
293                value TEXT NOT NULL
294            );
295            CREATE TABLE IF NOT EXISTS flow_workspaces (
296                flow_id    TEXT PRIMARY KEY,
297                path       TEXT NOT NULL,
298                updated_at TEXT NOT NULL
299            );
300            "#,
301        )?;
302        Ok(Self {
303            conn: Arc::new(Mutex::new(conn)),
304        })
305    }
306
307    pub fn upsert_execution(&self, rec: &ExecutionRecord) -> Result<(), StoreError> {
308        let conn = self.conn.lock();
309        conn.execute(
310            r#"
311            INSERT INTO executions (execution_id, status, started_at, ended_at, succeeded, failed, skipped, flow_name, trigger)
312            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
313            ON CONFLICT(execution_id) DO UPDATE SET
314                status = excluded.status,
315                ended_at = excluded.ended_at,
316                succeeded = excluded.succeeded,
317                failed = excluded.failed,
318                skipped = excluded.skipped
319            "#,
320            params![
321                rec.execution_id,
322                rec.status,
323                rec.started_at.to_rfc3339(),
324                rec.ended_at.map(|d| d.to_rfc3339()),
325                rec.succeeded,
326                rec.failed,
327                rec.skipped,
328                rec.flow_name,
329                rec.trigger,
330            ],
331        )?;
332        Ok(())
333    }
334
335    pub fn upsert_step(&self, rec: &ExecutionStepRecord) -> Result<(), StoreError> {
336        let conn = self.conn.lock();
337        let output_json = match &rec.output {
338            Some(v) => Some(serde_json::to_string(v)?),
339            None => None,
340        };
341        conn.execute(
342            r#"
343            INSERT INTO execution_steps (execution_id, node_id, status, started_at, ended_at, output, error, reason)
344            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
345            ON CONFLICT(execution_id, node_id) DO UPDATE SET
346                status = excluded.status,
347                ended_at = excluded.ended_at,
348                output = excluded.output,
349                error = excluded.error,
350                reason = excluded.reason
351            "#,
352            params![
353                rec.execution_id,
354                rec.node_id,
355                rec.status,
356                rec.started_at.to_rfc3339(),
357                rec.ended_at.map(|d| d.to_rfc3339()),
358                output_json,
359                rec.error,
360                rec.reason,
361            ],
362        )?;
363        Ok(())
364    }
365
366    pub fn record_interception(&self, rec: &InterceptionRecord) -> Result<(), StoreError> {
367        let conn = self.conn.lock();
368        conn.execute(
369            r#"
370            INSERT INTO execution_interceptions (execution_id, at, failed_node, issue, summary, dsl_before, dsl_after)
371            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
372            "#,
373            params![
374                rec.execution_id,
375                rec.at.to_rfc3339(),
376                rec.failed_node,
377                rec.issue,
378                rec.summary,
379                rec.dsl_before,
380                rec.dsl_after,
381            ],
382        )?;
383        Ok(())
384    }
385
386    /// Append one AI audit event. Insert-only: the trail is written as part of
387    /// the execution cycle and never updated after the fact.
388    pub fn record_ai_audit(&self, rec: &AiAuditRecord) -> Result<(), StoreError> {
389        let conn = self.conn.lock();
390        conn.execute(
391            r#"
392            INSERT INTO execution_ai_audit (execution_id, node_id, at, kind, payload)
393            VALUES (?1, ?2, ?3, ?4, ?5)
394            "#,
395            params![
396                rec.execution_id,
397                rec.node_id,
398                rec.at.to_rfc3339(),
399                rec.kind,
400                serde_json::to_string(&rec.payload)?,
401            ],
402        )?;
403        Ok(())
404    }
405
406    /// The AI audit trail for one execution, in insertion order.
407    pub fn list_ai_audit(&self, execution_id: &str) -> Result<Vec<AiAuditRecord>, StoreError> {
408        let conn = self.conn.lock();
409        let mut stmt = conn.prepare(
410            "SELECT execution_id, node_id, at, kind, payload
411             FROM execution_ai_audit WHERE execution_id = ?1 ORDER BY id",
412        )?;
413        let rows = stmt.query_map(params![execution_id], |r| {
414            Ok((
415                r.get::<_, String>(0)?,
416                r.get::<_, String>(1)?,
417                r.get::<_, String>(2)?,
418                r.get::<_, String>(3)?,
419                r.get::<_, String>(4)?,
420            ))
421        })?;
422        let mut out = Vec::new();
423        for row in rows {
424            let (execution_id, node_id, at, kind, payload) = row?;
425            out.push(AiAuditRecord {
426                execution_id,
427                node_id,
428                at: at.parse().unwrap_or_else(|_| Utc::now()),
429                kind,
430                payload: serde_json::from_str(&payload).unwrap_or(serde_json::Value::Null),
431            });
432        }
433        Ok(out)
434    }
435
436    /// Load the persisted working-memory map (durable agent memory, roadmap E1).
437    /// One row per `{{memory.<key>}}` entry; the value is JSON. Unparseable rows
438    /// are skipped rather than failing the whole load.
439    pub fn load_memory(
440        &self,
441    ) -> Result<std::collections::HashMap<String, serde_json::Value>, StoreError> {
442        let conn = self.conn.lock();
443        let mut stmt = conn.prepare("SELECT key, value FROM memory_kv")?;
444        let rows = stmt.query_map([], |r| {
445            Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
446        })?;
447        let mut map = std::collections::HashMap::new();
448        for row in rows {
449            let (k, v) = row?;
450            if let Ok(val) = serde_json::from_str::<serde_json::Value>(&v) {
451                map.insert(k, val);
452            }
453        }
454        Ok(map)
455    }
456
457    /// Replace the persisted working memory with a full snapshot of `map`.
458    pub fn save_memory(
459        &self,
460        map: &std::collections::HashMap<String, serde_json::Value>,
461    ) -> Result<(), StoreError> {
462        let mut conn = self.conn.lock();
463        let tx = conn.transaction()?;
464        tx.execute("DELETE FROM memory_kv", [])?;
465        {
466            let mut stmt = tx.prepare("INSERT INTO memory_kv (key, value) VALUES (?1, ?2)")?;
467            for (k, v) in map {
468                stmt.execute(params![k, serde_json::to_string(v)?])?;
469            }
470        }
471        tx.commit()?;
472        Ok(())
473    }
474
475    /// Clear all persisted working memory.
476    pub fn clear_memory(&self) -> Result<(), StoreError> {
477        let conn = self.conn.lock();
478        conn.execute("DELETE FROM memory_kv", [])?;
479        Ok(())
480    }
481
482    pub fn list_executions(&self, limit: usize) -> Result<Vec<ExecutionRecord>, StoreError> {
483        let conn = self.conn.lock();
484        let mut stmt = conn.prepare(
485            r#"
486            SELECT execution_id, status, started_at, ended_at, succeeded, failed, skipped, flow_name, trigger
487            FROM executions
488            ORDER BY started_at DESC
489            LIMIT ?1
490            "#,
491        )?;
492        let rows = stmt
493            .query_map(params![limit as i64], row_to_execution)?
494            .collect::<Result<Vec<_>, _>>()?;
495        Ok(rows)
496    }
497
498    #[allow(clippy::type_complexity)]
499    pub fn get_execution(
500        &self,
501        execution_id: &str,
502    ) -> Result<
503        Option<(ExecutionRecord, Vec<ExecutionStepRecord>, Vec<InterceptionRecord>)>,
504        StoreError,
505    > {
506        let conn = self.conn.lock();
507        let mut stmt = conn.prepare(
508            "SELECT execution_id, status, started_at, ended_at, succeeded, failed, skipped, flow_name, trigger \
509             FROM executions WHERE execution_id = ?1",
510        )?;
511        let exec = stmt.query_row(params![execution_id], row_to_execution).ok();
512
513        let Some(exec) = exec else { return Ok(None) };
514
515        let mut step_stmt = conn.prepare(
516            "SELECT execution_id, node_id, status, started_at, ended_at, output, error, reason \
517             FROM execution_steps WHERE execution_id = ?1 ORDER BY started_at",
518        )?;
519        let steps = step_stmt
520            .query_map(params![execution_id], |row| {
521                let output_json: Option<String> = row.get(5)?;
522                Ok(ExecutionStepRecord {
523                    execution_id: row.get(0)?,
524                    node_id: row.get(1)?,
525                    status: row.get(2)?,
526                    started_at: parse_dt(row.get::<_, String>(3)?),
527                    ended_at: row.get::<_, Option<String>>(4)?.map(parse_dt),
528                    output: output_json
529                        .as_deref()
530                        .and_then(|s| serde_json::from_str(s).ok()),
531                    error: row.get(6)?,
532                    reason: row.get(7)?,
533                })
534            })?
535            .collect::<Result<Vec<_>, _>>()?;
536
537        let mut int_stmt = conn.prepare(
538            "SELECT execution_id, at, failed_node, issue, summary, dsl_before, dsl_after \
539             FROM execution_interceptions WHERE execution_id = ?1 ORDER BY at",
540        )?;
541        let interceptions = int_stmt
542            .query_map(params![execution_id], |row| {
543                Ok(InterceptionRecord {
544                    execution_id: row.get(0)?,
545                    at: parse_dt(row.get::<_, String>(1)?),
546                    failed_node: row.get(2)?,
547                    issue: row.get(3)?,
548                    summary: row.get(4)?,
549                    dsl_before: row.get(5)?,
550                    dsl_after: row.get(6)?,
551                })
552            })?
553            .collect::<Result<Vec<_>, _>>()?;
554
555        Ok(Some((exec, steps, interceptions)))
556    }
557
558    /// Run summaries for list views, newest first - each with its monotonic run
559    /// number (rowid) and the first failed node (for a `failed @ <node>` label).
560    pub fn list_runs(&self, limit: usize) -> Result<Vec<RunSummary>, StoreError> {
561        let conn = self.conn.lock();
562        let mut stmt = conn.prepare(
563            r#"
564            SELECT e.rowid, e.execution_id, e.flow_name, e.status, e.started_at, e.ended_at,
565                   e.succeeded, e.failed, e.skipped,
566                   (SELECT s.node_id FROM execution_steps s
567                      WHERE s.execution_id = e.execution_id AND s.status = 'failed'
568                      ORDER BY s.started_at LIMIT 1) AS failed_node
569            FROM executions e
570            ORDER BY e.started_at DESC
571            LIMIT ?1
572            "#,
573        )?;
574        let rows = stmt
575            .query_map(params![limit as i64], |row| {
576                Ok(RunSummary {
577                    seq: row.get(0)?,
578                    execution_id: row.get(1)?,
579                    flow_name: row.get(2)?,
580                    status: row.get(3)?,
581                    started_at: parse_dt(row.get::<_, String>(4)?),
582                    ended_at: row.get::<_, Option<String>>(5)?.map(parse_dt),
583                    succeeded: row.get::<_, i64>(6)? as u32,
584                    failed: row.get::<_, i64>(7)? as u32,
585                    skipped: row.get::<_, i64>(8)? as u32,
586                    failed_node: row.get(9)?,
587                })
588            })?
589            .collect::<Result<Vec<_>, _>>()?;
590        Ok(rows)
591    }
592
593    pub fn list_template_collections(&self) -> Result<Vec<TemplateCollectionRecord>, StoreError> {
594        let conn = self.conn.lock();
595        let mut stmt = conn.prepare(
596            "SELECT slug, name, is_system, created_at, updated_at \
597             FROM template_collections ORDER BY is_system DESC, name COLLATE NOCASE ASC",
598        )?;
599        let rows = stmt
600            .query_map([], |row| {
601                Ok(TemplateCollectionRecord {
602                    slug: row.get(0)?,
603                    name: row.get(1)?,
604                    is_system: row.get::<_, i64>(2)? != 0,
605                    created_at: parse_dt(row.get::<_, String>(3)?),
606                    updated_at: parse_dt(row.get::<_, String>(4)?),
607                })
608            })?
609            .collect::<Result<Vec<_>, _>>()?;
610        Ok(rows)
611    }
612
613    pub fn get_template_collection(
614        &self,
615        slug: &str,
616    ) -> Result<Option<TemplateCollectionRecord>, StoreError> {
617        let conn = self.conn.lock();
618        let row = conn
619            .query_row(
620                "SELECT slug, name, is_system, created_at, updated_at \
621                 FROM template_collections WHERE slug = ?1",
622                params![slug],
623                |row| {
624                    Ok(TemplateCollectionRecord {
625                        slug: row.get(0)?,
626                        name: row.get(1)?,
627                        is_system: row.get::<_, i64>(2)? != 0,
628                        created_at: parse_dt(row.get::<_, String>(3)?),
629                        updated_at: parse_dt(row.get::<_, String>(4)?),
630                    })
631                },
632            )
633            .ok();
634        Ok(row)
635    }
636
637    /// Insert if absent, otherwise update `name` + `updated_at`. `is_system`
638    /// is set only on first insert - a subsequent upsert with `is_system =
639    /// false` does NOT downgrade an existing system row. This makes the
640    /// boot-time `ensure_default` idempotent without racing user renames.
641    pub fn upsert_template_collection(
642        &self,
643        slug: &str,
644        name: &str,
645        is_system: bool,
646    ) -> Result<TemplateCollectionRecord, StoreError> {
647        let now = Utc::now().to_rfc3339();
648        let conn = self.conn.lock();
649        conn.execute(
650            r#"
651            INSERT INTO template_collections (slug, name, is_system, created_at, updated_at)
652            VALUES (?1, ?2, ?3, ?4, ?4)
653            ON CONFLICT(slug) DO UPDATE SET
654                name = excluded.name,
655                updated_at = excluded.updated_at
656            "#,
657            params![slug, name, is_system as i64, now],
658        )?;
659        let rec = conn
660            .query_row(
661                "SELECT slug, name, is_system, created_at, updated_at \
662                 FROM template_collections WHERE slug = ?1",
663                params![slug],
664                |row| {
665                    Ok(TemplateCollectionRecord {
666                        slug: row.get(0)?,
667                        name: row.get(1)?,
668                        is_system: row.get::<_, i64>(2)? != 0,
669                        created_at: parse_dt(row.get::<_, String>(3)?),
670                        updated_at: parse_dt(row.get::<_, String>(4)?),
671                    })
672                },
673            )
674            .map_err(StoreError::from)?;
675        Ok(rec)
676    }
677
678    pub fn rename_template_collection(
679        &self,
680        slug: &str,
681        new_name: &str,
682    ) -> Result<TemplateCollectionRecord, StoreError> {
683        let now = Utc::now().to_rfc3339();
684        let conn = self.conn.lock();
685        let updated = conn.execute(
686            "UPDATE template_collections SET name = ?1, updated_at = ?2 WHERE slug = ?3",
687            params![new_name, now, slug],
688        )?;
689        if updated == 0 {
690            return Err(StoreError::Refused(format!("collection `{slug}` not found")));
691        }
692        let rec = conn
693            .query_row(
694                "SELECT slug, name, is_system, created_at, updated_at \
695                 FROM template_collections WHERE slug = ?1",
696                params![slug],
697                |row| {
698                    Ok(TemplateCollectionRecord {
699                        slug: row.get(0)?,
700                        name: row.get(1)?,
701                        is_system: row.get::<_, i64>(2)? != 0,
702                        created_at: parse_dt(row.get::<_, String>(3)?),
703                        updated_at: parse_dt(row.get::<_, String>(4)?),
704                    })
705                },
706            )
707            .map_err(StoreError::from)?;
708        Ok(rec)
709    }
710
711    /// Refuses on `is_system = true` and on any extant membership row pointing
712    /// at the collection. Caller is expected to move templates out (set their
713    /// membership to another collection, or delete them) first.
714    pub fn delete_template_collection(&self, slug: &str) -> Result<(), StoreError> {
715        let conn = self.conn.lock();
716        let is_system: Option<i64> = conn
717            .query_row(
718                "SELECT is_system FROM template_collections WHERE slug = ?1",
719                params![slug],
720                |row| row.get(0),
721            )
722            .ok();
723        match is_system {
724            None => {
725                return Err(StoreError::Refused(format!(
726                    "collection `{slug}` not found"
727                )))
728            }
729            Some(1) => {
730                return Err(StoreError::Refused(format!(
731                    "collection `{slug}` is system-managed and cannot be deleted"
732                )))
733            }
734            _ => {}
735        }
736        let members: i64 = conn.query_row(
737            "SELECT COUNT(*) FROM template_membership WHERE collection_slug = ?1",
738            params![slug],
739            |row| row.get(0),
740        )?;
741        if members > 0 {
742            return Err(StoreError::Refused(format!(
743                "collection `{slug}` has {members} member(s); move them out before deleting"
744            )));
745        }
746        conn.execute(
747            "DELETE FROM template_collections WHERE slug = ?1",
748            params![slug],
749        )?;
750        Ok(())
751    }
752
753    pub fn list_template_memberships(&self) -> Result<Vec<TemplateMembershipRow>, StoreError> {
754        let conn = self.conn.lock();
755        let mut stmt = conn.prepare(
756            "SELECT template_slug, collection_slug, hub_source_slug, hub_version, installed_at \
757             FROM template_membership",
758        )?;
759        let rows = stmt
760            .query_map([], |row| {
761                Ok(TemplateMembershipRow {
762                    template_slug: row.get(0)?,
763                    collection_slug: row.get(1)?,
764                    hub_source_slug: row.get(2)?,
765                    hub_version: row.get(3)?,
766                    installed_at: row.get::<_, Option<String>>(4)?.map(parse_dt),
767                })
768            })?
769            .collect::<Result<Vec<_>, _>>()?;
770        Ok(rows)
771    }
772
773    pub fn get_template_membership(
774        &self,
775        template_slug: &str,
776    ) -> Result<Option<TemplateMembershipRow>, StoreError> {
777        let conn = self.conn.lock();
778        let row = conn
779            .query_row(
780                "SELECT template_slug, collection_slug, hub_source_slug, hub_version, installed_at \
781                 FROM template_membership WHERE template_slug = ?1",
782                params![template_slug],
783                |row| {
784                    Ok(TemplateMembershipRow {
785                        template_slug: row.get(0)?,
786                        collection_slug: row.get(1)?,
787                        hub_source_slug: row.get(2)?,
788                        hub_version: row.get(3)?,
789                        installed_at: row.get::<_, Option<String>>(4)?.map(parse_dt),
790                    })
791                },
792            )
793            .ok();
794        Ok(row)
795    }
796
797    pub fn upsert_template_membership(
798        &self,
799        row: &TemplateMembershipRow,
800    ) -> Result<(), StoreError> {
801        let conn = self.conn.lock();
802        conn.execute(
803            r#"
804            INSERT INTO template_membership
805                (template_slug, collection_slug, hub_source_slug, hub_version, installed_at)
806            VALUES (?1, ?2, ?3, ?4, ?5)
807            ON CONFLICT(template_slug) DO UPDATE SET
808                collection_slug = excluded.collection_slug,
809                hub_source_slug = excluded.hub_source_slug,
810                hub_version = excluded.hub_version,
811                installed_at = excluded.installed_at
812            "#,
813            params![
814                row.template_slug,
815                row.collection_slug,
816                row.hub_source_slug,
817                row.hub_version,
818                row.installed_at.map(|d| d.to_rfc3339()),
819            ],
820        )?;
821        Ok(())
822    }
823
824    pub fn delete_template_membership(&self, template_slug: &str) -> Result<(), StoreError> {
825        let conn = self.conn.lock();
826        conn.execute(
827            "DELETE FROM template_membership WHERE template_slug = ?1",
828            params![template_slug],
829        )?;
830        Ok(())
831    }
832
833    /// The per-flow workspace override, keyed by `FlowGraph.id`. `None` when the
834    /// flow uses the edition default. Local machine state - never in the flow file.
835    pub fn get_flow_workspace(&self, flow_id: &str) -> Result<Option<String>, StoreError> {
836        let conn = self.conn.lock();
837        let path = conn
838            .query_row(
839                "SELECT path FROM flow_workspaces WHERE flow_id = ?1",
840                params![flow_id],
841                |row| row.get::<_, String>(0),
842            )
843            .ok();
844        Ok(path)
845    }
846
847    pub fn set_flow_workspace(&self, flow_id: &str, path: &str) -> Result<(), StoreError> {
848        let conn = self.conn.lock();
849        conn.execute(
850            r#"
851            INSERT INTO flow_workspaces (flow_id, path, updated_at)
852            VALUES (?1, ?2, ?3)
853            ON CONFLICT(flow_id) DO UPDATE SET
854                path = excluded.path,
855                updated_at = excluded.updated_at
856            "#,
857            params![flow_id, path, chrono::Utc::now().to_rfc3339()],
858        )?;
859        Ok(())
860    }
861
862    pub fn clear_flow_workspace(&self, flow_id: &str) -> Result<(), StoreError> {
863        let conn = self.conn.lock();
864        conn.execute(
865            "DELETE FROM flow_workspaces WHERE flow_id = ?1",
866            params![flow_id],
867        )?;
868        Ok(())
869    }
870
871    pub fn list_node_library_rows(&self) -> Result<Vec<NodeLibraryRow>, StoreError> {
872        let conn = self.conn.lock();
873        let mut stmt = conn.prepare(
874            "SELECT slug, version, installed_at \
875             FROM node_library ORDER BY slug ASC",
876        )?;
877        let rows = stmt
878            .query_map([], |row| {
879                Ok(NodeLibraryRow {
880                    slug: row.get(0)?,
881                    version: row.get(1)?,
882                    installed_at: parse_dt(row.get::<_, String>(2)?),
883                })
884            })?
885            .collect::<Result<Vec<_>, _>>()?;
886        Ok(rows)
887    }
888
889    pub fn get_node_library_row(&self, slug: &str) -> Result<Option<NodeLibraryRow>, StoreError> {
890        let conn = self.conn.lock();
891        let row = conn
892            .query_row(
893                "SELECT slug, version, installed_at \
894                 FROM node_library WHERE slug = ?1",
895                params![slug],
896                |row| {
897                    Ok(NodeLibraryRow {
898                        slug: row.get(0)?,
899                        version: row.get(1)?,
900                        installed_at: parse_dt(row.get::<_, String>(2)?),
901                    })
902                },
903            )
904            .ok();
905        Ok(row)
906    }
907
908    /// Insert if absent, else update `version` + `installed_at`.
909    pub fn upsert_node_library_row(
910        &self,
911        row: &NodeLibraryRow,
912    ) -> Result<NodeLibraryRow, StoreError> {
913        let conn = self.conn.lock();
914        conn.execute(
915            r#"
916            INSERT INTO node_library (slug, version, installed_at)
917            VALUES (?1, ?2, ?3)
918            ON CONFLICT(slug) DO UPDATE SET
919                version = excluded.version,
920                installed_at = excluded.installed_at
921            "#,
922            params![
923                row.slug,
924                row.version,
925                row.installed_at.to_rfc3339(),
926            ],
927        )?;
928        let stored = conn
929            .query_row(
930                "SELECT slug, version, installed_at \
931                 FROM node_library WHERE slug = ?1",
932                params![row.slug],
933                |r| {
934                    Ok(NodeLibraryRow {
935                        slug: r.get(0)?,
936                        version: r.get(1)?,
937                        installed_at: parse_dt(r.get::<_, String>(2)?),
938                    })
939                },
940            )
941            .map_err(StoreError::from)?;
942        Ok(stored)
943    }
944
945    pub fn delete_node_library_row(&self, slug: &str) -> Result<(), StoreError> {
946        let conn = self.conn.lock();
947        let affected = conn.execute("DELETE FROM node_library WHERE slug = ?1", params![slug])?;
948        if affected == 0 {
949            return Err(StoreError::Refused(format!("node `{slug}` not found")));
950        }
951        Ok(())
952    }
953
954    /// Insert or replace a flow's schedule (keyed by `template_slug`). `created_at`
955    /// is preserved on update.
956    pub fn upsert_schedule(&self, rec: &ScheduleRecord) -> Result<(), StoreError> {
957        let conn = self.conn.lock();
958        conn.execute(
959            r#"
960            INSERT INTO flow_schedules
961                (template_slug, collection_slug, flow_name, enabled, frequency,
962                 anchor_at, timezone, cron, every_minutes, until_at, max_runs, catchup,
963                 next_run_at, last_run_at, last_status, run_count, created_at, updated_at)
964            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18)
965            ON CONFLICT(template_slug) DO UPDATE SET
966                collection_slug = excluded.collection_slug,
967                flow_name = excluded.flow_name,
968                enabled = excluded.enabled,
969                frequency = excluded.frequency,
970                anchor_at = excluded.anchor_at,
971                timezone = excluded.timezone,
972                cron = excluded.cron,
973                every_minutes = excluded.every_minutes,
974                until_at = excluded.until_at,
975                max_runs = excluded.max_runs,
976                catchup = excluded.catchup,
977                next_run_at = excluded.next_run_at,
978                last_run_at = excluded.last_run_at,
979                last_status = excluded.last_status,
980                run_count = excluded.run_count,
981                updated_at = excluded.updated_at
982            "#,
983            params![
984                rec.template_slug,
985                rec.collection_slug,
986                rec.flow_name,
987                rec.enabled as i64,
988                rec.frequency,
989                rec.anchor_at.to_rfc3339(),
990                rec.timezone,
991                rec.cron,
992                rec.every_minutes.map(|v| v as i64),
993                rec.until.map(|d| d.to_rfc3339()),
994                rec.max_runs.map(|v| v as i64),
995                rec.catchup,
996                rec.next_run_at.map(|d| d.to_rfc3339()),
997                rec.last_run_at.map(|d| d.to_rfc3339()),
998                rec.last_status,
999                rec.run_count as i64,
1000                rec.created_at.to_rfc3339(),
1001                rec.updated_at.to_rfc3339(),
1002            ],
1003        )?;
1004        Ok(())
1005    }
1006
1007    pub fn get_schedule(&self, template_slug: &str) -> Result<Option<ScheduleRecord>, StoreError> {
1008        let conn = self.conn.lock();
1009        let row = conn
1010            .query_row(
1011                "SELECT template_slug, collection_slug, flow_name, enabled, frequency, \
1012                 anchor_at, timezone, cron, every_minutes, until_at, max_runs, catchup, \
1013                 next_run_at, last_run_at, last_status, run_count, created_at, updated_at \
1014                 FROM flow_schedules WHERE template_slug = ?1",
1015                params![template_slug],
1016                row_to_schedule,
1017            )
1018            .ok();
1019        Ok(row)
1020    }
1021
1022    pub fn list_schedules(&self) -> Result<Vec<ScheduleRecord>, StoreError> {
1023        let conn = self.conn.lock();
1024        let mut stmt = conn.prepare(
1025            "SELECT template_slug, collection_slug, flow_name, enabled, frequency, \
1026             anchor_at, timezone, cron, every_minutes, until_at, max_runs, catchup, \
1027             next_run_at, last_run_at, last_status, run_count, created_at, updated_at \
1028             FROM flow_schedules ORDER BY updated_at DESC",
1029        )?;
1030        let rows = stmt
1031            .query_map([], row_to_schedule)?
1032            .collect::<Result<Vec<_>, _>>()?;
1033        Ok(rows)
1034    }
1035
1036    /// Enabled schedules whose `next_run_at` is at or before `now` - the rows the
1037    /// background scheduler should fire on this tick.
1038    pub fn list_due_schedules(
1039        &self,
1040        now: DateTime<Utc>,
1041    ) -> Result<Vec<ScheduleRecord>, StoreError> {
1042        let conn = self.conn.lock();
1043        let mut stmt = conn.prepare(
1044            "SELECT template_slug, collection_slug, flow_name, enabled, frequency, \
1045             anchor_at, timezone, cron, every_minutes, until_at, max_runs, catchup, \
1046             next_run_at, last_run_at, last_status, run_count, created_at, updated_at \
1047             FROM flow_schedules \
1048             WHERE enabled = 1 AND next_run_at IS NOT NULL AND next_run_at <= ?1 \
1049             ORDER BY next_run_at ASC",
1050        )?;
1051        let rows = stmt
1052            .query_map(params![now.to_rfc3339()], row_to_schedule)?
1053            .collect::<Result<Vec<_>, _>>()?;
1054        Ok(rows)
1055    }
1056
1057    /// Advance a schedule's timer on a fire: set `last_run_at`, the recomputed
1058    /// `next_run_at`, and bump the run counter.
1059    pub fn mark_schedule_run(
1060        &self,
1061        template_slug: &str,
1062        last_run_at: DateTime<Utc>,
1063        next_run_at: Option<DateTime<Utc>>,
1064        last_status: &str,
1065    ) -> Result<(), StoreError> {
1066        let conn = self.conn.lock();
1067        conn.execute(
1068            "UPDATE flow_schedules \
1069             SET last_run_at = ?2, next_run_at = ?3, last_status = ?4, \
1070                 run_count = run_count + 1, enabled = CASE WHEN ?3 IS NULL THEN 0 ELSE enabled END, updated_at = ?2 \
1071             WHERE template_slug = ?1",
1072            params![
1073                template_slug,
1074                last_run_at.to_rfc3339(),
1075                next_run_at.map(|d| d.to_rfc3339()),
1076                last_status,
1077            ],
1078        )?;
1079        Ok(())
1080    }
1081
1082    pub fn advance_schedule_without_run(
1083        &self,
1084        template_slug: &str,
1085        at: DateTime<Utc>,
1086        next_run_at: Option<DateTime<Utc>>,
1087        last_status: &str,
1088    ) -> Result<(), StoreError> {
1089        let conn = self.conn.lock();
1090        conn.execute(
1091            "UPDATE flow_schedules \
1092             SET next_run_at = ?2, last_status = ?3, \
1093                 enabled = CASE WHEN ?2 IS NULL THEN 0 ELSE enabled END, updated_at = ?4 \
1094             WHERE template_slug = ?1",
1095            params![
1096                template_slug,
1097                next_run_at.map(|d| d.to_rfc3339()),
1098                last_status,
1099                at.to_rfc3339(),
1100            ],
1101        )?;
1102        Ok(())
1103    }
1104
1105    pub fn set_schedule_enabled(
1106        &self,
1107        template_slug: &str,
1108        enabled: bool,
1109        next_run_at: Option<DateTime<Utc>>,
1110    ) -> Result<(), StoreError> {
1111        let conn = self.conn.lock();
1112        let now = Utc::now().to_rfc3339();
1113        conn.execute(
1114            "UPDATE flow_schedules SET enabled = ?2, next_run_at = ?3, updated_at = ?4 \
1115             WHERE template_slug = ?1",
1116            params![
1117                template_slug,
1118                enabled as i64,
1119                next_run_at.map(|d| d.to_rfc3339()),
1120                now,
1121            ],
1122        )?;
1123        Ok(())
1124    }
1125
1126    pub fn migrate_schedule(
1127        &self,
1128        old_slug: &str,
1129        new_slug: &str,
1130        collection_slug: &str,
1131        flow_name: &str,
1132    ) -> Result<(), StoreError> {
1133        let conn = self.conn.lock();
1134        let now = Utc::now().to_rfc3339();
1135        conn.execute(
1136            "UPDATE flow_schedules \
1137             SET template_slug = ?2, collection_slug = ?3, flow_name = ?4, updated_at = ?5 \
1138             WHERE template_slug = ?1",
1139            params![old_slug, new_slug, collection_slug, flow_name, now],
1140        )?;
1141        Ok(())
1142    }
1143
1144    pub fn delete_schedule(&self, template_slug: &str) -> Result<(), StoreError> {
1145        let conn = self.conn.lock();
1146        conn.execute(
1147            "DELETE FROM flow_schedules WHERE template_slug = ?1",
1148            params![template_slug],
1149        )?;
1150        Ok(())
1151    }
1152}
1153
1154fn row_to_execution(row: &rusqlite::Row<'_>) -> rusqlite::Result<ExecutionRecord> {
1155    Ok(ExecutionRecord {
1156        execution_id: row.get(0)?,
1157        status: row.get(1)?,
1158        started_at: parse_dt(row.get::<_, String>(2)?),
1159        ended_at: row.get::<_, Option<String>>(3)?.map(parse_dt),
1160        succeeded: row.get::<_, i64>(4)? as u32,
1161        failed: row.get::<_, i64>(5)? as u32,
1162        skipped: row.get::<_, i64>(6)? as u32,
1163        flow_name: row.get(7)?,
1164        trigger: row.get(8)?,
1165    })
1166}
1167
1168fn row_to_schedule(row: &rusqlite::Row<'_>) -> rusqlite::Result<ScheduleRecord> {
1169    Ok(ScheduleRecord {
1170        template_slug: row.get(0)?,
1171        collection_slug: row.get(1)?,
1172        flow_name: row.get(2)?,
1173        enabled: row.get::<_, i64>(3)? != 0,
1174        frequency: row.get(4)?,
1175        anchor_at: parse_dt(row.get::<_, String>(5)?),
1176        timezone: row.get(6)?,
1177        cron: row.get(7)?,
1178        every_minutes: row.get::<_, Option<i64>>(8)?.map(|v| v as u32),
1179        until: row.get::<_, Option<String>>(9)?.map(parse_dt),
1180        max_runs: row.get::<_, Option<i64>>(10)?.map(|v| v as u32),
1181        catchup: row.get(11)?,
1182        next_run_at: row.get::<_, Option<String>>(12)?.map(parse_dt),
1183        last_run_at: row.get::<_, Option<String>>(13)?.map(parse_dt),
1184        last_status: row.get(14)?,
1185        run_count: row.get::<_, i64>(15)? as u32,
1186        created_at: parse_dt(row.get::<_, String>(16)?),
1187        updated_at: parse_dt(row.get::<_, String>(17)?),
1188    })
1189}
1190
1191fn parse_dt(raw: String) -> DateTime<Utc> {
1192    DateTime::parse_from_rfc3339(&raw)
1193        .map(|d| d.with_timezone(&Utc))
1194        .unwrap_or_else(|_| Utc::now())
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199    use super::*;
1200
1201    fn rec(id: &str, status: &str) -> ExecutionRecord {
1202        ExecutionRecord {
1203            execution_id: id.into(),
1204            status: status.into(),
1205            started_at: Utc::now(),
1206            ended_at: Some(Utc::now()),
1207            succeeded: 1,
1208            failed: 0,
1209            skipped: 0,
1210            flow_name: "demo".into(),
1211            trigger: "manual".into(),
1212        }
1213    }
1214
1215    fn step(exec: &str, node: &str, status: &str) -> ExecutionStepRecord {
1216        ExecutionStepRecord {
1217            execution_id: exec.into(),
1218            node_id: node.into(),
1219            status: status.into(),
1220            started_at: Utc::now(),
1221            ended_at: Some(Utc::now()),
1222            output: Some(serde_json::json!({ "ok": true })),
1223            error: None,
1224            reason: None,
1225        }
1226    }
1227
1228    #[test]
1229    fn round_trip_execution_and_steps() {
1230        let store = Store::open_in_memory().unwrap();
1231        store.upsert_execution(&rec("e1", "succeeded")).unwrap();
1232        store.upsert_step(&step("e1", "n1", "succeeded")).unwrap();
1233        store.upsert_step(&step("e1", "n2", "skipped")).unwrap();
1234
1235        let listed = store.list_executions(10).unwrap();
1236        assert_eq!(listed.len(), 1);
1237        assert_eq!(listed[0].execution_id, "e1");
1238
1239        let detail = store.get_execution("e1").unwrap().unwrap();
1240        assert_eq!(detail.0.status, "succeeded");
1241        assert_eq!(detail.1.len(), 2);
1242    }
1243
1244    #[test]
1245    fn memory_round_trips_and_replaces() {
1246        let store = Store::open_in_memory().unwrap();
1247        assert!(store.load_memory().unwrap().is_empty());
1248
1249        let mut m = std::collections::HashMap::new();
1250        m.insert("count".to_string(), serde_json::json!(3));
1251        m.insert("note".to_string(), serde_json::json!({ "k": "v" }));
1252        store.save_memory(&m).unwrap();
1253        let loaded = store.load_memory().unwrap();
1254        assert_eq!(loaded.get("count"), Some(&serde_json::json!(3)));
1255        assert_eq!(loaded.get("note"), Some(&serde_json::json!({ "k": "v" })));
1256
1257        // save_memory replaces the whole snapshot.
1258        let mut m2 = std::collections::HashMap::new();
1259        m2.insert("only".to_string(), serde_json::json!(true));
1260        store.save_memory(&m2).unwrap();
1261        let reloaded = store.load_memory().unwrap();
1262        assert_eq!(reloaded.len(), 1);
1263        assert_eq!(reloaded.get("only"), Some(&serde_json::json!(true)));
1264
1265        store.clear_memory().unwrap();
1266        assert!(store.load_memory().unwrap().is_empty());
1267    }
1268
1269    #[test]
1270    fn upsert_replaces_existing_step() {
1271        let store = Store::open_in_memory().unwrap();
1272        store.upsert_execution(&rec("e1", "running")).unwrap();
1273        let mut s = step("e1", "n1", "running");
1274        s.ended_at = None;
1275        store.upsert_step(&s).unwrap();
1276
1277        let mut s2 = step("e1", "n1", "succeeded");
1278        s2.error = Some("ignored".into());
1279        store.upsert_step(&s2).unwrap();
1280
1281        let detail = store.get_execution("e1").unwrap().unwrap();
1282        assert_eq!(detail.1.len(), 1);
1283        assert_eq!(detail.1[0].status, "succeeded");
1284    }
1285
1286    #[test]
1287    fn list_executions_orders_newest_first() {
1288        let store = Store::open_in_memory().unwrap();
1289        let mut a = rec("a", "succeeded");
1290        a.started_at = Utc::now() - chrono::Duration::hours(2);
1291        let mut b = rec("b", "failed");
1292        b.started_at = Utc::now() - chrono::Duration::hours(1);
1293        store.upsert_execution(&a).unwrap();
1294        store.upsert_execution(&b).unwrap();
1295        let listed = store.list_executions(10).unwrap();
1296        assert_eq!(listed[0].execution_id, "b");
1297        assert_eq!(listed[1].execution_id, "a");
1298    }
1299
1300    fn membership(template: &str, collection: &str) -> TemplateMembershipRow {
1301        TemplateMembershipRow {
1302            template_slug: template.into(),
1303            collection_slug: collection.into(),
1304            hub_source_slug: None,
1305            hub_version: None,
1306            installed_at: None,
1307        }
1308    }
1309
1310    #[test]
1311    fn upsert_and_list_template_collections_pins_system_first() {
1312        let store = Store::open_in_memory().unwrap();
1313        store
1314            .upsert_template_collection("default", "Default", true)
1315            .unwrap();
1316        store
1317            .upsert_template_collection("alpha", "Alpha collection", false)
1318            .unwrap();
1319        store
1320            .upsert_template_collection("beta", "Beta collection", false)
1321            .unwrap();
1322        let listed = store.list_template_collections().unwrap();
1323        assert_eq!(listed[0].slug, "default", "system collection pins first");
1324        assert!(listed[0].is_system);
1325        assert_eq!(listed[1].slug, "alpha");
1326        assert_eq!(listed[2].slug, "beta");
1327    }
1328
1329    #[test]
1330    fn upsert_does_not_downgrade_system_flag() {
1331        let store = Store::open_in_memory().unwrap();
1332        store
1333            .upsert_template_collection("default", "Default", true)
1334            .unwrap();
1335        // A subsequent upsert with is_system=false (which is what
1336        // ensure_default does on every boot) must leave the flag set.
1337        store
1338            .upsert_template_collection("default", "Default", false)
1339            .unwrap();
1340        let rec = store
1341            .get_template_collection("default")
1342            .unwrap()
1343            .expect("row exists");
1344        assert!(rec.is_system, "is_system must not be downgraded on upsert");
1345    }
1346
1347    #[test]
1348    fn rename_template_collection_changes_name_only() {
1349        let store = Store::open_in_memory().unwrap();
1350        let original = store
1351            .upsert_template_collection("alpha", "Alpha", false)
1352            .unwrap();
1353        let renamed = store
1354            .rename_template_collection("alpha", "Renamed Alpha")
1355            .unwrap();
1356        assert_eq!(renamed.slug, original.slug, "slug must not change on rename");
1357        assert_eq!(renamed.name, "Renamed Alpha");
1358    }
1359
1360    #[test]
1361    fn rename_missing_collection_returns_refused() {
1362        let store = Store::open_in_memory().unwrap();
1363        let err = store
1364            .rename_template_collection("ghost", "x")
1365            .expect_err("missing slug refused");
1366        assert!(matches!(err, StoreError::Refused(_)), "got {err:?}");
1367    }
1368
1369    #[test]
1370    fn delete_refuses_system_and_non_empty_collections() {
1371        let store = Store::open_in_memory().unwrap();
1372        store
1373            .upsert_template_collection("default", "Default", true)
1374            .unwrap();
1375        store
1376            .upsert_template_collection("alpha", "Alpha", false)
1377            .unwrap();
1378        store
1379            .upsert_template_membership(&membership("foo", "alpha"))
1380            .unwrap();
1381
1382        let sys_err = store
1383            .delete_template_collection("default")
1384            .expect_err("system refused");
1385        assert!(matches!(sys_err, StoreError::Refused(_)), "got {sys_err:?}");
1386
1387        let non_empty_err = store
1388            .delete_template_collection("alpha")
1389            .expect_err("non-empty refused");
1390        assert!(
1391            matches!(non_empty_err, StoreError::Refused(_)),
1392            "got {non_empty_err:?}"
1393        );
1394
1395        store.delete_template_membership("foo").unwrap();
1396        store.delete_template_collection("alpha").unwrap();
1397        let listed = store.list_template_collections().unwrap();
1398        assert!(listed.iter().all(|c| c.slug != "alpha"));
1399    }
1400
1401    #[test]
1402    fn membership_upsert_replaces_and_get_returns_row() {
1403        let store = Store::open_in_memory().unwrap();
1404        store
1405            .upsert_template_collection("alpha", "Alpha", false)
1406            .unwrap();
1407        store
1408            .upsert_template_collection("beta", "Beta", false)
1409            .unwrap();
1410        store
1411            .upsert_template_membership(&membership("foo", "alpha"))
1412            .unwrap();
1413        let got = store
1414            .get_template_membership("foo")
1415            .unwrap()
1416            .expect("row exists");
1417        assert_eq!(got.collection_slug, "alpha");
1418
1419        store
1420            .upsert_template_membership(&membership("foo", "beta"))
1421            .unwrap();
1422        let moved = store
1423            .get_template_membership("foo")
1424            .unwrap()
1425            .expect("row exists");
1426        assert_eq!(moved.collection_slug, "beta", "upsert replaces in place");
1427    }
1428
1429    #[test]
1430    fn membership_fk_violation_when_collection_unknown() {
1431        let store = Store::open_in_memory().unwrap();
1432        // No collection inserted; FK must reject.
1433        let err = store
1434            .upsert_template_membership(&membership("foo", "ghost"))
1435            .expect_err("FK violation");
1436        assert!(matches!(err, StoreError::Sqlite(_)), "got {err:?}");
1437    }
1438
1439    #[test]
1440    fn delete_membership_is_idempotent() {
1441        let store = Store::open_in_memory().unwrap();
1442        store
1443            .upsert_template_collection("alpha", "Alpha", false)
1444            .unwrap();
1445        store
1446            .upsert_template_membership(&membership("foo", "alpha"))
1447            .unwrap();
1448        store.delete_template_membership("foo").unwrap();
1449        // Deleting again is a no-op, not an error - callers can be sloppy.
1450        store.delete_template_membership("foo").unwrap();
1451        assert!(store.get_template_membership("foo").unwrap().is_none());
1452    }
1453
1454    #[test]
1455    fn hub_source_fields_round_trip() {
1456        let store = Store::open_in_memory().unwrap();
1457        store
1458            .upsert_template_collection("alpha", "Alpha", false)
1459            .unwrap();
1460        let installed_at = Utc::now();
1461        store
1462            .upsert_template_membership(&TemplateMembershipRow {
1463                template_slug: "foo".into(),
1464                collection_slug: "alpha".into(),
1465                hub_source_slug: Some("hub-foo".into()),
1466                hub_version: Some("1.2.3".into()),
1467                installed_at: Some(installed_at),
1468            })
1469            .unwrap();
1470        let got = store
1471            .get_template_membership("foo")
1472            .unwrap()
1473            .expect("row exists");
1474        assert_eq!(got.hub_source_slug.as_deref(), Some("hub-foo"));
1475        assert_eq!(got.hub_version.as_deref(), Some("1.2.3"));
1476        // RFC3339 round-trip can lose sub-second precision; compare to-second.
1477        let got_at = got.installed_at.expect("installed_at present");
1478        assert_eq!(
1479            got_at.timestamp(),
1480            installed_at.timestamp(),
1481            "installed_at round-trips to second precision"
1482        );
1483    }
1484
1485    fn node_row(slug: &str, version: &str) -> NodeLibraryRow {
1486        NodeLibraryRow {
1487            slug: slug.into(),
1488            version: version.into(),
1489            installed_at: Utc::now(),
1490        }
1491    }
1492
1493    #[test]
1494    fn node_library_upsert_list_get_roundtrip() {
1495        let store = Store::open_in_memory().unwrap();
1496        store
1497            .upsert_node_library_row(&node_row("alpha", "1.0.0"))
1498            .unwrap();
1499        store
1500            .upsert_node_library_row(&node_row("beta", "2.0.0"))
1501            .unwrap();
1502        let listed = store.list_node_library_rows().unwrap();
1503        assert_eq!(listed.len(), 2);
1504        assert_eq!(listed[0].slug, "alpha");
1505        assert_eq!(listed[1].slug, "beta");
1506        let got = store
1507            .get_node_library_row("alpha")
1508            .unwrap()
1509            .expect("row exists");
1510        assert_eq!(got.version, "1.0.0");
1511    }
1512
1513    #[test]
1514    fn node_library_upsert_updates_version() {
1515        let store = Store::open_in_memory().unwrap();
1516        store
1517            .upsert_node_library_row(&node_row("alpha", "1.0.0"))
1518            .unwrap();
1519        store
1520            .upsert_node_library_row(&node_row("alpha", "1.1.0"))
1521            .unwrap();
1522        let got = store
1523            .get_node_library_row("alpha")
1524            .unwrap()
1525            .expect("row exists");
1526        assert_eq!(got.version, "1.1.0");
1527    }
1528
1529    #[test]
1530    fn node_library_delete_removes_row() {
1531        let store = Store::open_in_memory().unwrap();
1532        store
1533            .upsert_node_library_row(&node_row("alpha", "1.0.0"))
1534            .unwrap();
1535        store.delete_node_library_row("alpha").unwrap();
1536        assert!(store.get_node_library_row("alpha").unwrap().is_none());
1537    }
1538
1539    #[test]
1540    fn node_library_delete_refuses_missing_row() {
1541        let store = Store::open_in_memory().unwrap();
1542        let err = store
1543            .delete_node_library_row("ghost")
1544            .expect_err("missing slug refused");
1545        assert!(matches!(err, StoreError::Refused(_)), "got {err:?}");
1546    }
1547
1548    fn schedule(slug: &str, enabled: bool, next: Option<DateTime<Utc>>) -> ScheduleRecord {
1549        let now = Utc::now();
1550        ScheduleRecord {
1551            template_slug: slug.into(),
1552            collection_slug: "default".into(),
1553            flow_name: "demo".into(),
1554            enabled,
1555            frequency: "daily".into(),
1556            anchor_at: now,
1557            timezone: "UTC".into(),
1558            cron: None,
1559            every_minutes: None,
1560            until: None,
1561            max_runs: None,
1562            catchup: "run-once".into(),
1563            next_run_at: next,
1564            last_run_at: None,
1565            last_status: None,
1566            run_count: 0,
1567            created_at: now,
1568            updated_at: now,
1569        }
1570    }
1571
1572    #[test]
1573    fn schedule_upsert_get_and_replace() {
1574        let store = Store::open_in_memory().unwrap();
1575        store.upsert_schedule(&schedule("foo", true, Some(Utc::now()))).unwrap();
1576        let got = store.get_schedule("foo").unwrap().expect("row exists");
1577        assert!(got.enabled);
1578        assert_eq!(got.frequency, "daily");
1579
1580        let mut updated = schedule("foo", false, None);
1581        updated.frequency = "weekly".into();
1582        store.upsert_schedule(&updated).unwrap();
1583        let got = store.get_schedule("foo").unwrap().expect("row exists");
1584        assert!(!got.enabled);
1585        assert_eq!(got.frequency, "weekly", "upsert replaces in place");
1586    }
1587
1588    #[test]
1589    fn list_due_excludes_disabled_and_future() {
1590        let store = Store::open_in_memory().unwrap();
1591        let past = Utc::now() - chrono::Duration::minutes(5);
1592        let future = Utc::now() + chrono::Duration::hours(1);
1593        store.upsert_schedule(&schedule("due", true, Some(past))).unwrap();
1594        store.upsert_schedule(&schedule("future", true, Some(future))).unwrap();
1595        store.upsert_schedule(&schedule("off", false, Some(past))).unwrap();
1596        let due = store.list_due_schedules(Utc::now()).unwrap();
1597        assert_eq!(due.len(), 1);
1598        assert_eq!(due[0].template_slug, "due");
1599    }
1600
1601    #[test]
1602    fn mark_run_advances_timer_and_delete_removes() {
1603        let store = Store::open_in_memory().unwrap();
1604        let past = Utc::now() - chrono::Duration::minutes(5);
1605        store.upsert_schedule(&schedule("foo", true, Some(past))).unwrap();
1606        let next = Utc::now() + chrono::Duration::days(1);
1607        store
1608            .mark_schedule_run("foo", Utc::now(), Some(next), "succeeded")
1609            .unwrap();
1610        let got = store.get_schedule("foo").unwrap().expect("row exists");
1611        assert!(got.last_run_at.is_some());
1612        assert_eq!(got.last_status.as_deref(), Some("succeeded"));
1613        assert_eq!(got.run_count, 1, "mark_schedule_run bumps the counter");
1614        assert!(got.next_run_at.unwrap() > Utc::now());
1615        assert!(store.list_due_schedules(Utc::now()).unwrap().is_empty());
1616
1617        store.delete_schedule("foo").unwrap();
1618        assert!(store.get_schedule("foo").unwrap().is_none());
1619    }
1620
1621    #[test]
1622    fn flow_workspace_set_get_clear_roundtrip() {
1623        let store = Store::open_in_memory().unwrap();
1624        assert!(store.get_flow_workspace("flow-1").unwrap().is_none());
1625
1626        store.set_flow_workspace("flow-1", "/tmp/project").unwrap();
1627        assert_eq!(
1628            store.get_flow_workspace("flow-1").unwrap().as_deref(),
1629            Some("/tmp/project")
1630        );
1631
1632        // Upsert replaces the path.
1633        store.set_flow_workspace("flow-1", "/tmp/other").unwrap();
1634        assert_eq!(
1635            store.get_flow_workspace("flow-1").unwrap().as_deref(),
1636            Some("/tmp/other")
1637        );
1638
1639        store.clear_flow_workspace("flow-1").unwrap();
1640        assert!(store.get_flow_workspace("flow-1").unwrap().is_none());
1641    }
1642}