1use chrono::{DateTime, Utc};
2use flow_storage::{ExecutionRecord, ExecutionStepRecord, Store};
3use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
7#[serde(rename_all = "snake_case")]
8pub enum LogStream {
9 Stdout,
10 Stderr,
11}
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
14#[serde(tag = "kind", rename_all = "snake_case")]
15pub enum ExecutionEvent {
16 Started {
17 execution_id: String,
18 at: DateTime<Utc>,
19 },
20 NodeStarted {
21 execution_id: String,
22 node_id: String,
23 at: DateTime<Utc>,
24 #[serde(default)]
28 iteration: u32,
29 },
30 NodeSucceeded {
31 execution_id: String,
32 node_id: String,
33 at: DateTime<Utc>,
34 output: serde_json::Value,
35 #[serde(default)]
36 iteration: u32,
37 },
38 NodeFailed {
39 execution_id: String,
40 node_id: String,
41 at: DateTime<Utc>,
42 error: String,
43 #[serde(default)]
44 iteration: u32,
45 },
46 NodeSkipped {
47 execution_id: String,
48 node_id: String,
49 at: DateTime<Utc>,
50 reason: String,
51 #[serde(default)]
52 iteration: u32,
53 },
54 NodeLog {
58 execution_id: String,
59 node_id: String,
60 at: DateTime<Utc>,
61 stream: LogStream,
62 line: String,
63 },
64 ToolCall {
71 execution_id: String,
72 node_id: String,
73 at: DateTime<Utc>,
74 name: String,
76 summary: String,
78 },
79 IterationCapped {
83 execution_id: String,
84 node_id: String,
85 at: DateTime<Utc>,
86 visits: u32,
87 },
88 Paused {
92 execution_id: String,
93 at: DateTime<Utc>,
94 },
95 Resumed {
97 execution_id: String,
98 at: DateTime<Utc>,
99 },
100 AwaitingConfirmation {
105 execution_id: String,
106 node_id: String,
107 action: String,
108 at: DateTime<Utc>,
109 },
110 AiInvocation {
115 execution_id: String,
116 node_id: String,
117 at: DateTime<Utc>,
118 provider: String,
119 model: String,
120 input: String,
121 contract_version: Option<String>,
123 },
124 AiRoutingDecision {
129 execution_id: String,
130 node_id: String,
131 at: DateTime<Utc>,
132 decision: String,
133 confidence: Option<f64>,
134 threshold: String,
135 contract_version: String,
136 },
137 AiReviewRequired {
141 execution_id: String,
142 node_id: String,
143 at: DateTime<Utc>,
144 primary_output: String,
145 confidence: f64,
146 reasoning: Option<String>,
147 },
148 AiReviewResolved {
151 execution_id: String,
152 node_id: String,
153 at: DateTime<Utc>,
154 approved: bool,
155 },
156 Done {
157 execution_id: String,
158 at: DateTime<Utc>,
159 status: String,
160 },
161}
162
163pub trait EventSink: Send + Sync {
164 fn emit(&self, event: ExecutionEvent);
165}
166
167#[derive(Default)]
168pub struct NoopSink;
169
170impl EventSink for NoopSink {
171 fn emit(&self, _event: ExecutionEvent) {}
172}
173
174#[derive(Default, Clone)]
175pub struct CapturingSink {
176 pub events: Arc<std::sync::Mutex<Vec<ExecutionEvent>>>,
177}
178
179impl EventSink for CapturingSink {
180 fn emit(&self, event: ExecutionEvent) {
181 if let Ok(mut g) = self.events.lock() {
182 g.push(event);
183 }
184 }
185}
186
187pub struct MultiSink {
188 sinks: Vec<Arc<dyn EventSink>>,
189}
190
191impl MultiSink {
192 pub fn new(sinks: Vec<Arc<dyn EventSink>>) -> Self {
193 Self { sinks }
194 }
195}
196
197impl EventSink for MultiSink {
198 fn emit(&self, event: ExecutionEvent) {
199 for sink in &self.sinks {
200 sink.emit(event.clone());
201 }
202 }
203}
204
205pub struct TracingSink {
211 inner: Arc<dyn EventSink>,
212}
213
214impl TracingSink {
215 pub fn new(inner: Arc<dyn EventSink>) -> Self {
216 Self { inner }
217 }
218}
219
220impl EventSink for TracingSink {
221 fn emit(&self, event: ExecutionEvent) {
222 match &event {
223 ExecutionEvent::Started { execution_id, .. } => {
224 tracing::info!(target: "flow_execution", %execution_id, "run started");
225 }
226 ExecutionEvent::NodeStarted {
227 execution_id,
228 node_id,
229 iteration,
230 ..
231 } => {
232 tracing::info!(target: "flow_execution", %execution_id, %node_id, iteration, "node started");
233 }
234 ExecutionEvent::NodeSucceeded {
235 execution_id,
236 node_id,
237 ..
238 } => {
239 tracing::info!(target: "flow_execution", %execution_id, %node_id, "node succeeded");
240 }
241 ExecutionEvent::NodeFailed {
242 execution_id,
243 node_id,
244 error,
245 ..
246 } => {
247 tracing::warn!(target: "flow_execution", %execution_id, %node_id, %error, "node failed");
248 }
249 ExecutionEvent::NodeSkipped {
250 execution_id,
251 node_id,
252 reason,
253 ..
254 } => {
255 tracing::info!(target: "flow_execution", %execution_id, %node_id, %reason, "node skipped");
256 }
257 ExecutionEvent::NodeLog { .. } => {}
260 ExecutionEvent::ToolCall {
261 execution_id,
262 node_id,
263 summary,
264 ..
265 } => {
266 tracing::info!(target: "flow_execution", %execution_id, %node_id, %summary, "tool call");
267 }
268 ExecutionEvent::IterationCapped {
269 execution_id,
270 node_id,
271 visits,
272 ..
273 } => {
274 tracing::warn!(target: "flow_execution", %execution_id, %node_id, visits, "iteration cap reached");
275 }
276 ExecutionEvent::Paused { execution_id, .. } => {
277 tracing::info!(target: "flow_execution", %execution_id, "run paused");
278 }
279 ExecutionEvent::Resumed { execution_id, .. } => {
280 tracing::info!(target: "flow_execution", %execution_id, "run resumed");
281 }
282 ExecutionEvent::AwaitingConfirmation {
283 execution_id,
284 node_id,
285 action,
286 ..
287 } => {
288 tracing::warn!(target: "flow_execution", %execution_id, %node_id, %action, "awaiting destructive-step confirmation");
289 }
290 ExecutionEvent::AiInvocation {
291 execution_id,
292 node_id,
293 provider,
294 model,
295 ..
296 } => {
297 tracing::info!(target: "flow_execution", %execution_id, %node_id, %provider, %model, "ai invocation");
298 }
299 ExecutionEvent::AiRoutingDecision {
300 execution_id,
301 node_id,
302 decision,
303 confidence,
304 threshold,
305 ..
306 } => {
307 tracing::info!(target: "flow_execution", %execution_id, %node_id, %decision, ?confidence, %threshold, "ai routing decision");
308 }
309 ExecutionEvent::AiReviewRequired {
310 execution_id,
311 node_id,
312 confidence,
313 ..
314 } => {
315 tracing::warn!(target: "flow_execution", %execution_id, %node_id, %confidence, "awaiting ai review");
316 }
317 ExecutionEvent::AiReviewResolved {
318 execution_id,
319 node_id,
320 approved,
321 ..
322 } => {
323 tracing::info!(target: "flow_execution", %execution_id, %node_id, %approved, "ai review resolved");
324 }
325 ExecutionEvent::Done {
326 execution_id,
327 status,
328 ..
329 } => {
330 tracing::info!(target: "flow_execution", %execution_id, %status, "run done");
331 }
332 }
333 self.inner.emit(event);
334 }
335}
336
337pub struct StorageSink {
341 pub store: Store,
342 flow_name: String,
343 trigger: String,
346 started_at: std::sync::Mutex<Option<(String, DateTime<Utc>)>>,
347}
348
349impl StorageSink {
350 pub fn new(store: Store) -> Self {
351 Self::with_flow_name(store, String::new())
352 }
353
354 pub fn with_flow_name(store: Store, flow_name: String) -> Self {
358 Self::with_flow_name_and_trigger(store, flow_name, "manual".into())
359 }
360
361 pub fn with_flow_name_and_trigger(store: Store, flow_name: String, trigger: String) -> Self {
364 Self {
365 store,
366 flow_name,
367 trigger,
368 started_at: std::sync::Mutex::new(None),
369 }
370 }
371}
372
373impl EventSink for StorageSink {
374 fn emit(&self, event: ExecutionEvent) {
375 match event {
376 ExecutionEvent::Started { execution_id, at } => {
377 if let Ok(mut g) = self.started_at.lock() {
378 *g = Some((execution_id.clone(), at));
379 }
380 let _ = self.store.upsert_execution(&ExecutionRecord {
381 execution_id,
382 status: "running".into(),
383 started_at: at,
384 ended_at: None,
385 succeeded: 0,
386 failed: 0,
387 skipped: 0,
388 flow_name: self.flow_name.clone(),
389 trigger: self.trigger.clone(),
390 });
391 }
392 ExecutionEvent::NodeStarted {
393 execution_id,
394 node_id,
395 at,
396 ..
397 } => {
398 let _ = self.store.upsert_step(&ExecutionStepRecord {
399 execution_id,
400 node_id,
401 status: "running".into(),
402 started_at: at,
403 ended_at: None,
404 output: None,
405 error: None,
406 reason: None,
407 });
408 }
409 ExecutionEvent::NodeSucceeded {
410 execution_id,
411 node_id,
412 at,
413 output,
414 ..
415 } => {
416 let _ = self.store.upsert_step(&ExecutionStepRecord {
417 execution_id,
418 node_id,
419 status: "succeeded".into(),
420 started_at: at,
421 ended_at: Some(at),
422 output: Some(output),
423 error: None,
424 reason: None,
425 });
426 }
427 ExecutionEvent::NodeFailed {
428 execution_id,
429 node_id,
430 at,
431 error,
432 ..
433 } => {
434 let _ = self.store.upsert_step(&ExecutionStepRecord {
435 execution_id,
436 node_id,
437 status: "failed".into(),
438 started_at: at,
439 ended_at: Some(at),
440 output: None,
441 error: Some(error),
442 reason: None,
443 });
444 }
445 ExecutionEvent::NodeSkipped {
446 execution_id,
447 node_id,
448 at,
449 reason,
450 ..
451 } => {
452 let _ = self.store.upsert_step(&ExecutionStepRecord {
453 execution_id,
454 node_id,
455 status: "skipped".into(),
456 started_at: at,
457 ended_at: Some(at),
458 output: None,
459 error: None,
460 reason: Some(reason),
461 });
462 }
463 ExecutionEvent::IterationCapped { .. } => {
464 }
467 ExecutionEvent::Paused { .. }
468 | ExecutionEvent::Resumed { .. }
469 | ExecutionEvent::AwaitingConfirmation { .. } => {
470 }
473 ExecutionEvent::AiInvocation {
476 execution_id,
477 node_id,
478 at,
479 provider,
480 model,
481 input,
482 contract_version,
483 } => {
484 let _ = self.store.record_ai_audit(&flow_storage::AiAuditRecord {
485 execution_id,
486 node_id,
487 at,
488 kind: "ai_invocation".into(),
489 payload: serde_json::json!({
490 "provider": provider,
491 "model": model,
492 "input": input,
493 "contract_version": contract_version,
494 }),
495 });
496 }
497 ExecutionEvent::AiRoutingDecision {
498 execution_id,
499 node_id,
500 at,
501 decision,
502 confidence,
503 threshold,
504 contract_version,
505 } => {
506 let _ = self.store.record_ai_audit(&flow_storage::AiAuditRecord {
507 execution_id,
508 node_id,
509 at,
510 kind: "ai_routing_decision".into(),
511 payload: serde_json::json!({
512 "decision": decision,
513 "confidence": confidence,
514 "threshold": threshold,
515 "contract_version": contract_version,
516 }),
517 });
518 }
519 ExecutionEvent::AiReviewRequired {
520 execution_id,
521 node_id,
522 at,
523 primary_output,
524 confidence,
525 reasoning,
526 } => {
527 let _ = self.store.record_ai_audit(&flow_storage::AiAuditRecord {
528 execution_id,
529 node_id,
530 at,
531 kind: "ai_review_required".into(),
532 payload: serde_json::json!({
533 "primary_output": primary_output,
534 "confidence": confidence,
535 "reasoning": reasoning,
536 }),
537 });
538 }
539 ExecutionEvent::AiReviewResolved {
540 execution_id,
541 node_id,
542 at,
543 approved,
544 } => {
545 let _ = self.store.record_ai_audit(&flow_storage::AiAuditRecord {
546 execution_id,
547 node_id,
548 at,
549 kind: "ai_review_resolved".into(),
550 payload: serde_json::json!({ "approved": approved }),
551 });
552 }
553 ExecutionEvent::NodeLog { .. } | ExecutionEvent::ToolCall { .. } => {
554 }
562 ExecutionEvent::Done {
563 execution_id,
564 at,
565 status,
566 } => {
567 let started = self
568 .started_at
569 .lock()
570 .ok()
571 .and_then(|g| g.clone())
572 .map(|(_, t)| t)
573 .unwrap_or(at);
574 let (succeeded, failed, skipped) = self
575 .store
576 .get_execution(&execution_id)
577 .ok()
578 .flatten()
579 .map(|(_, steps, _)| {
580 let mut s = 0;
581 let mut f = 0;
582 let mut k = 0;
583 for st in steps {
584 match st.status.as_str() {
585 "succeeded" => s += 1,
586 "failed" => f += 1,
587 "skipped" => k += 1,
588 _ => {}
589 }
590 }
591 (s, f, k)
592 })
593 .unwrap_or((0, 0, 0));
594 let _ = self.store.upsert_execution(&ExecutionRecord {
595 execution_id,
596 status,
597 started_at: started,
598 ended_at: Some(at),
599 succeeded,
600 failed,
601 skipped,
602 flow_name: self.flow_name.clone(),
603 trigger: self.trigger.clone(),
604 });
605 }
606 }
607 }
608}