Skip to main content

flow_execution/
events.rs

1use chrono::{DateTime, Utc};
2use flow_storage::{ExecutionRecord, ExecutionStepRecord, Store};
3use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
7#[serde(rename_all = "snake_case")]
8pub enum LogStream {
9    Stdout,
10    Stderr,
11}
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(tag = "kind", rename_all = "snake_case")]
15pub enum ExecutionEvent {
16    Started {
17        execution_id: String,
18        at: DateTime<Utc>,
19    },
20    NodeStarted {
21        execution_id: String,
22        node_id: String,
23        at: DateTime<Utc>,
24        /// 0-based execution count for this node. Always 0 on the acyclic
25        /// path; increments when a bounded loop re-enters the node. Defaults
26        /// so events serialized before iteration support deserialize cleanly.
27        #[serde(default)]
28        iteration: u32,
29    },
30    NodeSucceeded {
31        execution_id: String,
32        node_id: String,
33        at: DateTime<Utc>,
34        output: serde_json::Value,
35        #[serde(default)]
36        iteration: u32,
37    },
38    NodeFailed {
39        execution_id: String,
40        node_id: String,
41        at: DateTime<Utc>,
42        error: String,
43        #[serde(default)]
44        iteration: u32,
45    },
46    NodeSkipped {
47        execution_id: String,
48        node_id: String,
49        at: DateTime<Utc>,
50        reason: String,
51        #[serde(default)]
52        iteration: u32,
53    },
54    /// Incremental log line emitted by streaming adapters (the shell adapter
55    /// is the first; future adapters can stream too). The Tauri side forwards
56    /// these as `flow:execution` payloads which the Log Panel renders live.
57    NodeLog {
58        execution_id: String,
59        node_id: String,
60        at: DateTime<Utc>,
61        stream: LogStream,
62        line: String,
63    },
64    /// A model tool call dispatched inside an AI node's in-node tool-use loop
65    /// (an `ai` node bound to adapters). Emitted by the tool dispatcher just
66    /// **before** the tool runs, so the UI shows live "editing `X`… running
67    /// tests…" granularity mid-turn - individual tool calls otherwise produce
68    /// no event, only node-level Started/Succeeded/Failed. UX/diagnostic layer
69    /// like [`ExecutionEvent::NodeLog`]: not persisted to the run store.
70    ToolCall {
71        execution_id: String,
72        node_id: String,
73        at: DateTime<Utc>,
74        /// Full tool name, e.g. `fs__edit-file`.
75        name: String,
76        /// Human-readable one-line summary, e.g. `edit-file src/foo.rs`.
77        summary: String,
78    },
79    /// A bounded loop hit the per-node visit cap and the node will not be
80    /// scheduled again. Diagnostic only - the run finishes with `partial`
81    /// status rather than aborting, so prior node outputs are preserved.
82    IterationCapped {
83        execution_id: String,
84        node_id: String,
85        at: DateTime<Utc>,
86        visits: u32,
87    },
88    /// The run reached a node-boundary checkpoint while paused and is now
89    /// suspended until resumed or cancelled. Diagnostic - lets the UI confirm
90    /// that an in-flight node finished and the run is truly idle.
91    Paused {
92        execution_id: String,
93        at: DateTime<Utc>,
94    },
95    /// A paused run was released and scheduling continues.
96    Resumed {
97        execution_id: String,
98        at: DateTime<Utc>,
99    },
100    /// A node about to run performs a destructive operation and the per-step
101    /// confirmation gate is enabled. The run self-pauses here until the user
102    /// confirms (resume) or cancels (stop). `action` is a human-readable
103    /// summary of what the node would do (e.g. "Delete file: build/").
104    AwaitingConfirmation {
105        execution_id: String,
106        node_id: String,
107        action: String,
108        at: DateTime<Utc>,
109    },
110    /// An AI node is invoking its model. Records the model input as a system
111    /// event (RAO constraint 6.1). `input` is the post-sanitizer prompt; for
112    /// cloud nodes without `auditContent` it is a short preview, matching the
113    /// privacy posture of `NodeSucceeded.output`.
114    AiInvocation {
115        execution_id: String,
116        node_id: String,
117        at: DateTime<Utc>,
118        provider: String,
119        model: String,
120        input: String,
121        /// Set when the node is contract-bound.
122        contract_version: Option<String>,
123    },
124    /// The engine routed a contract-bound AI output (RAO constraint 5/6).
125    /// `decision` is `auto_approve` / `human_review` / `suppress` /
126    /// `contract_violation`; `threshold` names the rule that applied, with its
127    /// configured value, so the trail shows *why* the route was taken.
128    AiRoutingDecision {
129        execution_id: String,
130        node_id: String,
131        at: DateTime<Utc>,
132        decision: String,
133        confidence: Option<f64>,
134        threshold: String,
135        contract_version: String,
136    },
137    /// A contract-bound AI output landed in the human review band (or the
138    /// model set `escalate`). The run self-pauses until the gate is resolved
139    /// (`resolve_ai_review`) or the run is cancelled.
140    AiReviewRequired {
141        execution_id: String,
142        node_id: String,
143        at: DateTime<Utc>,
144        primary_output: String,
145        confidence: f64,
146        reasoning: Option<String>,
147    },
148    /// A human resolved an AI review gate. Recorded as a system event (RAO
149    /// checklist 6.4): human decisions are execution events.
150    AiReviewResolved {
151        execution_id: String,
152        node_id: String,
153        at: DateTime<Utc>,
154        approved: bool,
155    },
156    Done {
157        execution_id: String,
158        at: DateTime<Utc>,
159        status: String,
160    },
161}
162
163pub trait EventSink: Send + Sync {
164    fn emit(&self, event: ExecutionEvent);
165}
166
167#[derive(Default)]
168pub struct NoopSink;
169
170impl EventSink for NoopSink {
171    fn emit(&self, _event: ExecutionEvent) {}
172}
173
174#[derive(Default, Clone)]
175pub struct CapturingSink {
176    pub events: Arc<std::sync::Mutex<Vec<ExecutionEvent>>>,
177}
178
179impl EventSink for CapturingSink {
180    fn emit(&self, event: ExecutionEvent) {
181        if let Ok(mut g) = self.events.lock() {
182            g.push(event);
183        }
184    }
185}
186
187pub struct MultiSink {
188    sinks: Vec<Arc<dyn EventSink>>,
189}
190
191impl MultiSink {
192    pub fn new(sinks: Vec<Arc<dyn EventSink>>) -> Self {
193        Self { sinks }
194    }
195}
196
197impl EventSink for MultiSink {
198    fn emit(&self, event: ExecutionEvent) {
199        for sink in &self.sinks {
200            sink.emit(event.clone());
201        }
202    }
203}
204
205/// EventSink that emits a `tracing` line for each execution event before
206/// forwarding to an inner sink. Wrapping a per-run sink with this makes the node
207/// lifecycle (start / succeed / fail / skip, pause / resume, done) visible in
208/// the dev terminal and the rotating log file under the `flow_execution`
209/// target.
210pub struct TracingSink {
211    inner: Arc<dyn EventSink>,
212}
213
214impl TracingSink {
215    pub fn new(inner: Arc<dyn EventSink>) -> Self {
216        Self { inner }
217    }
218}
219
220impl EventSink for TracingSink {
221    fn emit(&self, event: ExecutionEvent) {
222        match &event {
223            ExecutionEvent::Started { execution_id, .. } => {
224                tracing::info!(target: "flow_execution", %execution_id, "run started");
225            }
226            ExecutionEvent::NodeStarted {
227                execution_id,
228                node_id,
229                iteration,
230                ..
231            } => {
232                tracing::info!(target: "flow_execution", %execution_id, %node_id, iteration, "node started");
233            }
234            ExecutionEvent::NodeSucceeded {
235                execution_id,
236                node_id,
237                ..
238            } => {
239                tracing::info!(target: "flow_execution", %execution_id, %node_id, "node succeeded");
240            }
241            ExecutionEvent::NodeFailed {
242                execution_id,
243                node_id,
244                error,
245                ..
246            } => {
247                tracing::warn!(target: "flow_execution", %execution_id, %node_id, %error, "node failed");
248            }
249            ExecutionEvent::NodeSkipped {
250                execution_id,
251                node_id,
252                reason,
253                ..
254            } => {
255                tracing::info!(target: "flow_execution", %execution_id, %node_id, %reason, "node skipped");
256            }
257            // Per-line stdout/stderr is already streamed to the Log Panel; don't
258            // duplicate every line into tracing.
259            ExecutionEvent::NodeLog { .. } => {}
260            ExecutionEvent::ToolCall {
261                execution_id,
262                node_id,
263                summary,
264                ..
265            } => {
266                tracing::info!(target: "flow_execution", %execution_id, %node_id, %summary, "tool call");
267            }
268            ExecutionEvent::IterationCapped {
269                execution_id,
270                node_id,
271                visits,
272                ..
273            } => {
274                tracing::warn!(target: "flow_execution", %execution_id, %node_id, visits, "iteration cap reached");
275            }
276            ExecutionEvent::Paused { execution_id, .. } => {
277                tracing::info!(target: "flow_execution", %execution_id, "run paused");
278            }
279            ExecutionEvent::Resumed { execution_id, .. } => {
280                tracing::info!(target: "flow_execution", %execution_id, "run resumed");
281            }
282            ExecutionEvent::AwaitingConfirmation {
283                execution_id,
284                node_id,
285                action,
286                ..
287            } => {
288                tracing::warn!(target: "flow_execution", %execution_id, %node_id, %action, "awaiting destructive-step confirmation");
289            }
290            ExecutionEvent::AiInvocation {
291                execution_id,
292                node_id,
293                provider,
294                model,
295                ..
296            } => {
297                tracing::info!(target: "flow_execution", %execution_id, %node_id, %provider, %model, "ai invocation");
298            }
299            ExecutionEvent::AiRoutingDecision {
300                execution_id,
301                node_id,
302                decision,
303                confidence,
304                threshold,
305                ..
306            } => {
307                tracing::info!(target: "flow_execution", %execution_id, %node_id, %decision, ?confidence, %threshold, "ai routing decision");
308            }
309            ExecutionEvent::AiReviewRequired {
310                execution_id,
311                node_id,
312                confidence,
313                ..
314            } => {
315                tracing::warn!(target: "flow_execution", %execution_id, %node_id, %confidence, "awaiting ai review");
316            }
317            ExecutionEvent::AiReviewResolved {
318                execution_id,
319                node_id,
320                approved,
321                ..
322            } => {
323                tracing::info!(target: "flow_execution", %execution_id, %node_id, %approved, "ai review resolved");
324            }
325            ExecutionEvent::Done {
326                execution_id,
327                status,
328                ..
329            } => {
330                tracing::info!(target: "flow_execution", %execution_id, %status, "run done");
331            }
332        }
333        self.inner.emit(event);
334    }
335}
336
337/// EventSink that persists each event into the SQLite store as
338/// execution + execution_step rows. Designed for fan-out via MultiSink
339/// alongside the Tauri-emitting sink.
340pub struct StorageSink {
341    pub store: Store,
342    flow_name: String,
343    /// How the run was triggered (`manual` / `scheduled`); persisted on the
344    /// execution row so history can distinguish scheduled runs (roadmap E11).
345    trigger: String,
346    started_at: std::sync::Mutex<Option<(String, DateTime<Utc>)>>,
347}
348
349impl StorageSink {
350    pub fn new(store: Store) -> Self {
351        Self::with_flow_name(store, String::new())
352    }
353
354    /// Like [`StorageSink::new`], but tags every persisted execution with the
355    /// flow's display name so list views (history, per-template last-run) can
356    /// show which flow ran.
357    pub fn with_flow_name(store: Store, flow_name: String) -> Self {
358        Self::with_flow_name_and_trigger(store, flow_name, "manual".into())
359    }
360
361    /// Like [`StorageSink::with_flow_name`], but also records how the run was
362    /// triggered (`manual` / `scheduled`).
363    pub fn with_flow_name_and_trigger(store: Store, flow_name: String, trigger: String) -> Self {
364        Self {
365            store,
366            flow_name,
367            trigger,
368            started_at: std::sync::Mutex::new(None),
369        }
370    }
371}
372
373impl EventSink for StorageSink {
374    fn emit(&self, event: ExecutionEvent) {
375        match event {
376            ExecutionEvent::Started { execution_id, at } => {
377                if let Ok(mut g) = self.started_at.lock() {
378                    *g = Some((execution_id.clone(), at));
379                }
380                let _ = self.store.upsert_execution(&ExecutionRecord {
381                    execution_id,
382                    status: "running".into(),
383                    started_at: at,
384                    ended_at: None,
385                    succeeded: 0,
386                    failed: 0,
387                    skipped: 0,
388                    flow_name: self.flow_name.clone(),
389                    trigger: self.trigger.clone(),
390                });
391            }
392            ExecutionEvent::NodeStarted {
393                execution_id,
394                node_id,
395                at,
396                ..
397            } => {
398                let _ = self.store.upsert_step(&ExecutionStepRecord {
399                    execution_id,
400                    node_id,
401                    status: "running".into(),
402                    started_at: at,
403                    ended_at: None,
404                    output: None,
405                    error: None,
406                    reason: None,
407                });
408            }
409            ExecutionEvent::NodeSucceeded {
410                execution_id,
411                node_id,
412                at,
413                output,
414                ..
415            } => {
416                let _ = self.store.upsert_step(&ExecutionStepRecord {
417                    execution_id,
418                    node_id,
419                    status: "succeeded".into(),
420                    started_at: at,
421                    ended_at: Some(at),
422                    output: Some(output),
423                    error: None,
424                    reason: None,
425                });
426            }
427            ExecutionEvent::NodeFailed {
428                execution_id,
429                node_id,
430                at,
431                error,
432                ..
433            } => {
434                let _ = self.store.upsert_step(&ExecutionStepRecord {
435                    execution_id,
436                    node_id,
437                    status: "failed".into(),
438                    started_at: at,
439                    ended_at: Some(at),
440                    output: None,
441                    error: Some(error),
442                    reason: None,
443                });
444            }
445            ExecutionEvent::NodeSkipped {
446                execution_id,
447                node_id,
448                at,
449                reason,
450                ..
451            } => {
452                let _ = self.store.upsert_step(&ExecutionStepRecord {
453                    execution_id,
454                    node_id,
455                    status: "skipped".into(),
456                    started_at: at,
457                    ended_at: Some(at),
458                    output: None,
459                    error: None,
460                    reason: Some(reason),
461                });
462            }
463            ExecutionEvent::IterationCapped { .. } => {
464                // Diagnostic only; the node's last terminal state is already
465                // persisted from its prior NodeSucceeded/Failed event.
466            }
467            ExecutionEvent::Paused { .. }
468            | ExecutionEvent::Resumed { .. }
469            | ExecutionEvent::AwaitingConfirmation { .. } => {
470                // UX-layer transitions; the cancelled run's final status is
471                // recorded by the Done event. Nothing to persist here.
472            }
473            // RAO audit trail (constraint 6): AI inputs, routing decisions, and
474            // human review actions persist as part of the execution cycle.
475            ExecutionEvent::AiInvocation {
476                execution_id,
477                node_id,
478                at,
479                provider,
480                model,
481                input,
482                contract_version,
483            } => {
484                let _ = self.store.record_ai_audit(&flow_storage::AiAuditRecord {
485                    execution_id,
486                    node_id,
487                    at,
488                    kind: "ai_invocation".into(),
489                    payload: serde_json::json!({
490                        "provider": provider,
491                        "model": model,
492                        "input": input,
493                        "contract_version": contract_version,
494                    }),
495                });
496            }
497            ExecutionEvent::AiRoutingDecision {
498                execution_id,
499                node_id,
500                at,
501                decision,
502                confidence,
503                threshold,
504                contract_version,
505            } => {
506                let _ = self.store.record_ai_audit(&flow_storage::AiAuditRecord {
507                    execution_id,
508                    node_id,
509                    at,
510                    kind: "ai_routing_decision".into(),
511                    payload: serde_json::json!({
512                        "decision": decision,
513                        "confidence": confidence,
514                        "threshold": threshold,
515                        "contract_version": contract_version,
516                    }),
517                });
518            }
519            ExecutionEvent::AiReviewRequired {
520                execution_id,
521                node_id,
522                at,
523                primary_output,
524                confidence,
525                reasoning,
526            } => {
527                let _ = self.store.record_ai_audit(&flow_storage::AiAuditRecord {
528                    execution_id,
529                    node_id,
530                    at,
531                    kind: "ai_review_required".into(),
532                    payload: serde_json::json!({
533                        "primary_output": primary_output,
534                        "confidence": confidence,
535                        "reasoning": reasoning,
536                    }),
537                });
538            }
539            ExecutionEvent::AiReviewResolved {
540                execution_id,
541                node_id,
542                at,
543                approved,
544            } => {
545                let _ = self.store.record_ai_audit(&flow_storage::AiAuditRecord {
546                    execution_id,
547                    node_id,
548                    at,
549                    kind: "ai_review_resolved".into(),
550                    payload: serde_json::json!({ "approved": approved }),
551                });
552            }
553            ExecutionEvent::NodeLog { .. } | ExecutionEvent::ToolCall { .. } => {
554                // Log lines and per-tool-call markers stream live to the Log
555                // Panel via the Tauri-side sink. We deliberately do not persist
556                // these per-line entries to SQLite: the History panel surfaces
557                // start / succeed / fail / skip events plus the final
558                // stdout/stderr captured in NodeSucceeded.output. Streaming is a
559                // UX layer, not an audit layer; the audit log on disk records
560                // the full capture per invocation.
561            }
562            ExecutionEvent::Done {
563                execution_id,
564                at,
565                status,
566            } => {
567                let started = self
568                    .started_at
569                    .lock()
570                    .ok()
571                    .and_then(|g| g.clone())
572                    .map(|(_, t)| t)
573                    .unwrap_or(at);
574                let (succeeded, failed, skipped) = self
575                    .store
576                    .get_execution(&execution_id)
577                    .ok()
578                    .flatten()
579                    .map(|(_, steps, _)| {
580                        let mut s = 0;
581                        let mut f = 0;
582                        let mut k = 0;
583                        for st in steps {
584                            match st.status.as_str() {
585                                "succeeded" => s += 1,
586                                "failed" => f += 1,
587                                "skipped" => k += 1,
588                                _ => {}
589                            }
590                        }
591                        (s, f, k)
592                    })
593                    .unwrap_or((0, 0, 0));
594                let _ = self.store.upsert_execution(&ExecutionRecord {
595                    execution_id,
596                    status,
597                    started_at: started,
598                    ended_at: Some(at),
599                    succeeded,
600                    failed,
601                    skipped,
602                    flow_name: self.flow_name.clone(),
603                    trigger: self.trigger.clone(),
604                });
605            }
606        }
607    }
608}