Skip to main content

flow_execution/
executor.rs

1use chrono::Utc;
2use flow_adapter_ai::{CloudAiRegistry, CloudAiRequest, LlmStreamSink};
3#[cfg(test)]
4use flow_adapter_ai::NullStreamSink;
5use flow_domain::contract::{AiNodeContract, AiOutputEnvelope, RouteDecision};
6use flow_domain::graph::{EdgeOutcome, FlowEdge, FlowGraph, FlowNode, Position, SubFlow};
7use flow_security::{CredentialError, CredentialResolver, PiiSanitizer};
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::path::PathBuf;
10use std::sync::Arc;
11use thiserror::Error;
12use uuid::Uuid;
13
14use crate::adapter::AdapterRegistry;
15use crate::events::{EventSink, ExecutionEvent, LogStream};
16
17#[derive(Debug, Error)]
18pub enum ExecutorError {
19    #[error("graph contains a cycle involving node {0}")]
20    Cycle(String),
21    #[error("node {node_id} references unknown adapter")]
22    MissingAdapter { node_id: String },
23}
24
25#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
26pub struct ExecutionSummary {
27    pub execution_id: String,
28    pub status: String,
29    pub succeeded: usize,
30    pub failed: usize,
31    pub skipped: usize,
32}
33
34pub struct Executor {
35    pub adapters: Arc<AdapterRegistry>,
36    pub sanitizer: Arc<PiiSanitizer>,
37    pub events: Arc<dyn EventSink>,
38    pub cloud_providers: Arc<CloudAiRegistry>,
39    pub credentials: Arc<dyn CredentialResolver>,
40    pub allow_cloud_ai: bool,
41    /// Gate for the `local` provider (on-device OpenAI-compatible server).
42    /// Separate from `allow_cloud_ai` because a localhost call is not
43    /// network egress.
44    pub allow_local_ai: bool,
45    /// Endpoint for the `local` provider, sourced from settings. Passed
46    /// through as `CloudAiRequest.base_url` for local-provider nodes.
47    pub local_ai_base_url: Option<String>,
48    /// Fan-out target for streaming LLM tokens. Wired by `FlowApp` so
49    /// every AI (local/cloud) and agentic node invocation feeds the
50    /// header chip ticker via Tauri's `flow:llm_token` event. Headless
51    /// tests use the default `NullStreamSink`.
52    pub stream_sink: Arc<dyn LlmStreamSink>,
53    /// Session-scoped key→JSON working memory, carried across runs by `FlowApp`
54    /// so re-plan iterations see prior results. Succeeded `set-variable` utility
55    /// nodes write it; any field (and `when` conditions) read it via
56    /// `{{memory.<key>}}`. Headless tests pass a fresh empty store.
57    pub working_memory: WorkingMemory,
58    /// Out-of-band pause/resume/cancel signal, polled at node boundaries.
59    /// Shared with `FlowApp` so the Tauri pause/resume/stop commands can steer
60    /// an in-progress run. Headless tests use the default (always `Running`).
61    pub control: SharedRunControl,
62    /// Per-step destructive-action confirmation gate (roadmap E1). When `true`,
63    /// the executor pauses before running a node that performs a destructive
64    /// operation (file delete, `rm`, `git push`, …), emits
65    /// [`ExecutionEvent::AwaitingConfirmation`], and blocks until the user
66    /// confirms (resume) or cancels (stop). Headless runners leave it `false`
67    /// so a destructive node never blocks on a confirmation that can't arrive.
68    pub confirm_destructive: bool,
69    /// Whether a host capable of resolving an AI review gate is attached.
70    /// Headless runners (CLI / TUI) set this `false`: a contract-bound output
71    /// in the human review band then fails onto its fallback path instead of
72    /// holding the run on a gate no one can resolve - RAO's deterministic
73    /// fallback, never a silent auto-approve.
74    pub review_gate_available: bool,
75    /// When `Some`, AI-node tool calls that mutate the filesystem (`fs`
76    /// write/edit/delete) are *staged* into this buffer - computed but not
77    /// written - so a coding-agent turn can be reviewed before its edits land.
78    /// Read-only tools and every `shell` tool still run for real, and a staged
79    /// file is visible to later `read-file` calls in the same turn.
80    /// [`FlowApp::run_agent_turn`] installs it; every other run leaves it `None`.
81    pub edit_staging: Option<crate::ai_tools::StagedEdits>,
82    /// Resolved workspace root for this run, forwarded to every adapter via
83    /// [`AdapterCtx::workspace_root`]. `FlowApp` resolves it (per-run override,
84    /// stored per-flow path, or edition default) before the run starts.
85    pub workspace_root: PathBuf,
86}
87
88/// Session-scoped store backing `{{memory.<key>}}`. Shared (`Arc`) so it
89/// survives across `Executor::run` calls within a `FlowApp` session, and
90/// interior-mutable (`Mutex`) so a node can write it through `&self`.
91pub type WorkingMemory = Arc<std::sync::Mutex<HashMap<String, serde_json::Value>>>;
92
93/// Run phase used by [`RunControl`]. Pause and cancel are honored at node
94/// boundaries (the executor checkpoints between node executions), so an
95/// in-flight node finishes before the request takes effect.
96#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum RunPhase {
98    Running,
99    Paused,
100    Cancelling,
101}
102
103/// Out-of-band pause/resume/cancel control for a run, shared (`Arc`) between
104/// `FlowApp` (which the host's pause/resume/stop commands signal) and the
105/// `Executor` (which polls it at node boundaries). One instance is created per
106/// run and registered under that run's execution id, so several flows (e.g.
107/// multiple canvas tabs) can run and be steered independently.
108pub struct RunControl {
109    phase: std::sync::atomic::AtomicU8,
110    /// Woken on `resume`/`cancel` so a paused checkpoint stops awaiting.
111    notify: tokio::sync::Notify,
112    /// Pending human decision for an AI review gate (RAO constraint 5). Set by
113    /// [`RunControl::resolve_review`] just before the resume that releases the
114    /// gate; taken by the executor when the gate wakes.
115    review: std::sync::Mutex<Option<ReviewDecision>>,
116}
117
118/// Human verdict on a contract-bound AI output held at a review gate.
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum ReviewDecision {
121    Approved,
122    Rejected,
123}
124
125impl Default for RunControl {
126    fn default() -> Self {
127        Self {
128            phase: std::sync::atomic::AtomicU8::new(Self::RUNNING),
129            notify: tokio::sync::Notify::new(),
130            review: std::sync::Mutex::new(None),
131        }
132    }
133}
134
135impl RunControl {
136    const RUNNING: u8 = 0;
137    const PAUSED: u8 = 1;
138    const CANCELLING: u8 = 2;
139
140    fn set(&self, v: u8) {
141        self.phase.store(v, std::sync::atomic::Ordering::SeqCst);
142    }
143
144    pub fn phase(&self) -> RunPhase {
145        match self.phase.load(std::sync::atomic::Ordering::SeqCst) {
146            Self::PAUSED => RunPhase::Paused,
147            Self::CANCELLING => RunPhase::Cancelling,
148            _ => RunPhase::Running,
149        }
150    }
151
152    /// Reset to `Running` at the start of a run so a stale cancel/pause from a
153    /// prior run can't leak into the next one.
154    pub fn reset(&self) {
155        self.set(Self::RUNNING);
156        // Drain any pending permit so a fresh run doesn't skip its first wait.
157        self.notify.notify_waiters();
158    }
159
160    /// Request a pause. No-op if already cancelling.
161    pub fn pause(&self) {
162        let _ = self.phase.compare_exchange(
163            Self::RUNNING,
164            Self::PAUSED,
165            std::sync::atomic::Ordering::SeqCst,
166            std::sync::atomic::Ordering::SeqCst,
167        );
168    }
169
170    /// Release a pause. No-op if cancelling.
171    pub fn resume(&self) {
172        let _ = self.phase.compare_exchange(
173            Self::PAUSED,
174            Self::RUNNING,
175            std::sync::atomic::Ordering::SeqCst,
176            std::sync::atomic::Ordering::SeqCst,
177        );
178        self.notify.notify_waiters();
179    }
180
181    /// Request cancellation; takes precedence over pause and wakes any waiter.
182    pub fn cancel(&self) {
183        self.set(Self::CANCELLING);
184        self.notify.notify_waiters();
185    }
186
187    pub fn is_cancelling(&self) -> bool {
188        self.phase() == RunPhase::Cancelling
189    }
190
191    /// Record a human verdict for the AI review gate and release the pause.
192    /// The executor picks the decision up via [`RunControl::take_review`].
193    pub fn resolve_review(&self, approved: bool) {
194        if let Ok(mut slot) = self.review.lock() {
195            *slot = Some(if approved {
196                ReviewDecision::Approved
197            } else {
198                ReviewDecision::Rejected
199            });
200        }
201        self.resume();
202    }
203
204    /// Take the pending review decision, if any, clearing the slot.
205    pub fn take_review(&self) -> Option<ReviewDecision> {
206        self.review.lock().ok().and_then(|mut slot| slot.take())
207    }
208
209    /// Block while the phase is `Paused`, returning once resumed or cancelling.
210    pub async fn wait_while_paused(&self) {
211        while self.phase() == RunPhase::Paused {
212            self.notify.notified().await;
213        }
214    }
215}
216
217/// Shared run control, mirroring [`WorkingMemory`]'s `Arc` sharing pattern.
218pub type SharedRunControl = Arc<RunControl>;
219
220/// Hard caps that guarantee a graph with cycles terminates. The per-node cap
221/// bounds re-entries of any single node; the global cap is a backstop across
222/// all node executions. Exceeding either ends the run as `partial` (an
223/// `IterationCapped` event marks the node) rather than aborting, so prior
224/// outputs are preserved.
225const PER_NODE_VISIT_CAP: u32 = 25;
226const GLOBAL_STEP_CAP: u32 = 500;
227
228impl Executor {
229    pub async fn run(&self, graph: &FlowGraph) -> Result<ExecutionSummary, ExecutorError> {
230        self.run_with_id(Uuid::new_v4().to_string(), graph).await
231    }
232
233    /// Like [`Executor::run`] but uses a caller-supplied execution id. The host
234    /// provides the id so it can associate the run with a specific tab/flow and
235    /// steer it (pause / resume / cancel) immediately - before the `Started`
236    /// event arrives - which is what makes concurrent multi-tab runs routable.
237    pub async fn run_with_id(
238        &self,
239        execution_id: String,
240        graph: &FlowGraph,
241    ) -> Result<ExecutionSummary, ExecutorError> {
242        // Clear any stale pause/cancel from a prior run before scheduling.
243        self.control.reset();
244        self.events.emit(ExecutionEvent::Started {
245            execution_id: execution_id.clone(),
246            at: Utc::now(),
247        });
248
249        // Per-node terminal status (drives outgoing-edge fire decisions) and
250        // succeeded-node output payloads (consumed by `{{...}}` interpolation
251        // for inter-node data flow). On a loop re-run, both are overwritten so
252        // references resolve to the latest pass.
253        let mut terminal: HashMap<String, NodeStatus> = HashMap::new();
254        let mut outputs: HashMap<String, serde_json::Value> = HashMap::new();
255
256        // Acyclic graphs take the proven single-pass path (behaviour
257        // unchanged); a detected cycle switches to the bounded-iteration
258        // scheduler. `topo_sort` itself still reports cycles as an error so it
259        // stays a reusable pure helper.
260        let capped = if graph.subflows.is_empty() {
261            match topo_sort(graph) {
262                Ok(order) => {
263                    self.run_acyclic(graph, &order, &execution_id, &mut outputs, &mut terminal)
264                        .await;
265                    false
266                }
267                Err(ExecutorError::Cycle(_)) => {
268                    self.run_iterative(graph, &execution_id, &mut outputs, &mut terminal)
269                        .await
270                }
271                Err(e) => return Err(e),
272            }
273        } else {
274            // Sub-flows: schedule over a collapsed outer graph (each sub-flow is
275            // one virtual node) so each unit runs atomically and in dependency
276            // order, then runs its members as a retry-able unit.
277            let outer = collapse_subflows(graph);
278            match topo_sort(&outer) {
279                Ok(order) => {
280                    self.run_acyclic_units(
281                        graph,
282                        &order,
283                        &execution_id,
284                        &mut outputs,
285                        &mut terminal,
286                    )
287                    .await;
288                    false
289                }
290                Err(ExecutorError::Cycle(_)) => {
291                    // Cyclic outer graph: fall back to the flat iterative
292                    // scheduler (sub-flows run as ordinary nodes, no unit retry).
293                    self.run_iterative(graph, &execution_id, &mut outputs, &mut terminal)
294                        .await
295                }
296                Err(e) => return Err(e),
297            }
298        };
299
300        let mut succeeded = 0usize;
301        let mut failed = 0usize;
302        let mut skipped = 0usize;
303        for status in terminal.values() {
304            match status {
305                NodeStatus::Succeeded => succeeded += 1,
306                NodeStatus::Failed => failed += 1,
307                NodeStatus::Skipped => skipped += 1,
308            }
309        }
310
311        let status = if self.control.is_cancelling() {
312            "cancelled"
313        } else if capped {
314            "partial"
315        } else if failed > 0 {
316            if succeeded > 0 || skipped > 0 {
317                "partial"
318            } else {
319                "failed"
320            }
321        } else if skipped > 0 && succeeded == 0 {
322            "skipped"
323        } else {
324            "succeeded"
325        };
326
327        self.events.emit(ExecutionEvent::Done {
328            execution_id: execution_id.clone(),
329            at: Utc::now(),
330            status: status.into(),
331        });
332
333        Ok(ExecutionSummary {
334            execution_id,
335            status: status.into(),
336            succeeded,
337            failed,
338            skipped,
339        })
340    }
341
342    /// Honor a pending pause/cancel at a node boundary. While paused, blocks
343    /// until resumed or cancelled (emitting `Paused`/`Resumed` so the UI can
344    /// confirm the in-flight node finished). Returns `true` when the run should
345    /// stop scheduling because cancellation was requested.
346    async fn checkpoint(&self, execution_id: &str) -> bool {
347        match self.control.phase() {
348            RunPhase::Running => false,
349            RunPhase::Cancelling => true,
350            RunPhase::Paused => {
351                self.events.emit(ExecutionEvent::Paused {
352                    execution_id: execution_id.to_string(),
353                    at: Utc::now(),
354                });
355                self.control.wait_while_paused().await;
356                if self.control.is_cancelling() {
357                    true
358                } else {
359                    self.events.emit(ExecutionEvent::Resumed {
360                        execution_id: execution_id.to_string(),
361                        at: Utc::now(),
362                    });
363                    false
364                }
365            }
366        }
367    }
368
369    /// Snapshot the shared working memory for read-only interpolation. Cloned
370    /// per use so a node always sees writes made by earlier nodes this run.
371    fn snapshot_memory(&self) -> HashMap<String, serde_json::Value> {
372        self.working_memory
373            .lock()
374            .map(|m| m.clone())
375            .unwrap_or_default()
376    }
377
378    /// If a succeeded node was a `set-variable` utility action, persist its
379    /// `name`→`value` into working memory. The adapter wraps its result as
380    /// `{ status, payload }`, so the action fields live under `payload`.
381    fn store_set_variable(&self, output: &serde_json::Value) {
382        let inner = output.get("payload").unwrap_or(output);
383        if inner.get("actionId").and_then(|v| v.as_str()) != Some("set-variable") {
384            return;
385        }
386        let Some(name) = inner.get("name").and_then(|v| v.as_str()) else {
387            return;
388        };
389        let value = inner.get("value").cloned().unwrap_or(serde_json::Value::Null);
390        if let Ok(mut mem) = self.working_memory.lock() {
391            mem.insert(name.to_string(), value);
392        }
393    }
394
395    /// Single forward pass over a topologically sorted DAG. A node runs iff it
396    /// has no incoming edges or at least one incoming edge is active; otherwise
397    /// it is skipped. Identical in behaviour to the pre-iteration executor.
398    async fn run_acyclic(
399        &self,
400        graph: &FlowGraph,
401        order: &[String],
402        execution_id: &str,
403        outputs: &mut HashMap<String, serde_json::Value>,
404        terminal: &mut HashMap<String, NodeStatus>,
405    ) {
406        let nodes_by_id: HashMap<&str, &FlowNode> =
407            graph.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
408
409        for node_id in order {
410            // Honor pause/cancel between nodes. On cancel, stop scheduling and
411            // fall through to the skip sweep below.
412            if self.checkpoint(execution_id).await {
413                break;
414            }
415            let Some(node) = nodes_by_id.get(node_id.as_str()).copied() else {
416                continue;
417            };
418            self.run_one_node(graph, node, execution_id, 0, outputs, terminal)
419                .await;
420        }
421
422        // Nodes never reached (e.g. a cancelled run broke out of the loop) are
423        // marked skipped, matching the iterative path's reachability sweep.
424        for node in &graph.nodes {
425            if !terminal.contains_key(&node.id) {
426                terminal.insert(node.id.clone(), NodeStatus::Skipped);
427                self.events.emit(ExecutionEvent::NodeSkipped {
428                    execution_id: execution_id.to_string(),
429                    node_id: node.id.clone(),
430                    at: Utc::now(),
431                    reason: "run cancelled before this node was scheduled".into(),
432                    iteration: 0,
433                });
434            }
435        }
436    }
437
438    /// Run a single node: evaluate reachability from its incoming edges and
439    /// either execute it or mark it skipped. Shared by the flat acyclic pass
440    /// and each sub-flow's inner pass.
441    async fn run_one_node(
442        &self,
443        graph: &FlowGraph,
444        node: &FlowNode,
445        execution_id: &str,
446        iteration: u32,
447        outputs: &mut HashMap<String, serde_json::Value>,
448        terminal: &mut HashMap<String, NodeStatus>,
449    ) {
450        let in_edges: Vec<_> = graph.edges.iter().filter(|e| e.target == node.id).collect();
451        let memory = self.snapshot_memory();
452        let reachable = in_edges.is_empty()
453            || in_edges.iter().any(|e| edge_active(e, terminal, outputs, &memory));
454
455        if !reachable {
456            terminal.insert(node.id.clone(), NodeStatus::Skipped);
457            self.events.emit(ExecutionEvent::NodeSkipped {
458                execution_id: execution_id.to_string(),
459                node_id: node.id.clone(),
460                at: Utc::now(),
461                reason: "no incoming edge fired (upstream did not pass/fail to here)".into(),
462                iteration,
463            });
464            return;
465        }
466
467        self.execute_node(graph, node, execution_id, iteration, outputs, terminal)
468            .await;
469    }
470
471    /// Forward pass over an outer topo order whose entries are free node ids and
472    /// collapsed sub-flow ids. Free nodes run individually; each sub-flow runs
473    /// as one unit (see [`Self::run_subflow_unit`]).
474    async fn run_acyclic_units(
475        &self,
476        graph: &FlowGraph,
477        outer_order: &[String],
478        execution_id: &str,
479        outputs: &mut HashMap<String, serde_json::Value>,
480        terminal: &mut HashMap<String, NodeStatus>,
481    ) {
482        let nodes_by_id: HashMap<&str, &FlowNode> =
483            graph.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
484
485        for outer_id in outer_order {
486            if self.checkpoint(execution_id).await {
487                break;
488            }
489            if let Some(sf) = graph.subflows.iter().find(|s| &s.id == outer_id) {
490                self.run_subflow_unit(graph, sf, execution_id, outputs, terminal)
491                    .await;
492            } else if let Some(node) = nodes_by_id.get(outer_id.as_str()).copied() {
493                self.run_one_node(graph, node, execution_id, 0, outputs, terminal)
494                    .await;
495            }
496        }
497
498        // Anything never reached (cancelled run, or an unreachable branch).
499        for node in &graph.nodes {
500            if !terminal.contains_key(&node.id) {
501                terminal.insert(node.id.clone(), NodeStatus::Skipped);
502                self.events.emit(ExecutionEvent::NodeSkipped {
503                    execution_id: execution_id.to_string(),
504                    node_id: node.id.clone(),
505                    at: Utc::now(),
506                    reason: "no incoming edge fired (upstream did not pass/fail to here)".into(),
507                    iteration: 0,
508                });
509            }
510        }
511    }
512
513    /// Run one sub-flow as an execution unit: gate on its boundary-crossing
514    /// incoming edges, then run its members in inner-topo order. If any member
515    /// fails, re-run the whole unit up to `retry` times. Member terminal/outputs
516    /// land in the shared maps, so the surrounding flat edges (which reference
517    /// member ids directly) fire normally.
518    async fn run_subflow_unit(
519        &self,
520        graph: &FlowGraph,
521        sf: &SubFlow,
522        execution_id: &str,
523        outputs: &mut HashMap<String, serde_json::Value>,
524        terminal: &mut HashMap<String, NodeStatus>,
525    ) {
526        let members: HashSet<&str> = sf.node_ids.iter().map(|s| s.as_str()).collect();
527
528        // Reachability: a boundary-crossing edge into the unit must fire (or
529        // there are none, making it an entry unit).
530        let cross_in: Vec<&FlowEdge> = graph
531            .edges
532            .iter()
533            .filter(|e| members.contains(e.target.as_str()) && !members.contains(e.source.as_str()))
534            .collect();
535        let memory = self.snapshot_memory();
536        let reachable = cross_in.is_empty()
537            || cross_in
538                .iter()
539                .any(|e| edge_active(e, terminal, outputs, &memory));
540        if !reachable {
541            for id in &sf.node_ids {
542                if !terminal.contains_key(id) {
543                    terminal.insert(id.clone(), NodeStatus::Skipped);
544                    self.events.emit(ExecutionEvent::NodeSkipped {
545                        execution_id: execution_id.to_string(),
546                        node_id: id.clone(),
547                        at: Utc::now(),
548                        reason: format!("sub-flow \"{}\" not reached", sf.label),
549                        iteration: 0,
550                    });
551                }
552            }
553            return;
554        }
555
556        // Inner topo order over members + their internal edges.
557        let inner_graph = FlowGraph {
558            id: sf.id.clone(),
559            name: sf.label.clone(),
560            version: "1.0.0".into(),
561            description: None,
562            nodes: graph
563                .nodes
564                .iter()
565                .filter(|n| members.contains(n.id.as_str()))
566                .cloned()
567                .collect(),
568            edges: graph
569                .edges
570                .iter()
571                .filter(|e| {
572                    members.contains(e.source.as_str()) && members.contains(e.target.as_str())
573                })
574                .cloned()
575                .collect(),
576            subflows: Vec::new(),
577        };
578        let inner_order = topo_sort(&inner_graph).unwrap_or_else(|_| sf.node_ids.clone());
579
580        let nodes_by_id: HashMap<&str, &FlowNode> =
581            graph.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
582        let max_attempts = 1 + sf.retry.unwrap_or(0);
583        for attempt in 0..max_attempts {
584            for nid in &inner_order {
585                if self.checkpoint(execution_id).await {
586                    return;
587                }
588                if let Some(node) = nodes_by_id.get(nid.as_str()).copied() {
589                    self.run_one_node(graph, node, execution_id, attempt, outputs, terminal)
590                        .await;
591                }
592            }
593            let failed = sf
594                .node_ids
595                .iter()
596                .any(|id| matches!(terminal.get(id), Some(NodeStatus::Failed)));
597            if !failed || attempt + 1 >= max_attempts {
598                break;
599            }
600            // Retry the whole unit: clear member state so the next pass re-runs
601            // from the unit's entry.
602            for id in &sf.node_ids {
603                terminal.remove(id);
604                outputs.remove(id);
605            }
606        }
607    }
608
609    /// Bounded worklist scheduler for graphs with cycles. A node re-runs each
610    /// time an active edge re-arrives; per-node and global caps guarantee
611    /// termination. Returns `true` if either cap was hit.
612    async fn run_iterative(
613        &self,
614        graph: &FlowGraph,
615        execution_id: &str,
616        outputs: &mut HashMap<String, serde_json::Value>,
617        terminal: &mut HashMap<String, NodeStatus>,
618    ) -> bool {
619        let nodes_by_id: HashMap<&str, &FlowNode> =
620            graph.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
621
622        // Seed with entry nodes (no incoming edges), in stable graph order.
623        let mut queue: VecDeque<String> = graph
624            .nodes
625            .iter()
626            .filter(|n| !graph.edges.iter().any(|e| e.target == n.id))
627            .map(|n| n.id.clone())
628            .collect();
629
630        let mut visits: HashMap<String, u32> = HashMap::new();
631        let mut global_steps = 0u32;
632        let mut capped = false;
633
634        while let Some(node_id) = queue.pop_front() {
635            // Honor pause/cancel between nodes. On cancel, stop scheduling; the
636            // skip sweep below marks everything still pending.
637            if self.checkpoint(execution_id).await {
638                queue.clear();
639                break;
640            }
641            if global_steps >= GLOBAL_STEP_CAP {
642                capped = true;
643                break;
644            }
645            let Some(node) = nodes_by_id.get(node_id.as_str()).copied() else {
646                continue;
647            };
648
649            let visit = visits.entry(node_id.clone()).or_insert(0);
650            if *visit >= PER_NODE_VISIT_CAP {
651                self.events.emit(ExecutionEvent::IterationCapped {
652                    execution_id: execution_id.to_string(),
653                    node_id: node_id.clone(),
654                    at: Utc::now(),
655                    visits: *visit,
656                });
657                capped = true;
658                continue;
659            }
660            let iteration = *visit;
661            *visit += 1;
662            global_steps += 1;
663
664            self.execute_node(graph, node, execution_id, iteration, outputs, terminal)
665                .await;
666
667            // Enqueue every target whose edge from this node is now active.
668            let memory = self.snapshot_memory();
669            for edge in graph.edges.iter().filter(|e| e.source == node_id) {
670                if edge_active(edge, terminal, outputs, &memory) {
671                    queue.push_back(edge.target.clone());
672                }
673            }
674        }
675
676        // Nodes the worklist never reached are skipped, matching the acyclic
677        // path's reachability semantics.
678        for node in &graph.nodes {
679            if !terminal.contains_key(&node.id) {
680                terminal.insert(node.id.clone(), NodeStatus::Skipped);
681                self.events.emit(ExecutionEvent::NodeSkipped {
682                    execution_id: execution_id.to_string(),
683                    node_id: node.id.clone(),
684                    at: Utc::now(),
685                    reason: "no incoming edge fired (upstream did not pass/fail to here)".into(),
686                    iteration: 0,
687                });
688            }
689        }
690
691        capped
692    }
693
694    /// Interpolate this node's data against prior outputs, dispatch by node
695    /// type, and record the outcome - emitting lifecycle events tagged with
696    /// `iteration`. Shared by both schedulers.
697    async fn execute_node(
698        &self,
699        graph: &FlowGraph,
700        node: &FlowNode,
701        execution_id: &str,
702        iteration: u32,
703        outputs: &mut HashMap<String, serde_json::Value>,
704        terminal: &mut HashMap<String, NodeStatus>,
705    ) {
706        // Per-step destructive-action confirmation gate (roadmap E1). Inspected
707        // before the node runs (and before NodeStarted) so a destructive op
708        // never executes without an explicit confirm. A cancel here skips the
709        // node and the next checkpoint stops scheduling.
710        if self.confirm_destructive {
711            if let Some(action) = destructive_reason(node) {
712                if !self.control.is_cancelling() {
713                    self.events.emit(ExecutionEvent::AwaitingConfirmation {
714                        execution_id: execution_id.to_string(),
715                        node_id: node.id.clone(),
716                        action,
717                        at: Utc::now(),
718                    });
719                    self.control.pause();
720                    self.control.wait_while_paused().await;
721                }
722                if self.control.is_cancelling() {
723                    terminal.insert(node.id.clone(), NodeStatus::Skipped);
724                    self.events.emit(ExecutionEvent::NodeSkipped {
725                        execution_id: execution_id.to_string(),
726                        node_id: node.id.clone(),
727                        at: Utc::now(),
728                        reason: "destructive action not confirmed".into(),
729                        iteration,
730                    });
731                    return;
732                }
733            }
734        }
735
736        self.events.emit(ExecutionEvent::NodeStarted {
737            execution_id: execution_id.to_string(),
738            node_id: node.id.clone(),
739            at: Utc::now(),
740            iteration,
741        });
742
743        // Inter-node data flow: `{{input}}` resolves to the output(s) of the
744        // upstream node(s) whose edge fired into this node; `{{<id>}}` /
745        // `{{<id>.<field>}}` reference any already-run node by id;
746        // `{{memory.<key>}}` reads session working memory.
747        let memory = self.snapshot_memory();
748        let fired_sources: Vec<String> = graph
749            .edges
750            .iter()
751            .filter(|e| e.target == node.id && edge_active(e, terminal, outputs, &memory))
752            .map(|e| e.source.clone())
753            .collect();
754        let mut resolved = node.clone();
755        resolved.data = interpolate_value(&resolved.data, outputs, &fired_sources, &memory);
756        let node = &resolved;
757
758        let outcome = match node.node_type.as_str() {
759            // Unified AI node: `provider` selects local (on-device) vs cloud.
760            "ai" => self.run_ai_node(node, execution_id).await,
761            // Agentic nodes are design-time only - they generate a flow for review
762            // and never execute as a runtime node. Skip one if it reaches here.
763            "agentic" => NodeOutcome::Skipped("agentic nodes are design-time only".into()),
764            // action / utility / service all dispatch through their adapter.
765            _ => self.run_adapter_node(node, execution_id).await,
766        };
767
768        match outcome {
769            NodeOutcome::Succeeded(payload) => {
770                terminal.insert(node.id.clone(), NodeStatus::Succeeded);
771                self.store_set_variable(&payload);
772                outputs.insert(node.id.clone(), payload.clone());
773                self.events.emit(ExecutionEvent::NodeSucceeded {
774                    execution_id: execution_id.to_string(),
775                    node_id: node.id.clone(),
776                    at: Utc::now(),
777                    output: payload,
778                    iteration,
779                });
780            }
781            NodeOutcome::Skipped(reason) => {
782                terminal.insert(node.id.clone(), NodeStatus::Skipped);
783                self.events.emit(ExecutionEvent::NodeSkipped {
784                    execution_id: execution_id.to_string(),
785                    node_id: node.id.clone(),
786                    at: Utc::now(),
787                    reason,
788                    iteration,
789                });
790            }
791            NodeOutcome::Failed(error) => {
792                terminal.insert(node.id.clone(), NodeStatus::Failed);
793                self.events.emit(ExecutionEvent::NodeFailed {
794                    execution_id: execution_id.to_string(),
795                    node_id: node.id.clone(),
796                    at: Utc::now(),
797                    error,
798                    iteration,
799                });
800            }
801        }
802    }
803
804    /// Unified AI node. The `provider` field selects the backend: `local`
805    /// (default) runs an LLM on the managed `llama-server` (zero egress); any
806    /// other value is a cloud provider, gated by `allow_cloud_ai` + an API key.
807    async fn run_ai_node(&self, node: &FlowNode, execution_id: &str) -> NodeOutcome {
808        let outcome = self.run_ai_node_once(node, execution_id).await;
809        // Per-node failover (roadmap E1 model routing & fallback): on a hard
810        // failure, retry once with the node's configured fallback provider /
811        // model. Only `Failed` triggers it - a `Skipped` (policy gate) is not a
812        // transient error worth retrying.
813        if let NodeOutcome::Failed(err) = &outcome {
814            if let Some(fb) = fallback_node(node) {
815                let fb_provider = fb
816                    .data
817                    .get("provider")
818                    .and_then(|v| v.as_str())
819                    .unwrap_or("")
820                    .to_string();
821                self.events.emit(ExecutionEvent::NodeLog {
822                    execution_id: execution_id.to_string(),
823                    node_id: node.id.clone(),
824                    at: Utc::now(),
825                    stream: LogStream::Stderr,
826                    line: format!("AI provider failed ({err}); falling back to {fb_provider}"),
827                });
828                return self.run_ai_node_once(&fb, execution_id).await;
829            }
830        }
831        outcome
832    }
833
834    /// One AI-node attempt: resolve the provider (`local` vs cloud) and dispatch.
835    /// [`run_ai_node`] wraps this with per-node failover.
836    async fn run_ai_node_once(&self, node: &FlowNode, execution_id: &str) -> NodeOutcome {
837        let model_id = node
838            .data
839            .get("modelId")
840            .and_then(|v| v.as_str())
841            .unwrap_or_default()
842            .to_string();
843        if model_id.is_empty() {
844            return NodeOutcome::Failed(
845                "no model selected on ai node (set modelId to a loaded model)".into(),
846            );
847        }
848
849        let provider = node
850            .data
851            .get("provider")
852            .and_then(|v| v.as_str())
853            .unwrap_or("local");
854        if provider.is_empty() || provider == "local" {
855            self.run_local_llm_ai_node(node, &model_id, execution_id).await
856        } else {
857            self.run_cloud_ai_node(node, execution_id).await
858        }
859    }
860
861    /// AI-node path for LLM chat models hosted by the managed
862    /// `llama-server` subprocess. Mirrors the local branch of
863    /// `run_cloud_ai_node` but reads `input` (the AI-node convention)
864    /// instead of `prompt`, and opts in to advanced sampling params
865    /// (`top_p`/`top_k`/`stop`) sourced from the inspector.
866    ///
867    /// The AI node has no `provider` field - the implicit provider is
868    /// `local`, so we always gate on `allow_local_ai` and route through
869    /// `LocalOpenAiProvider`. Confidence thresholds are ignored
870    /// (LLM chat responses don't carry one); the outcome is `Succeeded`
871    /// with the assistant text wrapped in a small JSON envelope so the
872    /// History panel can display it without parsing.
873    async fn run_local_llm_ai_node(
874        &self,
875        node: &FlowNode,
876        model_id: &str,
877        execution_id: &str,
878    ) -> NodeOutcome {
879        if !self.allow_local_ai {
880            return NodeOutcome::Skipped(
881                "local AI disabled by policy (settings.allow_local_ai = false)".into(),
882            );
883        }
884        let provider = match self.cloud_providers.get("local") {
885            Some(p) => p,
886            None => {
887                return NodeOutcome::Skipped(
888                    "local AI provider not registered in CloudAiRegistry".into(),
889                );
890            }
891        };
892        if self.local_ai_base_url.is_none() {
893            return NodeOutcome::Failed(
894                "local AI base URL not set; load a model from the Model Hub first".into(),
895            );
896        }
897
898        let input = match node
899            .data
900            .get("input")
901            .and_then(|v| v.as_str())
902            .filter(|s| !s.trim().is_empty())
903        {
904            Some(s) => s.to_string(),
905            None => return NodeOutcome::Failed("missing 'input' on ai node (local LLM)".into()),
906        };
907        let system = node
908            .data
909            .get("system")
910            .and_then(|v| v.as_str())
911            .filter(|s| !s.is_empty())
912            .map(str::to_owned);
913        let max_tokens = node
914            .data
915            .get("maxTokens")
916            .and_then(|v| v.as_u64())
917            .map(|v| v as u32);
918        let temperature = node
919            .data
920            .get("temperature")
921            .and_then(|v| v.as_f64())
922            .map(|v| v as f32);
923        let top_p = node
924            .data
925            .get("topP")
926            .and_then(|v| v.as_f64())
927            .map(|v| v as f32);
928        let top_k = node
929            .data
930            .get("topK")
931            .and_then(|v| v.as_u64())
932            .map(|v| v as u32);
933        let stop = node.data.get("stop").and_then(|v| v.as_array()).map(|arr| {
934            arr.iter()
935                .filter_map(|x| x.as_str().map(str::to_owned))
936                .collect::<Vec<_>>()
937        });
938
939        // Optional credential lookup mirrors `run_cloud_ai_node`: local
940        // servers usually need no auth, but if the user stored a key
941        // (e.g. for a remote OpenAI-compat endpoint) we forward it.
942        let api_key = match self
943            .credentials
944            .resolve_cloud_ai_key("local", provider.env_var())
945        {
946            Ok(k) => k,
947            Err(CredentialError::NotFound { .. }) => String::new(),
948            Err(e) => return NodeOutcome::Failed(format!("credential lookup failed: {e}")),
949        };
950
951        // RAO node contract (docs/rao-spec): an unreadable contract is a node
952        // failure before any inference happens.
953        let contract = match AiNodeContract::from_node_data(&node.data) {
954            Ok(c) => c,
955            Err(e) => return NodeOutcome::Failed(e.to_string()),
956        };
957
958        let sanitized = self.sanitizer.sanitize(&input);
959        self.events.emit(ExecutionEvent::AiInvocation {
960            execution_id: execution_id.to_string(),
961            node_id: node.id.clone(),
962            at: Utc::now(),
963            provider: "local".into(),
964            model: model_id.to_string(),
965            input: sanitized.text.clone(),
966            contract_version: contract.as_ref().map(|c| c.contract_version.clone()),
967        });
968
969        let images = match crate::ai_tools::resolve_node_images(node) {
970            Ok(v) => v,
971            Err(e) => return NodeOutcome::Failed(e),
972        };
973        let workspace = node
974            .data
975            .get("workspace")
976            .and_then(|v| v.as_str())
977            .filter(|s| !s.trim().is_empty())
978            .map(str::to_owned);
979
980        let req = CloudAiRequest {
981            model: model_id.to_string(),
982            prompt: sanitized.text,
983            max_tokens,
984            temperature,
985            api_key,
986            system,
987            base_url: self.local_ai_base_url.clone(),
988            top_p,
989            top_k,
990            stop,
991            stream: true,
992            call_id: Some(format!("ai-node-{}", Uuid::new_v4())),
993            reasoning: crate::ai_tools::ai_reasoning(node),
994            images,
995            tools: Vec::new(),
996            response_schema: None,
997        };
998
999        self.dispatch_ai_node(
1000            &provider,
1001            req,
1002            &sanitized.map,
1003            &crate::ai_tools::ai_task(node),
1004            &crate::ai_tools::ai_labels(node),
1005            &crate::ai_tools::ai_tool_adapters(node),
1006            workspace,
1007            true,
1008            "",
1009            execution_id,
1010            &node.id,
1011            crate::ai_tools::ai_max_tool_iters(node),
1012            crate::ai_tools::ai_output_schema(node),
1013            crate::ai_tools::ai_expect(node),
1014            contract.as_ref(),
1015        )
1016        .await
1017    }
1018
1019    /// Capability-driven dispatch shared by the local + cloud AI nodes. `req`
1020    /// carries the sanitized prompt + params + reasoning/images. `task` selects
1021    /// the path: `"embedding"` → vector output; `"classify"` → a constrained
1022    /// label (branchable as `{{node.label}}`); `"structured"` → a JSON object
1023    /// matching the node's `outputSchema`, spread into the output so each field
1024    /// is branchable as `{{node.<field>}}`; otherwise `"generate"`, which runs a
1025    /// tool loop when adapters are bound, else a streamed completion.
1026    /// `rehydrate` restores PII placeholders in surfaced text.
1027    #[allow(clippy::too_many_arguments)]
1028    async fn dispatch_ai_node(
1029        &self,
1030        provider: &Arc<dyn flow_adapter_ai::CloudAiProvider>,
1031        mut req: CloudAiRequest,
1032        rehydrate: &flow_security::RehydrationMap,
1033        task: &str,
1034        labels: &[String],
1035        tool_adapters: &[String],
1036        workspace: Option<String>,
1037        audit: bool,
1038        original_prompt: &str,
1039        execution_id: &str,
1040        node_id: &str,
1041        max_tool_iters: usize,
1042        schema: Option<String>,
1043        expect: Option<String>,
1044        contract: Option<&AiNodeContract>,
1045    ) -> NodeOutcome {
1046        // Contract-bound output is defined for the generate path; the other
1047        // tasks have their own output shapes. Failing loudly beats silently
1048        // running an ungoverned node the flow author believed was governed.
1049        if let Some(contract) = contract {
1050            match task {
1051                "embedding" | "classify" | "structured" => {
1052                    return NodeOutcome::Failed(format!(
1053                        "contract-bound output is not supported for the `{task}` task \
1054                         (remove `contract: true` or use the generate task)"
1055                    ));
1056                }
1057                _ => {
1058                    return self
1059                        .run_contract_bound(
1060                            provider,
1061                            req,
1062                            rehydrate,
1063                            tool_adapters,
1064                            workspace,
1065                            execution_id,
1066                            node_id,
1067                            max_tool_iters,
1068                            contract,
1069                        )
1070                        .await;
1071                }
1072            }
1073        }
1074        match task {
1075            "embedding" => {
1076                req.stream = false;
1077                match provider.embed(&req).await {
1078                    Ok(e) => NodeOutcome::Succeeded(serde_json::json!({
1079                        "provider": e.provider,
1080                        "model": e.model,
1081                        "embedding": e.embedding,
1082                        "dims": e.dims,
1083                        "latency_ms": e.latency_ms,
1084                    })),
1085                    Err(err) => NodeOutcome::Failed(err.to_string()),
1086                }
1087            }
1088            "classify" => {
1089                if labels.is_empty() {
1090                    return NodeOutcome::Failed("classify task needs at least one label".into());
1091                }
1092                req.stream = false;
1093                let constraint = format!(
1094                    "You are a classifier. Read the input and reply with EXACTLY one of these labels and nothing else: {}.",
1095                    labels.join(", ")
1096                );
1097                req.system = Some(match req.system {
1098                    Some(s) if !s.is_empty() => format!("{constraint}\n\n{s}"),
1099                    _ => constraint,
1100                });
1101                match provider.invoke(&req).await {
1102                    Ok(resp) => {
1103                        let raw = rehydrate.rehydrate(&resp.text);
1104                        let label = crate::ai_tools::match_label(&raw, labels);
1105                        NodeOutcome::Succeeded(serde_json::json!({
1106                            "provider": resp.provider,
1107                            "model": resp.model,
1108                            "label": label,
1109                            "raw": raw,
1110                            "labels": labels,
1111                            "input_tokens": resp.input_tokens,
1112                            "output_tokens": resp.output_tokens,
1113                        }))
1114                    }
1115                    Err(err) => NodeOutcome::Failed(err.to_string()),
1116                }
1117            }
1118            "structured" => {
1119                let Some(schema) = schema.filter(|s| !s.trim().is_empty()) else {
1120                    return NodeOutcome::Failed(
1121                        "structured task needs a non-empty `outputSchema`".into(),
1122                    );
1123                };
1124                req.stream = false;
1125                // Hard enforcement where supported: a parseable schema rides as
1126                // `response_format` on the OpenAI-compatible providers (the
1127                // prompt below still constrains Claude/Gemini and is the fallback
1128                // for servers that ignore `response_format`).
1129                req.response_schema = serde_json::from_str(&schema).ok();
1130                let constraint = format!(
1131                    "Reply with ONLY a single JSON object that conforms to this JSON Schema. \
1132                     No prose, no explanation, no markdown code fences:\n{schema}"
1133                );
1134                req.system = Some(match req.system {
1135                    Some(s) if !s.is_empty() => format!("{constraint}\n\n{s}"),
1136                    _ => constraint,
1137                });
1138                match provider.invoke(&req).await {
1139                    Ok(resp) => {
1140                        let raw = rehydrate.rehydrate(&resp.text);
1141                        let mut out = serde_json::json!({
1142                            "provider": resp.provider,
1143                            "model": resp.model,
1144                            "raw": raw,
1145                            "input_tokens": resp.input_tokens,
1146                            "output_tokens": resp.output_tokens,
1147                        });
1148                        match crate::ai_tools::parse_json_lenient(&raw) {
1149                            Some(value) => {
1150                                // Spread the structured fields into the output so
1151                                // each is branchable as `{{node.<field>}}`, and
1152                                // keep the whole object under `output`. Never
1153                                // clobber the envelope's own keys.
1154                                if let (Some(obj), Some(fields)) =
1155                                    (out.as_object_mut(), value.as_object())
1156                                {
1157                                    for (k, v) in fields {
1158                                        obj.entry(k.clone()).or_insert_with(|| v.clone());
1159                                    }
1160                                }
1161                                out["output"] = value;
1162                            }
1163                            None => {
1164                                out["parseError"] =
1165                                    serde_json::json!("model did not return valid JSON");
1166                            }
1167                        }
1168                        NodeOutcome::Succeeded(out)
1169                    }
1170                    Err(err) => NodeOutcome::Failed(err.to_string()),
1171                }
1172            }
1173            _ => {
1174                if !tool_adapters.is_empty() {
1175                    let specs = crate::ai_tools::build_tool_specs(&self.adapters, tool_adapters);
1176                    if specs.is_empty() {
1177                        return NodeOutcome::Failed(
1178                            "tool use requested but the bound adapters expose no tools".into(),
1179                        );
1180                    }
1181                    req.stream = false;
1182                    req.tools = specs.clone();
1183                    // Emit a ToolCall event per dispatched call so the UI gets
1184                    // live tool-step granularity during the loop (the dispatcher
1185                    // otherwise produces no per-call events).
1186                    let observer = crate::ai_tools::ToolObserver::new(
1187                        self.events.clone(),
1188                        execution_id.to_string(),
1189                        node_id.to_string(),
1190                    );
1191                    // A coding-agent turn stages fs mutations for review; every
1192                    // other run dispatches tool calls straight to the adapters.
1193                    let result = if let Some(staging) = self.edit_staging.clone() {
1194                        let dispatcher = crate::ai_tools::StagingToolDispatcher::new(
1195                            self.adapters.clone(),
1196                            workspace,
1197                            staging,
1198                        )
1199                        .with_observer(observer);
1200                        provider
1201                            .invoke_tools(&req, &specs, &dispatcher, max_tool_iters)
1202                            .await
1203                    } else {
1204                        let dispatcher = crate::ai_tools::AdapterToolDispatcher::new(
1205                            self.adapters.clone(),
1206                            workspace,
1207                        )
1208                        .with_observer(observer);
1209                        provider
1210                            .invoke_tools(&req, &specs, &dispatcher, max_tool_iters)
1211                            .await
1212                    };
1213                    match result {
1214                        Ok(resp) => {
1215                            // Per-node AI evaluation: if the node carries an
1216                            // `expect` assertion, self-check the output and fail
1217                            // the node when unmet so the monitor/fix path engages
1218                            // (roadmap E1, beyond hard-failure-triggered).
1219                            if let Some(exp) = expect.as_deref() {
1220                                if matches!(
1221                                    evaluate_expectation(provider, &req, &resp.text, exp).await,
1222                                    Some(false)
1223                                ) {
1224                                    return NodeOutcome::Failed(format!(
1225                                        "node output did not meet expectation: {exp}"
1226                                    ));
1227                                }
1228                            }
1229                            NodeOutcome::Succeeded(ai_generate_envelope(
1230                                &resp,
1231                                rehydrate,
1232                                audit,
1233                                original_prompt,
1234                            ))
1235                        }
1236                        Err(err) => NodeOutcome::Failed(err.to_string()),
1237                    }
1238                } else {
1239                    req.stream = true;
1240                    match provider.invoke_stream(&req, self.stream_sink.as_ref()).await {
1241                        Ok(resp) => {
1242                            // Per-node AI evaluation: if the node carries an
1243                            // `expect` assertion, self-check the output and fail
1244                            // the node when unmet so the monitor/fix path engages
1245                            // (roadmap E1, beyond hard-failure-triggered).
1246                            if let Some(exp) = expect.as_deref() {
1247                                if matches!(
1248                                    evaluate_expectation(provider, &req, &resp.text, exp).await,
1249                                    Some(false)
1250                                ) {
1251                                    return NodeOutcome::Failed(format!(
1252                                        "node output did not meet expectation: {exp}"
1253                                    ));
1254                                }
1255                            }
1256                            NodeOutcome::Succeeded(ai_generate_envelope(
1257                                &resp,
1258                                rehydrate,
1259                                audit,
1260                                original_prompt,
1261                            ))
1262                        }
1263                        Err(err) => NodeOutcome::Failed(err.to_string()),
1264                    }
1265                }
1266            }
1267        }
1268    }
1269
1270    /// Contract-bound AI invocation (RAO Spec v1.0). The model must return an
1271    /// [`AiOutputEnvelope`]; anything else fails the node. The engine - never
1272    /// the model - routes the validated output by the contract's thresholds:
1273    /// auto-approve, human review gate, or suppression onto the `.fail`
1274    /// fallback path. The envelope schema rides in the system instruction (and
1275    /// as `response_schema` when no tools are bound); the thresholds do not -
1276    /// they are applied here, structurally (RAO constraint 5).
1277    #[allow(clippy::too_many_arguments)]
1278    async fn run_contract_bound(
1279        &self,
1280        provider: &Arc<dyn flow_adapter_ai::CloudAiProvider>,
1281        mut req: CloudAiRequest,
1282        rehydrate: &flow_security::RehydrationMap,
1283        tool_adapters: &[String],
1284        workspace: Option<String>,
1285        execution_id: &str,
1286        node_id: &str,
1287        max_tool_iters: usize,
1288        contract: &AiNodeContract,
1289    ) -> NodeOutcome {
1290        let schema = AiOutputEnvelope::json_schema();
1291        let constraint = format!(
1292            "Reply with ONLY a single JSON object conforming to this JSON Schema. \
1293             No prose, no markdown code fences:\n{schema}\n\
1294             `primary_output` is your full answer. `confidence` is your confidence in it, \
1295             0.0 to 1.0. `confidence_type` is \"{}\". Set `escalate` to true only if a \
1296             human should review this output regardless of confidence. Put any \
1297             explanation for a human reviewer in `reasoning`.",
1298            contract
1299                .confidence_type
1300                .unwrap_or(flow_domain::contract::ConfidenceType::Verbalized)
1301                .as_str()
1302        );
1303        req.system = Some(match req.system.take() {
1304            Some(s) if !s.is_empty() => format!("{constraint}\n\n{s}"),
1305            _ => constraint,
1306        });
1307        req.stream = false;
1308
1309        let result = if tool_adapters.is_empty() {
1310            req.response_schema = Some(schema);
1311            budgeted(contract.max_inference_ms, provider.invoke(&req)).await
1312        } else {
1313            let specs = crate::ai_tools::build_tool_specs(&self.adapters, tool_adapters);
1314            if specs.is_empty() {
1315                return NodeOutcome::Failed(
1316                    "tool use requested but the bound adapters expose no tools".into(),
1317                );
1318            }
1319            req.tools = specs.clone();
1320            let observer = crate::ai_tools::ToolObserver::new(
1321                self.events.clone(),
1322                execution_id.to_string(),
1323                node_id.to_string(),
1324            );
1325            if let Some(staging) = self.edit_staging.clone() {
1326                let dispatcher = crate::ai_tools::StagingToolDispatcher::new(
1327                    self.adapters.clone(),
1328                    workspace,
1329                    staging,
1330                )
1331                .with_observer(observer);
1332                budgeted(
1333                    contract.max_inference_ms,
1334                    provider.invoke_tools(&req, &specs, &dispatcher, max_tool_iters),
1335                )
1336                .await
1337            } else {
1338                let dispatcher =
1339                    crate::ai_tools::AdapterToolDispatcher::new(self.adapters.clone(), workspace)
1340                        .with_observer(observer);
1341                budgeted(
1342                    contract.max_inference_ms,
1343                    provider.invoke_tools(&req, &specs, &dispatcher, max_tool_iters),
1344                )
1345                .await
1346            }
1347        };
1348
1349        let resp = match result {
1350            Ok(r) => r,
1351            Err(e) => return NodeOutcome::Failed(e),
1352        };
1353
1354        let raw = rehydrate.rehydrate(&resp.text);
1355        let parsed = crate::ai_tools::parse_json_lenient(&raw)
1356            .ok_or_else(|| {
1357                flow_domain::contract::ContractError::SchemaViolation(
1358                    "model did not return a JSON object".into(),
1359                )
1360            })
1361            .and_then(|v| AiOutputEnvelope::from_value(&v))
1362            .and_then(|env| match contract.confidence_type {
1363                Some(expected) if env.confidence_type != expected => {
1364                    Err(flow_domain::contract::ContractError::SchemaViolation(format!(
1365                        "confidence_type `{}` does not match the contract's `{}`",
1366                        env.confidence_type.as_str(),
1367                        expected.as_str()
1368                    )))
1369                }
1370                _ => Ok(env),
1371            });
1372
1373        let envelope = match parsed {
1374            Ok(env) => env,
1375            Err(violation) => {
1376                self.events.emit(ExecutionEvent::AiRoutingDecision {
1377                    execution_id: execution_id.to_string(),
1378                    node_id: node_id.to_string(),
1379                    at: Utc::now(),
1380                    decision: "contract_violation".into(),
1381                    confidence: None,
1382                    threshold: "expected_output schema".into(),
1383                    contract_version: contract.contract_version.clone(),
1384                });
1385                return NodeOutcome::Failed(format!("contract violation: {violation}"));
1386            }
1387        };
1388
1389        let thresholds = &contract.thresholds;
1390        let decision = thresholds.route(envelope.confidence, envelope.escalate);
1391        let threshold_desc = match decision {
1392            RouteDecision::AutoApprove => format!(
1393                "confidence {} > autoApproveAbove {}",
1394                envelope.confidence, thresholds.auto_approve_above
1395            ),
1396            RouteDecision::Suppress => format!(
1397                "confidence {} < suppressBelow {}",
1398                envelope.confidence, thresholds.suppress_below
1399            ),
1400            RouteDecision::HumanReview if envelope.escalate => "model set escalate".into(),
1401            RouteDecision::HumanReview => format!(
1402                "confidence {} within review band [{}, {}]",
1403                envelope.confidence, thresholds.human_review_band.0, thresholds.human_review_band.1
1404            ),
1405        };
1406        self.events.emit(ExecutionEvent::AiRoutingDecision {
1407            execution_id: execution_id.to_string(),
1408            node_id: node_id.to_string(),
1409            at: Utc::now(),
1410            decision: decision.as_str().into(),
1411            confidence: Some(envelope.confidence),
1412            threshold: threshold_desc,
1413            contract_version: contract.contract_version.clone(),
1414        });
1415
1416        match decision {
1417            RouteDecision::AutoApprove => {
1418                NodeOutcome::Succeeded(contract_payload(&envelope, &resp, contract, decision))
1419            }
1420            RouteDecision::Suppress => NodeOutcome::Failed(format!(
1421                "output suppressed: confidence {:.2} below suppressBelow {:.2}",
1422                envelope.confidence, thresholds.suppress_below
1423            )),
1424            RouteDecision::HumanReview => {
1425                self.await_review(&envelope, &resp, contract, execution_id, node_id)
1426                    .await
1427            }
1428        }
1429    }
1430
1431    /// Hold a contract-bound output at the human review gate. The run
1432    /// self-pauses (like the destructive-step gate) until the verdict arrives
1433    /// via [`RunControl::resolve_review`] or the run is cancelled. A plain
1434    /// resume releases the gate as an approval, mirroring the destructive
1435    /// gate's resume-confirms semantics.
1436    async fn await_review(
1437        &self,
1438        envelope: &AiOutputEnvelope,
1439        resp: &flow_adapter_ai::CloudAiResponse,
1440        contract: &AiNodeContract,
1441        execution_id: &str,
1442        node_id: &str,
1443    ) -> NodeOutcome {
1444        if self.control.is_cancelling() {
1445            return NodeOutcome::Skipped("run cancelled before the review gate".into());
1446        }
1447        if !self.review_gate_available {
1448            return NodeOutcome::Failed(format!(
1449                "output requires human review (confidence {:.2}) but no review gate is \
1450                 attached to this run; routing to the fallback path",
1451                envelope.confidence
1452            ));
1453        }
1454        // Drop any stale verdict so only a fresh one can release this gate.
1455        let _ = self.control.take_review();
1456        self.events.emit(ExecutionEvent::AiReviewRequired {
1457            execution_id: execution_id.to_string(),
1458            node_id: node_id.to_string(),
1459            at: Utc::now(),
1460            primary_output: envelope.primary_output.clone(),
1461            confidence: envelope.confidence,
1462            reasoning: envelope.reasoning.clone(),
1463        });
1464        self.control.pause();
1465        self.control.wait_while_paused().await;
1466        if self.control.is_cancelling() {
1467            return NodeOutcome::Skipped("ai output not reviewed (run cancelled)".into());
1468        }
1469        let approved = !matches!(self.control.take_review(), Some(ReviewDecision::Rejected));
1470        self.events.emit(ExecutionEvent::AiReviewResolved {
1471            execution_id: execution_id.to_string(),
1472            node_id: node_id.to_string(),
1473            at: Utc::now(),
1474            approved,
1475        });
1476        if approved {
1477            NodeOutcome::Succeeded(contract_payload(
1478                envelope,
1479                resp,
1480                contract,
1481                RouteDecision::HumanReview,
1482            ))
1483        } else {
1484            NodeOutcome::Failed(format!(
1485                "output rejected at human review gate (confidence {:.2})",
1486                envelope.confidence
1487            ))
1488        }
1489    }
1490
1491    /// Cloud branch of the unified AI node: invokes a frontier provider's API.
1492    /// Reached from `run_ai_node` when `provider` is not `local`.
1493    async fn run_cloud_ai_node(&self, node: &FlowNode, execution_id: &str) -> NodeOutcome {
1494        let provider_name = match node.data.get("provider").and_then(|v| v.as_str()) {
1495            Some(p) => p.to_string(),
1496            None => return NodeOutcome::Failed("missing 'provider' on ai node".into()),
1497        };
1498
1499        // Policy gate. The `local` provider hits a localhost endpoint (no
1500        // egress) so it's governed by `allow_local_ai`; every other
1501        // provider is true cloud egress, gated by `allow_cloud_ai`.
1502        let is_local = provider_name == "local";
1503        if is_local {
1504            if !self.allow_local_ai {
1505                return NodeOutcome::Skipped(
1506                    "local AI disabled by policy (settings.allow_local_ai = false)".into(),
1507                );
1508            }
1509        } else if !self.allow_cloud_ai {
1510            return NodeOutcome::Skipped(
1511                "cloud AI disabled by policy (settings.allow_cloud_ai = false)".into(),
1512            );
1513        }
1514
1515        let model = match node.data.get("modelId").and_then(|v| v.as_str()) {
1516            Some(m) => m.to_string(),
1517            None => return NodeOutcome::Failed("missing 'modelId' on ai node".into()),
1518        };
1519        // The AI node's canonical field is `input`; `prompt` is accepted as a
1520        // deliberate leniency for model-generated DSL that emits it.
1521        let prompt = match node
1522            .data
1523            .get("prompt")
1524            .and_then(|v| v.as_str())
1525            .or_else(|| node.data.get("input").and_then(|v| v.as_str()))
1526        {
1527            Some(p) => p.to_string(),
1528            None => {
1529                return NodeOutcome::Failed("missing 'input' on ai node (cloud provider)".into())
1530            }
1531        };
1532        let max_tokens = node
1533            .data
1534            .get("maxTokens")
1535            .and_then(|v| v.as_u64())
1536            .map(|v| v as u32);
1537        let temperature = node
1538            .data
1539            .get("temperature")
1540            .and_then(|v| v.as_f64())
1541            .map(|v| v as f32);
1542        let audit_content = node
1543            .data
1544            .get("auditContent")
1545            .and_then(|v| v.as_bool())
1546            .unwrap_or(false);
1547
1548        let provider = match self.cloud_providers.get(&provider_name) {
1549            Some(p) => p,
1550            None => {
1551                return NodeOutcome::Skipped(format!(
1552                    "cloud provider '{provider_name}' not enabled"
1553                ));
1554            }
1555        };
1556
1557        let env_var = provider.env_var();
1558        let api_key = match self
1559            .credentials
1560            .resolve_cloud_ai_key(&provider_name, env_var)
1561        {
1562            Ok(k) => k,
1563            // Local servers (Ollama / LM Studio / llama.cpp) typically need
1564            // no auth - a missing key is not fatal; the provider simply omits
1565            // the bearer header. Cloud providers still require one.
1566            Err(CredentialError::NotFound { .. }) if is_local => String::new(),
1567            Err(CredentialError::NotFound { .. }) => {
1568                return NodeOutcome::Failed(format!(
1569                    "no API key for provider '{provider_name}'. Set it in Settings, or export {env_var} in your environment."
1570                ));
1571            }
1572            Err(e) => return NodeOutcome::Failed(format!("credential lookup failed: {e}")),
1573        };
1574
1575        let contract = match AiNodeContract::from_node_data(&node.data) {
1576            Ok(c) => c,
1577            Err(e) => return NodeOutcome::Failed(e.to_string()),
1578        };
1579
1580        let sanitized = self.sanitizer.sanitize(&prompt);
1581        // Without `auditContent`, record only a preview of the input - the
1582        // same privacy posture as the surfaced `text_preview` output.
1583        let recorded_input = if audit_content {
1584            sanitized.text.clone()
1585        } else {
1586            sanitized.text.chars().take(200).collect()
1587        };
1588        self.events.emit(ExecutionEvent::AiInvocation {
1589            execution_id: execution_id.to_string(),
1590            node_id: node.id.clone(),
1591            at: Utc::now(),
1592            provider: provider_name.clone(),
1593            model: model.clone(),
1594            input: recorded_input,
1595            contract_version: contract.as_ref().map(|c| c.contract_version.clone()),
1596        });
1597
1598        let system = node
1599            .data
1600            .get("system")
1601            .and_then(|v| v.as_str())
1602            .filter(|s| !s.is_empty())
1603            .map(str::to_owned);
1604        let images = match crate::ai_tools::resolve_node_images(node) {
1605            Ok(v) => v,
1606            Err(e) => return NodeOutcome::Failed(e),
1607        };
1608        let workspace = node
1609            .data
1610            .get("workspace")
1611            .and_then(|v| v.as_str())
1612            .filter(|s| !s.trim().is_empty())
1613            .map(str::to_owned);
1614
1615        let req = CloudAiRequest {
1616            model: model.clone(),
1617            prompt: sanitized.text,
1618            max_tokens,
1619            temperature,
1620            api_key,
1621            system,
1622            // Local provider reads its endpoint from settings; cloud
1623            // providers ignore this and use their fixed const.
1624            base_url: if is_local {
1625                self.local_ai_base_url.clone()
1626            } else {
1627                None
1628            },
1629            stream: true,
1630            call_id: Some(format!("cloud-ai-{}", Uuid::new_v4())),
1631            reasoning: crate::ai_tools::ai_reasoning(node),
1632            images,
1633            ..Default::default()
1634        };
1635
1636        // `auditContent` is the cloud privacy gate: when off, only a short
1637        // preview of the response is surfaced (no full text / prompt echo).
1638        self.dispatch_ai_node(
1639            &provider,
1640            req,
1641            &sanitized.map,
1642            &crate::ai_tools::ai_task(node),
1643            &crate::ai_tools::ai_labels(node),
1644            &crate::ai_tools::ai_tool_adapters(node),
1645            workspace,
1646            audit_content,
1647            &prompt,
1648            execution_id,
1649            &node.id,
1650            crate::ai_tools::ai_max_tool_iters(node),
1651            crate::ai_tools::ai_output_schema(node),
1652            crate::ai_tools::ai_expect(node),
1653            contract.as_ref(),
1654        )
1655        .await
1656    }
1657
1658    async fn run_adapter_node(&self, node: &FlowNode, execution_id: &str) -> NodeOutcome {
1659        let adapter_name = node
1660            .data
1661            .get("adapter")
1662            .and_then(|v| v.as_str())
1663            .unwrap_or("mock");
1664
1665        let Some(adapter) = self.adapters.get(adapter_name) else {
1666            return NodeOutcome::Failed(format!("adapter '{adapter_name}' not registered"));
1667        };
1668
1669        let ctx = crate::adapter::AdapterCtx {
1670            events: self.events.clone(),
1671            execution_id: execution_id.to_string(),
1672            node_id: node.id.clone(),
1673            workspace_root: self.workspace_root.clone(),
1674        };
1675
1676        match adapter.execute_with_events(node, &ctx).await {
1677            Ok(out) => {
1678                NodeOutcome::Succeeded(serde_json::to_value(out).unwrap_or(serde_json::Value::Null))
1679            }
1680            Err(e) => NodeOutcome::Failed(e.to_string()),
1681        }
1682    }
1683}
1684
1685/// If `node` performs a destructive operation, return a short human-readable
1686/// description of it; otherwise `None`. Used by the per-step confirmation gate
1687/// (roadmap E1). Detection is intentionally conservative - it flags the clear
1688/// data-loss cases (deleting files, `rm`, `git push`/`reset --hard`/`clean`,
1689/// `kubectl delete`, `drop table/database`, `truncate`, `mkfs`, `dd if=`,
1690/// `shutdown`/`reboot`) rather than guessing at every possible command.
1691/// A pre-apply advisory about a node in a proposed flow, surfaced in the review
1692/// + interception modals before the flow runs (roadmap E1 pre-apply
1693/// verification). `severity` is `"destructive"` or `"sandbox-escape"`.
1694#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1695#[serde(rename_all = "camelCase")]
1696pub struct FlowWarning {
1697    pub node_id: String,
1698    pub severity: String,
1699    pub message: String,
1700}
1701
1702/// Static pre-apply scan of a proposed graph: flags nodes that perform a
1703/// destructive operation (reusing [`destructive_reason`]) or reference a path
1704/// that would escape the workspace jail (`..` parent traversal). Advisory only -
1705/// it does not block; the executor's runtime gates still apply.
1706pub fn verify_graph(graph: &FlowGraph) -> Vec<FlowWarning> {
1707    let mut out = Vec::new();
1708    for node in &graph.nodes {
1709        if let Some(message) = destructive_reason(node) {
1710            out.push(FlowWarning {
1711                node_id: node.id.clone(),
1712                severity: "destructive".into(),
1713                message,
1714            });
1715        }
1716        if let Some(message) = sandbox_escape_reason(node) {
1717            out.push(FlowWarning {
1718                node_id: node.id.clone(),
1719                severity: "sandbox-escape".into(),
1720                message,
1721            });
1722        }
1723    }
1724    out
1725}
1726
1727/// Flags a node whose path-like fields contain a `..` parent-traversal segment -
1728/// an attempt to leave the workspace jail (`flow_security::confine_path` rejects
1729/// these at runtime; surfacing it pre-apply catches an AI-generated escape before
1730/// the run starts). Whole-segment match, so `a..b` is not a false positive.
1731fn sandbox_escape_reason(node: &FlowNode) -> Option<String> {
1732    let data = &node.data;
1733    for key in ["path", "cwd", "workspaceRoot", "command", "args"] {
1734        let candidate = match data.get(key) {
1735            Some(serde_json::Value::String(s)) => s.clone(),
1736            Some(serde_json::Value::Array(arr)) => arr
1737                .iter()
1738                .filter_map(|v| v.as_str())
1739                .collect::<Vec<_>>()
1740                .join(" "),
1741            _ => continue,
1742        };
1743        if candidate
1744            .split(|c: char| c == '/' || c == '\\' || c == ' ')
1745            .any(|seg| seg == "..")
1746        {
1747            let preview: String = candidate.chars().take(120).collect();
1748            return Some(format!(
1749                "Path may escape the workspace (contains `..`): {preview}"
1750            ));
1751        }
1752    }
1753    None
1754}
1755
1756/// Build the failover variant of an `ai` node: a clone with `provider` (and
1757/// `modelId`, if a `fallbackModelId` is set) swapped to the node's configured
1758/// fallback, and the `fallbackProvider` / `fallbackModelId` fields stripped so
1759/// the retry can't fall over a second time. `None` when no fallback is set.
1760fn fallback_node(node: &FlowNode) -> Option<FlowNode> {
1761    let fb_provider = node
1762        .data
1763        .get("fallbackProvider")
1764        .and_then(|v| v.as_str())
1765        .map(str::trim)
1766        .filter(|s| !s.is_empty())?
1767        .to_string();
1768    let mut fb = node.clone();
1769    if let Some(obj) = fb.data.as_object_mut() {
1770        obj.insert("provider".into(), serde_json::Value::String(fb_provider));
1771        if let Some(m) = node
1772            .data
1773            .get("fallbackModelId")
1774            .and_then(|v| v.as_str())
1775            .map(str::trim)
1776            .filter(|s| !s.is_empty())
1777        {
1778            obj.insert("modelId".into(), serde_json::Value::String(m.to_string()));
1779        }
1780        obj.remove("fallbackProvider");
1781        obj.remove("fallbackModelId");
1782    }
1783    Some(fb)
1784}
1785
1786pub fn destructive_reason(node: &FlowNode) -> Option<String> {
1787    let data = &node.data;
1788    let adapter = data.get("adapter").and_then(|v| v.as_str()).unwrap_or("");
1789    let action = data.get("actionId").and_then(|v| v.as_str()).unwrap_or("");
1790
1791    // Filesystem adapter: deleting a file is unambiguously destructive.
1792    if adapter == "fs" && action == "delete-file" {
1793        let path = data
1794            .get("path")
1795            .and_then(|v| v.as_str())
1796            .filter(|s| !s.trim().is_empty())
1797            .unwrap_or("a file");
1798        return Some(format!("Delete file: {path}"));
1799    }
1800
1801    // Shell / CLI adapters: scan the program + arguments for destructive
1802    // commands. Tokens are matched whole-word to avoid substring false
1803    // positives (e.g. `confirm` containing `rm`).
1804    if adapter == "shell" || adapter == "cli" {
1805        let mut parts: Vec<String> = Vec::new();
1806        // The shell adapter's actionId names the program for curated tools
1807        // (`git`, `npm`, `kubectl`, …); only `run-command` carries a free-form
1808        // program in `command`. Include it so `git push` etc. are detected.
1809        if adapter == "shell" && action != "run-command" && !action.is_empty() {
1810            parts.push(action.to_string());
1811        }
1812        for key in ["bin", "command"] {
1813            if let Some(s) = data.get(key).and_then(|v| v.as_str()) {
1814                parts.push(s.to_string());
1815            }
1816        }
1817        match data.get("args") {
1818            Some(serde_json::Value::String(s)) => parts.push(s.clone()),
1819            Some(serde_json::Value::Array(arr)) => {
1820                for a in arr {
1821                    if let Some(s) = a.as_str() {
1822                        parts.push(s.to_string());
1823                    }
1824                }
1825            }
1826            _ => {}
1827        }
1828        let cmd = parts.join(" ");
1829        if let Some(label) = destructive_command(&cmd) {
1830            let preview: String = cmd.chars().take(120).collect();
1831            return Some(format!("Runs a destructive command ({label}): {preview}"));
1832        }
1833    }
1834
1835    None
1836}
1837
1838/// Whole-word scan of a command line for known destructive operations. Returns
1839/// a short label for the first match, or `None`.
1840fn destructive_command(cmd: &str) -> Option<&'static str> {
1841    let lower = cmd.to_lowercase();
1842    let tokens: Vec<&str> = lower.split_whitespace().collect();
1843    let has = |w: &str| tokens.iter().any(|t| *t == w);
1844
1845    if has("rm") || has("rmdir") {
1846        return Some("rm");
1847    }
1848    if has("mkfs") || tokens.iter().any(|t| t.starts_with("mkfs.")) {
1849        return Some("mkfs");
1850    }
1851    if has("shutdown") {
1852        return Some("shutdown");
1853    }
1854    if has("reboot") {
1855        return Some("reboot");
1856    }
1857    if has("truncate") {
1858        return Some("truncate");
1859    }
1860    if has("dd") && tokens.iter().any(|t| t.starts_with("if=")) {
1861        return Some("dd");
1862    }
1863    // Multi-token tool subcommands.
1864    let after = |tool: &str, sub: &str| -> bool {
1865        tokens
1866            .iter()
1867            .position(|t| *t == tool)
1868            .map(|i| tokens[i + 1..].iter().any(|t| *t == sub))
1869            .unwrap_or(false)
1870    };
1871    if after("git", "push") {
1872        return Some("git push");
1873    }
1874    if after("git", "clean") {
1875        return Some("git clean");
1876    }
1877    if has("git") && has("reset") && has("--hard") {
1878        return Some("git reset --hard");
1879    }
1880    if after("kubectl", "delete") {
1881        return Some("kubectl delete");
1882    }
1883    if has("drop") && (has("table") || has("database")) {
1884        return Some("drop");
1885    }
1886    None
1887}
1888
1889/// Wrap an LLM text response in the AI-node output envelope. With `audit` the
1890/// full (rehydrated) text is surfaced - and the original prompt echoed when
1891/// non-empty; without it (the cloud privacy posture) only a short preview is
1892/// kept.
1893/// Self-evaluate an AI node's output against its `expect` assertion (roadmap E1
1894/// per-node evaluation): a second, constrained call to the same provider/model
1895/// returns PASS/FAIL. `Some(false)` = the output fails the assertion (the node
1896/// is then failed so the monitor/fix path engages); `None` = the eval itself
1897/// errored, treated as a pass so a flaky check never blocks a good run.
1898async fn evaluate_expectation(
1899    provider: &Arc<dyn flow_adapter_ai::CloudAiProvider>,
1900    base: &CloudAiRequest,
1901    output: &str,
1902    expect: &str,
1903) -> Option<bool> {
1904    let preview: String = output.chars().take(4000).collect();
1905    let eval_req = CloudAiRequest {
1906        model: base.model.clone(),
1907        prompt: format!("Output:\n{preview}\n\nExpectation: {expect}"),
1908        api_key: base.api_key.clone(),
1909        base_url: base.base_url.clone(),
1910        system: Some(
1911            "You are a strict evaluator. Decide whether the Output satisfies the \
1912             Expectation. Reply with EXACTLY one word: PASS or FAIL."
1913                .into(),
1914        ),
1915        max_tokens: Some(8),
1916        ..Default::default()
1917    };
1918    let resp = provider.invoke(&eval_req).await.ok()?;
1919    Some(!resp.text.trim().to_uppercase().contains("FAIL"))
1920}
1921
1922fn ai_generate_envelope(
1923    resp: &flow_adapter_ai::CloudAiResponse,
1924    rehydrate: &flow_security::RehydrationMap,
1925    audit: bool,
1926    original_prompt: &str,
1927) -> serde_json::Value {
1928    let text = rehydrate.rehydrate(&resp.text);
1929    let mut payload = serde_json::json!({
1930        "provider": resp.provider,
1931        "model": resp.model,
1932        "finish_reason": resp.finish_reason,
1933        "input_tokens": resp.input_tokens,
1934        "output_tokens": resp.output_tokens,
1935        "latency_ms": resp.latency_ms,
1936    });
1937    if audit {
1938        payload["text"] = serde_json::Value::String(text);
1939        if !original_prompt.is_empty() {
1940            payload["prompt"] = serde_json::Value::String(original_prompt.to_string());
1941        }
1942    } else {
1943        let preview: String = text.chars().take(200).collect();
1944        payload["text_preview"] = serde_json::Value::String(preview);
1945    }
1946    payload
1947}
1948
1949/// Run an inference future under the contract's `maxInferenceMs` budget.
1950/// Exceeding the budget is a node failure (RAO node-contract
1951/// `max_inference_ms`), which routes to the flow's fallback path.
1952async fn budgeted<F>(budget_ms: Option<u64>, fut: F) -> Result<flow_adapter_ai::CloudAiResponse, String>
1953where
1954    F: std::future::Future<
1955        Output = Result<flow_adapter_ai::CloudAiResponse, flow_adapter_ai::CloudAiError>,
1956    >,
1957{
1958    match budget_ms {
1959        Some(ms) => match tokio::time::timeout(std::time::Duration::from_millis(ms), fut).await {
1960            Ok(r) => r.map_err(|e| e.to_string()),
1961            Err(_) => Err(format!("inference exceeded the contract budget ({ms} ms)")),
1962        },
1963        None => fut.await.map_err(|e| e.to_string()),
1964    }
1965}
1966
1967/// Output payload of a contract-bound AI node. The envelope fields are
1968/// top-level so downstream nodes can branch on `{{node.primary_output}}`,
1969/// `{{node.confidence}}`, etc.; `route` records how the engine routed it.
1970fn contract_payload(
1971    envelope: &AiOutputEnvelope,
1972    resp: &flow_adapter_ai::CloudAiResponse,
1973    contract: &AiNodeContract,
1974    route: RouteDecision,
1975) -> serde_json::Value {
1976    let mut payload = serde_json::json!({
1977        "provider": resp.provider,
1978        "model": resp.model,
1979        "input_tokens": resp.input_tokens,
1980        "output_tokens": resp.output_tokens,
1981        "latency_ms": resp.latency_ms,
1982        "contract_version": contract.contract_version,
1983        "route": route.as_str(),
1984        "primary_output": envelope.primary_output,
1985        "confidence": envelope.confidence,
1986        "confidence_type": envelope.confidence_type.as_str(),
1987        "escalate": envelope.escalate,
1988    });
1989    if let Some(reasoning) = &envelope.reasoning {
1990        payload["reasoning"] = serde_json::Value::String(reasoning.clone());
1991    }
1992    payload
1993}
1994
1995enum NodeOutcome {
1996    Succeeded(serde_json::Value),
1997    Skipped(String),
1998    Failed(String),
1999}
2000
2001#[derive(Debug, Clone, Copy)]
2002enum NodeStatus {
2003    Succeeded,
2004    Failed,
2005    Skipped,
2006}
2007
2008/// Recursively substitute `{{...}}` references in every string leaf of a
2009/// node's `data` value. Objects and arrays are walked; non-string scalars are
2010/// returned unchanged. `outputs` holds prior succeeded-node payloads; `upstream`
2011/// is the list of node ids whose edge fired into the current node (the source
2012/// for `{{input}}`).
2013fn interpolate_value(
2014    value: &serde_json::Value,
2015    outputs: &HashMap<String, serde_json::Value>,
2016    upstream: &[String],
2017    memory: &HashMap<String, serde_json::Value>,
2018) -> serde_json::Value {
2019    match value {
2020        serde_json::Value::String(s) => {
2021            serde_json::Value::String(interpolate_str(s, outputs, upstream, memory))
2022        }
2023        serde_json::Value::Array(items) => serde_json::Value::Array(
2024            items
2025                .iter()
2026                .map(|v| interpolate_value(v, outputs, upstream, memory))
2027                .collect(),
2028        ),
2029        serde_json::Value::Object(map) => serde_json::Value::Object(
2030            map.iter()
2031                .map(|(k, v)| (k.clone(), interpolate_value(v, outputs, upstream, memory)))
2032                .collect(),
2033        ),
2034        other => other.clone(),
2035    }
2036}
2037
2038/// Replace each `{{token}}` occurrence in `s`. Unmatched braces and tokens
2039/// that resolve to nothing are left as the empty string. Strings with no `{{`
2040/// are returned as-is.
2041fn interpolate_str(
2042    s: &str,
2043    outputs: &HashMap<String, serde_json::Value>,
2044    upstream: &[String],
2045    memory: &HashMap<String, serde_json::Value>,
2046) -> String {
2047    if !s.contains("{{") {
2048        return s.to_string();
2049    }
2050    let mut out = String::with_capacity(s.len());
2051    let mut rest = s;
2052    while let Some(open) = rest.find("{{") {
2053        out.push_str(&rest[..open]);
2054        let after = &rest[open + 2..];
2055        match after.find("}}") {
2056            Some(close) => {
2057                let token = after[..close].trim();
2058                out.push_str(&resolve_token(token, outputs, upstream, memory));
2059                rest = &after[close + 2..];
2060            }
2061            // No closing braces - emit the literal `{{` and stop scanning.
2062            None => {
2063                out.push_str("{{");
2064                rest = after;
2065                break;
2066            }
2067        }
2068    }
2069    out.push_str(rest);
2070    out
2071}
2072
2073/// Resolve one `{{...}}` token to a string. `input` joins the upstream node
2074/// outputs; `memory.<key>[.<path>]` reads session working memory; `<id>` yields
2075/// that node's primary text; `<id>.<a>.<b>` walks into the node's output JSON.
2076fn resolve_token(
2077    token: &str,
2078    outputs: &HashMap<String, serde_json::Value>,
2079    upstream: &[String],
2080    memory: &HashMap<String, serde_json::Value>,
2081) -> String {
2082    if token == "input" {
2083        return upstream
2084            .iter()
2085            .filter_map(|id| outputs.get(id))
2086            .map(primary_text)
2087            .collect::<Vec<_>>()
2088            .join("\n\n");
2089    }
2090    if let Some(rest) = token.strip_prefix("memory.") {
2091        let (key, path) = match rest.split_once('.') {
2092            Some((key, path)) => (key, Some(path)),
2093            None => (rest, None),
2094        };
2095        let Some(value) = memory.get(key) else {
2096            return String::new();
2097        };
2098        return match path {
2099            Some(path) => value_at(value, path).map(primary_text).unwrap_or_default(),
2100            None => primary_text(value),
2101        };
2102    }
2103    let (id, path) = match token.split_once('.') {
2104        Some((id, path)) => (id, Some(path)),
2105        None => (token, None),
2106    };
2107    let Some(value) = outputs.get(id) else {
2108        return String::new();
2109    };
2110    match path {
2111        Some(path) => value_at(value, path).map(primary_text).unwrap_or_default(),
2112        None => primary_text(value),
2113    }
2114}
2115
2116/// Walk a dotted path (`a.b.c`) into a JSON object, returning the nested value.
2117fn value_at<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
2118    let mut cur = value;
2119    for segment in path.split('.') {
2120        cur = cur.get(segment)?;
2121    }
2122    Some(cur)
2123}
2124
2125/// Render a node-output value as the string a downstream field consumes. Plain
2126/// strings pass through; scalars use their natural string form. For objects:
2127/// adapter outputs nest their result under `payload`, so descend into it first;
2128/// then expose a common text-bearing key (`text`/`stdout`/`output`/`result`/
2129/// `value`) when present, else compact JSON.
2130fn primary_text(value: &serde_json::Value) -> String {
2131    match value {
2132        serde_json::Value::String(s) => s.clone(),
2133        serde_json::Value::Null => String::new(),
2134        serde_json::Value::Bool(b) => b.to_string(),
2135        serde_json::Value::Number(n) => n.to_string(),
2136        serde_json::Value::Object(map) => {
2137            // Adapter results are wrapped as `{ status, payload }`; the useful
2138            // content lives in `payload`.
2139            if let Some(payload) = map.get("payload") {
2140                if !payload.is_null() {
2141                    return primary_text(payload);
2142                }
2143            }
2144            for key in ["text", "stdout", "output", "result", "value"] {
2145                if let Some(serde_json::Value::String(s)) = map.get(key) {
2146                    return s.clone();
2147                }
2148            }
2149            serde_json::to_string(value).unwrap_or_default()
2150        }
2151        serde_json::Value::Array(_) => serde_json::to_string(value).unwrap_or_default(),
2152    }
2153}
2154
2155/// An outgoing edge is active when its routing `outcome` matches the source's
2156/// terminal status AND its optional `when` condition evaluates true. The
2157/// condition's `{{...}}` references are resolved against prior node outputs
2158/// (the edge source seeds `{{input}}`) before evaluation; a malformed or
2159/// unresolved condition fails closed - the edge does not fire.
2160fn edge_active(
2161    edge: &FlowEdge,
2162    terminal: &HashMap<String, NodeStatus>,
2163    outputs: &HashMap<String, serde_json::Value>,
2164    memory: &HashMap<String, serde_json::Value>,
2165) -> bool {
2166    let Some(status) = terminal.get(&edge.source) else {
2167        return false;
2168    };
2169    if !edge_fires(edge.outcome, *status) {
2170        return false;
2171    }
2172    match edge.condition.as_deref() {
2173        None => true,
2174        Some(expr) if expr.trim().is_empty() => true,
2175        Some(expr) => {
2176            let resolved =
2177                interpolate_str(expr, outputs, std::slice::from_ref(&edge.source), memory);
2178            crate::condition::eval(&resolved).unwrap_or(false)
2179        }
2180    }
2181}
2182
2183fn edge_fires(outcome: EdgeOutcome, status: NodeStatus) -> bool {
2184    matches!(
2185        (outcome, status),
2186        (EdgeOutcome::Always, _)
2187            | (EdgeOutcome::Pass, NodeStatus::Succeeded)
2188            | (EdgeOutcome::Fail, NodeStatus::Failed | NodeStatus::Skipped)
2189    )
2190}
2191
2192/// Build an outer graph where each sub-flow collapses to a single virtual node
2193/// (id = the sub-flow id). Edges are remapped to the virtual node; edges
2194/// internal to a sub-flow (or self-loops after remapping) are dropped. Lets the
2195/// scheduler topo-order free nodes and sub-flow units together.
2196fn collapse_subflows(graph: &FlowGraph) -> FlowGraph {
2197    let mut member_to_sf: HashMap<&str, &str> = HashMap::new();
2198    for sf in &graph.subflows {
2199        for id in &sf.node_ids {
2200            member_to_sf.insert(id.as_str(), sf.id.as_str());
2201        }
2202    }
2203    let map = |id: &str| -> String {
2204        member_to_sf
2205            .get(id)
2206            .map(|s| s.to_string())
2207            .unwrap_or_else(|| id.to_string())
2208    };
2209
2210    let mut nodes: Vec<FlowNode> = graph
2211        .nodes
2212        .iter()
2213        .filter(|n| !member_to_sf.contains_key(n.id.as_str()))
2214        .cloned()
2215        .collect();
2216    for sf in &graph.subflows {
2217        nodes.push(FlowNode {
2218            id: sf.id.clone(),
2219            node_type: "subflow".into(),
2220            position: Position { x: 0.0, y: 0.0 },
2221            data: serde_json::Value::Null,
2222        });
2223    }
2224
2225    let mut edges: Vec<FlowEdge> = Vec::new();
2226    for (i, e) in graph.edges.iter().enumerate() {
2227        let s = map(&e.source);
2228        let t = map(&e.target);
2229        if s == t {
2230            continue; // internal edge, or a self-loop after collapsing
2231        }
2232        edges.push(FlowEdge {
2233            id: format!("outer-{i}"),
2234            source: s,
2235            target: t,
2236            label: None,
2237            condition: None,
2238            outcome: EdgeOutcome::Always,
2239        });
2240    }
2241
2242    FlowGraph {
2243        id: graph.id.clone(),
2244        name: graph.name.clone(),
2245        version: graph.version.clone(),
2246        description: None,
2247        nodes,
2248        edges,
2249        subflows: Vec::new(),
2250    }
2251}
2252
2253fn topo_sort(graph: &FlowGraph) -> Result<Vec<String>, ExecutorError> {
2254    let mut indegree: HashMap<&str, usize> =
2255        graph.nodes.iter().map(|n| (n.id.as_str(), 0)).collect();
2256    let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
2257
2258    for edge in &graph.edges {
2259        adj.entry(edge.source.as_str())
2260            .or_default()
2261            .push(edge.target.as_str());
2262        *indegree.entry(edge.target.as_str()).or_insert(0) += 1;
2263    }
2264
2265    let mut queue: VecDeque<&str> = indegree
2266        .iter()
2267        .filter(|(_, &d)| d == 0)
2268        .map(|(id, _)| *id)
2269        .collect();
2270
2271    let mut order = Vec::with_capacity(graph.nodes.len());
2272    let mut visited = HashSet::new();
2273
2274    while let Some(id) = queue.pop_front() {
2275        if !visited.insert(id) {
2276            continue;
2277        }
2278        order.push(id.to_string());
2279
2280        if let Some(children) = adj.get(id) {
2281            for &c in children {
2282                let entry = indegree.entry(c).or_insert(0);
2283                if *entry > 0 {
2284                    *entry -= 1;
2285                    if *entry == 0 {
2286                        queue.push_back(c);
2287                    }
2288                }
2289            }
2290        }
2291    }
2292
2293    if order.len() < graph.nodes.len() {
2294        let unresolved = graph
2295            .nodes
2296            .iter()
2297            .find(|n| !visited.contains(n.id.as_str()))
2298            .map(|n| n.id.clone())
2299            .unwrap_or_default();
2300        return Err(ExecutorError::Cycle(unresolved));
2301    }
2302
2303    Ok(order)
2304}
2305
2306#[cfg(test)]
2307mod tests {
2308    use super::*;
2309    use crate::adapter::MockAdapter;
2310    use crate::events::CapturingSink;
2311    use flow_domain::graph::{FlowEdge, FlowNode, Position};
2312
2313    fn make_node(id: &str, kind: &str, data: serde_json::Value) -> FlowNode {
2314        FlowNode {
2315            id: id.into(),
2316            node_type: kind.into(),
2317            position: Position { x: 0.0, y: 0.0 },
2318            data,
2319        }
2320    }
2321
2322    fn make_edge(id: &str, src: &str, dst: &str) -> FlowEdge {
2323        FlowEdge {
2324            id: id.into(),
2325            source: src.into(),
2326            target: dst.into(),
2327            label: None,
2328            condition: None,
2329            outcome: EdgeOutcome::Always,
2330        }
2331    }
2332
2333    fn make_edge_with(id: &str, src: &str, dst: &str, outcome: EdgeOutcome) -> FlowEdge {
2334        FlowEdge {
2335            id: id.into(),
2336            source: src.into(),
2337            target: dst.into(),
2338            label: None,
2339            condition: None,
2340            outcome,
2341        }
2342    }
2343
2344    /// Build an executor with a single mock adapter (1ms delay) that always succeeds.
2345    async fn build_executor() -> (Executor, CapturingSink) {
2346        let mut adapters = AdapterRegistry::new();
2347        adapters.register(Arc::new(MockAdapter::with_delay(
2348            "mock",
2349            std::time::Duration::from_millis(1),
2350        )));
2351        let sink = CapturingSink::default();
2352        let credentials: Arc<dyn CredentialResolver> =
2353            Arc::new(flow_security::EnvFallbackResolver::new(Arc::new(
2354                flow_security::InMemoryCredentialStore::new(),
2355            )));
2356        let exec = Executor {
2357            adapters: Arc::new(adapters),
2358            sanitizer: Arc::new(PiiSanitizer::new()),
2359            events: Arc::new(sink.clone()),
2360            cloud_providers: Arc::new(CloudAiRegistry::new()),
2361            credentials,
2362            allow_cloud_ai: false,
2363            allow_local_ai: false,
2364            local_ai_base_url: None,
2365            stream_sink: Arc::new(NullStreamSink),
2366            working_memory: Default::default(),
2367            control: Default::default(),
2368            confirm_destructive: false,
2369            review_gate_available: true,
2370            edit_staging: None,
2371            workspace_root: PathBuf::from("."),
2372        };
2373        (exec, sink)
2374    }
2375
2376    /// A node with `actionId="forced-fail"` and `adapter="missing"` so the
2377    /// executor reports `Failed` (no such adapter registered).
2378    fn make_failing_action(id: &str) -> FlowNode {
2379        make_node(
2380            id,
2381            "action",
2382            serde_json::json!({ "adapter": "missing-adapter" }),
2383        )
2384    }
2385
2386    #[tokio::test]
2387    async fn topological_order_is_respected() {
2388        let graph = FlowGraph {
2389            subflows: Vec::new(),
2390            id: "g".into(),
2391            name: "g".into(),
2392            version: "1".into(),
2393            description: None,
2394            nodes: vec![
2395                make_node("c", "action", serde_json::json!({ "adapter": "mock" })),
2396                make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
2397                make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2398            ],
2399            edges: vec![make_edge("e1", "a", "b"), make_edge("e2", "b", "c")],
2400        };
2401
2402        let order = topo_sort(&graph).unwrap();
2403        let pos = |id: &str| order.iter().position(|x| x == id).unwrap();
2404        assert!(pos("a") < pos("b"));
2405        assert!(pos("b") < pos("c"));
2406    }
2407
2408    #[tokio::test]
2409    async fn cycle_detected() {
2410        let graph = FlowGraph {
2411            subflows: Vec::new(),
2412            id: "g".into(),
2413            name: "g".into(),
2414            version: "1".into(),
2415            description: None,
2416            nodes: vec![
2417                make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
2418                make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2419            ],
2420            edges: vec![make_edge("e1", "a", "b"), make_edge("e2", "b", "a")],
2421        };
2422        assert!(matches!(topo_sort(&graph), Err(ExecutorError::Cycle(_))));
2423    }
2424
2425    #[tokio::test]
2426    async fn happy_path_emits_events_and_returns_succeeded() {
2427        let (executor, sink) = build_executor().await;
2428
2429        let graph = FlowGraph {
2430            subflows: Vec::new(),
2431            id: "g".into(),
2432            name: "demo".into(),
2433            version: "1".into(),
2434            description: None,
2435            nodes: vec![
2436                make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
2437                make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2438            ],
2439            edges: vec![make_edge("e1", "a", "b")],
2440        };
2441
2442        let summary = executor.run(&graph).await.unwrap();
2443        assert_eq!(summary.status, "succeeded");
2444        assert_eq!(summary.succeeded, 2);
2445
2446        let events = sink.events.lock().unwrap();
2447        assert!(matches!(
2448            events.first(),
2449            Some(ExecutionEvent::Started { .. })
2450        ));
2451        assert!(matches!(events.last(), Some(ExecutionEvent::Done { .. })));
2452    }
2453
2454    #[tokio::test]
2455    async fn subflow_runs_as_unit_and_downstream_fires() {
2456        let (executor, _sink) = build_executor().await;
2457        let graph = FlowGraph {
2458            subflows: vec![SubFlow {
2459                id: "setup".into(),
2460                label: "Setup".into(),
2461                node_ids: vec!["a".into(), "b".into()],
2462                retry: None,
2463            }],
2464            id: "g".into(),
2465            name: "g".into(),
2466            version: "1".into(),
2467            description: None,
2468            nodes: vec![
2469                make_node("start", "action", serde_json::json!({ "adapter": "mock" })),
2470                make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
2471                make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2472                make_node("done", "action", serde_json::json!({ "adapter": "mock" })),
2473            ],
2474            edges: vec![
2475                make_edge("e1", "start", "a"), // boundary-crossing into the unit
2476                make_edge("e2", "a", "b"),     // internal to the unit
2477                make_edge("e3", "b", "done"),  // boundary-crossing out of the unit
2478            ],
2479        };
2480        let summary = executor.run(&graph).await.unwrap();
2481        assert_eq!(summary.status, "succeeded");
2482        assert_eq!(summary.succeeded, 4, "start + a + b + done all run");
2483        assert_eq!(summary.failed, 0);
2484        assert_eq!(summary.skipped, 0);
2485    }
2486
2487    #[tokio::test]
2488    async fn subflow_retries_as_a_unit_on_failure() {
2489        let (executor, sink) = build_executor().await;
2490        let graph = FlowGraph {
2491            subflows: vec![SubFlow {
2492                id: "unit".into(),
2493                label: "Unit".into(),
2494                node_ids: vec!["boom".into()],
2495                retry: Some(2),
2496            }],
2497            id: "g".into(),
2498            name: "g".into(),
2499            version: "1".into(),
2500            description: None,
2501            nodes: vec![make_failing_action("boom")],
2502            edges: vec![],
2503        };
2504        let summary = executor.run(&graph).await.unwrap();
2505        assert_eq!(summary.failed, 1, "the unit ultimately fails");
2506        // retry: Some(2) → 1 initial attempt + 2 retries = 3 runs of the member.
2507        let events = sink.events.lock().unwrap();
2508        let starts = events
2509            .iter()
2510            .filter(|e| matches!(e, ExecutionEvent::NodeStarted { node_id, .. } if node_id == "boom"))
2511            .count();
2512        assert_eq!(starts, 3, "the unit re-ran the failing member 1 + retry(2) times");
2513    }
2514
2515    #[tokio::test]
2516    async fn subflow_skipped_when_entry_not_reached() {
2517        let (executor, _sink) = build_executor().await;
2518        let graph = FlowGraph {
2519            subflows: vec![SubFlow {
2520                id: "unit".into(),
2521                label: "Unit".into(),
2522                node_ids: vec!["a".into(), "b".into()],
2523                retry: None,
2524            }],
2525            id: "g".into(),
2526            name: "g".into(),
2527            version: "1".into(),
2528            description: None,
2529            nodes: vec![
2530                make_failing_action("gate"),
2531                make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
2532                make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2533            ],
2534            // gate fails; the `pass` edge into the unit does not fire → unit skipped.
2535            edges: vec![
2536                make_edge_with("e1", "gate", "a", EdgeOutcome::Pass),
2537                make_edge("e2", "a", "b"),
2538            ],
2539        };
2540        let summary = executor.run(&graph).await.unwrap();
2541        assert_eq!(summary.failed, 1, "gate fails");
2542        assert_eq!(summary.skipped, 2, "both unit members are skipped");
2543        assert_eq!(summary.succeeded, 0);
2544    }
2545
2546    /// A `Pass` edge from a failing source must NOT fire; downstream is skipped.
2547    #[tokio::test]
2548    async fn pass_edge_skips_node_on_failure() {
2549        let (executor, sink) = build_executor().await;
2550        let graph = FlowGraph {
2551            subflows: Vec::new(),
2552            id: "g".into(),
2553            name: "g".into(),
2554            version: "1".into(),
2555            description: None,
2556            nodes: vec![
2557                make_failing_action("a"),
2558                make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2559            ],
2560            edges: vec![make_edge_with("e1", "a", "b", EdgeOutcome::Pass)],
2561        };
2562        let summary = executor.run(&graph).await.unwrap();
2563        assert_eq!(summary.failed, 1, "a must fail");
2564        assert_eq!(
2565            summary.skipped, 1,
2566            "b must be skipped (no incoming edge fired)"
2567        );
2568        assert_eq!(summary.succeeded, 0);
2569        let events = sink.events.lock().unwrap();
2570        let b_skipped = events.iter().any(|e| {
2571            matches!(e,
2572            ExecutionEvent::NodeSkipped { node_id, .. } if node_id == "b")
2573        });
2574        assert!(b_skipped, "b should have a skipped event");
2575    }
2576
2577    /// A `Fail` edge fires when the source is skipped (skipped is treated as
2578    /// "did not pass", so it propagates Fail).
2579    #[tokio::test]
2580    async fn fail_edge_fires_on_skip() {
2581        let (executor, _) = build_executor().await;
2582        let graph = FlowGraph {
2583            subflows: Vec::new(),
2584            id: "g".into(),
2585            name: "g".into(),
2586            version: "1".into(),
2587            description: None,
2588            nodes: vec![
2589                make_failing_action("a"),
2590                make_node(
2591                    "recover",
2592                    "action",
2593                    serde_json::json!({ "adapter": "mock" }),
2594                ),
2595            ],
2596            edges: vec![make_edge_with("e1", "a", "recover", EdgeOutcome::Fail)],
2597        };
2598        let summary = executor.run(&graph).await.unwrap();
2599        assert_eq!(summary.failed, 1);
2600        assert_eq!(
2601            summary.succeeded, 1,
2602            "recover runs because a Fail edge fired"
2603        );
2604    }
2605
2606    /// Build an executor wired with the real `local` provider so we can
2607    /// exercise the local-gate + optional-key path. `allow_cloud_ai` is
2608    /// false throughout to prove the local path is governed independently.
2609    async fn build_executor_with_local(
2610        allow_local_ai: bool,
2611        local_ai_base_url: Option<String>,
2612    ) -> Executor {
2613        let adapters = AdapterRegistry::new();
2614        let mut cloud = CloudAiRegistry::new();
2615        cloud.register(Arc::new(flow_adapter_ai::LocalOpenAiProvider::new()));
2616        let credentials: Arc<dyn CredentialResolver> =
2617            Arc::new(flow_security::EnvFallbackResolver::new(Arc::new(
2618                flow_security::InMemoryCredentialStore::new(),
2619            )));
2620        Executor {
2621            adapters: Arc::new(adapters),
2622            sanitizer: Arc::new(PiiSanitizer::new()),
2623            events: Arc::new(CapturingSink::default()),
2624            cloud_providers: Arc::new(cloud),
2625            credentials,
2626            allow_cloud_ai: false,
2627            allow_local_ai,
2628            local_ai_base_url,
2629            stream_sink: Arc::new(NullStreamSink),
2630            working_memory: Default::default(),
2631            control: Default::default(),
2632            confirm_destructive: false,
2633            review_gate_available: true,
2634            edit_staging: None,
2635            workspace_root: PathBuf::from("."),
2636        }
2637    }
2638
2639    fn local_ai_graph() -> FlowGraph {
2640        FlowGraph {
2641            subflows: Vec::new(),
2642            id: "g".into(),
2643            name: "g".into(),
2644            version: "1".into(),
2645            description: None,
2646            nodes: vec![make_node(
2647                "llm",
2648                "ai",
2649                serde_json::json!({
2650                    "provider": "local",
2651                    "modelId": "local-llm",
2652                    "input": "hello",
2653                }),
2654            )],
2655            edges: vec![],
2656        }
2657    }
2658
2659    /// `allow_local_ai = false` skips the node even though `allow_cloud_ai`
2660    /// is irrelevant - the local provider has its own gate.
2661    #[tokio::test]
2662    async fn local_provider_skipped_when_local_gate_off() {
2663        let exec = build_executor_with_local(false, None).await;
2664        let summary = exec.run(&local_ai_graph()).await.unwrap();
2665        assert_eq!(summary.skipped, 1);
2666        assert_eq!(summary.succeeded, 0);
2667        assert_eq!(summary.failed, 0);
2668    }
2669
2670    /// With the local gate ON and no stored API key, the node must NOT fail
2671    /// for a missing key (local servers need none). With no base_url set it
2672    /// fails inside the provider on a Shape error - proving we got past both
2673    /// the gate and the credential check.
2674    #[tokio::test]
2675    async fn local_provider_no_key_required_reaches_provider() {
2676        let exec = build_executor_with_local(true, None).await;
2677        let summary = exec.run(&local_ai_graph()).await.unwrap();
2678        // Failed (no base_url), not Skipped (gate) and not a key error.
2679        assert_eq!(summary.failed, 1);
2680        assert_eq!(summary.skipped, 0);
2681    }
2682
2683    // --- RAO contract-bound AI nodes (docs/rao-spec) -------------------------
2684
2685    /// Stub provider replaying fixed responses, registered under the `local`
2686    /// name so the contract tests drive the real local-AI node path.
2687    struct ScriptedLocalProvider {
2688        responses: std::sync::Mutex<VecDeque<String>>,
2689    }
2690
2691    impl ScriptedLocalProvider {
2692        fn new(responses: Vec<&str>) -> Arc<Self> {
2693            Arc::new(Self {
2694                responses: std::sync::Mutex::new(
2695                    responses.into_iter().map(String::from).collect(),
2696                ),
2697            })
2698        }
2699    }
2700
2701    #[async_trait::async_trait]
2702    impl flow_adapter_ai::CloudAiProvider for ScriptedLocalProvider {
2703        fn name(&self) -> &str {
2704            "local"
2705        }
2706        fn env_var(&self) -> &str {
2707            "STUB_KEY"
2708        }
2709        fn default_models(&self) -> &[&str] {
2710            &["stub-model"]
2711        }
2712        async fn invoke(
2713            &self,
2714            req: &CloudAiRequest,
2715        ) -> Result<flow_adapter_ai::CloudAiResponse, flow_adapter_ai::CloudAiError> {
2716            let text = self
2717                .responses
2718                .lock()
2719                .unwrap()
2720                .pop_front()
2721                .expect("scripted provider ran out of responses");
2722            Ok(flow_adapter_ai::CloudAiResponse {
2723                provider: "local".into(),
2724                model: req.model.clone(),
2725                text,
2726                finish_reason: "stop".into(),
2727                input_tokens: 0,
2728                output_tokens: 0,
2729                latency_ms: 0,
2730            })
2731        }
2732    }
2733
2734    async fn build_contract_executor(responses: Vec<&str>) -> (Executor, CapturingSink) {
2735        let sink = CapturingSink::default();
2736        let mut cloud = CloudAiRegistry::new();
2737        cloud.register(ScriptedLocalProvider::new(responses));
2738        let mut adapters = AdapterRegistry::new();
2739        adapters.register(Arc::new(MockAdapter::with_delay(
2740            "mock",
2741            std::time::Duration::from_millis(1),
2742        )));
2743        let credentials: Arc<dyn CredentialResolver> =
2744            Arc::new(flow_security::EnvFallbackResolver::new(Arc::new(
2745                flow_security::InMemoryCredentialStore::new(),
2746            )));
2747        let executor = Executor {
2748            adapters: Arc::new(adapters),
2749            sanitizer: Arc::new(PiiSanitizer::new()),
2750            events: Arc::new(sink.clone()),
2751            cloud_providers: Arc::new(cloud),
2752            credentials,
2753            allow_cloud_ai: false,
2754            allow_local_ai: true,
2755            local_ai_base_url: Some("http://127.0.0.1:9".into()),
2756            stream_sink: Arc::new(NullStreamSink),
2757            working_memory: Default::default(),
2758            control: Default::default(),
2759            confirm_destructive: false,
2760            review_gate_available: true,
2761            edit_staging: None,
2762            workspace_root: PathBuf::from("."),
2763        };
2764        (executor, sink)
2765    }
2766
2767    /// One contract-bound AI node with a `.fail` fallback edge to a mock
2768    /// recovery action - the RAO reference shape.
2769    fn contract_graph() -> FlowGraph {
2770        FlowGraph {
2771            subflows: Vec::new(),
2772            id: "g".into(),
2773            name: "g".into(),
2774            version: "1".into(),
2775            description: None,
2776            nodes: vec![
2777                make_node(
2778                    "interpret",
2779                    "ai",
2780                    serde_json::json!({
2781                        "provider": "local",
2782                        "modelId": "stub-model",
2783                        "input": "interpret this log",
2784                        "contract": true,
2785                    }),
2786                ),
2787                make_node("fallback", "action", serde_json::json!({ "adapter": "mock" })),
2788            ],
2789            edges: vec![make_edge_with("e1", "interpret", "fallback", EdgeOutcome::Fail)],
2790        }
2791    }
2792
2793    fn envelope_json(confidence: f64, escalate: bool) -> String {
2794        serde_json::json!({
2795            "primary_output": "step 3 failed: missing dataset",
2796            "confidence": confidence,
2797            "confidence_type": "verbalized",
2798            "escalate": escalate,
2799            "reasoning": "matched the abend code"
2800        })
2801        .to_string()
2802    }
2803
2804    fn routing_decisions(sink: &CapturingSink) -> Vec<String> {
2805        sink.events
2806            .lock()
2807            .unwrap()
2808            .iter()
2809            .filter_map(|e| match e {
2810                ExecutionEvent::AiRoutingDecision { decision, .. } => Some(decision.clone()),
2811                _ => None,
2812            })
2813            .collect()
2814    }
2815
2816    /// Output outside the envelope schema is a failed node (RAO constraint 3),
2817    /// which routes onto the `.fail` fallback path (constraint 4).
2818    #[tokio::test]
2819    async fn contract_violation_fails_node_onto_fallback() {
2820        let (executor, sink) = build_contract_executor(vec!["sure, here's my analysis"]).await;
2821        let summary = executor.run(&contract_graph()).await.unwrap();
2822        assert_eq!(summary.failed, 1, "schema violation fails the ai node");
2823        assert_eq!(summary.succeeded, 1, "fallback runs via the .fail edge");
2824        assert_eq!(routing_decisions(&sink), vec!["contract_violation"]);
2825    }
2826
2827    /// Confidence above `autoApproveAbove` routes the output downstream with
2828    /// the envelope fields branchable on the payload.
2829    #[tokio::test]
2830    async fn contract_auto_approves_high_confidence() {
2831        let (executor, sink) =
2832            build_contract_executor(vec![&envelope_json(0.95, false)]).await;
2833        let summary = executor.run(&contract_graph()).await.unwrap();
2834        assert_eq!(summary.succeeded, 1);
2835        assert_eq!(summary.skipped, 1, "fallback not reached");
2836        assert_eq!(routing_decisions(&sink), vec!["auto_approve"]);
2837        let events = sink.events.lock().unwrap();
2838        let output = events
2839            .iter()
2840            .find_map(|e| match e {
2841                ExecutionEvent::NodeSucceeded { node_id, output, .. }
2842                    if node_id == "interpret" =>
2843                {
2844                    Some(output.clone())
2845                }
2846                _ => None,
2847            })
2848            .expect("ai node succeeded");
2849        assert_eq!(
2850            output.get("primary_output").and_then(|v| v.as_str()),
2851            Some("step 3 failed: missing dataset")
2852        );
2853        assert_eq!(output.get("route").and_then(|v| v.as_str()), Some("auto_approve"));
2854    }
2855
2856    /// Confidence below `suppressBelow` suppresses the output: the node fails
2857    /// and the fallback path runs.
2858    #[tokio::test]
2859    async fn contract_suppresses_low_confidence_onto_fallback() {
2860        let (executor, sink) =
2861            build_contract_executor(vec![&envelope_json(0.2, false)]).await;
2862        let summary = executor.run(&contract_graph()).await.unwrap();
2863        assert_eq!(summary.failed, 1);
2864        assert_eq!(summary.succeeded, 1, "fallback runs via the .fail edge");
2865        assert_eq!(routing_decisions(&sink), vec!["suppress"]);
2866    }
2867
2868    /// Review-band output with no attached review gate (headless run) fails
2869    /// deterministically onto the fallback path - never a silent auto-approve.
2870    #[tokio::test]
2871    async fn contract_review_band_falls_back_when_gate_unavailable() {
2872        let (mut executor, sink) =
2873            build_contract_executor(vec![&envelope_json(0.7, false)]).await;
2874        executor.review_gate_available = false;
2875        let summary = executor.run(&contract_graph()).await.unwrap();
2876        assert_eq!(summary.failed, 1);
2877        assert_eq!(summary.succeeded, 1, "fallback runs via the .fail edge");
2878        assert_eq!(routing_decisions(&sink), vec!["human_review"]);
2879    }
2880
2881    /// `escalate: true` forces the human gate even at high confidence - the
2882    /// model can request review but can never bypass it (RAO checklist 5.4).
2883    #[tokio::test]
2884    async fn contract_escalate_forces_review_at_high_confidence() {
2885        let (mut executor, sink) =
2886            build_contract_executor(vec![&envelope_json(0.99, true)]).await;
2887        executor.review_gate_available = false;
2888        let summary = executor.run(&contract_graph()).await.unwrap();
2889        assert_eq!(summary.failed, 1, "escalated output held back without a gate");
2890        assert_eq!(routing_decisions(&sink), vec!["human_review"]);
2891    }
2892
2893    /// Drive the review gate to a verdict: watch for `AiReviewRequired`, then
2894    /// resolve via the shared `RunControl` the way the host command does.
2895    async fn run_with_review_verdict(approved: bool) -> (ExecutionSummary, CapturingSink) {
2896        let (executor, sink) = build_contract_executor(vec![&envelope_json(0.7, false)]).await;
2897        let control = executor.control.clone();
2898        let watcher_sink = sink.clone();
2899        let watcher = tokio::spawn(async move {
2900            loop {
2901                let gated = watcher_sink
2902                    .events
2903                    .lock()
2904                    .unwrap()
2905                    .iter()
2906                    .any(|e| matches!(e, ExecutionEvent::AiReviewRequired { .. }));
2907                if gated {
2908                    control.resolve_review(approved);
2909                    return;
2910                }
2911                tokio::time::sleep(std::time::Duration::from_millis(5)).await;
2912            }
2913        });
2914        let summary = executor.run(&contract_graph()).await.unwrap();
2915        watcher.await.unwrap();
2916        (summary, sink)
2917    }
2918
2919    /// Approval at the gate passes the contract-bound output downstream, and
2920    /// the human decision lands in the event trail (RAO checklist 6.4).
2921    #[tokio::test]
2922    async fn contract_review_gate_approval_passes_output() {
2923        let (summary, sink) = run_with_review_verdict(true).await;
2924        assert_eq!(summary.succeeded, 1);
2925        assert_eq!(summary.failed, 0);
2926        let events = sink.events.lock().unwrap();
2927        assert!(events.iter().any(|e| matches!(
2928            e,
2929            ExecutionEvent::AiReviewResolved { approved: true, .. }
2930        )));
2931    }
2932
2933    /// Rejection at the gate fails the node onto its fallback path.
2934    #[tokio::test]
2935    async fn contract_review_gate_rejection_routes_fallback() {
2936        let (summary, sink) = run_with_review_verdict(false).await;
2937        assert_eq!(summary.failed, 1);
2938        assert_eq!(summary.succeeded, 1, "fallback runs via the .fail edge");
2939        let events = sink.events.lock().unwrap();
2940        assert!(events.iter().any(|e| matches!(
2941            e,
2942            ExecutionEvent::AiReviewResolved { approved: false, .. }
2943        )));
2944    }
2945
2946    /// Every AI invocation is recorded as a system event with its input
2947    /// (RAO checklist 6.1), contract-bound or not.
2948    #[tokio::test]
2949    async fn ai_invocation_is_recorded() {
2950        let (executor, sink) = build_contract_executor(vec!["plain text answer"]).await;
2951        let graph = FlowGraph {
2952            subflows: Vec::new(),
2953            id: "g".into(),
2954            name: "g".into(),
2955            version: "1".into(),
2956            description: None,
2957            nodes: vec![make_node(
2958                "llm",
2959                "ai",
2960                serde_json::json!({
2961                    "provider": "local",
2962                    "modelId": "stub-model",
2963                    "input": "hello",
2964                }),
2965            )],
2966            edges: vec![],
2967        };
2968        executor.run(&graph).await.unwrap();
2969        let events = sink.events.lock().unwrap();
2970        let recorded = events.iter().any(|e| matches!(
2971            e,
2972            ExecutionEvent::AiInvocation { provider, input, .. }
2973                if provider == "local" && input == "hello"
2974        ));
2975        assert!(recorded, "AiInvocation event with the sanitized input");
2976    }
2977
2978    /// `Always` edges fire regardless of the source's terminal status, so
2979    /// the target runs even when the source failed.
2980    #[tokio::test]
2981    async fn always_edges_run_target_unconditionally() {
2982        let (executor, _) = build_executor().await;
2983        let graph = FlowGraph {
2984            subflows: Vec::new(),
2985            id: "g".into(),
2986            name: "g".into(),
2987            version: "1".into(),
2988            description: None,
2989            nodes: vec![
2990                make_failing_action("a"),
2991                make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2992            ],
2993            edges: vec![make_edge_with("e1", "a", "b", EdgeOutcome::Always)],
2994        };
2995        let summary = executor.run(&graph).await.unwrap();
2996        assert_eq!(summary.failed, 1, "a still fails");
2997        assert_eq!(
2998            summary.succeeded, 1,
2999            "b runs despite a's failure (Always edge)"
3000        );
3001    }
3002
3003    // --- inter-node data flow (`{{...}}` interpolation) ---------------------
3004
3005    fn output_for(sink: &CapturingSink, node_id: &str) -> Option<serde_json::Value> {
3006        sink.events.lock().unwrap().iter().find_map(|e| match e {
3007            ExecutionEvent::NodeSucceeded {
3008                node_id: nid,
3009                output,
3010                ..
3011            } if nid == node_id => Some(output.clone()),
3012            _ => None,
3013        })
3014    }
3015
3016    #[test]
3017    fn primary_text_extracts_common_keys_and_scalars() {
3018        assert_eq!(primary_text(&serde_json::json!("hi")), "hi");
3019        assert_eq!(primary_text(&serde_json::json!({ "text": "abc" })), "abc");
3020        assert_eq!(primary_text(&serde_json::json!({ "stdout": "out" })), "out");
3021        assert_eq!(primary_text(&serde_json::json!(42)), "42");
3022        assert_eq!(primary_text(&serde_json::json!(true)), "true");
3023        assert_eq!(primary_text(&serde_json::Value::Null), "");
3024        // No text-bearing key → compact JSON.
3025        assert_eq!(
3026            primary_text(&serde_json::json!({ "a": 1 })),
3027            "{\"a\":1}".to_string()
3028        );
3029    }
3030
3031    #[test]
3032    fn resolve_token_handles_input_id_and_path() {
3033        let mut outputs = HashMap::new();
3034        outputs.insert("a".to_string(), serde_json::json!({ "text": "hello" }));
3035        outputs.insert(
3036            "b".to_string(),
3037            serde_json::json!({ "nested": { "k": "deep" } }),
3038        );
3039        let upstream = vec!["a".to_string()];
3040        let mut memory = HashMap::new();
3041        memory.insert("region".to_string(), serde_json::json!("us-east-1"));
3042        memory.insert("cfg".to_string(), serde_json::json!({ "retries": 3 }));
3043
3044        assert_eq!(resolve_token("input", &outputs, &upstream, &memory), "hello");
3045        assert_eq!(resolve_token("a", &outputs, &upstream, &memory), "hello");
3046        assert_eq!(resolve_token("a.text", &outputs, &upstream, &memory), "hello");
3047        assert_eq!(resolve_token("b.nested.k", &outputs, &upstream, &memory), "deep");
3048        // Working-memory reads.
3049        assert_eq!(resolve_token("memory.region", &outputs, &upstream, &memory), "us-east-1");
3050        assert_eq!(resolve_token("memory.cfg.retries", &outputs, &upstream, &memory), "3");
3051        // Unknown id / missing field / missing memory key → empty.
3052        assert_eq!(resolve_token("missing", &outputs, &upstream, &memory), "");
3053        assert_eq!(resolve_token("a.nope", &outputs, &upstream, &memory), "");
3054        assert_eq!(resolve_token("memory.nope", &outputs, &upstream, &memory), "");
3055    }
3056
3057    #[test]
3058    fn interpolate_str_replaces_tokens_and_preserves_literals() {
3059        let mut outputs = HashMap::new();
3060        outputs.insert("a".to_string(), serde_json::json!({ "text": "world" }));
3061        let upstream = vec!["a".to_string()];
3062        let memory = HashMap::new();
3063
3064        assert_eq!(
3065            interpolate_str("hi {{a.text}}!", &outputs, &upstream, &memory),
3066            "hi world!"
3067        );
3068        assert_eq!(
3069            interpolate_str("no tokens here", &outputs, &upstream, &memory),
3070            "no tokens here"
3071        );
3072        // Unterminated braces are left literal.
3073        assert_eq!(
3074            interpolate_str("oops {{a.text", &outputs, &upstream, &memory),
3075            "oops {{a.text"
3076        );
3077    }
3078
3079    #[test]
3080    fn interpolate_value_recurses_into_arrays_and_objects() {
3081        let mut outputs = HashMap::new();
3082        outputs.insert("a".to_string(), serde_json::json!({ "text": "X" }));
3083        let upstream = vec!["a".to_string()];
3084        let memory = HashMap::new();
3085        let data = serde_json::json!({
3086            "command": "echo {{a.text}}",
3087            "args": ["--msg", "{{a.text}}"],
3088            "count": 3,
3089        });
3090        let resolved = interpolate_value(&data, &outputs, &upstream, &memory);
3091        assert_eq!(resolved["command"], serde_json::json!("echo X"));
3092        assert_eq!(resolved["args"][1], serde_json::json!("X"));
3093        assert_eq!(resolved["count"], serde_json::json!(3));
3094    }
3095
3096    #[tokio::test]
3097    async fn upstream_output_flows_into_downstream_field() {
3098        // The mock adapter echoes `data.label` into its output payload (under
3099        // the `{status, payload}` wrapper). Node B references node A's label via
3100        // `{{a.payload.label}}`, so B's resolved label - and thus B's echoed
3101        // output - must carry A's value.
3102        let (executor, sink) = build_executor().await;
3103        let graph = FlowGraph {
3104            subflows: Vec::new(),
3105            id: "g".into(),
3106            name: "g".into(),
3107            version: "1".into(),
3108            description: None,
3109            nodes: vec![
3110                make_node("a", "action", serde_json::json!({ "adapter": "mock", "label": "hello" })),
3111                make_node(
3112                    "b",
3113                    "action",
3114                    serde_json::json!({ "adapter": "mock", "label": "got: {{a.payload.label}}" }),
3115                ),
3116            ],
3117            edges: vec![make_edge("e1", "a", "b")],
3118        };
3119        let summary = executor.run(&graph).await.unwrap();
3120        assert_eq!(summary.succeeded, 2);
3121        let b_out = output_for(&sink, "b").expect("b succeeded");
3122        assert_eq!(b_out["payload"]["label"], serde_json::json!("got: hello"));
3123    }
3124
3125    #[tokio::test]
3126    async fn input_token_resolves_to_upstream_output() {
3127        // `{{input}}` resolves to the upstream node's primary text. The mock
3128        // payload has no text-bearing key, so `primary_text` descends into the
3129        // payload and serializes it - assert the downstream label embeds the
3130        // upstream label.
3131        let (executor, sink) = build_executor().await;
3132        let graph = FlowGraph {
3133            subflows: Vec::new(),
3134            id: "g".into(),
3135            name: "g".into(),
3136            version: "1".into(),
3137            description: None,
3138            nodes: vec![
3139                make_node("a", "action", serde_json::json!({ "adapter": "mock", "label": "seed" })),
3140                make_node(
3141                    "b",
3142                    "action",
3143                    serde_json::json!({ "adapter": "mock", "label": "{{input}}" }),
3144                ),
3145            ],
3146            edges: vec![make_edge("e1", "a", "b")],
3147        };
3148        executor.run(&graph).await.unwrap();
3149        let b_out = output_for(&sink, "b").expect("b succeeded");
3150        let label = b_out["payload"]["label"].as_str().unwrap();
3151        assert!(
3152            label.contains("\"label\":\"seed\""),
3153            "b's label embeds a's serialized output: {label}"
3154        );
3155    }
3156
3157    // --- executor iteration: when-conditions + bounded loops ----------------
3158
3159    use crate::adapter::{Adapter, AdapterError, NodeOutput};
3160    use std::sync::atomic::{AtomicI64, Ordering};
3161
3162    /// Edge with a `when` condition, otherwise an `Always` Pass-equivalent.
3163    fn make_edge_when(id: &str, src: &str, dst: &str, outcome: EdgeOutcome, when: &str) -> FlowEdge {
3164        FlowEdge {
3165            id: id.into(),
3166            source: src.into(),
3167            target: dst.into(),
3168            label: None,
3169            condition: Some(when.into()),
3170            outcome,
3171        }
3172    }
3173
3174    /// Adapter that returns `payload.remaining` decreasing by 1 each call,
3175    /// so a back-edge `when {{<id>.payload.remaining}} > 0` exits after N runs.
3176    struct CountdownAdapter {
3177        name: String,
3178        remaining: AtomicI64,
3179    }
3180
3181    impl CountdownAdapter {
3182        fn new(name: &str, start: i64) -> Self {
3183            Self {
3184                name: name.into(),
3185                remaining: AtomicI64::new(start),
3186            }
3187        }
3188    }
3189
3190    #[async_trait::async_trait]
3191    impl Adapter for CountdownAdapter {
3192        fn name(&self) -> &str {
3193            &self.name
3194        }
3195        async fn execute(&self, _node: &FlowNode) -> Result<NodeOutput, AdapterError> {
3196            let after = self.remaining.fetch_sub(1, Ordering::SeqCst) - 1;
3197            Ok(NodeOutput {
3198                status: "succeeded".into(),
3199                payload: serde_json::json!({ "remaining": after }),
3200            })
3201        }
3202    }
3203
3204    async fn build_executor_with_adapters(extra: Vec<Arc<dyn Adapter>>) -> (Executor, CapturingSink) {
3205        build_executor_with(extra, WorkingMemory::default()).await
3206    }
3207
3208    async fn build_executor_with(
3209        extra: Vec<Arc<dyn Adapter>>,
3210        working_memory: WorkingMemory,
3211    ) -> (Executor, CapturingSink) {
3212        let mut adapters = AdapterRegistry::new();
3213        adapters.register(Arc::new(MockAdapter::with_delay(
3214            "mock",
3215            std::time::Duration::from_millis(1),
3216        )));
3217        for a in extra {
3218            adapters.register(a);
3219        }
3220        let sink = CapturingSink::default();
3221        let credentials: Arc<dyn CredentialResolver> =
3222            Arc::new(flow_security::EnvFallbackResolver::new(Arc::new(
3223                flow_security::InMemoryCredentialStore::new(),
3224            )));
3225        let exec = Executor {
3226            adapters: Arc::new(adapters),
3227            sanitizer: Arc::new(PiiSanitizer::new()),
3228            events: Arc::new(sink.clone()),
3229            cloud_providers: Arc::new(CloudAiRegistry::new()),
3230            credentials,
3231            allow_cloud_ai: false,
3232            allow_local_ai: false,
3233            local_ai_base_url: None,
3234            stream_sink: Arc::new(NullStreamSink),
3235            working_memory,
3236            control: Default::default(),
3237            confirm_destructive: false,
3238            review_gate_available: true,
3239            edit_staging: None,
3240            workspace_root: PathBuf::from("."),
3241        };
3242        (exec, sink)
3243    }
3244
3245    /// Test adapter mirroring the utility `set-variable` payload contract:
3246    /// emits `{ actionId: "set-variable", name, value }` so the executor's
3247    /// `store_set_variable` writes working memory. Avoids depending on the
3248    /// `flow-adapter-utility` crate (layering).
3249    struct SetVarAdapter;
3250
3251    #[async_trait::async_trait]
3252    impl Adapter for SetVarAdapter {
3253        fn name(&self) -> &str {
3254            "setvar"
3255        }
3256        async fn execute(&self, node: &FlowNode) -> Result<NodeOutput, AdapterError> {
3257            let name = node
3258                .data
3259                .get("name")
3260                .and_then(|v| v.as_str())
3261                .unwrap_or("")
3262                .to_string();
3263            let value = node.data.get("value").cloned().unwrap_or(serde_json::Value::Null);
3264            Ok(NodeOutput {
3265                status: "succeeded".into(),
3266                payload: serde_json::json!({
3267                    "actionId": "set-variable",
3268                    "name": name,
3269                    "value": value,
3270                }),
3271            })
3272        }
3273    }
3274
3275    fn count_succeeded(sink: &CapturingSink, node_id: &str) -> usize {
3276        sink.events
3277            .lock()
3278            .unwrap()
3279            .iter()
3280            .filter(|e| matches!(e, ExecutionEvent::NodeSucceeded { node_id: nid, .. } if nid == node_id))
3281            .count()
3282    }
3283
3284    fn was_skipped(sink: &CapturingSink, node_id: &str) -> bool {
3285        sink.events
3286            .lock()
3287            .unwrap()
3288            .iter()
3289            .any(|e| matches!(e, ExecutionEvent::NodeSkipped { node_id: nid, .. } if nid == node_id))
3290    }
3291
3292    fn count_capped(sink: &CapturingSink, node_id: &str) -> usize {
3293        sink.events
3294            .lock()
3295            .unwrap()
3296            .iter()
3297            .filter(|e| matches!(e, ExecutionEvent::IterationCapped { node_id: nid, .. } if nid == node_id))
3298            .count()
3299    }
3300
3301    #[tokio::test]
3302    async fn when_condition_gates_edge_on_dag() {
3303        let (executor, sink) = build_executor().await;
3304        // src emits label "go"; the edge whose condition matches fires, the
3305        // other is gated off and its target is skipped.
3306        let graph = FlowGraph {
3307            subflows: Vec::new(),
3308            id: "g".into(),
3309            name: "g".into(),
3310            version: "1".into(),
3311            description: None,
3312            nodes: vec![
3313                make_node("src", "action", serde_json::json!({ "adapter": "mock", "label": "go" })),
3314                make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
3315                make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
3316            ],
3317            edges: vec![
3318                make_edge_when("e1", "src", "a", EdgeOutcome::Pass, "{{src.payload.label}} == 'go'"),
3319                make_edge_when("e2", "src", "b", EdgeOutcome::Pass, "{{src.payload.label}} == 'stop'"),
3320            ],
3321        };
3322
3323        let summary = executor.run(&graph).await.unwrap();
3324        assert_eq!(count_succeeded(&sink, "a"), 1, "matching condition runs a");
3325        assert!(was_skipped(&sink, "b"), "non-matching condition skips b");
3326        // src+a succeeded, b skipped via routing - a skip alongside successes
3327        // is not a failure, so the run is still "succeeded".
3328        assert_eq!(summary.status, "succeeded");
3329    }
3330
3331    #[tokio::test]
3332    async fn bounded_loop_converges_on_when_exit() {
3333        let countdown = Arc::new(CountdownAdapter::new("countdown", 3));
3334        let (executor, sink) = build_executor_with_adapters(vec![countdown]).await;
3335        // start -> work -> check, with check looping back to work while
3336        // remaining > 0. Countdown starts at 3, so check exits after 3 runs.
3337        let graph = FlowGraph {
3338            subflows: Vec::new(),
3339            id: "g".into(),
3340            name: "g".into(),
3341            version: "1".into(),
3342            description: None,
3343            nodes: vec![
3344                make_node("start", "action", serde_json::json!({ "adapter": "mock" })),
3345                make_node("work", "action", serde_json::json!({ "adapter": "mock" })),
3346                make_node("check", "action", serde_json::json!({ "adapter": "countdown" })),
3347            ],
3348            edges: vec![
3349                make_edge("e1", "start", "work"),
3350                make_edge("e2", "work", "check"),
3351                make_edge_when(
3352                    "e3",
3353                    "check",
3354                    "work",
3355                    EdgeOutcome::Always,
3356                    "{{check.payload.remaining}} > 0",
3357                ),
3358            ],
3359        };
3360
3361        let summary = executor.run(&graph).await.unwrap();
3362        assert_eq!(count_succeeded(&sink, "check"), 3, "loop runs until remaining hits 0");
3363        assert_eq!(count_succeeded(&sink, "work"), 3);
3364        assert_eq!(count_capped(&sink, "check"), 0, "converged before the cap");
3365        assert_eq!(summary.status, "succeeded");
3366    }
3367
3368    #[tokio::test]
3369    async fn runaway_loop_hits_cap_and_finishes_partial() {
3370        let (executor, sink) = build_executor().await;
3371        // A self-loop with an always-active back-edge never converges; the
3372        // per-node cap stops it and the run ends `partial` rather than hanging.
3373        let graph = FlowGraph {
3374            subflows: Vec::new(),
3375            id: "g".into(),
3376            name: "g".into(),
3377            version: "1".into(),
3378            description: None,
3379            nodes: vec![
3380                make_node("start", "action", serde_json::json!({ "adapter": "mock" })),
3381                make_node("loop", "action", serde_json::json!({ "adapter": "mock" })),
3382            ],
3383            edges: vec![
3384                make_edge("e1", "start", "loop"),
3385                make_edge("e2", "loop", "loop"),
3386            ],
3387        };
3388
3389        let summary = executor.run(&graph).await.unwrap();
3390        assert_eq!(
3391            count_succeeded(&sink, "loop"),
3392            PER_NODE_VISIT_CAP as usize,
3393            "node runs exactly up to the cap"
3394        );
3395        assert_eq!(count_capped(&sink, "loop"), 1, "one cap diagnostic emitted");
3396        assert_eq!(summary.status, "partial");
3397    }
3398
3399    // --- working memory: set-variable writes, {{memory.x}} reads ------------
3400
3401    fn set_var_node(id: &str, name: &str, value: &str) -> FlowNode {
3402        make_node(
3403            id,
3404            "utility",
3405            serde_json::json!({ "adapter": "setvar", "name": name, "value": value }),
3406        )
3407    }
3408
3409    #[tokio::test]
3410    async fn set_variable_writes_memory_and_downstream_reads_it() {
3411        let (executor, sink) = build_executor_with_adapters(vec![Arc::new(SetVarAdapter)]).await;
3412        let graph = FlowGraph {
3413            subflows: Vec::new(),
3414            id: "g".into(),
3415            name: "g".into(),
3416            version: "1".into(),
3417            description: None,
3418            nodes: vec![
3419                set_var_node("set", "region", "us-east-1"),
3420                make_node(
3421                    "reader",
3422                    "action",
3423                    serde_json::json!({ "adapter": "mock", "label": "{{memory.region}}" }),
3424                ),
3425            ],
3426            edges: vec![make_edge("e1", "set", "reader")],
3427        };
3428
3429        executor.run(&graph).await.unwrap();
3430        let reader = output_for(&sink, "reader").expect("reader succeeded");
3431        assert_eq!(
3432            reader["payload"]["label"].as_str(),
3433            Some("us-east-1"),
3434            "downstream node reads the value written via set-variable"
3435        );
3436    }
3437
3438    #[tokio::test]
3439    async fn working_memory_persists_across_runs() {
3440        // A shared store mirrors FlowApp owning memory across execute_graph
3441        // calls (i.e. across re-plan iterations).
3442        let memory = WorkingMemory::default();
3443        let (executor, sink) =
3444            build_executor_with(vec![Arc::new(SetVarAdapter)], memory.clone()).await;
3445
3446        // Run 1: write the variable.
3447        let write_graph = FlowGraph {
3448            subflows: Vec::new(),
3449            id: "g1".into(),
3450            name: "g1".into(),
3451            version: "1".into(),
3452            description: None,
3453            nodes: vec![set_var_node("set", "token", "abc123")],
3454            edges: vec![],
3455        };
3456        executor.run(&write_graph).await.unwrap();
3457        assert_eq!(memory.lock().unwrap().get("token").and_then(|v| v.as_str()), Some("abc123"));
3458
3459        // Run 2: a separate graph reads it - the store survived the first run.
3460        let read_graph = FlowGraph {
3461            subflows: Vec::new(),
3462            id: "g2".into(),
3463            name: "g2".into(),
3464            version: "1".into(),
3465            description: None,
3466            nodes: vec![make_node(
3467                "reader",
3468                "action",
3469                serde_json::json!({ "adapter": "mock", "label": "{{memory.token}}" }),
3470            )],
3471            edges: vec![],
3472        };
3473        executor.run(&read_graph).await.unwrap();
3474        let reader = output_for(&sink, "reader").expect("reader succeeded");
3475        assert_eq!(reader["payload"]["label"].as_str(), Some("abc123"));
3476    }
3477
3478    #[tokio::test]
3479    async fn when_condition_reads_memory() {
3480        let (executor, sink) = build_executor_with_adapters(vec![Arc::new(SetVarAdapter)]).await;
3481        // set flag=go, then a Pass edge gated on memory routes to `go_node`
3482        // while the mismatched edge skips `stop_node`.
3483        let graph = FlowGraph {
3484            subflows: Vec::new(),
3485            id: "g".into(),
3486            name: "g".into(),
3487            version: "1".into(),
3488            description: None,
3489            nodes: vec![
3490                set_var_node("set", "flag", "go"),
3491                make_node("go_node", "action", serde_json::json!({ "adapter": "mock" })),
3492                make_node("stop_node", "action", serde_json::json!({ "adapter": "mock" })),
3493            ],
3494            edges: vec![
3495                make_edge_when("e1", "set", "go_node", EdgeOutcome::Pass, "{{memory.flag}} == 'go'"),
3496                make_edge_when(
3497                    "e2",
3498                    "set",
3499                    "stop_node",
3500                    EdgeOutcome::Pass,
3501                    "{{memory.flag}} == 'stop'",
3502                ),
3503            ],
3504        };
3505
3506        executor.run(&graph).await.unwrap();
3507        assert_eq!(count_succeeded(&sink, "go_node"), 1, "memory-matched edge fires");
3508        assert!(was_skipped(&sink, "stop_node"), "memory-mismatched edge skips");
3509    }
3510
3511    #[test]
3512    fn destructive_reason_flags_data_loss_only() {
3513        let del = make_node(
3514            "d",
3515            "action",
3516            serde_json::json!({"adapter":"fs","actionId":"delete-file","path":"build/"}),
3517        );
3518        assert!(destructive_reason(&del).is_some(), "fs delete-file");
3519
3520        let rm = make_node(
3521            "s",
3522            "action",
3523            serde_json::json!({"adapter":"shell","actionId":"run-command","command":"rm","args":["-rf","dist"]}),
3524        );
3525        assert!(destructive_reason(&rm).is_some(), "shell rm");
3526
3527        let push = make_node(
3528            "g",
3529            "action",
3530            serde_json::json!({"adapter":"shell","actionId":"git","args":["push","origin","main"]}),
3531        );
3532        assert!(destructive_reason(&push).is_some(), "git push");
3533
3534        let read = make_node(
3535            "r",
3536            "action",
3537            serde_json::json!({"adapter":"fs","actionId":"read-file","path":"x"}),
3538        );
3539        assert!(destructive_reason(&read).is_none(), "read is safe");
3540
3541        // No substring false positive: `confirm` must not match the `rm` token.
3542        let confirm = make_node(
3543            "c",
3544            "action",
3545            serde_json::json!({"adapter":"shell","actionId":"run-command","command":"./confirm","args":["build"]}),
3546        );
3547        assert!(destructive_reason(&confirm).is_none(), "no rm substring match");
3548    }
3549
3550    #[test]
3551    fn sandbox_escape_flags_parent_traversal() {
3552        let esc = make_node(
3553            "e",
3554            "action",
3555            serde_json::json!({"adapter":"fs","actionId":"read-file","path":"../../etc/passwd"}),
3556        );
3557        assert!(sandbox_escape_reason(&esc).is_some(), "`..` path flagged");
3558        let safe = make_node(
3559            "s",
3560            "action",
3561            serde_json::json!({"adapter":"fs","actionId":"read-file","path":"src/main.rs"}),
3562        );
3563        assert!(sandbox_escape_reason(&safe).is_none(), "in-jail path is fine");
3564        // No false positive on `..` as a substring (not a whole segment).
3565        let dots = make_node(
3566            "d",
3567            "action",
3568            serde_json::json!({"adapter":"fs","actionId":"read-file","path":"a..b.txt"}),
3569        );
3570        assert!(
3571            sandbox_escape_reason(&dots).is_none(),
3572            "`..` substring not flagged"
3573        );
3574    }
3575
3576    #[test]
3577    fn verify_graph_collects_destructive_and_escape() {
3578        let graph = FlowGraph {
3579            subflows: Vec::new(),
3580            id: "g".into(),
3581            name: "g".into(),
3582            version: "1".into(),
3583            description: None,
3584            nodes: vec![
3585                make_node(
3586                    "del",
3587                    "action",
3588                    serde_json::json!({"adapter":"fs","actionId":"delete-file","path":"x"}),
3589                ),
3590                make_node(
3591                    "esc",
3592                    "action",
3593                    serde_json::json!({"adapter":"fs","actionId":"read-file","path":"../x"}),
3594                ),
3595                make_node(
3596                    "ok",
3597                    "action",
3598                    serde_json::json!({"adapter":"fs","actionId":"read-file","path":"x"}),
3599                ),
3600            ],
3601            edges: vec![],
3602        };
3603        let w = verify_graph(&graph);
3604        assert!(w.iter().any(|x| x.node_id == "del" && x.severity == "destructive"));
3605        assert!(w
3606            .iter()
3607            .any(|x| x.node_id == "esc" && x.severity == "sandbox-escape"));
3608        assert!(!w.iter().any(|x| x.node_id == "ok"));
3609    }
3610
3611    #[test]
3612    fn fallback_node_swaps_provider_and_strips_fallback_fields() {
3613        let n = make_node(
3614            "ai",
3615            "ai",
3616            serde_json::json!({
3617                "provider": "claude",
3618                "modelId": "m1",
3619                "fallbackProvider": "local",
3620                "fallbackModelId": "m2",
3621                "input": "x"
3622            }),
3623        );
3624        let fb = fallback_node(&n).expect("has fallback");
3625        assert_eq!(fb.data.get("provider").unwrap(), "local");
3626        assert_eq!(fb.data.get("modelId").unwrap(), "m2");
3627        assert!(fb.data.get("fallbackProvider").is_none());
3628        assert!(fb.data.get("fallbackModelId").is_none());
3629        assert_eq!(fb.data.get("input").unwrap(), "x"); // other fields preserved
3630
3631        // No fallbackProvider → no failover variant.
3632        let no_fb = make_node(
3633            "ai",
3634            "ai",
3635            serde_json::json!({"provider":"claude","modelId":"m1"}),
3636        );
3637        assert!(fallback_node(&no_fb).is_none());
3638    }
3639
3640    #[test]
3641    fn run_control_phase_transitions() {
3642        let c = RunControl::default();
3643        assert_eq!(c.phase(), RunPhase::Running);
3644        c.pause();
3645        assert_eq!(c.phase(), RunPhase::Paused);
3646        c.resume();
3647        assert_eq!(c.phase(), RunPhase::Running);
3648        c.cancel();
3649        assert!(c.is_cancelling());
3650        // pause/resume are no-ops once cancelling; cancel wins.
3651        c.pause();
3652        c.resume();
3653        assert!(c.is_cancelling());
3654        // reset clears back to running for the next run.
3655        c.reset();
3656        assert_eq!(c.phase(), RunPhase::Running);
3657    }
3658
3659    #[tokio::test]
3660    async fn wait_while_paused_returns_immediately_when_not_paused() {
3661        let c = RunControl::default();
3662        // Running: returns at once.
3663        c.wait_while_paused().await;
3664        // Cancelling (not Paused): also returns at once rather than blocking.
3665        c.cancel();
3666        c.wait_while_paused().await;
3667    }
3668
3669    #[tokio::test]
3670    async fn wait_while_paused_unblocks_on_resume() {
3671        let c = Arc::new(RunControl::default());
3672        c.pause();
3673        let c2 = c.clone();
3674        let waiter = tokio::spawn(async move { c2.wait_while_paused().await });
3675        // Let the waiter park inside `notified()` before releasing it.
3676        tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3677        c.resume();
3678        tokio::time::timeout(std::time::Duration::from_secs(1), waiter)
3679            .await
3680            .expect("waiter did not wake within timeout")
3681            .expect("waiter task panicked");
3682        assert_eq!(c.phase(), RunPhase::Running);
3683    }
3684
3685    /// Test adapter that cancels the shared run control the first time it runs,
3686    /// so the next node-boundary checkpoint observes the cancellation.
3687    struct CancelTriggerAdapter {
3688        control: SharedRunControl,
3689    }
3690
3691    #[async_trait::async_trait]
3692    impl Adapter for CancelTriggerAdapter {
3693        fn name(&self) -> &str {
3694            "cancel-trigger"
3695        }
3696        async fn execute(&self, _node: &FlowNode) -> Result<NodeOutput, AdapterError> {
3697            self.control.cancel();
3698            Ok(NodeOutput {
3699                status: "succeeded".into(),
3700                payload: serde_json::Value::Null,
3701            })
3702        }
3703    }
3704
3705    #[tokio::test]
3706    async fn cancel_skips_remaining_nodes_and_reports_cancelled() {
3707        let (mut exec, sink) = build_executor().await;
3708        let control = exec.control.clone();
3709        let trigger: Arc<dyn Adapter> = Arc::new(CancelTriggerAdapter { control });
3710        // Re-register the adapter set with the cancel trigger added.
3711        let mut adapters = AdapterRegistry::new();
3712        adapters.register(Arc::new(MockAdapter::with_delay(
3713            "mock",
3714            std::time::Duration::from_millis(1),
3715        )));
3716        adapters.register(trigger);
3717        exec.adapters = Arc::new(adapters);
3718
3719        // n1 cancels on execution; n2 and n3 should never run.
3720        let graph = FlowGraph {
3721            subflows: Vec::new(),
3722            id: "g".into(),
3723            name: "g".into(),
3724            version: "1".into(),
3725            description: None,
3726            nodes: vec![
3727                make_node("n1", "action", serde_json::json!({ "adapter": "cancel-trigger" })),
3728                make_node("n2", "action", serde_json::json!({ "adapter": "mock" })),
3729                make_node("n3", "action", serde_json::json!({ "adapter": "mock" })),
3730            ],
3731            edges: vec![
3732                make_edge("e1", "n1", "n2"),
3733                make_edge("e2", "n2", "n3"),
3734            ],
3735        };
3736
3737        let summary = exec.run(&graph).await.unwrap();
3738        assert_eq!(summary.status, "cancelled", "cancel yields cancelled status");
3739        assert_eq!(count_succeeded(&sink, "n1"), 1, "the triggering node still completes");
3740        assert!(was_skipped(&sink, "n2"), "downstream node is skipped after cancel");
3741        assert!(was_skipped(&sink, "n3"), "all remaining nodes are skipped after cancel");
3742    }
3743}