1use chrono::Utc;
2use flow_adapter_ai::{CloudAiRegistry, CloudAiRequest, LlmStreamSink};
3#[cfg(test)]
4use flow_adapter_ai::NullStreamSink;
5use flow_domain::contract::{AiNodeContract, AiOutputEnvelope, RouteDecision};
6use flow_domain::graph::{EdgeOutcome, FlowEdge, FlowGraph, FlowNode, Position, SubFlow};
7use flow_security::{CredentialError, CredentialResolver, PiiSanitizer};
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::path::PathBuf;
10use std::sync::Arc;
11use thiserror::Error;
12use uuid::Uuid;
13
14use crate::adapter::AdapterRegistry;
15use crate::events::{EventSink, ExecutionEvent, LogStream};
16
17#[derive(Debug, Error)]
18pub enum ExecutorError {
19 #[error("graph contains a cycle involving node {0}")]
20 Cycle(String),
21 #[error("node {node_id} references unknown adapter")]
22 MissingAdapter { node_id: String },
23}
24
25#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
26pub struct ExecutionSummary {
27 pub execution_id: String,
28 pub status: String,
29 pub succeeded: usize,
30 pub failed: usize,
31 pub skipped: usize,
32}
33
34pub struct Executor {
35 pub adapters: Arc<AdapterRegistry>,
36 pub sanitizer: Arc<PiiSanitizer>,
37 pub events: Arc<dyn EventSink>,
38 pub cloud_providers: Arc<CloudAiRegistry>,
39 pub credentials: Arc<dyn CredentialResolver>,
40 pub allow_cloud_ai: bool,
41 pub allow_local_ai: bool,
45 pub local_ai_base_url: Option<String>,
48 pub stream_sink: Arc<dyn LlmStreamSink>,
53 pub working_memory: WorkingMemory,
58 pub control: SharedRunControl,
62 pub confirm_destructive: bool,
69 pub review_gate_available: bool,
75 pub edit_staging: Option<crate::ai_tools::StagedEdits>,
82 pub workspace_root: PathBuf,
86}
87
88pub type WorkingMemory = Arc<std::sync::Mutex<HashMap<String, serde_json::Value>>>;
92
93#[derive(Debug, Clone, Copy, PartialEq, Eq)]
97pub enum RunPhase {
98 Running,
99 Paused,
100 Cancelling,
101}
102
103pub struct RunControl {
109 phase: std::sync::atomic::AtomicU8,
110 notify: tokio::sync::Notify,
112 review: std::sync::Mutex<Option<ReviewDecision>>,
116}
117
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120pub enum ReviewDecision {
121 Approved,
122 Rejected,
123}
124
125impl Default for RunControl {
126 fn default() -> Self {
127 Self {
128 phase: std::sync::atomic::AtomicU8::new(Self::RUNNING),
129 notify: tokio::sync::Notify::new(),
130 review: std::sync::Mutex::new(None),
131 }
132 }
133}
134
135impl RunControl {
136 const RUNNING: u8 = 0;
137 const PAUSED: u8 = 1;
138 const CANCELLING: u8 = 2;
139
140 fn set(&self, v: u8) {
141 self.phase.store(v, std::sync::atomic::Ordering::SeqCst);
142 }
143
144 pub fn phase(&self) -> RunPhase {
145 match self.phase.load(std::sync::atomic::Ordering::SeqCst) {
146 Self::PAUSED => RunPhase::Paused,
147 Self::CANCELLING => RunPhase::Cancelling,
148 _ => RunPhase::Running,
149 }
150 }
151
152 pub fn reset(&self) {
155 self.set(Self::RUNNING);
156 self.notify.notify_waiters();
158 }
159
160 pub fn pause(&self) {
162 let _ = self.phase.compare_exchange(
163 Self::RUNNING,
164 Self::PAUSED,
165 std::sync::atomic::Ordering::SeqCst,
166 std::sync::atomic::Ordering::SeqCst,
167 );
168 }
169
170 pub fn resume(&self) {
172 let _ = self.phase.compare_exchange(
173 Self::PAUSED,
174 Self::RUNNING,
175 std::sync::atomic::Ordering::SeqCst,
176 std::sync::atomic::Ordering::SeqCst,
177 );
178 self.notify.notify_waiters();
179 }
180
181 pub fn cancel(&self) {
183 self.set(Self::CANCELLING);
184 self.notify.notify_waiters();
185 }
186
187 pub fn is_cancelling(&self) -> bool {
188 self.phase() == RunPhase::Cancelling
189 }
190
191 pub fn resolve_review(&self, approved: bool) {
194 if let Ok(mut slot) = self.review.lock() {
195 *slot = Some(if approved {
196 ReviewDecision::Approved
197 } else {
198 ReviewDecision::Rejected
199 });
200 }
201 self.resume();
202 }
203
204 pub fn take_review(&self) -> Option<ReviewDecision> {
206 self.review.lock().ok().and_then(|mut slot| slot.take())
207 }
208
209 pub async fn wait_while_paused(&self) {
211 while self.phase() == RunPhase::Paused {
212 self.notify.notified().await;
213 }
214 }
215}
216
217pub type SharedRunControl = Arc<RunControl>;
219
220const PER_NODE_VISIT_CAP: u32 = 25;
226const GLOBAL_STEP_CAP: u32 = 500;
227
228impl Executor {
229 pub async fn run(&self, graph: &FlowGraph) -> Result<ExecutionSummary, ExecutorError> {
230 self.run_with_id(Uuid::new_v4().to_string(), graph).await
231 }
232
233 pub async fn run_with_id(
238 &self,
239 execution_id: String,
240 graph: &FlowGraph,
241 ) -> Result<ExecutionSummary, ExecutorError> {
242 self.control.reset();
244 self.events.emit(ExecutionEvent::Started {
245 execution_id: execution_id.clone(),
246 at: Utc::now(),
247 });
248
249 let mut terminal: HashMap<String, NodeStatus> = HashMap::new();
254 let mut outputs: HashMap<String, serde_json::Value> = HashMap::new();
255
256 let capped = if graph.subflows.is_empty() {
261 match topo_sort(graph) {
262 Ok(order) => {
263 self.run_acyclic(graph, &order, &execution_id, &mut outputs, &mut terminal)
264 .await;
265 false
266 }
267 Err(ExecutorError::Cycle(_)) => {
268 self.run_iterative(graph, &execution_id, &mut outputs, &mut terminal)
269 .await
270 }
271 Err(e) => return Err(e),
272 }
273 } else {
274 let outer = collapse_subflows(graph);
278 match topo_sort(&outer) {
279 Ok(order) => {
280 self.run_acyclic_units(
281 graph,
282 &order,
283 &execution_id,
284 &mut outputs,
285 &mut terminal,
286 )
287 .await;
288 false
289 }
290 Err(ExecutorError::Cycle(_)) => {
291 self.run_iterative(graph, &execution_id, &mut outputs, &mut terminal)
294 .await
295 }
296 Err(e) => return Err(e),
297 }
298 };
299
300 let mut succeeded = 0usize;
301 let mut failed = 0usize;
302 let mut skipped = 0usize;
303 for status in terminal.values() {
304 match status {
305 NodeStatus::Succeeded => succeeded += 1,
306 NodeStatus::Failed => failed += 1,
307 NodeStatus::Skipped => skipped += 1,
308 }
309 }
310
311 let status = if self.control.is_cancelling() {
312 "cancelled"
313 } else if capped {
314 "partial"
315 } else if failed > 0 {
316 if succeeded > 0 || skipped > 0 {
317 "partial"
318 } else {
319 "failed"
320 }
321 } else if skipped > 0 && succeeded == 0 {
322 "skipped"
323 } else {
324 "succeeded"
325 };
326
327 self.events.emit(ExecutionEvent::Done {
328 execution_id: execution_id.clone(),
329 at: Utc::now(),
330 status: status.into(),
331 });
332
333 Ok(ExecutionSummary {
334 execution_id,
335 status: status.into(),
336 succeeded,
337 failed,
338 skipped,
339 })
340 }
341
342 async fn checkpoint(&self, execution_id: &str) -> bool {
347 match self.control.phase() {
348 RunPhase::Running => false,
349 RunPhase::Cancelling => true,
350 RunPhase::Paused => {
351 self.events.emit(ExecutionEvent::Paused {
352 execution_id: execution_id.to_string(),
353 at: Utc::now(),
354 });
355 self.control.wait_while_paused().await;
356 if self.control.is_cancelling() {
357 true
358 } else {
359 self.events.emit(ExecutionEvent::Resumed {
360 execution_id: execution_id.to_string(),
361 at: Utc::now(),
362 });
363 false
364 }
365 }
366 }
367 }
368
369 fn snapshot_memory(&self) -> HashMap<String, serde_json::Value> {
372 self.working_memory
373 .lock()
374 .map(|m| m.clone())
375 .unwrap_or_default()
376 }
377
378 fn store_set_variable(&self, output: &serde_json::Value) {
382 let inner = output.get("payload").unwrap_or(output);
383 if inner.get("actionId").and_then(|v| v.as_str()) != Some("set-variable") {
384 return;
385 }
386 let Some(name) = inner.get("name").and_then(|v| v.as_str()) else {
387 return;
388 };
389 let value = inner.get("value").cloned().unwrap_or(serde_json::Value::Null);
390 if let Ok(mut mem) = self.working_memory.lock() {
391 mem.insert(name.to_string(), value);
392 }
393 }
394
395 async fn run_acyclic(
399 &self,
400 graph: &FlowGraph,
401 order: &[String],
402 execution_id: &str,
403 outputs: &mut HashMap<String, serde_json::Value>,
404 terminal: &mut HashMap<String, NodeStatus>,
405 ) {
406 let nodes_by_id: HashMap<&str, &FlowNode> =
407 graph.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
408
409 for node_id in order {
410 if self.checkpoint(execution_id).await {
413 break;
414 }
415 let Some(node) = nodes_by_id.get(node_id.as_str()).copied() else {
416 continue;
417 };
418 self.run_one_node(graph, node, execution_id, 0, outputs, terminal)
419 .await;
420 }
421
422 for node in &graph.nodes {
425 if !terminal.contains_key(&node.id) {
426 terminal.insert(node.id.clone(), NodeStatus::Skipped);
427 self.events.emit(ExecutionEvent::NodeSkipped {
428 execution_id: execution_id.to_string(),
429 node_id: node.id.clone(),
430 at: Utc::now(),
431 reason: "run cancelled before this node was scheduled".into(),
432 iteration: 0,
433 });
434 }
435 }
436 }
437
438 async fn run_one_node(
442 &self,
443 graph: &FlowGraph,
444 node: &FlowNode,
445 execution_id: &str,
446 iteration: u32,
447 outputs: &mut HashMap<String, serde_json::Value>,
448 terminal: &mut HashMap<String, NodeStatus>,
449 ) {
450 let in_edges: Vec<_> = graph.edges.iter().filter(|e| e.target == node.id).collect();
451 let memory = self.snapshot_memory();
452 let reachable = in_edges.is_empty()
453 || in_edges.iter().any(|e| edge_active(e, terminal, outputs, &memory));
454
455 if !reachable {
456 terminal.insert(node.id.clone(), NodeStatus::Skipped);
457 self.events.emit(ExecutionEvent::NodeSkipped {
458 execution_id: execution_id.to_string(),
459 node_id: node.id.clone(),
460 at: Utc::now(),
461 reason: "no incoming edge fired (upstream did not pass/fail to here)".into(),
462 iteration,
463 });
464 return;
465 }
466
467 self.execute_node(graph, node, execution_id, iteration, outputs, terminal)
468 .await;
469 }
470
471 async fn run_acyclic_units(
475 &self,
476 graph: &FlowGraph,
477 outer_order: &[String],
478 execution_id: &str,
479 outputs: &mut HashMap<String, serde_json::Value>,
480 terminal: &mut HashMap<String, NodeStatus>,
481 ) {
482 let nodes_by_id: HashMap<&str, &FlowNode> =
483 graph.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
484
485 for outer_id in outer_order {
486 if self.checkpoint(execution_id).await {
487 break;
488 }
489 if let Some(sf) = graph.subflows.iter().find(|s| &s.id == outer_id) {
490 self.run_subflow_unit(graph, sf, execution_id, outputs, terminal)
491 .await;
492 } else if let Some(node) = nodes_by_id.get(outer_id.as_str()).copied() {
493 self.run_one_node(graph, node, execution_id, 0, outputs, terminal)
494 .await;
495 }
496 }
497
498 for node in &graph.nodes {
500 if !terminal.contains_key(&node.id) {
501 terminal.insert(node.id.clone(), NodeStatus::Skipped);
502 self.events.emit(ExecutionEvent::NodeSkipped {
503 execution_id: execution_id.to_string(),
504 node_id: node.id.clone(),
505 at: Utc::now(),
506 reason: "no incoming edge fired (upstream did not pass/fail to here)".into(),
507 iteration: 0,
508 });
509 }
510 }
511 }
512
513 async fn run_subflow_unit(
519 &self,
520 graph: &FlowGraph,
521 sf: &SubFlow,
522 execution_id: &str,
523 outputs: &mut HashMap<String, serde_json::Value>,
524 terminal: &mut HashMap<String, NodeStatus>,
525 ) {
526 let members: HashSet<&str> = sf.node_ids.iter().map(|s| s.as_str()).collect();
527
528 let cross_in: Vec<&FlowEdge> = graph
531 .edges
532 .iter()
533 .filter(|e| members.contains(e.target.as_str()) && !members.contains(e.source.as_str()))
534 .collect();
535 let memory = self.snapshot_memory();
536 let reachable = cross_in.is_empty()
537 || cross_in
538 .iter()
539 .any(|e| edge_active(e, terminal, outputs, &memory));
540 if !reachable {
541 for id in &sf.node_ids {
542 if !terminal.contains_key(id) {
543 terminal.insert(id.clone(), NodeStatus::Skipped);
544 self.events.emit(ExecutionEvent::NodeSkipped {
545 execution_id: execution_id.to_string(),
546 node_id: id.clone(),
547 at: Utc::now(),
548 reason: format!("sub-flow \"{}\" not reached", sf.label),
549 iteration: 0,
550 });
551 }
552 }
553 return;
554 }
555
556 let inner_graph = FlowGraph {
558 id: sf.id.clone(),
559 name: sf.label.clone(),
560 version: "1.0.0".into(),
561 description: None,
562 nodes: graph
563 .nodes
564 .iter()
565 .filter(|n| members.contains(n.id.as_str()))
566 .cloned()
567 .collect(),
568 edges: graph
569 .edges
570 .iter()
571 .filter(|e| {
572 members.contains(e.source.as_str()) && members.contains(e.target.as_str())
573 })
574 .cloned()
575 .collect(),
576 subflows: Vec::new(),
577 };
578 let inner_order = topo_sort(&inner_graph).unwrap_or_else(|_| sf.node_ids.clone());
579
580 let nodes_by_id: HashMap<&str, &FlowNode> =
581 graph.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
582 let max_attempts = 1 + sf.retry.unwrap_or(0);
583 for attempt in 0..max_attempts {
584 for nid in &inner_order {
585 if self.checkpoint(execution_id).await {
586 return;
587 }
588 if let Some(node) = nodes_by_id.get(nid.as_str()).copied() {
589 self.run_one_node(graph, node, execution_id, attempt, outputs, terminal)
590 .await;
591 }
592 }
593 let failed = sf
594 .node_ids
595 .iter()
596 .any(|id| matches!(terminal.get(id), Some(NodeStatus::Failed)));
597 if !failed || attempt + 1 >= max_attempts {
598 break;
599 }
600 for id in &sf.node_ids {
603 terminal.remove(id);
604 outputs.remove(id);
605 }
606 }
607 }
608
609 async fn run_iterative(
613 &self,
614 graph: &FlowGraph,
615 execution_id: &str,
616 outputs: &mut HashMap<String, serde_json::Value>,
617 terminal: &mut HashMap<String, NodeStatus>,
618 ) -> bool {
619 let nodes_by_id: HashMap<&str, &FlowNode> =
620 graph.nodes.iter().map(|n| (n.id.as_str(), n)).collect();
621
622 let mut queue: VecDeque<String> = graph
624 .nodes
625 .iter()
626 .filter(|n| !graph.edges.iter().any(|e| e.target == n.id))
627 .map(|n| n.id.clone())
628 .collect();
629
630 let mut visits: HashMap<String, u32> = HashMap::new();
631 let mut global_steps = 0u32;
632 let mut capped = false;
633
634 while let Some(node_id) = queue.pop_front() {
635 if self.checkpoint(execution_id).await {
638 queue.clear();
639 break;
640 }
641 if global_steps >= GLOBAL_STEP_CAP {
642 capped = true;
643 break;
644 }
645 let Some(node) = nodes_by_id.get(node_id.as_str()).copied() else {
646 continue;
647 };
648
649 let visit = visits.entry(node_id.clone()).or_insert(0);
650 if *visit >= PER_NODE_VISIT_CAP {
651 self.events.emit(ExecutionEvent::IterationCapped {
652 execution_id: execution_id.to_string(),
653 node_id: node_id.clone(),
654 at: Utc::now(),
655 visits: *visit,
656 });
657 capped = true;
658 continue;
659 }
660 let iteration = *visit;
661 *visit += 1;
662 global_steps += 1;
663
664 self.execute_node(graph, node, execution_id, iteration, outputs, terminal)
665 .await;
666
667 let memory = self.snapshot_memory();
669 for edge in graph.edges.iter().filter(|e| e.source == node_id) {
670 if edge_active(edge, terminal, outputs, &memory) {
671 queue.push_back(edge.target.clone());
672 }
673 }
674 }
675
676 for node in &graph.nodes {
679 if !terminal.contains_key(&node.id) {
680 terminal.insert(node.id.clone(), NodeStatus::Skipped);
681 self.events.emit(ExecutionEvent::NodeSkipped {
682 execution_id: execution_id.to_string(),
683 node_id: node.id.clone(),
684 at: Utc::now(),
685 reason: "no incoming edge fired (upstream did not pass/fail to here)".into(),
686 iteration: 0,
687 });
688 }
689 }
690
691 capped
692 }
693
694 async fn execute_node(
698 &self,
699 graph: &FlowGraph,
700 node: &FlowNode,
701 execution_id: &str,
702 iteration: u32,
703 outputs: &mut HashMap<String, serde_json::Value>,
704 terminal: &mut HashMap<String, NodeStatus>,
705 ) {
706 if self.confirm_destructive {
711 if let Some(action) = destructive_reason(node) {
712 if !self.control.is_cancelling() {
713 self.events.emit(ExecutionEvent::AwaitingConfirmation {
714 execution_id: execution_id.to_string(),
715 node_id: node.id.clone(),
716 action,
717 at: Utc::now(),
718 });
719 self.control.pause();
720 self.control.wait_while_paused().await;
721 }
722 if self.control.is_cancelling() {
723 terminal.insert(node.id.clone(), NodeStatus::Skipped);
724 self.events.emit(ExecutionEvent::NodeSkipped {
725 execution_id: execution_id.to_string(),
726 node_id: node.id.clone(),
727 at: Utc::now(),
728 reason: "destructive action not confirmed".into(),
729 iteration,
730 });
731 return;
732 }
733 }
734 }
735
736 self.events.emit(ExecutionEvent::NodeStarted {
737 execution_id: execution_id.to_string(),
738 node_id: node.id.clone(),
739 at: Utc::now(),
740 iteration,
741 });
742
743 let memory = self.snapshot_memory();
748 let fired_sources: Vec<String> = graph
749 .edges
750 .iter()
751 .filter(|e| e.target == node.id && edge_active(e, terminal, outputs, &memory))
752 .map(|e| e.source.clone())
753 .collect();
754 let mut resolved = node.clone();
755 resolved.data = interpolate_value(&resolved.data, outputs, &fired_sources, &memory);
756 let node = &resolved;
757
758 let outcome = match node.node_type.as_str() {
759 "ai" => self.run_ai_node(node, execution_id).await,
761 "agentic" => NodeOutcome::Skipped("agentic nodes are design-time only".into()),
764 _ => self.run_adapter_node(node, execution_id).await,
766 };
767
768 match outcome {
769 NodeOutcome::Succeeded(payload) => {
770 terminal.insert(node.id.clone(), NodeStatus::Succeeded);
771 self.store_set_variable(&payload);
772 outputs.insert(node.id.clone(), payload.clone());
773 self.events.emit(ExecutionEvent::NodeSucceeded {
774 execution_id: execution_id.to_string(),
775 node_id: node.id.clone(),
776 at: Utc::now(),
777 output: payload,
778 iteration,
779 });
780 }
781 NodeOutcome::Skipped(reason) => {
782 terminal.insert(node.id.clone(), NodeStatus::Skipped);
783 self.events.emit(ExecutionEvent::NodeSkipped {
784 execution_id: execution_id.to_string(),
785 node_id: node.id.clone(),
786 at: Utc::now(),
787 reason,
788 iteration,
789 });
790 }
791 NodeOutcome::Failed(error) => {
792 terminal.insert(node.id.clone(), NodeStatus::Failed);
793 self.events.emit(ExecutionEvent::NodeFailed {
794 execution_id: execution_id.to_string(),
795 node_id: node.id.clone(),
796 at: Utc::now(),
797 error,
798 iteration,
799 });
800 }
801 }
802 }
803
804 async fn run_ai_node(&self, node: &FlowNode, execution_id: &str) -> NodeOutcome {
808 let outcome = self.run_ai_node_once(node, execution_id).await;
809 if let NodeOutcome::Failed(err) = &outcome {
814 if let Some(fb) = fallback_node(node) {
815 let fb_provider = fb
816 .data
817 .get("provider")
818 .and_then(|v| v.as_str())
819 .unwrap_or("")
820 .to_string();
821 self.events.emit(ExecutionEvent::NodeLog {
822 execution_id: execution_id.to_string(),
823 node_id: node.id.clone(),
824 at: Utc::now(),
825 stream: LogStream::Stderr,
826 line: format!("AI provider failed ({err}); falling back to {fb_provider}"),
827 });
828 return self.run_ai_node_once(&fb, execution_id).await;
829 }
830 }
831 outcome
832 }
833
834 async fn run_ai_node_once(&self, node: &FlowNode, execution_id: &str) -> NodeOutcome {
837 let model_id = node
838 .data
839 .get("modelId")
840 .and_then(|v| v.as_str())
841 .unwrap_or_default()
842 .to_string();
843 if model_id.is_empty() {
844 return NodeOutcome::Failed(
845 "no model selected on ai node (set modelId to a loaded model)".into(),
846 );
847 }
848
849 let provider = node
850 .data
851 .get("provider")
852 .and_then(|v| v.as_str())
853 .unwrap_or("local");
854 if provider.is_empty() || provider == "local" {
855 self.run_local_llm_ai_node(node, &model_id, execution_id).await
856 } else {
857 self.run_cloud_ai_node(node, execution_id).await
858 }
859 }
860
861 async fn run_local_llm_ai_node(
874 &self,
875 node: &FlowNode,
876 model_id: &str,
877 execution_id: &str,
878 ) -> NodeOutcome {
879 if !self.allow_local_ai {
880 return NodeOutcome::Skipped(
881 "local AI disabled by policy (settings.allow_local_ai = false)".into(),
882 );
883 }
884 let provider = match self.cloud_providers.get("local") {
885 Some(p) => p,
886 None => {
887 return NodeOutcome::Skipped(
888 "local AI provider not registered in CloudAiRegistry".into(),
889 );
890 }
891 };
892 if self.local_ai_base_url.is_none() {
893 return NodeOutcome::Failed(
894 "local AI base URL not set; load a model from the Model Hub first".into(),
895 );
896 }
897
898 let input = match node
899 .data
900 .get("input")
901 .and_then(|v| v.as_str())
902 .filter(|s| !s.trim().is_empty())
903 {
904 Some(s) => s.to_string(),
905 None => return NodeOutcome::Failed("missing 'input' on ai node (local LLM)".into()),
906 };
907 let system = node
908 .data
909 .get("system")
910 .and_then(|v| v.as_str())
911 .filter(|s| !s.is_empty())
912 .map(str::to_owned);
913 let max_tokens = node
914 .data
915 .get("maxTokens")
916 .and_then(|v| v.as_u64())
917 .map(|v| v as u32);
918 let temperature = node
919 .data
920 .get("temperature")
921 .and_then(|v| v.as_f64())
922 .map(|v| v as f32);
923 let top_p = node
924 .data
925 .get("topP")
926 .and_then(|v| v.as_f64())
927 .map(|v| v as f32);
928 let top_k = node
929 .data
930 .get("topK")
931 .and_then(|v| v.as_u64())
932 .map(|v| v as u32);
933 let stop = node.data.get("stop").and_then(|v| v.as_array()).map(|arr| {
934 arr.iter()
935 .filter_map(|x| x.as_str().map(str::to_owned))
936 .collect::<Vec<_>>()
937 });
938
939 let api_key = match self
943 .credentials
944 .resolve_cloud_ai_key("local", provider.env_var())
945 {
946 Ok(k) => k,
947 Err(CredentialError::NotFound { .. }) => String::new(),
948 Err(e) => return NodeOutcome::Failed(format!("credential lookup failed: {e}")),
949 };
950
951 let contract = match AiNodeContract::from_node_data(&node.data) {
954 Ok(c) => c,
955 Err(e) => return NodeOutcome::Failed(e.to_string()),
956 };
957
958 let sanitized = self.sanitizer.sanitize(&input);
959 self.events.emit(ExecutionEvent::AiInvocation {
960 execution_id: execution_id.to_string(),
961 node_id: node.id.clone(),
962 at: Utc::now(),
963 provider: "local".into(),
964 model: model_id.to_string(),
965 input: sanitized.text.clone(),
966 contract_version: contract.as_ref().map(|c| c.contract_version.clone()),
967 });
968
969 let images = match crate::ai_tools::resolve_node_images(node) {
970 Ok(v) => v,
971 Err(e) => return NodeOutcome::Failed(e),
972 };
973 let workspace = node
974 .data
975 .get("workspace")
976 .and_then(|v| v.as_str())
977 .filter(|s| !s.trim().is_empty())
978 .map(str::to_owned);
979
980 let req = CloudAiRequest {
981 model: model_id.to_string(),
982 prompt: sanitized.text,
983 max_tokens,
984 temperature,
985 api_key,
986 system,
987 base_url: self.local_ai_base_url.clone(),
988 top_p,
989 top_k,
990 stop,
991 stream: true,
992 call_id: Some(format!("ai-node-{}", Uuid::new_v4())),
993 reasoning: crate::ai_tools::ai_reasoning(node),
994 images,
995 tools: Vec::new(),
996 response_schema: None,
997 };
998
999 self.dispatch_ai_node(
1000 &provider,
1001 req,
1002 &sanitized.map,
1003 &crate::ai_tools::ai_task(node),
1004 &crate::ai_tools::ai_labels(node),
1005 &crate::ai_tools::ai_tool_adapters(node),
1006 workspace,
1007 true,
1008 "",
1009 execution_id,
1010 &node.id,
1011 crate::ai_tools::ai_max_tool_iters(node),
1012 crate::ai_tools::ai_output_schema(node),
1013 crate::ai_tools::ai_expect(node),
1014 contract.as_ref(),
1015 )
1016 .await
1017 }
1018
1019 #[allow(clippy::too_many_arguments)]
1028 async fn dispatch_ai_node(
1029 &self,
1030 provider: &Arc<dyn flow_adapter_ai::CloudAiProvider>,
1031 mut req: CloudAiRequest,
1032 rehydrate: &flow_security::RehydrationMap,
1033 task: &str,
1034 labels: &[String],
1035 tool_adapters: &[String],
1036 workspace: Option<String>,
1037 audit: bool,
1038 original_prompt: &str,
1039 execution_id: &str,
1040 node_id: &str,
1041 max_tool_iters: usize,
1042 schema: Option<String>,
1043 expect: Option<String>,
1044 contract: Option<&AiNodeContract>,
1045 ) -> NodeOutcome {
1046 if let Some(contract) = contract {
1050 match task {
1051 "embedding" | "classify" | "structured" => {
1052 return NodeOutcome::Failed(format!(
1053 "contract-bound output is not supported for the `{task}` task \
1054 (remove `contract: true` or use the generate task)"
1055 ));
1056 }
1057 _ => {
1058 return self
1059 .run_contract_bound(
1060 provider,
1061 req,
1062 rehydrate,
1063 tool_adapters,
1064 workspace,
1065 execution_id,
1066 node_id,
1067 max_tool_iters,
1068 contract,
1069 )
1070 .await;
1071 }
1072 }
1073 }
1074 match task {
1075 "embedding" => {
1076 req.stream = false;
1077 match provider.embed(&req).await {
1078 Ok(e) => NodeOutcome::Succeeded(serde_json::json!({
1079 "provider": e.provider,
1080 "model": e.model,
1081 "embedding": e.embedding,
1082 "dims": e.dims,
1083 "latency_ms": e.latency_ms,
1084 })),
1085 Err(err) => NodeOutcome::Failed(err.to_string()),
1086 }
1087 }
1088 "classify" => {
1089 if labels.is_empty() {
1090 return NodeOutcome::Failed("classify task needs at least one label".into());
1091 }
1092 req.stream = false;
1093 let constraint = format!(
1094 "You are a classifier. Read the input and reply with EXACTLY one of these labels and nothing else: {}.",
1095 labels.join(", ")
1096 );
1097 req.system = Some(match req.system {
1098 Some(s) if !s.is_empty() => format!("{constraint}\n\n{s}"),
1099 _ => constraint,
1100 });
1101 match provider.invoke(&req).await {
1102 Ok(resp) => {
1103 let raw = rehydrate.rehydrate(&resp.text);
1104 let label = crate::ai_tools::match_label(&raw, labels);
1105 NodeOutcome::Succeeded(serde_json::json!({
1106 "provider": resp.provider,
1107 "model": resp.model,
1108 "label": label,
1109 "raw": raw,
1110 "labels": labels,
1111 "input_tokens": resp.input_tokens,
1112 "output_tokens": resp.output_tokens,
1113 }))
1114 }
1115 Err(err) => NodeOutcome::Failed(err.to_string()),
1116 }
1117 }
1118 "structured" => {
1119 let Some(schema) = schema.filter(|s| !s.trim().is_empty()) else {
1120 return NodeOutcome::Failed(
1121 "structured task needs a non-empty `outputSchema`".into(),
1122 );
1123 };
1124 req.stream = false;
1125 req.response_schema = serde_json::from_str(&schema).ok();
1130 let constraint = format!(
1131 "Reply with ONLY a single JSON object that conforms to this JSON Schema. \
1132 No prose, no explanation, no markdown code fences:\n{schema}"
1133 );
1134 req.system = Some(match req.system {
1135 Some(s) if !s.is_empty() => format!("{constraint}\n\n{s}"),
1136 _ => constraint,
1137 });
1138 match provider.invoke(&req).await {
1139 Ok(resp) => {
1140 let raw = rehydrate.rehydrate(&resp.text);
1141 let mut out = serde_json::json!({
1142 "provider": resp.provider,
1143 "model": resp.model,
1144 "raw": raw,
1145 "input_tokens": resp.input_tokens,
1146 "output_tokens": resp.output_tokens,
1147 });
1148 match crate::ai_tools::parse_json_lenient(&raw) {
1149 Some(value) => {
1150 if let (Some(obj), Some(fields)) =
1155 (out.as_object_mut(), value.as_object())
1156 {
1157 for (k, v) in fields {
1158 obj.entry(k.clone()).or_insert_with(|| v.clone());
1159 }
1160 }
1161 out["output"] = value;
1162 }
1163 None => {
1164 out["parseError"] =
1165 serde_json::json!("model did not return valid JSON");
1166 }
1167 }
1168 NodeOutcome::Succeeded(out)
1169 }
1170 Err(err) => NodeOutcome::Failed(err.to_string()),
1171 }
1172 }
1173 _ => {
1174 if !tool_adapters.is_empty() {
1175 let specs = crate::ai_tools::build_tool_specs(&self.adapters, tool_adapters);
1176 if specs.is_empty() {
1177 return NodeOutcome::Failed(
1178 "tool use requested but the bound adapters expose no tools".into(),
1179 );
1180 }
1181 req.stream = false;
1182 req.tools = specs.clone();
1183 let observer = crate::ai_tools::ToolObserver::new(
1187 self.events.clone(),
1188 execution_id.to_string(),
1189 node_id.to_string(),
1190 );
1191 let result = if let Some(staging) = self.edit_staging.clone() {
1194 let dispatcher = crate::ai_tools::StagingToolDispatcher::new(
1195 self.adapters.clone(),
1196 workspace,
1197 staging,
1198 )
1199 .with_observer(observer);
1200 provider
1201 .invoke_tools(&req, &specs, &dispatcher, max_tool_iters)
1202 .await
1203 } else {
1204 let dispatcher = crate::ai_tools::AdapterToolDispatcher::new(
1205 self.adapters.clone(),
1206 workspace,
1207 )
1208 .with_observer(observer);
1209 provider
1210 .invoke_tools(&req, &specs, &dispatcher, max_tool_iters)
1211 .await
1212 };
1213 match result {
1214 Ok(resp) => {
1215 if let Some(exp) = expect.as_deref() {
1220 if matches!(
1221 evaluate_expectation(provider, &req, &resp.text, exp).await,
1222 Some(false)
1223 ) {
1224 return NodeOutcome::Failed(format!(
1225 "node output did not meet expectation: {exp}"
1226 ));
1227 }
1228 }
1229 NodeOutcome::Succeeded(ai_generate_envelope(
1230 &resp,
1231 rehydrate,
1232 audit,
1233 original_prompt,
1234 ))
1235 }
1236 Err(err) => NodeOutcome::Failed(err.to_string()),
1237 }
1238 } else {
1239 req.stream = true;
1240 match provider.invoke_stream(&req, self.stream_sink.as_ref()).await {
1241 Ok(resp) => {
1242 if let Some(exp) = expect.as_deref() {
1247 if matches!(
1248 evaluate_expectation(provider, &req, &resp.text, exp).await,
1249 Some(false)
1250 ) {
1251 return NodeOutcome::Failed(format!(
1252 "node output did not meet expectation: {exp}"
1253 ));
1254 }
1255 }
1256 NodeOutcome::Succeeded(ai_generate_envelope(
1257 &resp,
1258 rehydrate,
1259 audit,
1260 original_prompt,
1261 ))
1262 }
1263 Err(err) => NodeOutcome::Failed(err.to_string()),
1264 }
1265 }
1266 }
1267 }
1268 }
1269
1270 #[allow(clippy::too_many_arguments)]
1278 async fn run_contract_bound(
1279 &self,
1280 provider: &Arc<dyn flow_adapter_ai::CloudAiProvider>,
1281 mut req: CloudAiRequest,
1282 rehydrate: &flow_security::RehydrationMap,
1283 tool_adapters: &[String],
1284 workspace: Option<String>,
1285 execution_id: &str,
1286 node_id: &str,
1287 max_tool_iters: usize,
1288 contract: &AiNodeContract,
1289 ) -> NodeOutcome {
1290 let schema = AiOutputEnvelope::json_schema();
1291 let constraint = format!(
1292 "Reply with ONLY a single JSON object conforming to this JSON Schema. \
1293 No prose, no markdown code fences:\n{schema}\n\
1294 `primary_output` is your full answer. `confidence` is your confidence in it, \
1295 0.0 to 1.0. `confidence_type` is \"{}\". Set `escalate` to true only if a \
1296 human should review this output regardless of confidence. Put any \
1297 explanation for a human reviewer in `reasoning`.",
1298 contract
1299 .confidence_type
1300 .unwrap_or(flow_domain::contract::ConfidenceType::Verbalized)
1301 .as_str()
1302 );
1303 req.system = Some(match req.system.take() {
1304 Some(s) if !s.is_empty() => format!("{constraint}\n\n{s}"),
1305 _ => constraint,
1306 });
1307 req.stream = false;
1308
1309 let result = if tool_adapters.is_empty() {
1310 req.response_schema = Some(schema);
1311 budgeted(contract.max_inference_ms, provider.invoke(&req)).await
1312 } else {
1313 let specs = crate::ai_tools::build_tool_specs(&self.adapters, tool_adapters);
1314 if specs.is_empty() {
1315 return NodeOutcome::Failed(
1316 "tool use requested but the bound adapters expose no tools".into(),
1317 );
1318 }
1319 req.tools = specs.clone();
1320 let observer = crate::ai_tools::ToolObserver::new(
1321 self.events.clone(),
1322 execution_id.to_string(),
1323 node_id.to_string(),
1324 );
1325 if let Some(staging) = self.edit_staging.clone() {
1326 let dispatcher = crate::ai_tools::StagingToolDispatcher::new(
1327 self.adapters.clone(),
1328 workspace,
1329 staging,
1330 )
1331 .with_observer(observer);
1332 budgeted(
1333 contract.max_inference_ms,
1334 provider.invoke_tools(&req, &specs, &dispatcher, max_tool_iters),
1335 )
1336 .await
1337 } else {
1338 let dispatcher =
1339 crate::ai_tools::AdapterToolDispatcher::new(self.adapters.clone(), workspace)
1340 .with_observer(observer);
1341 budgeted(
1342 contract.max_inference_ms,
1343 provider.invoke_tools(&req, &specs, &dispatcher, max_tool_iters),
1344 )
1345 .await
1346 }
1347 };
1348
1349 let resp = match result {
1350 Ok(r) => r,
1351 Err(e) => return NodeOutcome::Failed(e),
1352 };
1353
1354 let raw = rehydrate.rehydrate(&resp.text);
1355 let parsed = crate::ai_tools::parse_json_lenient(&raw)
1356 .ok_or_else(|| {
1357 flow_domain::contract::ContractError::SchemaViolation(
1358 "model did not return a JSON object".into(),
1359 )
1360 })
1361 .and_then(|v| AiOutputEnvelope::from_value(&v))
1362 .and_then(|env| match contract.confidence_type {
1363 Some(expected) if env.confidence_type != expected => {
1364 Err(flow_domain::contract::ContractError::SchemaViolation(format!(
1365 "confidence_type `{}` does not match the contract's `{}`",
1366 env.confidence_type.as_str(),
1367 expected.as_str()
1368 )))
1369 }
1370 _ => Ok(env),
1371 });
1372
1373 let envelope = match parsed {
1374 Ok(env) => env,
1375 Err(violation) => {
1376 self.events.emit(ExecutionEvent::AiRoutingDecision {
1377 execution_id: execution_id.to_string(),
1378 node_id: node_id.to_string(),
1379 at: Utc::now(),
1380 decision: "contract_violation".into(),
1381 confidence: None,
1382 threshold: "expected_output schema".into(),
1383 contract_version: contract.contract_version.clone(),
1384 });
1385 return NodeOutcome::Failed(format!("contract violation: {violation}"));
1386 }
1387 };
1388
1389 let thresholds = &contract.thresholds;
1390 let decision = thresholds.route(envelope.confidence, envelope.escalate);
1391 let threshold_desc = match decision {
1392 RouteDecision::AutoApprove => format!(
1393 "confidence {} > autoApproveAbove {}",
1394 envelope.confidence, thresholds.auto_approve_above
1395 ),
1396 RouteDecision::Suppress => format!(
1397 "confidence {} < suppressBelow {}",
1398 envelope.confidence, thresholds.suppress_below
1399 ),
1400 RouteDecision::HumanReview if envelope.escalate => "model set escalate".into(),
1401 RouteDecision::HumanReview => format!(
1402 "confidence {} within review band [{}, {}]",
1403 envelope.confidence, thresholds.human_review_band.0, thresholds.human_review_band.1
1404 ),
1405 };
1406 self.events.emit(ExecutionEvent::AiRoutingDecision {
1407 execution_id: execution_id.to_string(),
1408 node_id: node_id.to_string(),
1409 at: Utc::now(),
1410 decision: decision.as_str().into(),
1411 confidence: Some(envelope.confidence),
1412 threshold: threshold_desc,
1413 contract_version: contract.contract_version.clone(),
1414 });
1415
1416 match decision {
1417 RouteDecision::AutoApprove => {
1418 NodeOutcome::Succeeded(contract_payload(&envelope, &resp, contract, decision))
1419 }
1420 RouteDecision::Suppress => NodeOutcome::Failed(format!(
1421 "output suppressed: confidence {:.2} below suppressBelow {:.2}",
1422 envelope.confidence, thresholds.suppress_below
1423 )),
1424 RouteDecision::HumanReview => {
1425 self.await_review(&envelope, &resp, contract, execution_id, node_id)
1426 .await
1427 }
1428 }
1429 }
1430
1431 async fn await_review(
1437 &self,
1438 envelope: &AiOutputEnvelope,
1439 resp: &flow_adapter_ai::CloudAiResponse,
1440 contract: &AiNodeContract,
1441 execution_id: &str,
1442 node_id: &str,
1443 ) -> NodeOutcome {
1444 if self.control.is_cancelling() {
1445 return NodeOutcome::Skipped("run cancelled before the review gate".into());
1446 }
1447 if !self.review_gate_available {
1448 return NodeOutcome::Failed(format!(
1449 "output requires human review (confidence {:.2}) but no review gate is \
1450 attached to this run; routing to the fallback path",
1451 envelope.confidence
1452 ));
1453 }
1454 let _ = self.control.take_review();
1456 self.events.emit(ExecutionEvent::AiReviewRequired {
1457 execution_id: execution_id.to_string(),
1458 node_id: node_id.to_string(),
1459 at: Utc::now(),
1460 primary_output: envelope.primary_output.clone(),
1461 confidence: envelope.confidence,
1462 reasoning: envelope.reasoning.clone(),
1463 });
1464 self.control.pause();
1465 self.control.wait_while_paused().await;
1466 if self.control.is_cancelling() {
1467 return NodeOutcome::Skipped("ai output not reviewed (run cancelled)".into());
1468 }
1469 let approved = !matches!(self.control.take_review(), Some(ReviewDecision::Rejected));
1470 self.events.emit(ExecutionEvent::AiReviewResolved {
1471 execution_id: execution_id.to_string(),
1472 node_id: node_id.to_string(),
1473 at: Utc::now(),
1474 approved,
1475 });
1476 if approved {
1477 NodeOutcome::Succeeded(contract_payload(
1478 envelope,
1479 resp,
1480 contract,
1481 RouteDecision::HumanReview,
1482 ))
1483 } else {
1484 NodeOutcome::Failed(format!(
1485 "output rejected at human review gate (confidence {:.2})",
1486 envelope.confidence
1487 ))
1488 }
1489 }
1490
1491 async fn run_cloud_ai_node(&self, node: &FlowNode, execution_id: &str) -> NodeOutcome {
1494 let provider_name = match node.data.get("provider").and_then(|v| v.as_str()) {
1495 Some(p) => p.to_string(),
1496 None => return NodeOutcome::Failed("missing 'provider' on ai node".into()),
1497 };
1498
1499 let is_local = provider_name == "local";
1503 if is_local {
1504 if !self.allow_local_ai {
1505 return NodeOutcome::Skipped(
1506 "local AI disabled by policy (settings.allow_local_ai = false)".into(),
1507 );
1508 }
1509 } else if !self.allow_cloud_ai {
1510 return NodeOutcome::Skipped(
1511 "cloud AI disabled by policy (settings.allow_cloud_ai = false)".into(),
1512 );
1513 }
1514
1515 let model = match node.data.get("modelId").and_then(|v| v.as_str()) {
1516 Some(m) => m.to_string(),
1517 None => return NodeOutcome::Failed("missing 'modelId' on ai node".into()),
1518 };
1519 let prompt = match node
1522 .data
1523 .get("prompt")
1524 .and_then(|v| v.as_str())
1525 .or_else(|| node.data.get("input").and_then(|v| v.as_str()))
1526 {
1527 Some(p) => p.to_string(),
1528 None => {
1529 return NodeOutcome::Failed("missing 'input' on ai node (cloud provider)".into())
1530 }
1531 };
1532 let max_tokens = node
1533 .data
1534 .get("maxTokens")
1535 .and_then(|v| v.as_u64())
1536 .map(|v| v as u32);
1537 let temperature = node
1538 .data
1539 .get("temperature")
1540 .and_then(|v| v.as_f64())
1541 .map(|v| v as f32);
1542 let audit_content = node
1543 .data
1544 .get("auditContent")
1545 .and_then(|v| v.as_bool())
1546 .unwrap_or(false);
1547
1548 let provider = match self.cloud_providers.get(&provider_name) {
1549 Some(p) => p,
1550 None => {
1551 return NodeOutcome::Skipped(format!(
1552 "cloud provider '{provider_name}' not enabled"
1553 ));
1554 }
1555 };
1556
1557 let env_var = provider.env_var();
1558 let api_key = match self
1559 .credentials
1560 .resolve_cloud_ai_key(&provider_name, env_var)
1561 {
1562 Ok(k) => k,
1563 Err(CredentialError::NotFound { .. }) if is_local => String::new(),
1567 Err(CredentialError::NotFound { .. }) => {
1568 return NodeOutcome::Failed(format!(
1569 "no API key for provider '{provider_name}'. Set it in Settings, or export {env_var} in your environment."
1570 ));
1571 }
1572 Err(e) => return NodeOutcome::Failed(format!("credential lookup failed: {e}")),
1573 };
1574
1575 let contract = match AiNodeContract::from_node_data(&node.data) {
1576 Ok(c) => c,
1577 Err(e) => return NodeOutcome::Failed(e.to_string()),
1578 };
1579
1580 let sanitized = self.sanitizer.sanitize(&prompt);
1581 let recorded_input = if audit_content {
1584 sanitized.text.clone()
1585 } else {
1586 sanitized.text.chars().take(200).collect()
1587 };
1588 self.events.emit(ExecutionEvent::AiInvocation {
1589 execution_id: execution_id.to_string(),
1590 node_id: node.id.clone(),
1591 at: Utc::now(),
1592 provider: provider_name.clone(),
1593 model: model.clone(),
1594 input: recorded_input,
1595 contract_version: contract.as_ref().map(|c| c.contract_version.clone()),
1596 });
1597
1598 let system = node
1599 .data
1600 .get("system")
1601 .and_then(|v| v.as_str())
1602 .filter(|s| !s.is_empty())
1603 .map(str::to_owned);
1604 let images = match crate::ai_tools::resolve_node_images(node) {
1605 Ok(v) => v,
1606 Err(e) => return NodeOutcome::Failed(e),
1607 };
1608 let workspace = node
1609 .data
1610 .get("workspace")
1611 .and_then(|v| v.as_str())
1612 .filter(|s| !s.trim().is_empty())
1613 .map(str::to_owned);
1614
1615 let req = CloudAiRequest {
1616 model: model.clone(),
1617 prompt: sanitized.text,
1618 max_tokens,
1619 temperature,
1620 api_key,
1621 system,
1622 base_url: if is_local {
1625 self.local_ai_base_url.clone()
1626 } else {
1627 None
1628 },
1629 stream: true,
1630 call_id: Some(format!("cloud-ai-{}", Uuid::new_v4())),
1631 reasoning: crate::ai_tools::ai_reasoning(node),
1632 images,
1633 ..Default::default()
1634 };
1635
1636 self.dispatch_ai_node(
1639 &provider,
1640 req,
1641 &sanitized.map,
1642 &crate::ai_tools::ai_task(node),
1643 &crate::ai_tools::ai_labels(node),
1644 &crate::ai_tools::ai_tool_adapters(node),
1645 workspace,
1646 audit_content,
1647 &prompt,
1648 execution_id,
1649 &node.id,
1650 crate::ai_tools::ai_max_tool_iters(node),
1651 crate::ai_tools::ai_output_schema(node),
1652 crate::ai_tools::ai_expect(node),
1653 contract.as_ref(),
1654 )
1655 .await
1656 }
1657
1658 async fn run_adapter_node(&self, node: &FlowNode, execution_id: &str) -> NodeOutcome {
1659 let adapter_name = node
1660 .data
1661 .get("adapter")
1662 .and_then(|v| v.as_str())
1663 .unwrap_or("mock");
1664
1665 let Some(adapter) = self.adapters.get(adapter_name) else {
1666 return NodeOutcome::Failed(format!("adapter '{adapter_name}' not registered"));
1667 };
1668
1669 let ctx = crate::adapter::AdapterCtx {
1670 events: self.events.clone(),
1671 execution_id: execution_id.to_string(),
1672 node_id: node.id.clone(),
1673 workspace_root: self.workspace_root.clone(),
1674 };
1675
1676 match adapter.execute_with_events(node, &ctx).await {
1677 Ok(out) => {
1678 NodeOutcome::Succeeded(serde_json::to_value(out).unwrap_or(serde_json::Value::Null))
1679 }
1680 Err(e) => NodeOutcome::Failed(e.to_string()),
1681 }
1682 }
1683}
1684
1685#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
1695#[serde(rename_all = "camelCase")]
1696pub struct FlowWarning {
1697 pub node_id: String,
1698 pub severity: String,
1699 pub message: String,
1700}
1701
1702pub fn verify_graph(graph: &FlowGraph) -> Vec<FlowWarning> {
1707 let mut out = Vec::new();
1708 for node in &graph.nodes {
1709 if let Some(message) = destructive_reason(node) {
1710 out.push(FlowWarning {
1711 node_id: node.id.clone(),
1712 severity: "destructive".into(),
1713 message,
1714 });
1715 }
1716 if let Some(message) = sandbox_escape_reason(node) {
1717 out.push(FlowWarning {
1718 node_id: node.id.clone(),
1719 severity: "sandbox-escape".into(),
1720 message,
1721 });
1722 }
1723 }
1724 out
1725}
1726
1727fn sandbox_escape_reason(node: &FlowNode) -> Option<String> {
1732 let data = &node.data;
1733 for key in ["path", "cwd", "workspaceRoot", "command", "args"] {
1734 let candidate = match data.get(key) {
1735 Some(serde_json::Value::String(s)) => s.clone(),
1736 Some(serde_json::Value::Array(arr)) => arr
1737 .iter()
1738 .filter_map(|v| v.as_str())
1739 .collect::<Vec<_>>()
1740 .join(" "),
1741 _ => continue,
1742 };
1743 if candidate
1744 .split(|c: char| c == '/' || c == '\\' || c == ' ')
1745 .any(|seg| seg == "..")
1746 {
1747 let preview: String = candidate.chars().take(120).collect();
1748 return Some(format!(
1749 "Path may escape the workspace (contains `..`): {preview}"
1750 ));
1751 }
1752 }
1753 None
1754}
1755
1756fn fallback_node(node: &FlowNode) -> Option<FlowNode> {
1761 let fb_provider = node
1762 .data
1763 .get("fallbackProvider")
1764 .and_then(|v| v.as_str())
1765 .map(str::trim)
1766 .filter(|s| !s.is_empty())?
1767 .to_string();
1768 let mut fb = node.clone();
1769 if let Some(obj) = fb.data.as_object_mut() {
1770 obj.insert("provider".into(), serde_json::Value::String(fb_provider));
1771 if let Some(m) = node
1772 .data
1773 .get("fallbackModelId")
1774 .and_then(|v| v.as_str())
1775 .map(str::trim)
1776 .filter(|s| !s.is_empty())
1777 {
1778 obj.insert("modelId".into(), serde_json::Value::String(m.to_string()));
1779 }
1780 obj.remove("fallbackProvider");
1781 obj.remove("fallbackModelId");
1782 }
1783 Some(fb)
1784}
1785
1786pub fn destructive_reason(node: &FlowNode) -> Option<String> {
1787 let data = &node.data;
1788 let adapter = data.get("adapter").and_then(|v| v.as_str()).unwrap_or("");
1789 let action = data.get("actionId").and_then(|v| v.as_str()).unwrap_or("");
1790
1791 if adapter == "fs" && action == "delete-file" {
1793 let path = data
1794 .get("path")
1795 .and_then(|v| v.as_str())
1796 .filter(|s| !s.trim().is_empty())
1797 .unwrap_or("a file");
1798 return Some(format!("Delete file: {path}"));
1799 }
1800
1801 if adapter == "shell" || adapter == "cli" {
1805 let mut parts: Vec<String> = Vec::new();
1806 if adapter == "shell" && action != "run-command" && !action.is_empty() {
1810 parts.push(action.to_string());
1811 }
1812 for key in ["bin", "command"] {
1813 if let Some(s) = data.get(key).and_then(|v| v.as_str()) {
1814 parts.push(s.to_string());
1815 }
1816 }
1817 match data.get("args") {
1818 Some(serde_json::Value::String(s)) => parts.push(s.clone()),
1819 Some(serde_json::Value::Array(arr)) => {
1820 for a in arr {
1821 if let Some(s) = a.as_str() {
1822 parts.push(s.to_string());
1823 }
1824 }
1825 }
1826 _ => {}
1827 }
1828 let cmd = parts.join(" ");
1829 if let Some(label) = destructive_command(&cmd) {
1830 let preview: String = cmd.chars().take(120).collect();
1831 return Some(format!("Runs a destructive command ({label}): {preview}"));
1832 }
1833 }
1834
1835 None
1836}
1837
1838fn destructive_command(cmd: &str) -> Option<&'static str> {
1841 let lower = cmd.to_lowercase();
1842 let tokens: Vec<&str> = lower.split_whitespace().collect();
1843 let has = |w: &str| tokens.iter().any(|t| *t == w);
1844
1845 if has("rm") || has("rmdir") {
1846 return Some("rm");
1847 }
1848 if has("mkfs") || tokens.iter().any(|t| t.starts_with("mkfs.")) {
1849 return Some("mkfs");
1850 }
1851 if has("shutdown") {
1852 return Some("shutdown");
1853 }
1854 if has("reboot") {
1855 return Some("reboot");
1856 }
1857 if has("truncate") {
1858 return Some("truncate");
1859 }
1860 if has("dd") && tokens.iter().any(|t| t.starts_with("if=")) {
1861 return Some("dd");
1862 }
1863 let after = |tool: &str, sub: &str| -> bool {
1865 tokens
1866 .iter()
1867 .position(|t| *t == tool)
1868 .map(|i| tokens[i + 1..].iter().any(|t| *t == sub))
1869 .unwrap_or(false)
1870 };
1871 if after("git", "push") {
1872 return Some("git push");
1873 }
1874 if after("git", "clean") {
1875 return Some("git clean");
1876 }
1877 if has("git") && has("reset") && has("--hard") {
1878 return Some("git reset --hard");
1879 }
1880 if after("kubectl", "delete") {
1881 return Some("kubectl delete");
1882 }
1883 if has("drop") && (has("table") || has("database")) {
1884 return Some("drop");
1885 }
1886 None
1887}
1888
1889async fn evaluate_expectation(
1899 provider: &Arc<dyn flow_adapter_ai::CloudAiProvider>,
1900 base: &CloudAiRequest,
1901 output: &str,
1902 expect: &str,
1903) -> Option<bool> {
1904 let preview: String = output.chars().take(4000).collect();
1905 let eval_req = CloudAiRequest {
1906 model: base.model.clone(),
1907 prompt: format!("Output:\n{preview}\n\nExpectation: {expect}"),
1908 api_key: base.api_key.clone(),
1909 base_url: base.base_url.clone(),
1910 system: Some(
1911 "You are a strict evaluator. Decide whether the Output satisfies the \
1912 Expectation. Reply with EXACTLY one word: PASS or FAIL."
1913 .into(),
1914 ),
1915 max_tokens: Some(8),
1916 ..Default::default()
1917 };
1918 let resp = provider.invoke(&eval_req).await.ok()?;
1919 Some(!resp.text.trim().to_uppercase().contains("FAIL"))
1920}
1921
1922fn ai_generate_envelope(
1923 resp: &flow_adapter_ai::CloudAiResponse,
1924 rehydrate: &flow_security::RehydrationMap,
1925 audit: bool,
1926 original_prompt: &str,
1927) -> serde_json::Value {
1928 let text = rehydrate.rehydrate(&resp.text);
1929 let mut payload = serde_json::json!({
1930 "provider": resp.provider,
1931 "model": resp.model,
1932 "finish_reason": resp.finish_reason,
1933 "input_tokens": resp.input_tokens,
1934 "output_tokens": resp.output_tokens,
1935 "latency_ms": resp.latency_ms,
1936 });
1937 if audit {
1938 payload["text"] = serde_json::Value::String(text);
1939 if !original_prompt.is_empty() {
1940 payload["prompt"] = serde_json::Value::String(original_prompt.to_string());
1941 }
1942 } else {
1943 let preview: String = text.chars().take(200).collect();
1944 payload["text_preview"] = serde_json::Value::String(preview);
1945 }
1946 payload
1947}
1948
1949async fn budgeted<F>(budget_ms: Option<u64>, fut: F) -> Result<flow_adapter_ai::CloudAiResponse, String>
1953where
1954 F: std::future::Future<
1955 Output = Result<flow_adapter_ai::CloudAiResponse, flow_adapter_ai::CloudAiError>,
1956 >,
1957{
1958 match budget_ms {
1959 Some(ms) => match tokio::time::timeout(std::time::Duration::from_millis(ms), fut).await {
1960 Ok(r) => r.map_err(|e| e.to_string()),
1961 Err(_) => Err(format!("inference exceeded the contract budget ({ms} ms)")),
1962 },
1963 None => fut.await.map_err(|e| e.to_string()),
1964 }
1965}
1966
1967fn contract_payload(
1971 envelope: &AiOutputEnvelope,
1972 resp: &flow_adapter_ai::CloudAiResponse,
1973 contract: &AiNodeContract,
1974 route: RouteDecision,
1975) -> serde_json::Value {
1976 let mut payload = serde_json::json!({
1977 "provider": resp.provider,
1978 "model": resp.model,
1979 "input_tokens": resp.input_tokens,
1980 "output_tokens": resp.output_tokens,
1981 "latency_ms": resp.latency_ms,
1982 "contract_version": contract.contract_version,
1983 "route": route.as_str(),
1984 "primary_output": envelope.primary_output,
1985 "confidence": envelope.confidence,
1986 "confidence_type": envelope.confidence_type.as_str(),
1987 "escalate": envelope.escalate,
1988 });
1989 if let Some(reasoning) = &envelope.reasoning {
1990 payload["reasoning"] = serde_json::Value::String(reasoning.clone());
1991 }
1992 payload
1993}
1994
1995enum NodeOutcome {
1996 Succeeded(serde_json::Value),
1997 Skipped(String),
1998 Failed(String),
1999}
2000
2001#[derive(Debug, Clone, Copy)]
2002enum NodeStatus {
2003 Succeeded,
2004 Failed,
2005 Skipped,
2006}
2007
2008fn interpolate_value(
2014 value: &serde_json::Value,
2015 outputs: &HashMap<String, serde_json::Value>,
2016 upstream: &[String],
2017 memory: &HashMap<String, serde_json::Value>,
2018) -> serde_json::Value {
2019 match value {
2020 serde_json::Value::String(s) => {
2021 serde_json::Value::String(interpolate_str(s, outputs, upstream, memory))
2022 }
2023 serde_json::Value::Array(items) => serde_json::Value::Array(
2024 items
2025 .iter()
2026 .map(|v| interpolate_value(v, outputs, upstream, memory))
2027 .collect(),
2028 ),
2029 serde_json::Value::Object(map) => serde_json::Value::Object(
2030 map.iter()
2031 .map(|(k, v)| (k.clone(), interpolate_value(v, outputs, upstream, memory)))
2032 .collect(),
2033 ),
2034 other => other.clone(),
2035 }
2036}
2037
2038fn interpolate_str(
2042 s: &str,
2043 outputs: &HashMap<String, serde_json::Value>,
2044 upstream: &[String],
2045 memory: &HashMap<String, serde_json::Value>,
2046) -> String {
2047 if !s.contains("{{") {
2048 return s.to_string();
2049 }
2050 let mut out = String::with_capacity(s.len());
2051 let mut rest = s;
2052 while let Some(open) = rest.find("{{") {
2053 out.push_str(&rest[..open]);
2054 let after = &rest[open + 2..];
2055 match after.find("}}") {
2056 Some(close) => {
2057 let token = after[..close].trim();
2058 out.push_str(&resolve_token(token, outputs, upstream, memory));
2059 rest = &after[close + 2..];
2060 }
2061 None => {
2063 out.push_str("{{");
2064 rest = after;
2065 break;
2066 }
2067 }
2068 }
2069 out.push_str(rest);
2070 out
2071}
2072
2073fn resolve_token(
2077 token: &str,
2078 outputs: &HashMap<String, serde_json::Value>,
2079 upstream: &[String],
2080 memory: &HashMap<String, serde_json::Value>,
2081) -> String {
2082 if token == "input" {
2083 return upstream
2084 .iter()
2085 .filter_map(|id| outputs.get(id))
2086 .map(primary_text)
2087 .collect::<Vec<_>>()
2088 .join("\n\n");
2089 }
2090 if let Some(rest) = token.strip_prefix("memory.") {
2091 let (key, path) = match rest.split_once('.') {
2092 Some((key, path)) => (key, Some(path)),
2093 None => (rest, None),
2094 };
2095 let Some(value) = memory.get(key) else {
2096 return String::new();
2097 };
2098 return match path {
2099 Some(path) => value_at(value, path).map(primary_text).unwrap_or_default(),
2100 None => primary_text(value),
2101 };
2102 }
2103 let (id, path) = match token.split_once('.') {
2104 Some((id, path)) => (id, Some(path)),
2105 None => (token, None),
2106 };
2107 let Some(value) = outputs.get(id) else {
2108 return String::new();
2109 };
2110 match path {
2111 Some(path) => value_at(value, path).map(primary_text).unwrap_or_default(),
2112 None => primary_text(value),
2113 }
2114}
2115
2116fn value_at<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
2118 let mut cur = value;
2119 for segment in path.split('.') {
2120 cur = cur.get(segment)?;
2121 }
2122 Some(cur)
2123}
2124
2125fn primary_text(value: &serde_json::Value) -> String {
2131 match value {
2132 serde_json::Value::String(s) => s.clone(),
2133 serde_json::Value::Null => String::new(),
2134 serde_json::Value::Bool(b) => b.to_string(),
2135 serde_json::Value::Number(n) => n.to_string(),
2136 serde_json::Value::Object(map) => {
2137 if let Some(payload) = map.get("payload") {
2140 if !payload.is_null() {
2141 return primary_text(payload);
2142 }
2143 }
2144 for key in ["text", "stdout", "output", "result", "value"] {
2145 if let Some(serde_json::Value::String(s)) = map.get(key) {
2146 return s.clone();
2147 }
2148 }
2149 serde_json::to_string(value).unwrap_or_default()
2150 }
2151 serde_json::Value::Array(_) => serde_json::to_string(value).unwrap_or_default(),
2152 }
2153}
2154
2155fn edge_active(
2161 edge: &FlowEdge,
2162 terminal: &HashMap<String, NodeStatus>,
2163 outputs: &HashMap<String, serde_json::Value>,
2164 memory: &HashMap<String, serde_json::Value>,
2165) -> bool {
2166 let Some(status) = terminal.get(&edge.source) else {
2167 return false;
2168 };
2169 if !edge_fires(edge.outcome, *status) {
2170 return false;
2171 }
2172 match edge.condition.as_deref() {
2173 None => true,
2174 Some(expr) if expr.trim().is_empty() => true,
2175 Some(expr) => {
2176 let resolved =
2177 interpolate_str(expr, outputs, std::slice::from_ref(&edge.source), memory);
2178 crate::condition::eval(&resolved).unwrap_or(false)
2179 }
2180 }
2181}
2182
2183fn edge_fires(outcome: EdgeOutcome, status: NodeStatus) -> bool {
2184 matches!(
2185 (outcome, status),
2186 (EdgeOutcome::Always, _)
2187 | (EdgeOutcome::Pass, NodeStatus::Succeeded)
2188 | (EdgeOutcome::Fail, NodeStatus::Failed | NodeStatus::Skipped)
2189 )
2190}
2191
2192fn collapse_subflows(graph: &FlowGraph) -> FlowGraph {
2197 let mut member_to_sf: HashMap<&str, &str> = HashMap::new();
2198 for sf in &graph.subflows {
2199 for id in &sf.node_ids {
2200 member_to_sf.insert(id.as_str(), sf.id.as_str());
2201 }
2202 }
2203 let map = |id: &str| -> String {
2204 member_to_sf
2205 .get(id)
2206 .map(|s| s.to_string())
2207 .unwrap_or_else(|| id.to_string())
2208 };
2209
2210 let mut nodes: Vec<FlowNode> = graph
2211 .nodes
2212 .iter()
2213 .filter(|n| !member_to_sf.contains_key(n.id.as_str()))
2214 .cloned()
2215 .collect();
2216 for sf in &graph.subflows {
2217 nodes.push(FlowNode {
2218 id: sf.id.clone(),
2219 node_type: "subflow".into(),
2220 position: Position { x: 0.0, y: 0.0 },
2221 data: serde_json::Value::Null,
2222 });
2223 }
2224
2225 let mut edges: Vec<FlowEdge> = Vec::new();
2226 for (i, e) in graph.edges.iter().enumerate() {
2227 let s = map(&e.source);
2228 let t = map(&e.target);
2229 if s == t {
2230 continue; }
2232 edges.push(FlowEdge {
2233 id: format!("outer-{i}"),
2234 source: s,
2235 target: t,
2236 label: None,
2237 condition: None,
2238 outcome: EdgeOutcome::Always,
2239 });
2240 }
2241
2242 FlowGraph {
2243 id: graph.id.clone(),
2244 name: graph.name.clone(),
2245 version: graph.version.clone(),
2246 description: None,
2247 nodes,
2248 edges,
2249 subflows: Vec::new(),
2250 }
2251}
2252
2253fn topo_sort(graph: &FlowGraph) -> Result<Vec<String>, ExecutorError> {
2254 let mut indegree: HashMap<&str, usize> =
2255 graph.nodes.iter().map(|n| (n.id.as_str(), 0)).collect();
2256 let mut adj: HashMap<&str, Vec<&str>> = HashMap::new();
2257
2258 for edge in &graph.edges {
2259 adj.entry(edge.source.as_str())
2260 .or_default()
2261 .push(edge.target.as_str());
2262 *indegree.entry(edge.target.as_str()).or_insert(0) += 1;
2263 }
2264
2265 let mut queue: VecDeque<&str> = indegree
2266 .iter()
2267 .filter(|(_, &d)| d == 0)
2268 .map(|(id, _)| *id)
2269 .collect();
2270
2271 let mut order = Vec::with_capacity(graph.nodes.len());
2272 let mut visited = HashSet::new();
2273
2274 while let Some(id) = queue.pop_front() {
2275 if !visited.insert(id) {
2276 continue;
2277 }
2278 order.push(id.to_string());
2279
2280 if let Some(children) = adj.get(id) {
2281 for &c in children {
2282 let entry = indegree.entry(c).or_insert(0);
2283 if *entry > 0 {
2284 *entry -= 1;
2285 if *entry == 0 {
2286 queue.push_back(c);
2287 }
2288 }
2289 }
2290 }
2291 }
2292
2293 if order.len() < graph.nodes.len() {
2294 let unresolved = graph
2295 .nodes
2296 .iter()
2297 .find(|n| !visited.contains(n.id.as_str()))
2298 .map(|n| n.id.clone())
2299 .unwrap_or_default();
2300 return Err(ExecutorError::Cycle(unresolved));
2301 }
2302
2303 Ok(order)
2304}
2305
2306#[cfg(test)]
2307mod tests {
2308 use super::*;
2309 use crate::adapter::MockAdapter;
2310 use crate::events::CapturingSink;
2311 use flow_domain::graph::{FlowEdge, FlowNode, Position};
2312
2313 fn make_node(id: &str, kind: &str, data: serde_json::Value) -> FlowNode {
2314 FlowNode {
2315 id: id.into(),
2316 node_type: kind.into(),
2317 position: Position { x: 0.0, y: 0.0 },
2318 data,
2319 }
2320 }
2321
2322 fn make_edge(id: &str, src: &str, dst: &str) -> FlowEdge {
2323 FlowEdge {
2324 id: id.into(),
2325 source: src.into(),
2326 target: dst.into(),
2327 label: None,
2328 condition: None,
2329 outcome: EdgeOutcome::Always,
2330 }
2331 }
2332
2333 fn make_edge_with(id: &str, src: &str, dst: &str, outcome: EdgeOutcome) -> FlowEdge {
2334 FlowEdge {
2335 id: id.into(),
2336 source: src.into(),
2337 target: dst.into(),
2338 label: None,
2339 condition: None,
2340 outcome,
2341 }
2342 }
2343
2344 async fn build_executor() -> (Executor, CapturingSink) {
2346 let mut adapters = AdapterRegistry::new();
2347 adapters.register(Arc::new(MockAdapter::with_delay(
2348 "mock",
2349 std::time::Duration::from_millis(1),
2350 )));
2351 let sink = CapturingSink::default();
2352 let credentials: Arc<dyn CredentialResolver> =
2353 Arc::new(flow_security::EnvFallbackResolver::new(Arc::new(
2354 flow_security::InMemoryCredentialStore::new(),
2355 )));
2356 let exec = Executor {
2357 adapters: Arc::new(adapters),
2358 sanitizer: Arc::new(PiiSanitizer::new()),
2359 events: Arc::new(sink.clone()),
2360 cloud_providers: Arc::new(CloudAiRegistry::new()),
2361 credentials,
2362 allow_cloud_ai: false,
2363 allow_local_ai: false,
2364 local_ai_base_url: None,
2365 stream_sink: Arc::new(NullStreamSink),
2366 working_memory: Default::default(),
2367 control: Default::default(),
2368 confirm_destructive: false,
2369 review_gate_available: true,
2370 edit_staging: None,
2371 workspace_root: PathBuf::from("."),
2372 };
2373 (exec, sink)
2374 }
2375
2376 fn make_failing_action(id: &str) -> FlowNode {
2379 make_node(
2380 id,
2381 "action",
2382 serde_json::json!({ "adapter": "missing-adapter" }),
2383 )
2384 }
2385
2386 #[tokio::test]
2387 async fn topological_order_is_respected() {
2388 let graph = FlowGraph {
2389 subflows: Vec::new(),
2390 id: "g".into(),
2391 name: "g".into(),
2392 version: "1".into(),
2393 description: None,
2394 nodes: vec![
2395 make_node("c", "action", serde_json::json!({ "adapter": "mock" })),
2396 make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
2397 make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2398 ],
2399 edges: vec![make_edge("e1", "a", "b"), make_edge("e2", "b", "c")],
2400 };
2401
2402 let order = topo_sort(&graph).unwrap();
2403 let pos = |id: &str| order.iter().position(|x| x == id).unwrap();
2404 assert!(pos("a") < pos("b"));
2405 assert!(pos("b") < pos("c"));
2406 }
2407
2408 #[tokio::test]
2409 async fn cycle_detected() {
2410 let graph = FlowGraph {
2411 subflows: Vec::new(),
2412 id: "g".into(),
2413 name: "g".into(),
2414 version: "1".into(),
2415 description: None,
2416 nodes: vec![
2417 make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
2418 make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2419 ],
2420 edges: vec![make_edge("e1", "a", "b"), make_edge("e2", "b", "a")],
2421 };
2422 assert!(matches!(topo_sort(&graph), Err(ExecutorError::Cycle(_))));
2423 }
2424
2425 #[tokio::test]
2426 async fn happy_path_emits_events_and_returns_succeeded() {
2427 let (executor, sink) = build_executor().await;
2428
2429 let graph = FlowGraph {
2430 subflows: Vec::new(),
2431 id: "g".into(),
2432 name: "demo".into(),
2433 version: "1".into(),
2434 description: None,
2435 nodes: vec![
2436 make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
2437 make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2438 ],
2439 edges: vec![make_edge("e1", "a", "b")],
2440 };
2441
2442 let summary = executor.run(&graph).await.unwrap();
2443 assert_eq!(summary.status, "succeeded");
2444 assert_eq!(summary.succeeded, 2);
2445
2446 let events = sink.events.lock().unwrap();
2447 assert!(matches!(
2448 events.first(),
2449 Some(ExecutionEvent::Started { .. })
2450 ));
2451 assert!(matches!(events.last(), Some(ExecutionEvent::Done { .. })));
2452 }
2453
2454 #[tokio::test]
2455 async fn subflow_runs_as_unit_and_downstream_fires() {
2456 let (executor, _sink) = build_executor().await;
2457 let graph = FlowGraph {
2458 subflows: vec![SubFlow {
2459 id: "setup".into(),
2460 label: "Setup".into(),
2461 node_ids: vec!["a".into(), "b".into()],
2462 retry: None,
2463 }],
2464 id: "g".into(),
2465 name: "g".into(),
2466 version: "1".into(),
2467 description: None,
2468 nodes: vec![
2469 make_node("start", "action", serde_json::json!({ "adapter": "mock" })),
2470 make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
2471 make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2472 make_node("done", "action", serde_json::json!({ "adapter": "mock" })),
2473 ],
2474 edges: vec![
2475 make_edge("e1", "start", "a"), make_edge("e2", "a", "b"), make_edge("e3", "b", "done"), ],
2479 };
2480 let summary = executor.run(&graph).await.unwrap();
2481 assert_eq!(summary.status, "succeeded");
2482 assert_eq!(summary.succeeded, 4, "start + a + b + done all run");
2483 assert_eq!(summary.failed, 0);
2484 assert_eq!(summary.skipped, 0);
2485 }
2486
2487 #[tokio::test]
2488 async fn subflow_retries_as_a_unit_on_failure() {
2489 let (executor, sink) = build_executor().await;
2490 let graph = FlowGraph {
2491 subflows: vec![SubFlow {
2492 id: "unit".into(),
2493 label: "Unit".into(),
2494 node_ids: vec!["boom".into()],
2495 retry: Some(2),
2496 }],
2497 id: "g".into(),
2498 name: "g".into(),
2499 version: "1".into(),
2500 description: None,
2501 nodes: vec![make_failing_action("boom")],
2502 edges: vec![],
2503 };
2504 let summary = executor.run(&graph).await.unwrap();
2505 assert_eq!(summary.failed, 1, "the unit ultimately fails");
2506 let events = sink.events.lock().unwrap();
2508 let starts = events
2509 .iter()
2510 .filter(|e| matches!(e, ExecutionEvent::NodeStarted { node_id, .. } if node_id == "boom"))
2511 .count();
2512 assert_eq!(starts, 3, "the unit re-ran the failing member 1 + retry(2) times");
2513 }
2514
2515 #[tokio::test]
2516 async fn subflow_skipped_when_entry_not_reached() {
2517 let (executor, _sink) = build_executor().await;
2518 let graph = FlowGraph {
2519 subflows: vec![SubFlow {
2520 id: "unit".into(),
2521 label: "Unit".into(),
2522 node_ids: vec!["a".into(), "b".into()],
2523 retry: None,
2524 }],
2525 id: "g".into(),
2526 name: "g".into(),
2527 version: "1".into(),
2528 description: None,
2529 nodes: vec![
2530 make_failing_action("gate"),
2531 make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
2532 make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2533 ],
2534 edges: vec![
2536 make_edge_with("e1", "gate", "a", EdgeOutcome::Pass),
2537 make_edge("e2", "a", "b"),
2538 ],
2539 };
2540 let summary = executor.run(&graph).await.unwrap();
2541 assert_eq!(summary.failed, 1, "gate fails");
2542 assert_eq!(summary.skipped, 2, "both unit members are skipped");
2543 assert_eq!(summary.succeeded, 0);
2544 }
2545
2546 #[tokio::test]
2548 async fn pass_edge_skips_node_on_failure() {
2549 let (executor, sink) = build_executor().await;
2550 let graph = FlowGraph {
2551 subflows: Vec::new(),
2552 id: "g".into(),
2553 name: "g".into(),
2554 version: "1".into(),
2555 description: None,
2556 nodes: vec![
2557 make_failing_action("a"),
2558 make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2559 ],
2560 edges: vec![make_edge_with("e1", "a", "b", EdgeOutcome::Pass)],
2561 };
2562 let summary = executor.run(&graph).await.unwrap();
2563 assert_eq!(summary.failed, 1, "a must fail");
2564 assert_eq!(
2565 summary.skipped, 1,
2566 "b must be skipped (no incoming edge fired)"
2567 );
2568 assert_eq!(summary.succeeded, 0);
2569 let events = sink.events.lock().unwrap();
2570 let b_skipped = events.iter().any(|e| {
2571 matches!(e,
2572 ExecutionEvent::NodeSkipped { node_id, .. } if node_id == "b")
2573 });
2574 assert!(b_skipped, "b should have a skipped event");
2575 }
2576
2577 #[tokio::test]
2580 async fn fail_edge_fires_on_skip() {
2581 let (executor, _) = build_executor().await;
2582 let graph = FlowGraph {
2583 subflows: Vec::new(),
2584 id: "g".into(),
2585 name: "g".into(),
2586 version: "1".into(),
2587 description: None,
2588 nodes: vec![
2589 make_failing_action("a"),
2590 make_node(
2591 "recover",
2592 "action",
2593 serde_json::json!({ "adapter": "mock" }),
2594 ),
2595 ],
2596 edges: vec![make_edge_with("e1", "a", "recover", EdgeOutcome::Fail)],
2597 };
2598 let summary = executor.run(&graph).await.unwrap();
2599 assert_eq!(summary.failed, 1);
2600 assert_eq!(
2601 summary.succeeded, 1,
2602 "recover runs because a Fail edge fired"
2603 );
2604 }
2605
2606 async fn build_executor_with_local(
2610 allow_local_ai: bool,
2611 local_ai_base_url: Option<String>,
2612 ) -> Executor {
2613 let adapters = AdapterRegistry::new();
2614 let mut cloud = CloudAiRegistry::new();
2615 cloud.register(Arc::new(flow_adapter_ai::LocalOpenAiProvider::new()));
2616 let credentials: Arc<dyn CredentialResolver> =
2617 Arc::new(flow_security::EnvFallbackResolver::new(Arc::new(
2618 flow_security::InMemoryCredentialStore::new(),
2619 )));
2620 Executor {
2621 adapters: Arc::new(adapters),
2622 sanitizer: Arc::new(PiiSanitizer::new()),
2623 events: Arc::new(CapturingSink::default()),
2624 cloud_providers: Arc::new(cloud),
2625 credentials,
2626 allow_cloud_ai: false,
2627 allow_local_ai,
2628 local_ai_base_url,
2629 stream_sink: Arc::new(NullStreamSink),
2630 working_memory: Default::default(),
2631 control: Default::default(),
2632 confirm_destructive: false,
2633 review_gate_available: true,
2634 edit_staging: None,
2635 workspace_root: PathBuf::from("."),
2636 }
2637 }
2638
2639 fn local_ai_graph() -> FlowGraph {
2640 FlowGraph {
2641 subflows: Vec::new(),
2642 id: "g".into(),
2643 name: "g".into(),
2644 version: "1".into(),
2645 description: None,
2646 nodes: vec![make_node(
2647 "llm",
2648 "ai",
2649 serde_json::json!({
2650 "provider": "local",
2651 "modelId": "local-llm",
2652 "input": "hello",
2653 }),
2654 )],
2655 edges: vec![],
2656 }
2657 }
2658
2659 #[tokio::test]
2662 async fn local_provider_skipped_when_local_gate_off() {
2663 let exec = build_executor_with_local(false, None).await;
2664 let summary = exec.run(&local_ai_graph()).await.unwrap();
2665 assert_eq!(summary.skipped, 1);
2666 assert_eq!(summary.succeeded, 0);
2667 assert_eq!(summary.failed, 0);
2668 }
2669
2670 #[tokio::test]
2675 async fn local_provider_no_key_required_reaches_provider() {
2676 let exec = build_executor_with_local(true, None).await;
2677 let summary = exec.run(&local_ai_graph()).await.unwrap();
2678 assert_eq!(summary.failed, 1);
2680 assert_eq!(summary.skipped, 0);
2681 }
2682
2683 struct ScriptedLocalProvider {
2688 responses: std::sync::Mutex<VecDeque<String>>,
2689 }
2690
2691 impl ScriptedLocalProvider {
2692 fn new(responses: Vec<&str>) -> Arc<Self> {
2693 Arc::new(Self {
2694 responses: std::sync::Mutex::new(
2695 responses.into_iter().map(String::from).collect(),
2696 ),
2697 })
2698 }
2699 }
2700
2701 #[async_trait::async_trait]
2702 impl flow_adapter_ai::CloudAiProvider for ScriptedLocalProvider {
2703 fn name(&self) -> &str {
2704 "local"
2705 }
2706 fn env_var(&self) -> &str {
2707 "STUB_KEY"
2708 }
2709 fn default_models(&self) -> &[&str] {
2710 &["stub-model"]
2711 }
2712 async fn invoke(
2713 &self,
2714 req: &CloudAiRequest,
2715 ) -> Result<flow_adapter_ai::CloudAiResponse, flow_adapter_ai::CloudAiError> {
2716 let text = self
2717 .responses
2718 .lock()
2719 .unwrap()
2720 .pop_front()
2721 .expect("scripted provider ran out of responses");
2722 Ok(flow_adapter_ai::CloudAiResponse {
2723 provider: "local".into(),
2724 model: req.model.clone(),
2725 text,
2726 finish_reason: "stop".into(),
2727 input_tokens: 0,
2728 output_tokens: 0,
2729 latency_ms: 0,
2730 })
2731 }
2732 }
2733
2734 async fn build_contract_executor(responses: Vec<&str>) -> (Executor, CapturingSink) {
2735 let sink = CapturingSink::default();
2736 let mut cloud = CloudAiRegistry::new();
2737 cloud.register(ScriptedLocalProvider::new(responses));
2738 let mut adapters = AdapterRegistry::new();
2739 adapters.register(Arc::new(MockAdapter::with_delay(
2740 "mock",
2741 std::time::Duration::from_millis(1),
2742 )));
2743 let credentials: Arc<dyn CredentialResolver> =
2744 Arc::new(flow_security::EnvFallbackResolver::new(Arc::new(
2745 flow_security::InMemoryCredentialStore::new(),
2746 )));
2747 let executor = Executor {
2748 adapters: Arc::new(adapters),
2749 sanitizer: Arc::new(PiiSanitizer::new()),
2750 events: Arc::new(sink.clone()),
2751 cloud_providers: Arc::new(cloud),
2752 credentials,
2753 allow_cloud_ai: false,
2754 allow_local_ai: true,
2755 local_ai_base_url: Some("http://127.0.0.1:9".into()),
2756 stream_sink: Arc::new(NullStreamSink),
2757 working_memory: Default::default(),
2758 control: Default::default(),
2759 confirm_destructive: false,
2760 review_gate_available: true,
2761 edit_staging: None,
2762 workspace_root: PathBuf::from("."),
2763 };
2764 (executor, sink)
2765 }
2766
2767 fn contract_graph() -> FlowGraph {
2770 FlowGraph {
2771 subflows: Vec::new(),
2772 id: "g".into(),
2773 name: "g".into(),
2774 version: "1".into(),
2775 description: None,
2776 nodes: vec![
2777 make_node(
2778 "interpret",
2779 "ai",
2780 serde_json::json!({
2781 "provider": "local",
2782 "modelId": "stub-model",
2783 "input": "interpret this log",
2784 "contract": true,
2785 }),
2786 ),
2787 make_node("fallback", "action", serde_json::json!({ "adapter": "mock" })),
2788 ],
2789 edges: vec![make_edge_with("e1", "interpret", "fallback", EdgeOutcome::Fail)],
2790 }
2791 }
2792
2793 fn envelope_json(confidence: f64, escalate: bool) -> String {
2794 serde_json::json!({
2795 "primary_output": "step 3 failed: missing dataset",
2796 "confidence": confidence,
2797 "confidence_type": "verbalized",
2798 "escalate": escalate,
2799 "reasoning": "matched the abend code"
2800 })
2801 .to_string()
2802 }
2803
2804 fn routing_decisions(sink: &CapturingSink) -> Vec<String> {
2805 sink.events
2806 .lock()
2807 .unwrap()
2808 .iter()
2809 .filter_map(|e| match e {
2810 ExecutionEvent::AiRoutingDecision { decision, .. } => Some(decision.clone()),
2811 _ => None,
2812 })
2813 .collect()
2814 }
2815
2816 #[tokio::test]
2819 async fn contract_violation_fails_node_onto_fallback() {
2820 let (executor, sink) = build_contract_executor(vec!["sure, here's my analysis"]).await;
2821 let summary = executor.run(&contract_graph()).await.unwrap();
2822 assert_eq!(summary.failed, 1, "schema violation fails the ai node");
2823 assert_eq!(summary.succeeded, 1, "fallback runs via the .fail edge");
2824 assert_eq!(routing_decisions(&sink), vec!["contract_violation"]);
2825 }
2826
2827 #[tokio::test]
2830 async fn contract_auto_approves_high_confidence() {
2831 let (executor, sink) =
2832 build_contract_executor(vec![&envelope_json(0.95, false)]).await;
2833 let summary = executor.run(&contract_graph()).await.unwrap();
2834 assert_eq!(summary.succeeded, 1);
2835 assert_eq!(summary.skipped, 1, "fallback not reached");
2836 assert_eq!(routing_decisions(&sink), vec!["auto_approve"]);
2837 let events = sink.events.lock().unwrap();
2838 let output = events
2839 .iter()
2840 .find_map(|e| match e {
2841 ExecutionEvent::NodeSucceeded { node_id, output, .. }
2842 if node_id == "interpret" =>
2843 {
2844 Some(output.clone())
2845 }
2846 _ => None,
2847 })
2848 .expect("ai node succeeded");
2849 assert_eq!(
2850 output.get("primary_output").and_then(|v| v.as_str()),
2851 Some("step 3 failed: missing dataset")
2852 );
2853 assert_eq!(output.get("route").and_then(|v| v.as_str()), Some("auto_approve"));
2854 }
2855
2856 #[tokio::test]
2859 async fn contract_suppresses_low_confidence_onto_fallback() {
2860 let (executor, sink) =
2861 build_contract_executor(vec![&envelope_json(0.2, false)]).await;
2862 let summary = executor.run(&contract_graph()).await.unwrap();
2863 assert_eq!(summary.failed, 1);
2864 assert_eq!(summary.succeeded, 1, "fallback runs via the .fail edge");
2865 assert_eq!(routing_decisions(&sink), vec!["suppress"]);
2866 }
2867
2868 #[tokio::test]
2871 async fn contract_review_band_falls_back_when_gate_unavailable() {
2872 let (mut executor, sink) =
2873 build_contract_executor(vec![&envelope_json(0.7, false)]).await;
2874 executor.review_gate_available = false;
2875 let summary = executor.run(&contract_graph()).await.unwrap();
2876 assert_eq!(summary.failed, 1);
2877 assert_eq!(summary.succeeded, 1, "fallback runs via the .fail edge");
2878 assert_eq!(routing_decisions(&sink), vec!["human_review"]);
2879 }
2880
2881 #[tokio::test]
2884 async fn contract_escalate_forces_review_at_high_confidence() {
2885 let (mut executor, sink) =
2886 build_contract_executor(vec![&envelope_json(0.99, true)]).await;
2887 executor.review_gate_available = false;
2888 let summary = executor.run(&contract_graph()).await.unwrap();
2889 assert_eq!(summary.failed, 1, "escalated output held back without a gate");
2890 assert_eq!(routing_decisions(&sink), vec!["human_review"]);
2891 }
2892
2893 async fn run_with_review_verdict(approved: bool) -> (ExecutionSummary, CapturingSink) {
2896 let (executor, sink) = build_contract_executor(vec![&envelope_json(0.7, false)]).await;
2897 let control = executor.control.clone();
2898 let watcher_sink = sink.clone();
2899 let watcher = tokio::spawn(async move {
2900 loop {
2901 let gated = watcher_sink
2902 .events
2903 .lock()
2904 .unwrap()
2905 .iter()
2906 .any(|e| matches!(e, ExecutionEvent::AiReviewRequired { .. }));
2907 if gated {
2908 control.resolve_review(approved);
2909 return;
2910 }
2911 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
2912 }
2913 });
2914 let summary = executor.run(&contract_graph()).await.unwrap();
2915 watcher.await.unwrap();
2916 (summary, sink)
2917 }
2918
2919 #[tokio::test]
2922 async fn contract_review_gate_approval_passes_output() {
2923 let (summary, sink) = run_with_review_verdict(true).await;
2924 assert_eq!(summary.succeeded, 1);
2925 assert_eq!(summary.failed, 0);
2926 let events = sink.events.lock().unwrap();
2927 assert!(events.iter().any(|e| matches!(
2928 e,
2929 ExecutionEvent::AiReviewResolved { approved: true, .. }
2930 )));
2931 }
2932
2933 #[tokio::test]
2935 async fn contract_review_gate_rejection_routes_fallback() {
2936 let (summary, sink) = run_with_review_verdict(false).await;
2937 assert_eq!(summary.failed, 1);
2938 assert_eq!(summary.succeeded, 1, "fallback runs via the .fail edge");
2939 let events = sink.events.lock().unwrap();
2940 assert!(events.iter().any(|e| matches!(
2941 e,
2942 ExecutionEvent::AiReviewResolved { approved: false, .. }
2943 )));
2944 }
2945
2946 #[tokio::test]
2949 async fn ai_invocation_is_recorded() {
2950 let (executor, sink) = build_contract_executor(vec!["plain text answer"]).await;
2951 let graph = FlowGraph {
2952 subflows: Vec::new(),
2953 id: "g".into(),
2954 name: "g".into(),
2955 version: "1".into(),
2956 description: None,
2957 nodes: vec![make_node(
2958 "llm",
2959 "ai",
2960 serde_json::json!({
2961 "provider": "local",
2962 "modelId": "stub-model",
2963 "input": "hello",
2964 }),
2965 )],
2966 edges: vec![],
2967 };
2968 executor.run(&graph).await.unwrap();
2969 let events = sink.events.lock().unwrap();
2970 let recorded = events.iter().any(|e| matches!(
2971 e,
2972 ExecutionEvent::AiInvocation { provider, input, .. }
2973 if provider == "local" && input == "hello"
2974 ));
2975 assert!(recorded, "AiInvocation event with the sanitized input");
2976 }
2977
2978 #[tokio::test]
2981 async fn always_edges_run_target_unconditionally() {
2982 let (executor, _) = build_executor().await;
2983 let graph = FlowGraph {
2984 subflows: Vec::new(),
2985 id: "g".into(),
2986 name: "g".into(),
2987 version: "1".into(),
2988 description: None,
2989 nodes: vec![
2990 make_failing_action("a"),
2991 make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
2992 ],
2993 edges: vec![make_edge_with("e1", "a", "b", EdgeOutcome::Always)],
2994 };
2995 let summary = executor.run(&graph).await.unwrap();
2996 assert_eq!(summary.failed, 1, "a still fails");
2997 assert_eq!(
2998 summary.succeeded, 1,
2999 "b runs despite a's failure (Always edge)"
3000 );
3001 }
3002
3003 fn output_for(sink: &CapturingSink, node_id: &str) -> Option<serde_json::Value> {
3006 sink.events.lock().unwrap().iter().find_map(|e| match e {
3007 ExecutionEvent::NodeSucceeded {
3008 node_id: nid,
3009 output,
3010 ..
3011 } if nid == node_id => Some(output.clone()),
3012 _ => None,
3013 })
3014 }
3015
3016 #[test]
3017 fn primary_text_extracts_common_keys_and_scalars() {
3018 assert_eq!(primary_text(&serde_json::json!("hi")), "hi");
3019 assert_eq!(primary_text(&serde_json::json!({ "text": "abc" })), "abc");
3020 assert_eq!(primary_text(&serde_json::json!({ "stdout": "out" })), "out");
3021 assert_eq!(primary_text(&serde_json::json!(42)), "42");
3022 assert_eq!(primary_text(&serde_json::json!(true)), "true");
3023 assert_eq!(primary_text(&serde_json::Value::Null), "");
3024 assert_eq!(
3026 primary_text(&serde_json::json!({ "a": 1 })),
3027 "{\"a\":1}".to_string()
3028 );
3029 }
3030
3031 #[test]
3032 fn resolve_token_handles_input_id_and_path() {
3033 let mut outputs = HashMap::new();
3034 outputs.insert("a".to_string(), serde_json::json!({ "text": "hello" }));
3035 outputs.insert(
3036 "b".to_string(),
3037 serde_json::json!({ "nested": { "k": "deep" } }),
3038 );
3039 let upstream = vec!["a".to_string()];
3040 let mut memory = HashMap::new();
3041 memory.insert("region".to_string(), serde_json::json!("us-east-1"));
3042 memory.insert("cfg".to_string(), serde_json::json!({ "retries": 3 }));
3043
3044 assert_eq!(resolve_token("input", &outputs, &upstream, &memory), "hello");
3045 assert_eq!(resolve_token("a", &outputs, &upstream, &memory), "hello");
3046 assert_eq!(resolve_token("a.text", &outputs, &upstream, &memory), "hello");
3047 assert_eq!(resolve_token("b.nested.k", &outputs, &upstream, &memory), "deep");
3048 assert_eq!(resolve_token("memory.region", &outputs, &upstream, &memory), "us-east-1");
3050 assert_eq!(resolve_token("memory.cfg.retries", &outputs, &upstream, &memory), "3");
3051 assert_eq!(resolve_token("missing", &outputs, &upstream, &memory), "");
3053 assert_eq!(resolve_token("a.nope", &outputs, &upstream, &memory), "");
3054 assert_eq!(resolve_token("memory.nope", &outputs, &upstream, &memory), "");
3055 }
3056
3057 #[test]
3058 fn interpolate_str_replaces_tokens_and_preserves_literals() {
3059 let mut outputs = HashMap::new();
3060 outputs.insert("a".to_string(), serde_json::json!({ "text": "world" }));
3061 let upstream = vec!["a".to_string()];
3062 let memory = HashMap::new();
3063
3064 assert_eq!(
3065 interpolate_str("hi {{a.text}}!", &outputs, &upstream, &memory),
3066 "hi world!"
3067 );
3068 assert_eq!(
3069 interpolate_str("no tokens here", &outputs, &upstream, &memory),
3070 "no tokens here"
3071 );
3072 assert_eq!(
3074 interpolate_str("oops {{a.text", &outputs, &upstream, &memory),
3075 "oops {{a.text"
3076 );
3077 }
3078
3079 #[test]
3080 fn interpolate_value_recurses_into_arrays_and_objects() {
3081 let mut outputs = HashMap::new();
3082 outputs.insert("a".to_string(), serde_json::json!({ "text": "X" }));
3083 let upstream = vec!["a".to_string()];
3084 let memory = HashMap::new();
3085 let data = serde_json::json!({
3086 "command": "echo {{a.text}}",
3087 "args": ["--msg", "{{a.text}}"],
3088 "count": 3,
3089 });
3090 let resolved = interpolate_value(&data, &outputs, &upstream, &memory);
3091 assert_eq!(resolved["command"], serde_json::json!("echo X"));
3092 assert_eq!(resolved["args"][1], serde_json::json!("X"));
3093 assert_eq!(resolved["count"], serde_json::json!(3));
3094 }
3095
3096 #[tokio::test]
3097 async fn upstream_output_flows_into_downstream_field() {
3098 let (executor, sink) = build_executor().await;
3103 let graph = FlowGraph {
3104 subflows: Vec::new(),
3105 id: "g".into(),
3106 name: "g".into(),
3107 version: "1".into(),
3108 description: None,
3109 nodes: vec![
3110 make_node("a", "action", serde_json::json!({ "adapter": "mock", "label": "hello" })),
3111 make_node(
3112 "b",
3113 "action",
3114 serde_json::json!({ "adapter": "mock", "label": "got: {{a.payload.label}}" }),
3115 ),
3116 ],
3117 edges: vec![make_edge("e1", "a", "b")],
3118 };
3119 let summary = executor.run(&graph).await.unwrap();
3120 assert_eq!(summary.succeeded, 2);
3121 let b_out = output_for(&sink, "b").expect("b succeeded");
3122 assert_eq!(b_out["payload"]["label"], serde_json::json!("got: hello"));
3123 }
3124
3125 #[tokio::test]
3126 async fn input_token_resolves_to_upstream_output() {
3127 let (executor, sink) = build_executor().await;
3132 let graph = FlowGraph {
3133 subflows: Vec::new(),
3134 id: "g".into(),
3135 name: "g".into(),
3136 version: "1".into(),
3137 description: None,
3138 nodes: vec![
3139 make_node("a", "action", serde_json::json!({ "adapter": "mock", "label": "seed" })),
3140 make_node(
3141 "b",
3142 "action",
3143 serde_json::json!({ "adapter": "mock", "label": "{{input}}" }),
3144 ),
3145 ],
3146 edges: vec![make_edge("e1", "a", "b")],
3147 };
3148 executor.run(&graph).await.unwrap();
3149 let b_out = output_for(&sink, "b").expect("b succeeded");
3150 let label = b_out["payload"]["label"].as_str().unwrap();
3151 assert!(
3152 label.contains("\"label\":\"seed\""),
3153 "b's label embeds a's serialized output: {label}"
3154 );
3155 }
3156
3157 use crate::adapter::{Adapter, AdapterError, NodeOutput};
3160 use std::sync::atomic::{AtomicI64, Ordering};
3161
3162 fn make_edge_when(id: &str, src: &str, dst: &str, outcome: EdgeOutcome, when: &str) -> FlowEdge {
3164 FlowEdge {
3165 id: id.into(),
3166 source: src.into(),
3167 target: dst.into(),
3168 label: None,
3169 condition: Some(when.into()),
3170 outcome,
3171 }
3172 }
3173
3174 struct CountdownAdapter {
3177 name: String,
3178 remaining: AtomicI64,
3179 }
3180
3181 impl CountdownAdapter {
3182 fn new(name: &str, start: i64) -> Self {
3183 Self {
3184 name: name.into(),
3185 remaining: AtomicI64::new(start),
3186 }
3187 }
3188 }
3189
3190 #[async_trait::async_trait]
3191 impl Adapter for CountdownAdapter {
3192 fn name(&self) -> &str {
3193 &self.name
3194 }
3195 async fn execute(&self, _node: &FlowNode) -> Result<NodeOutput, AdapterError> {
3196 let after = self.remaining.fetch_sub(1, Ordering::SeqCst) - 1;
3197 Ok(NodeOutput {
3198 status: "succeeded".into(),
3199 payload: serde_json::json!({ "remaining": after }),
3200 })
3201 }
3202 }
3203
3204 async fn build_executor_with_adapters(extra: Vec<Arc<dyn Adapter>>) -> (Executor, CapturingSink) {
3205 build_executor_with(extra, WorkingMemory::default()).await
3206 }
3207
3208 async fn build_executor_with(
3209 extra: Vec<Arc<dyn Adapter>>,
3210 working_memory: WorkingMemory,
3211 ) -> (Executor, CapturingSink) {
3212 let mut adapters = AdapterRegistry::new();
3213 adapters.register(Arc::new(MockAdapter::with_delay(
3214 "mock",
3215 std::time::Duration::from_millis(1),
3216 )));
3217 for a in extra {
3218 adapters.register(a);
3219 }
3220 let sink = CapturingSink::default();
3221 let credentials: Arc<dyn CredentialResolver> =
3222 Arc::new(flow_security::EnvFallbackResolver::new(Arc::new(
3223 flow_security::InMemoryCredentialStore::new(),
3224 )));
3225 let exec = Executor {
3226 adapters: Arc::new(adapters),
3227 sanitizer: Arc::new(PiiSanitizer::new()),
3228 events: Arc::new(sink.clone()),
3229 cloud_providers: Arc::new(CloudAiRegistry::new()),
3230 credentials,
3231 allow_cloud_ai: false,
3232 allow_local_ai: false,
3233 local_ai_base_url: None,
3234 stream_sink: Arc::new(NullStreamSink),
3235 working_memory,
3236 control: Default::default(),
3237 confirm_destructive: false,
3238 review_gate_available: true,
3239 edit_staging: None,
3240 workspace_root: PathBuf::from("."),
3241 };
3242 (exec, sink)
3243 }
3244
3245 struct SetVarAdapter;
3250
3251 #[async_trait::async_trait]
3252 impl Adapter for SetVarAdapter {
3253 fn name(&self) -> &str {
3254 "setvar"
3255 }
3256 async fn execute(&self, node: &FlowNode) -> Result<NodeOutput, AdapterError> {
3257 let name = node
3258 .data
3259 .get("name")
3260 .and_then(|v| v.as_str())
3261 .unwrap_or("")
3262 .to_string();
3263 let value = node.data.get("value").cloned().unwrap_or(serde_json::Value::Null);
3264 Ok(NodeOutput {
3265 status: "succeeded".into(),
3266 payload: serde_json::json!({
3267 "actionId": "set-variable",
3268 "name": name,
3269 "value": value,
3270 }),
3271 })
3272 }
3273 }
3274
3275 fn count_succeeded(sink: &CapturingSink, node_id: &str) -> usize {
3276 sink.events
3277 .lock()
3278 .unwrap()
3279 .iter()
3280 .filter(|e| matches!(e, ExecutionEvent::NodeSucceeded { node_id: nid, .. } if nid == node_id))
3281 .count()
3282 }
3283
3284 fn was_skipped(sink: &CapturingSink, node_id: &str) -> bool {
3285 sink.events
3286 .lock()
3287 .unwrap()
3288 .iter()
3289 .any(|e| matches!(e, ExecutionEvent::NodeSkipped { node_id: nid, .. } if nid == node_id))
3290 }
3291
3292 fn count_capped(sink: &CapturingSink, node_id: &str) -> usize {
3293 sink.events
3294 .lock()
3295 .unwrap()
3296 .iter()
3297 .filter(|e| matches!(e, ExecutionEvent::IterationCapped { node_id: nid, .. } if nid == node_id))
3298 .count()
3299 }
3300
3301 #[tokio::test]
3302 async fn when_condition_gates_edge_on_dag() {
3303 let (executor, sink) = build_executor().await;
3304 let graph = FlowGraph {
3307 subflows: Vec::new(),
3308 id: "g".into(),
3309 name: "g".into(),
3310 version: "1".into(),
3311 description: None,
3312 nodes: vec![
3313 make_node("src", "action", serde_json::json!({ "adapter": "mock", "label": "go" })),
3314 make_node("a", "action", serde_json::json!({ "adapter": "mock" })),
3315 make_node("b", "action", serde_json::json!({ "adapter": "mock" })),
3316 ],
3317 edges: vec![
3318 make_edge_when("e1", "src", "a", EdgeOutcome::Pass, "{{src.payload.label}} == 'go'"),
3319 make_edge_when("e2", "src", "b", EdgeOutcome::Pass, "{{src.payload.label}} == 'stop'"),
3320 ],
3321 };
3322
3323 let summary = executor.run(&graph).await.unwrap();
3324 assert_eq!(count_succeeded(&sink, "a"), 1, "matching condition runs a");
3325 assert!(was_skipped(&sink, "b"), "non-matching condition skips b");
3326 assert_eq!(summary.status, "succeeded");
3329 }
3330
3331 #[tokio::test]
3332 async fn bounded_loop_converges_on_when_exit() {
3333 let countdown = Arc::new(CountdownAdapter::new("countdown", 3));
3334 let (executor, sink) = build_executor_with_adapters(vec![countdown]).await;
3335 let graph = FlowGraph {
3338 subflows: Vec::new(),
3339 id: "g".into(),
3340 name: "g".into(),
3341 version: "1".into(),
3342 description: None,
3343 nodes: vec![
3344 make_node("start", "action", serde_json::json!({ "adapter": "mock" })),
3345 make_node("work", "action", serde_json::json!({ "adapter": "mock" })),
3346 make_node("check", "action", serde_json::json!({ "adapter": "countdown" })),
3347 ],
3348 edges: vec![
3349 make_edge("e1", "start", "work"),
3350 make_edge("e2", "work", "check"),
3351 make_edge_when(
3352 "e3",
3353 "check",
3354 "work",
3355 EdgeOutcome::Always,
3356 "{{check.payload.remaining}} > 0",
3357 ),
3358 ],
3359 };
3360
3361 let summary = executor.run(&graph).await.unwrap();
3362 assert_eq!(count_succeeded(&sink, "check"), 3, "loop runs until remaining hits 0");
3363 assert_eq!(count_succeeded(&sink, "work"), 3);
3364 assert_eq!(count_capped(&sink, "check"), 0, "converged before the cap");
3365 assert_eq!(summary.status, "succeeded");
3366 }
3367
3368 #[tokio::test]
3369 async fn runaway_loop_hits_cap_and_finishes_partial() {
3370 let (executor, sink) = build_executor().await;
3371 let graph = FlowGraph {
3374 subflows: Vec::new(),
3375 id: "g".into(),
3376 name: "g".into(),
3377 version: "1".into(),
3378 description: None,
3379 nodes: vec![
3380 make_node("start", "action", serde_json::json!({ "adapter": "mock" })),
3381 make_node("loop", "action", serde_json::json!({ "adapter": "mock" })),
3382 ],
3383 edges: vec![
3384 make_edge("e1", "start", "loop"),
3385 make_edge("e2", "loop", "loop"),
3386 ],
3387 };
3388
3389 let summary = executor.run(&graph).await.unwrap();
3390 assert_eq!(
3391 count_succeeded(&sink, "loop"),
3392 PER_NODE_VISIT_CAP as usize,
3393 "node runs exactly up to the cap"
3394 );
3395 assert_eq!(count_capped(&sink, "loop"), 1, "one cap diagnostic emitted");
3396 assert_eq!(summary.status, "partial");
3397 }
3398
3399 fn set_var_node(id: &str, name: &str, value: &str) -> FlowNode {
3402 make_node(
3403 id,
3404 "utility",
3405 serde_json::json!({ "adapter": "setvar", "name": name, "value": value }),
3406 )
3407 }
3408
3409 #[tokio::test]
3410 async fn set_variable_writes_memory_and_downstream_reads_it() {
3411 let (executor, sink) = build_executor_with_adapters(vec![Arc::new(SetVarAdapter)]).await;
3412 let graph = FlowGraph {
3413 subflows: Vec::new(),
3414 id: "g".into(),
3415 name: "g".into(),
3416 version: "1".into(),
3417 description: None,
3418 nodes: vec![
3419 set_var_node("set", "region", "us-east-1"),
3420 make_node(
3421 "reader",
3422 "action",
3423 serde_json::json!({ "adapter": "mock", "label": "{{memory.region}}" }),
3424 ),
3425 ],
3426 edges: vec![make_edge("e1", "set", "reader")],
3427 };
3428
3429 executor.run(&graph).await.unwrap();
3430 let reader = output_for(&sink, "reader").expect("reader succeeded");
3431 assert_eq!(
3432 reader["payload"]["label"].as_str(),
3433 Some("us-east-1"),
3434 "downstream node reads the value written via set-variable"
3435 );
3436 }
3437
3438 #[tokio::test]
3439 async fn working_memory_persists_across_runs() {
3440 let memory = WorkingMemory::default();
3443 let (executor, sink) =
3444 build_executor_with(vec![Arc::new(SetVarAdapter)], memory.clone()).await;
3445
3446 let write_graph = FlowGraph {
3448 subflows: Vec::new(),
3449 id: "g1".into(),
3450 name: "g1".into(),
3451 version: "1".into(),
3452 description: None,
3453 nodes: vec![set_var_node("set", "token", "abc123")],
3454 edges: vec![],
3455 };
3456 executor.run(&write_graph).await.unwrap();
3457 assert_eq!(memory.lock().unwrap().get("token").and_then(|v| v.as_str()), Some("abc123"));
3458
3459 let read_graph = FlowGraph {
3461 subflows: Vec::new(),
3462 id: "g2".into(),
3463 name: "g2".into(),
3464 version: "1".into(),
3465 description: None,
3466 nodes: vec![make_node(
3467 "reader",
3468 "action",
3469 serde_json::json!({ "adapter": "mock", "label": "{{memory.token}}" }),
3470 )],
3471 edges: vec![],
3472 };
3473 executor.run(&read_graph).await.unwrap();
3474 let reader = output_for(&sink, "reader").expect("reader succeeded");
3475 assert_eq!(reader["payload"]["label"].as_str(), Some("abc123"));
3476 }
3477
3478 #[tokio::test]
3479 async fn when_condition_reads_memory() {
3480 let (executor, sink) = build_executor_with_adapters(vec![Arc::new(SetVarAdapter)]).await;
3481 let graph = FlowGraph {
3484 subflows: Vec::new(),
3485 id: "g".into(),
3486 name: "g".into(),
3487 version: "1".into(),
3488 description: None,
3489 nodes: vec![
3490 set_var_node("set", "flag", "go"),
3491 make_node("go_node", "action", serde_json::json!({ "adapter": "mock" })),
3492 make_node("stop_node", "action", serde_json::json!({ "adapter": "mock" })),
3493 ],
3494 edges: vec![
3495 make_edge_when("e1", "set", "go_node", EdgeOutcome::Pass, "{{memory.flag}} == 'go'"),
3496 make_edge_when(
3497 "e2",
3498 "set",
3499 "stop_node",
3500 EdgeOutcome::Pass,
3501 "{{memory.flag}} == 'stop'",
3502 ),
3503 ],
3504 };
3505
3506 executor.run(&graph).await.unwrap();
3507 assert_eq!(count_succeeded(&sink, "go_node"), 1, "memory-matched edge fires");
3508 assert!(was_skipped(&sink, "stop_node"), "memory-mismatched edge skips");
3509 }
3510
3511 #[test]
3512 fn destructive_reason_flags_data_loss_only() {
3513 let del = make_node(
3514 "d",
3515 "action",
3516 serde_json::json!({"adapter":"fs","actionId":"delete-file","path":"build/"}),
3517 );
3518 assert!(destructive_reason(&del).is_some(), "fs delete-file");
3519
3520 let rm = make_node(
3521 "s",
3522 "action",
3523 serde_json::json!({"adapter":"shell","actionId":"run-command","command":"rm","args":["-rf","dist"]}),
3524 );
3525 assert!(destructive_reason(&rm).is_some(), "shell rm");
3526
3527 let push = make_node(
3528 "g",
3529 "action",
3530 serde_json::json!({"adapter":"shell","actionId":"git","args":["push","origin","main"]}),
3531 );
3532 assert!(destructive_reason(&push).is_some(), "git push");
3533
3534 let read = make_node(
3535 "r",
3536 "action",
3537 serde_json::json!({"adapter":"fs","actionId":"read-file","path":"x"}),
3538 );
3539 assert!(destructive_reason(&read).is_none(), "read is safe");
3540
3541 let confirm = make_node(
3543 "c",
3544 "action",
3545 serde_json::json!({"adapter":"shell","actionId":"run-command","command":"./confirm","args":["build"]}),
3546 );
3547 assert!(destructive_reason(&confirm).is_none(), "no rm substring match");
3548 }
3549
3550 #[test]
3551 fn sandbox_escape_flags_parent_traversal() {
3552 let esc = make_node(
3553 "e",
3554 "action",
3555 serde_json::json!({"adapter":"fs","actionId":"read-file","path":"../../etc/passwd"}),
3556 );
3557 assert!(sandbox_escape_reason(&esc).is_some(), "`..` path flagged");
3558 let safe = make_node(
3559 "s",
3560 "action",
3561 serde_json::json!({"adapter":"fs","actionId":"read-file","path":"src/main.rs"}),
3562 );
3563 assert!(sandbox_escape_reason(&safe).is_none(), "in-jail path is fine");
3564 let dots = make_node(
3566 "d",
3567 "action",
3568 serde_json::json!({"adapter":"fs","actionId":"read-file","path":"a..b.txt"}),
3569 );
3570 assert!(
3571 sandbox_escape_reason(&dots).is_none(),
3572 "`..` substring not flagged"
3573 );
3574 }
3575
3576 #[test]
3577 fn verify_graph_collects_destructive_and_escape() {
3578 let graph = FlowGraph {
3579 subflows: Vec::new(),
3580 id: "g".into(),
3581 name: "g".into(),
3582 version: "1".into(),
3583 description: None,
3584 nodes: vec![
3585 make_node(
3586 "del",
3587 "action",
3588 serde_json::json!({"adapter":"fs","actionId":"delete-file","path":"x"}),
3589 ),
3590 make_node(
3591 "esc",
3592 "action",
3593 serde_json::json!({"adapter":"fs","actionId":"read-file","path":"../x"}),
3594 ),
3595 make_node(
3596 "ok",
3597 "action",
3598 serde_json::json!({"adapter":"fs","actionId":"read-file","path":"x"}),
3599 ),
3600 ],
3601 edges: vec![],
3602 };
3603 let w = verify_graph(&graph);
3604 assert!(w.iter().any(|x| x.node_id == "del" && x.severity == "destructive"));
3605 assert!(w
3606 .iter()
3607 .any(|x| x.node_id == "esc" && x.severity == "sandbox-escape"));
3608 assert!(!w.iter().any(|x| x.node_id == "ok"));
3609 }
3610
3611 #[test]
3612 fn fallback_node_swaps_provider_and_strips_fallback_fields() {
3613 let n = make_node(
3614 "ai",
3615 "ai",
3616 serde_json::json!({
3617 "provider": "claude",
3618 "modelId": "m1",
3619 "fallbackProvider": "local",
3620 "fallbackModelId": "m2",
3621 "input": "x"
3622 }),
3623 );
3624 let fb = fallback_node(&n).expect("has fallback");
3625 assert_eq!(fb.data.get("provider").unwrap(), "local");
3626 assert_eq!(fb.data.get("modelId").unwrap(), "m2");
3627 assert!(fb.data.get("fallbackProvider").is_none());
3628 assert!(fb.data.get("fallbackModelId").is_none());
3629 assert_eq!(fb.data.get("input").unwrap(), "x"); let no_fb = make_node(
3633 "ai",
3634 "ai",
3635 serde_json::json!({"provider":"claude","modelId":"m1"}),
3636 );
3637 assert!(fallback_node(&no_fb).is_none());
3638 }
3639
3640 #[test]
3641 fn run_control_phase_transitions() {
3642 let c = RunControl::default();
3643 assert_eq!(c.phase(), RunPhase::Running);
3644 c.pause();
3645 assert_eq!(c.phase(), RunPhase::Paused);
3646 c.resume();
3647 assert_eq!(c.phase(), RunPhase::Running);
3648 c.cancel();
3649 assert!(c.is_cancelling());
3650 c.pause();
3652 c.resume();
3653 assert!(c.is_cancelling());
3654 c.reset();
3656 assert_eq!(c.phase(), RunPhase::Running);
3657 }
3658
3659 #[tokio::test]
3660 async fn wait_while_paused_returns_immediately_when_not_paused() {
3661 let c = RunControl::default();
3662 c.wait_while_paused().await;
3664 c.cancel();
3666 c.wait_while_paused().await;
3667 }
3668
3669 #[tokio::test]
3670 async fn wait_while_paused_unblocks_on_resume() {
3671 let c = Arc::new(RunControl::default());
3672 c.pause();
3673 let c2 = c.clone();
3674 let waiter = tokio::spawn(async move { c2.wait_while_paused().await });
3675 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3677 c.resume();
3678 tokio::time::timeout(std::time::Duration::from_secs(1), waiter)
3679 .await
3680 .expect("waiter did not wake within timeout")
3681 .expect("waiter task panicked");
3682 assert_eq!(c.phase(), RunPhase::Running);
3683 }
3684
3685 struct CancelTriggerAdapter {
3688 control: SharedRunControl,
3689 }
3690
3691 #[async_trait::async_trait]
3692 impl Adapter for CancelTriggerAdapter {
3693 fn name(&self) -> &str {
3694 "cancel-trigger"
3695 }
3696 async fn execute(&self, _node: &FlowNode) -> Result<NodeOutput, AdapterError> {
3697 self.control.cancel();
3698 Ok(NodeOutput {
3699 status: "succeeded".into(),
3700 payload: serde_json::Value::Null,
3701 })
3702 }
3703 }
3704
3705 #[tokio::test]
3706 async fn cancel_skips_remaining_nodes_and_reports_cancelled() {
3707 let (mut exec, sink) = build_executor().await;
3708 let control = exec.control.clone();
3709 let trigger: Arc<dyn Adapter> = Arc::new(CancelTriggerAdapter { control });
3710 let mut adapters = AdapterRegistry::new();
3712 adapters.register(Arc::new(MockAdapter::with_delay(
3713 "mock",
3714 std::time::Duration::from_millis(1),
3715 )));
3716 adapters.register(trigger);
3717 exec.adapters = Arc::new(adapters);
3718
3719 let graph = FlowGraph {
3721 subflows: Vec::new(),
3722 id: "g".into(),
3723 name: "g".into(),
3724 version: "1".into(),
3725 description: None,
3726 nodes: vec![
3727 make_node("n1", "action", serde_json::json!({ "adapter": "cancel-trigger" })),
3728 make_node("n2", "action", serde_json::json!({ "adapter": "mock" })),
3729 make_node("n3", "action", serde_json::json!({ "adapter": "mock" })),
3730 ],
3731 edges: vec![
3732 make_edge("e1", "n1", "n2"),
3733 make_edge("e2", "n2", "n3"),
3734 ],
3735 };
3736
3737 let summary = exec.run(&graph).await.unwrap();
3738 assert_eq!(summary.status, "cancelled", "cancel yields cancelled status");
3739 assert_eq!(count_succeeded(&sink, "n1"), 1, "the triggering node still completes");
3740 assert!(was_skipped(&sink, "n2"), "downstream node is skipped after cancel");
3741 assert!(was_skipped(&sink, "n3"), "all remaining nodes are skipped after cancel");
3742 }
3743}