Skip to main content

flow_execution/
ai_tools.rs

1//! AI-node execution support: exposing bound adapters to a model as callable
2//! tools, dispatching the model's tool calls back to those adapters, and
3//! resolving image references for vision calls.
4//!
5//! This is the in-node tool-use path - distinct from whole-flow agentic
6//! generation. The model emits tool calls, [`AdapterToolDispatcher`] runs each
7//! through the *existing* adapter (so the fs/shell sandboxes apply unchanged),
8//! and the result is fed back to the model.
9
10use std::sync::Arc;
11
12use chrono::Utc;
13use flow_adapter_ai::{ToolDispatcher, ToolSpec};
14use flow_domain::graph::{FlowNode, Position};
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Map, Value};
17
18use crate::adapter::{AdapterRegistry, FieldType};
19use crate::events::{EventSink, ExecutionEvent};
20
21/// Separator between adapter name and action id in a tool name
22/// (`fs__read-file`). `__` avoids clashing with the `-` inside action ids.
23const SEP: &str = "__";
24
25/// Default cap on the in-node tool-use loop (how many times the model may call
26/// tools before the run gives up). Overridable per node via `maxToolIters`
27/// ([`ai_max_tool_iters`]).
28pub const DEFAULT_TOOL_ITERATIONS: usize = 8;
29
30/// Observes each model tool call so the dispatcher can emit an
31/// [`ExecutionEvent::ToolCall`] just before the tool runs - giving the UI live
32/// per-tool-call granularity during an AI node's tool loop (the dispatcher
33/// otherwise emits nothing per call). Cheap to clone (an `Arc` + two ids).
34#[derive(Clone)]
35pub struct ToolObserver {
36    sink: Arc<dyn EventSink>,
37    execution_id: String,
38    node_id: String,
39}
40
41impl ToolObserver {
42    pub fn new(sink: Arc<dyn EventSink>, execution_id: String, node_id: String) -> Self {
43        Self {
44            sink,
45            execution_id,
46            node_id,
47        }
48    }
49
50    /// Emit a `ToolCall` event for the call about to run.
51    fn observe(&self, name: &str, args: &Value) {
52        self.sink.emit(ExecutionEvent::ToolCall {
53            execution_id: self.execution_id.clone(),
54            node_id: self.node_id.clone(),
55            at: Utc::now(),
56            name: name.to_string(),
57            summary: summarize_tool_call(name, args),
58        });
59    }
60}
61
62/// One-line, human-readable summary of a model tool call for the `ToolCall`
63/// event: the action id plus the most salient argument (path / command /
64/// pattern / query / url), e.g. `edit-file src/foo.rs`. The salient value is
65/// truncated so a giant `content`/`command` can't bloat a log line.
66fn summarize_tool_call(name: &str, args: &Value) -> String {
67    let action = name.split_once(SEP).map(|(_, a)| a).unwrap_or(name);
68    let detail = ["path", "command", "args", "pattern", "query", "url"]
69        .iter()
70        .find_map(|k| args.get(*k).and_then(|v| v.as_str()))
71        .map(str::trim)
72        .filter(|s| !s.is_empty());
73    match detail {
74        Some(d) => {
75            let mut short: String = d.chars().take(80).collect();
76            if short.len() < d.len() {
77                short.push('…');
78            }
79            format!("{action} {short}")
80        }
81        None => action.to_string(),
82    }
83}
84
85/// Fields the dispatcher injects (routing + sandbox root), so they're omitted
86/// from the model-facing tool schema.
87const INJECTED: &[&str] = &["workspaceRoot", "cwd", "adapter", "actionId"];
88
89/// Build model tool specs from the descriptors of the bound adapters. Each
90/// adapter action becomes one tool `"{adapter}__{actionId}"`; its required +
91/// optional fields become JSON-schema properties.
92pub fn build_tool_specs(adapters: &AdapterRegistry, bound: &[String]) -> Vec<ToolSpec> {
93    let mut specs = Vec::new();
94    for adapter_name in bound {
95        let Some(adapter) = adapters.get(adapter_name) else {
96            continue;
97        };
98        let desc = adapter.descriptor();
99        for action in &desc.actions {
100            let mut props = Map::new();
101            let mut required = Vec::new();
102            for f in action
103                .required_fields
104                .iter()
105                .chain(action.optional_fields.iter())
106            {
107                if INJECTED.contains(&f.name.as_str()) {
108                    continue;
109                }
110                props.insert(
111                    f.name.clone(),
112                    json!({ "type": json_type(f.value_type), "description": f.description }),
113                );
114            }
115            for f in &action.required_fields {
116                if INJECTED.contains(&f.name.as_str()) {
117                    continue;
118                }
119                required.push(Value::String(f.name.clone()));
120            }
121            specs.push(ToolSpec {
122                name: format!("{}{SEP}{}", desc.name, action.id),
123                description: action.summary.clone(),
124                parameters: json!({
125                    "type": "object",
126                    "properties": props,
127                    "required": required,
128                }),
129            });
130        }
131    }
132    specs
133}
134
135fn json_type(t: FieldType) -> &'static str {
136    match t {
137        FieldType::Number => "number",
138        FieldType::Bool => "boolean",
139        FieldType::String | FieldType::Null => "string",
140    }
141}
142
143/// Runs a model tool call by mapping `"{adapter}__{actionId}"` back to an
144/// adapter action: it synthesizes a node from the model's args (plus the AI
145/// node's workspace as `workspaceRoot`/`cwd`) and executes it through the bound
146/// adapter, so all existing sandboxing/policy applies.
147pub struct AdapterToolDispatcher {
148    adapters: Arc<AdapterRegistry>,
149    workspace: Option<String>,
150    observer: Option<ToolObserver>,
151}
152
153impl AdapterToolDispatcher {
154    pub fn new(adapters: Arc<AdapterRegistry>, workspace: Option<String>) -> Self {
155        Self {
156            adapters,
157            workspace,
158            observer: None,
159        }
160    }
161
162    /// Attach a [`ToolObserver`] so each dispatched call emits a `ToolCall`
163    /// event for live UI granularity.
164    pub fn with_observer(mut self, observer: ToolObserver) -> Self {
165        self.observer = Some(observer);
166        self
167    }
168}
169
170#[async_trait::async_trait]
171impl ToolDispatcher for AdapterToolDispatcher {
172    async fn call(&self, name: &str, args: &Value) -> Result<Value, String> {
173        if let Some(o) = &self.observer {
174            o.observe(name, args);
175        }
176        let (adapter_name, action_id) = name
177            .split_once(SEP)
178            .ok_or_else(|| format!("malformed tool name '{name}'"))?;
179        let adapter = self
180            .adapters
181            .get(adapter_name)
182            .ok_or_else(|| format!("adapter '{adapter_name}' not registered"))?;
183
184        let mut data: Map<String, Value> = match args {
185            Value::Object(m) => m.clone(),
186            _ => Map::new(),
187        };
188        data.insert("adapter".into(), json!(adapter_name));
189        data.insert("actionId".into(), json!(action_id));
190        if let Some(ws) = &self.workspace {
191            data.entry("workspaceRoot".to_string())
192                .or_insert_with(|| json!(ws));
193            data.entry("cwd".to_string()).or_insert_with(|| json!(ws));
194        }
195
196        let node = FlowNode {
197            id: format!("tool-{adapter_name}-{action_id}"),
198            node_type: "action".into(),
199            position: Position { x: 0.0, y: 0.0 },
200            data: Value::Object(data),
201        };
202
203        match adapter.execute(&node).await {
204            Ok(out) => Ok(serde_json::to_value(out).unwrap_or(Value::Null)),
205            Err(e) => Err(e.to_string()),
206        }
207    }
208}
209
210// ---------- staged (review-before-write) tool dispatch ----------
211
212/// A single filesystem change the agent proposed during a turn, computed but
213/// **not** applied. The IDE renders each as a diff (`before` → `after`) the
214/// user accepts or rejects.
215#[derive(Debug, Clone, Serialize, Deserialize)]
216pub struct ProposedEdit {
217    /// Path relative to the workspace root.
218    pub path: String,
219    /// `"create"`, `"modify"`, or `"delete"`, derived from before/after.
220    pub kind: String,
221    /// File contents before the turn (`None` if the file did not exist).
222    pub before: Option<String>,
223    /// File contents after the proposed change (`None` if deleted).
224    pub after: Option<String>,
225}
226
227/// Net staged edits for one agent turn: the per-path result plus first-touch
228/// order, so the IDE shows one stable diff per file.
229#[derive(Default)]
230pub struct StagingState {
231    order: Vec<String>,
232    by_path: std::collections::HashMap<String, ProposedEdit>,
233}
234
235impl StagingState {
236    /// The proposed edits in first-touch order, one per path.
237    pub fn edits(&self) -> Vec<ProposedEdit> {
238        self.order
239            .iter()
240            .filter_map(|p| self.by_path.get(p).cloned())
241            .collect()
242    }
243
244    /// The staged result for `path` if the turn has touched it: `Some(Some(c))`
245    /// = staged content, `Some(None)` = staged-deleted, `None` = not staged.
246    fn staged(&self, path: &str) -> Option<Option<String>> {
247        self.by_path.get(path).map(|e| e.after.clone())
248    }
249
250    fn record(&mut self, path: &str, before: Option<String>, after: Option<String>) {
251        // Keep the *original* `before` across repeated edits to the same file.
252        let before = match self.by_path.get(path) {
253            Some(existing) => existing.before.clone(),
254            None => {
255                self.order.push(path.to_string());
256                before
257            }
258        };
259        let kind = if after.is_none() {
260            "delete"
261        } else if before.is_none() {
262            "create"
263        } else {
264            "modify"
265        };
266        self.by_path.insert(
267            path.to_string(),
268            ProposedEdit {
269                path: path.to_string(),
270                kind: kind.to_string(),
271                before,
272                after,
273            },
274        );
275    }
276}
277
278/// Shared handle to a turn's [`StagingState`]. `FlowApp::run_agent_turn`
279/// creates one, the executor hands it to the dispatcher, and the caller reads
280/// the proposed edits back out after the run.
281pub type StagedEdits = Arc<std::sync::Mutex<StagingState>>;
282
283/// Tool dispatcher for a coding-agent *turn*: filesystem **mutations**
284/// (`fs` `write-file`/`edit-file`/`delete-file`) are staged into a buffer
285/// instead of written to disk, so the turn can be reviewed before it lands.
286/// Everything else - `fs` reads, `glob`/`grep`/`list-dir`, and every `shell`
287/// tool - runs for real through [`AdapterToolDispatcher`]. A `read-file` of a
288/// staged path is served from the overlay, so the model sees its own pending
289/// edits within the same turn.
290pub struct StagingToolDispatcher {
291    inner: AdapterToolDispatcher,
292    state: StagedEdits,
293    observer: Option<ToolObserver>,
294}
295
296impl StagingToolDispatcher {
297    pub fn new(
298        adapters: Arc<AdapterRegistry>,
299        workspace: Option<String>,
300        state: StagedEdits,
301    ) -> Self {
302        // `inner` deliberately has no observer: this (outer) dispatcher emits
303        // one `ToolCall` per model call at the top of `call`, so delegated and
304        // internal-read passthroughs to `inner` must not emit again.
305        Self {
306            inner: AdapterToolDispatcher::new(adapters, workspace),
307            state,
308            observer: None,
309        }
310    }
311
312    /// Attach a [`ToolObserver`] so each staged-or-delegated call emits a
313    /// `ToolCall` event for live UI granularity.
314    pub fn with_observer(mut self, observer: ToolObserver) -> Self {
315        self.observer = Some(observer);
316        self
317    }
318
319    /// Current effective content of `path`: the staged overlay if the turn has
320    /// already touched it, otherwise what's on disk. `None` means absent
321    /// everywhere (including staged-deleted).
322    async fn effective_content(&self, path: &str) -> Option<String> {
323        if let Ok(st) = self.state.lock() {
324            if let Some(staged) = st.staged(path) {
325                return staged;
326            }
327        }
328        self.read_disk(path).await
329    }
330
331    /// Read `path` from disk through the real fs adapter (so the workspace jail
332    /// applies). `None` on any error (missing / unreadable).
333    async fn read_disk(&self, path: &str) -> Option<String> {
334        match self.inner.call("fs__read-file", &json!({ "path": path })).await {
335            Ok(v) => v
336                .get("payload")
337                .and_then(|p| p.get("content"))
338                .and_then(|c| c.as_str())
339                .map(str::to_string),
340            Err(_) => None,
341        }
342    }
343
344    fn record(&self, path: &str, before: Option<String>, after: Option<String>) {
345        if let Ok(mut st) = self.state.lock() {
346            st.record(path, before, after);
347        }
348    }
349
350    async fn stage_write(&self, args: &Value) -> Result<Value, String> {
351        let path = arg_str(args, "path")?;
352        let content = arg_str(args, "content")?;
353        let before = self.read_disk(&path).await;
354        self.record(&path, before, Some(content.clone()));
355        Ok(staged_ok(
356            "write-file",
357            &path,
358            json!({ "bytesWritten": content.len() }),
359        ))
360    }
361
362    async fn stage_edit(&self, args: &Value) -> Result<Value, String> {
363        let path = arg_str(args, "path")?;
364        let old = arg_str(args, "oldString")?;
365        let new = arg_str(args, "newString")?;
366        let replace_all = args
367            .get("replaceAll")
368            .and_then(|v| v.as_bool())
369            .unwrap_or(false);
370
371        // Edit the current effective content (overlay first, then disk), so a
372        // file created/edited earlier in the same turn edits correctly.
373        let Some(base) = self.effective_content(&path).await else {
374            return Err(format!("oldString not found in {path}"));
375        };
376        let count = base.matches(&old).count();
377        if count == 0 {
378            return Err(format!("oldString not found in {path}"));
379        }
380        if count > 1 && !replace_all {
381            return Err(format!(
382                "oldString occurs {count}× in {path}; pass replaceAll: true or add context to make it unique"
383            ));
384        }
385        let updated = if replace_all {
386            base.replace(&old, &new)
387        } else {
388            base.replacen(&old, &new, 1)
389        };
390        // `record` keeps the original `before` on repeat touches, so passing
391        // the on-disk state here is correct for a first-touch edit and ignored
392        // for a file already created/edited this turn.
393        let before = self.read_disk(&path).await;
394        self.record(&path, before, Some(updated));
395        Ok(staged_ok(
396            "edit-file",
397            &path,
398            json!({ "replacements": if replace_all { count } else { 1 } }),
399        ))
400    }
401
402    async fn stage_delete(&self, args: &Value) -> Result<Value, String> {
403        let path = arg_str(args, "path")?;
404        if self.effective_content(&path).await.is_none() {
405            return Err(format!("{path}: no such file"));
406        }
407        let before = self.read_disk(&path).await;
408        self.record(&path, before, None);
409        Ok(staged_ok("delete-file", &path, json!({})))
410    }
411
412    /// Serve a `read-file` from the overlay when the path is staged; `None`
413    /// falls through to disk.
414    fn overlay_read(&self, args: &Value) -> Option<Result<Value, String>> {
415        let path = args.get("path").and_then(|v| v.as_str())?.to_string();
416        let staged = self.state.lock().ok()?.staged(&path)?;
417        Some(match staged {
418            Some(content) => Ok(json!({
419                "status": "succeeded",
420                "payload": {
421                    "action": "read-file",
422                    "path": path,
423                    "bytes": content.len(),
424                    "content": content,
425                    "staged": true,
426                }
427            })),
428            None => Err(format!("read {path}: no such file (staged delete)")),
429        })
430    }
431}
432
433#[async_trait::async_trait]
434impl ToolDispatcher for StagingToolDispatcher {
435    async fn call(&self, name: &str, args: &Value) -> Result<Value, String> {
436        if let Some(o) = &self.observer {
437            o.observe(name, args);
438        }
439        if let Some((adapter, action)) = name.split_once(SEP) {
440            if adapter == "fs" {
441                match action {
442                    "write-file" => return self.stage_write(args).await,
443                    "edit-file" => return self.stage_edit(args).await,
444                    "delete-file" => return self.stage_delete(args).await,
445                    "read-file" => {
446                        if let Some(res) = self.overlay_read(args) {
447                            return res;
448                        }
449                    }
450                    // glob / grep / list-dir read disk directly (MVP: the
451                    // overlay only resolves read-file).
452                    _ => {}
453                }
454            }
455        }
456        self.inner.call(name, args).await
457    }
458}
459
460/// A success result shaped like the real fs adapter's, tagged `staged` so the
461/// model (and logging) can tell the write was deferred.
462fn staged_ok(action: &str, path: &str, extra: Value) -> Value {
463    let mut payload = json!({ "action": action, "path": path, "staged": true });
464    if let (Some(obj), Some(ex)) = (payload.as_object_mut(), extra.as_object()) {
465        for (k, v) in ex {
466            obj.insert(k.clone(), v.clone());
467        }
468    }
469    json!({ "status": "succeeded", "payload": payload })
470}
471
472fn arg_str(args: &Value, key: &str) -> Result<String, String> {
473    args.get(key)
474        .and_then(|v| v.as_str())
475        .filter(|s| !s.is_empty())
476        .map(str::to_string)
477        .ok_or_else(|| format!("missing required field '{key}'"))
478}
479
480/// Resolve an image reference into something a multimodal API accepts: an
481/// `http(s)`/`data:` URL passes through unchanged; a local path is read and
482/// encoded into a `data:<mime>;base64,…` URL (local inference servers can't
483/// fetch arbitrary URLs). Returns an error string the caller surfaces.
484pub fn resolve_image_ref(raw: &str) -> Result<String, String> {
485    let r = raw.trim();
486    if r.is_empty() {
487        return Err("empty image reference".into());
488    }
489    if r.starts_with("http://") || r.starts_with("https://") || r.starts_with("data:") {
490        return Ok(r.to_string());
491    }
492    let bytes = std::fs::read(r).map_err(|e| format!("read image {r}: {e}"))?;
493    Ok(format!("data:{};base64,{}", mime_for(r), b64_encode(&bytes)))
494}
495
496fn mime_for(path: &str) -> &'static str {
497    let ext = path.rsplit('.').next().unwrap_or("").to_ascii_lowercase();
498    match ext.as_str() {
499        "jpg" | "jpeg" => "image/jpeg",
500        "gif" => "image/gif",
501        "webp" => "image/webp",
502        _ => "image/png",
503    }
504}
505
506/// Minimal standard-alphabet base64 encoder (avoids pulling a crate for the
507/// single use of building image `data:` URLs).
508fn b64_encode(data: &[u8]) -> String {
509    const T: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
510    let mut out = String::with_capacity(data.len().div_ceil(3) * 4);
511    for chunk in data.chunks(3) {
512        let b0 = chunk[0];
513        let b1 = *chunk.get(1).unwrap_or(&0);
514        let b2 = *chunk.get(2).unwrap_or(&0);
515        let n = ((b0 as u32) << 16) | ((b1 as u32) << 8) | (b2 as u32);
516        out.push(T[((n >> 18) & 63) as usize] as char);
517        out.push(T[((n >> 12) & 63) as usize] as char);
518        out.push(if chunk.len() > 1 {
519            T[((n >> 6) & 63) as usize] as char
520        } else {
521            '='
522        });
523        out.push(if chunk.len() > 2 {
524            T[(n & 63) as usize] as char
525        } else {
526            '='
527        });
528    }
529    out
530}
531
532// ---------- node field readers + label matching ----------
533
534/// The AI node's dispatch task: `"generate"` (default), `"embedding"`, or
535/// `"classify"`. The inspector stamps it from the bound model's capability /
536/// whether labels are set.
537pub fn ai_task(node: &FlowNode) -> String {
538    node.data
539        .get("task")
540        .and_then(|v| v.as_str())
541        .filter(|s| !s.is_empty())
542        .unwrap_or("generate")
543        .to_string()
544}
545
546/// Per-request reasoning toggle from the node's `thinking` flag.
547pub fn ai_reasoning(node: &FlowNode) -> Option<bool> {
548    node.data.get("thinking").and_then(|v| v.as_bool())
549}
550
551/// Classification labels (trimmed, non-empty) from the node's `labels` array.
552pub fn ai_labels(node: &FlowNode) -> Vec<String> {
553    node.data
554        .get("labels")
555        .and_then(|v| v.as_array())
556        .map(|a| {
557            a.iter()
558                .filter_map(|x| x.as_str().map(|s| s.trim().to_string()))
559                .filter(|s| !s.is_empty())
560                .collect()
561        })
562        .unwrap_or_default()
563}
564
565/// Adapter names the node exposes to the model as tools (its `tools` array).
566pub fn ai_tool_adapters(node: &FlowNode) -> Vec<String> {
567    node.data
568        .get("tools")
569        .and_then(|v| v.as_array())
570        .map(|a| {
571            a.iter()
572                .filter_map(|x| x.as_str().map(str::to_string))
573                .collect()
574        })
575        .unwrap_or_default()
576}
577
578/// Per-node cap on the in-node tool-use loop, read from the node's
579/// `maxToolIters` field; falls back to [`DEFAULT_TOOL_ITERATIONS`]. Clamped to
580/// at least 1 so a `0` can't silently disable tool use.
581pub fn ai_max_tool_iters(node: &FlowNode) -> usize {
582    node.data
583        .get("maxToolIters")
584        .and_then(|v| v.as_u64())
585        .map(|v| (v as usize).max(1))
586        .unwrap_or(DEFAULT_TOOL_ITERATIONS)
587}
588
589/// The structured-output JSON schema (raw text) from the node's `outputSchema`
590/// field, used by the `"structured"` task. Stored as a string so it round-trips
591/// through the DSL; the model is prompted to produce JSON matching it. `None`
592/// when unset or blank.
593pub fn ai_output_schema(node: &FlowNode) -> Option<String> {
594    let s = match node.data.get("outputSchema")? {
595        Value::String(s) => s.clone(),
596        // Tolerate an object/array set programmatically by stringifying it.
597        v @ (Value::Object(_) | Value::Array(_)) => v.to_string(),
598        _ => return None,
599    };
600    let s = s.trim().to_string();
601    if s.is_empty() {
602        None
603    } else {
604        Some(s)
605    }
606}
607
608/// The node's natural-language output expectation (`expect` field) for per-node
609/// AI evaluation (roadmap E1). When set, the generate path self-evaluates the
610/// output against it and fails the node - engaging the monitor/fix flow - if
611/// unmet. `None` when unset/blank.
612pub fn ai_expect(node: &FlowNode) -> Option<String> {
613    node.data
614        .get("expect")
615        .and_then(|v| v.as_str())
616        .map(str::trim)
617        .filter(|s| !s.is_empty())
618        .map(str::to_string)
619}
620
621/// Best-effort parse of a model's text into a JSON value: a direct parse first,
622/// else the substring between the first `{` and last `}` - tolerates markdown
623/// fences or surrounding prose a model adds around structured output.
624pub fn parse_json_lenient(text: &str) -> Option<Value> {
625    if let Ok(v) = serde_json::from_str::<Value>(text.trim()) {
626        return Some(v);
627    }
628    let start = text.find('{')?;
629    let end = text.rfind('}')?;
630    if end <= start {
631        return None;
632    }
633    serde_json::from_str(&text[start..=end]).ok()
634}
635
636/// Resolve the node's `imageUrl` (plus an optional `images` array) into
637/// ready-to-send references (URLs pass through; local paths become `data:`
638/// URLs). Empty when the node carries no images.
639pub fn resolve_node_images(node: &FlowNode) -> Result<Vec<String>, String> {
640    let mut refs: Vec<String> = Vec::new();
641    if let Some(s) = node
642        .data
643        .get("imageUrl")
644        .and_then(|v| v.as_str())
645        .filter(|s| !s.trim().is_empty())
646    {
647        refs.push(s.to_string());
648    }
649    if let Some(arr) = node.data.get("images").and_then(|v| v.as_array()) {
650        for x in arr {
651            if let Some(s) = x.as_str().filter(|s| !s.trim().is_empty()) {
652                refs.push(s.to_string());
653            }
654        }
655    }
656    refs.iter().map(|r| resolve_image_ref(r)).collect()
657}
658
659/// Pick the label the model's output best corresponds to: an exact
660/// (case-insensitive) match first, then a label contained in the output, else
661/// the first label as a safe default.
662pub fn match_label(output: &str, labels: &[String]) -> String {
663    let o = output.trim().to_lowercase();
664    if let Some(l) = labels.iter().find(|l| l.to_lowercase() == o) {
665        return l.clone();
666    }
667    if let Some(l) = labels.iter().find(|l| o.contains(&l.to_lowercase())) {
668        return l.clone();
669    }
670    labels.first().cloned().unwrap_or_default()
671}
672
673#[cfg(test)]
674mod tests {
675    use super::*;
676
677    #[test]
678    fn matches_labels_by_exact_then_contains() {
679        let labels = vec!["pass".to_string(), "fail".to_string()];
680        assert_eq!(match_label("PASS", &labels), "pass");
681        assert_eq!(match_label("the result is fail.", &labels), "fail");
682        assert_eq!(match_label("???", &labels), "pass"); // first as default
683    }
684
685    #[test]
686    fn max_tool_iters_defaults_and_overrides() {
687        let mk = |data: Value| FlowNode {
688            id: "n".into(),
689            node_type: "ai".into(),
690            position: Position { x: 0.0, y: 0.0 },
691            data,
692        };
693        assert_eq!(ai_max_tool_iters(&mk(json!({}))), DEFAULT_TOOL_ITERATIONS);
694        assert_eq!(ai_max_tool_iters(&mk(json!({ "maxToolIters": 3 }))), 3);
695        // 0 is clamped to 1 so tool use can't be silently disabled.
696        assert_eq!(ai_max_tool_iters(&mk(json!({ "maxToolIters": 0 }))), 1);
697    }
698
699    #[test]
700    fn tool_call_summary_picks_salient_arg() {
701        assert_eq!(
702            summarize_tool_call("fs__edit-file", &json!({ "path": "src/foo.rs" })),
703            "edit-file src/foo.rs"
704        );
705        assert_eq!(
706            summarize_tool_call("shell__run-command", &json!({ "command": "cargo test" })),
707            "run-command cargo test"
708        );
709        // No salient arg → just the action id.
710        assert_eq!(summarize_tool_call("fs__list-dir", &json!({})), "list-dir");
711    }
712
713    #[test]
714    fn output_schema_reads_string_and_trims() {
715        let mk = |data: Value| FlowNode {
716            id: "n".into(),
717            node_type: "ai".into(),
718            position: Position { x: 0.0, y: 0.0 },
719            data,
720        };
721        assert_eq!(
722            ai_output_schema(&mk(json!({ "outputSchema": "  {\"type\":\"object\"}  " })))
723                .as_deref(),
724            Some("{\"type\":\"object\"}")
725        );
726        assert!(ai_output_schema(&mk(json!({ "outputSchema": "   " }))).is_none());
727        assert!(ai_output_schema(&mk(json!({}))).is_none());
728    }
729
730    #[test]
731    fn expect_reads_and_trims() {
732        let mk = |data: Value| FlowNode {
733            id: "n".into(),
734            node_type: "ai".into(),
735            position: Position { x: 0.0, y: 0.0 },
736            data,
737        };
738        assert_eq!(
739            ai_expect(&mk(json!({ "expect": "  is valid JSON  " }))).as_deref(),
740            Some("is valid JSON")
741        );
742        assert!(ai_expect(&mk(json!({ "expect": "   " }))).is_none());
743        assert!(ai_expect(&mk(json!({}))).is_none());
744    }
745
746    #[test]
747    fn parse_json_lenient_handles_fences_and_prose() {
748        assert_eq!(parse_json_lenient(r#"{"a":1}"#), Some(json!({"a":1})));
749        assert_eq!(
750            parse_json_lenient("```json\n{\"a\":1}\n```"),
751            Some(json!({"a":1}))
752        );
753        assert_eq!(
754            parse_json_lenient("Sure! Here you go:\n{\"a\":1}\nHope that helps"),
755            Some(json!({"a":1}))
756        );
757        assert_eq!(parse_json_lenient("not json"), None);
758    }
759
760    #[tokio::test]
761    async fn dispatcher_emits_tool_call_before_running() {
762        use crate::events::CapturingSink;
763        let sink = CapturingSink::default();
764        // Empty registry: the call errors *after* the observer fires, so the
765        // ToolCall event is still captured (it's emitted before adapter lookup).
766        let d = AdapterToolDispatcher::new(Arc::new(AdapterRegistry::new()), None).with_observer(
767            ToolObserver::new(Arc::new(sink.clone()), "exec-1".into(), "node-1".into()),
768        );
769        let _ = d.call("fs__edit-file", &json!({ "path": "src/foo.rs" })).await;
770
771        let events = sink.events.lock().unwrap();
772        let found = events.iter().find_map(|e| match e {
773            ExecutionEvent::ToolCall {
774                name,
775                summary,
776                execution_id,
777                node_id,
778                ..
779            } => Some((
780                name.clone(),
781                summary.clone(),
782                execution_id.clone(),
783                node_id.clone(),
784            )),
785            _ => None,
786        });
787        assert_eq!(
788            found,
789            Some((
790                "fs__edit-file".into(),
791                "edit-file src/foo.rs".into(),
792                "exec-1".into(),
793                "node-1".into()
794            ))
795        );
796    }
797
798    #[test]
799    fn base64_matches_known_vectors() {
800        assert_eq!(b64_encode(b""), "");
801        assert_eq!(b64_encode(b"f"), "Zg==");
802        assert_eq!(b64_encode(b"fo"), "Zm8=");
803        assert_eq!(b64_encode(b"foo"), "Zm9v");
804        assert_eq!(b64_encode(b"foob"), "Zm9vYg==");
805        assert_eq!(b64_encode(b"Man"), "TWFu");
806    }
807
808    #[test]
809    fn urls_pass_through_paths_encode() {
810        assert_eq!(
811            resolve_image_ref("https://x/y.png").unwrap(),
812            "https://x/y.png"
813        );
814        assert_eq!(resolve_image_ref("data:image/png;base64,AA").unwrap(), "data:image/png;base64,AA");
815        assert!(resolve_image_ref("/no/such/file.png").is_err());
816    }
817
818    #[tokio::test]
819    async fn staging_defers_writes_and_overlays_reads() {
820        use crate::adapter::{Adapter, AdapterDescriptor, AdapterError, NodeOutput};
821
822        // Minimal "fs" adapter that only serves read-file from a temp dir, so
823        // the staging dispatcher can compute `before`/overlay without the real
824        // flow-adapter-fs crate (which would be a dependency cycle).
825        struct DiskReadFs;
826        #[async_trait::async_trait]
827        impl Adapter for DiskReadFs {
828            fn name(&self) -> &str {
829                "fs"
830            }
831            async fn execute(&self, node: &FlowNode) -> Result<NodeOutput, AdapterError> {
832                let action = node.data.get("actionId").and_then(|v| v.as_str()).unwrap_or("");
833                let root = node.data.get("workspaceRoot").and_then(|v| v.as_str()).unwrap_or("");
834                let path = node.data.get("path").and_then(|v| v.as_str()).unwrap_or("");
835                if action != "read-file" {
836                    return Err(AdapterError::Rejected {
837                        adapter: "fs".into(),
838                        node_id: node.id.clone(),
839                        reason: format!("test adapter only serves read-file, got {action}"),
840                    });
841                }
842                match std::fs::read_to_string(std::path::Path::new(root).join(path)) {
843                    Ok(content) => Ok(NodeOutput {
844                        status: "succeeded".into(),
845                        payload: json!({ "action": "read-file", "path": path, "content": content }),
846                    }),
847                    Err(e) => Err(AdapterError::Failed {
848                        adapter: "fs".into(),
849                        node_id: node.id.clone(),
850                        reason: e.to_string(),
851                    }),
852                }
853            }
854            fn descriptor(&self) -> AdapterDescriptor {
855                AdapterDescriptor { name: "fs".into(), summary: String::new(), actions: Vec::new() }
856            }
857        }
858
859        let root = std::env::temp_dir().join(format!("flow-staging-{}", std::process::id()));
860        let _ = std::fs::remove_dir_all(&root);
861        std::fs::create_dir_all(&root).unwrap();
862        std::fs::write(root.join("foo.txt"), "hello").unwrap();
863
864        let mut reg = AdapterRegistry::new();
865        reg.register(Arc::new(DiskReadFs));
866        let state: StagedEdits = Default::default();
867        let d = StagingToolDispatcher::new(
868            Arc::new(reg),
869            Some(root.to_string_lossy().to_string()),
870            state.clone(),
871        );
872
873        // write-file → staged, not applied to disk.
874        d.call("fs__write-file", &json!({"path":"bar.txt","content":"new"})).await.unwrap();
875        assert!(!root.join("bar.txt").exists(), "write must be staged, not applied");
876
877        // read-file of a staged new file is served from the overlay.
878        let r = d.call("fs__read-file", &json!({"path":"bar.txt"})).await.unwrap();
879        assert_eq!(r["payload"]["content"], "new");
880
881        // edit-file applies to the on-disk content, staged; disk stays put.
882        d.call("fs__edit-file", &json!({"path":"foo.txt","oldString":"hello","newString":"world"}))
883            .await
884            .unwrap();
885        let r2 = d.call("fs__read-file", &json!({"path":"foo.txt"})).await.unwrap();
886        assert_eq!(r2["payload"]["content"], "world", "overlay reflects the edit");
887        assert_eq!(std::fs::read_to_string(root.join("foo.txt")).unwrap(), "hello", "disk untouched");
888
889        // delete-file → staged; a later read errors but the file survives.
890        d.call("fs__delete-file", &json!({"path":"foo.txt"})).await.unwrap();
891        assert!(d.call("fs__read-file", &json!({"path":"foo.txt"})).await.is_err());
892        assert!(root.join("foo.txt").exists(), "delete must be staged, not applied");
893
894        // Net proposed edits, in first-touch order.
895        let edits = state.lock().unwrap().edits();
896        assert_eq!(edits.len(), 2);
897        assert_eq!(edits[0].path, "bar.txt");
898        assert_eq!(edits[0].kind, "create");
899        assert_eq!(edits[0].after.as_deref(), Some("new"));
900        assert_eq!(edits[0].before, None);
901        assert_eq!(edits[1].path, "foo.txt");
902        assert_eq!(edits[1].kind, "delete");
903        assert_eq!(edits[1].before.as_deref(), Some("hello"));
904        assert_eq!(edits[1].after, None);
905
906        let _ = std::fs::remove_dir_all(&root);
907    }
908}