Skip to main content

flow_application/
lib.rs

1use chrono::{DateTime, Utc};
2use flow_adapter_cli::CliAdapter;
3use flow_adapter_fs::FsAdapter;
4use flow_adapter_shell::ShellAdapter;
5use flow_adapter_utility::UtilityAdapter;
6use flow_adapter_ai::{
7    ClaudeProvider, CloudAiProvider, CloudAiRegistry, DeepSeekProvider, GeminiProvider,
8    LocalOpenAiProvider, NvidiaProvider, OpenAiProvider,
9};
10use flow_domain::execution::{ExecutionRequest, ExecutionResult, ExecutionStatus};
11use flow_domain::graph::{FlowEdge, FlowGraph, FlowNode, Position};
12use flow_execution::{
13    AdapterRegistry, CapturingSink, EventSink, ExecutionEvent,
14    ExecutionSummary, Executor, MockAdapter, MultiSink, ProposedEdit,
15};
16use flow_security::{
17    CredentialKind, CredentialResolver, CredentialStore, EnvFallbackResolver,
18    KeyringCredentialStore, PiiSanitizer,
19};
20use flow_storage::{ExecutionRecord, ExecutionStepRecord, InterceptionRecord, ScheduleRecord, Store};
21use std::path::PathBuf;
22use std::sync::Arc;
23use thiserror::Error;
24use uuid::Uuid;
25
26pub mod ansible_import;
27pub mod cli_tools;
28pub mod connections;
29pub mod node_hub;
30pub mod nodes;
31pub mod dsl_semantics;
32pub mod hub;
33pub mod install;
34// The local model-server lifecycle lives in its own crate so the CLI can reuse
35// it; re-exported under the original `llm_server` path to keep every call site
36// (here and in the desktop) unchanged.
37pub use flow_models_server as llm_server;
38pub mod scheduler;
39pub mod settings;
40pub mod template_hub;
41pub mod templates;
42pub use connections::{ConnectionStore, StoreError as ConnectionStoreError};
43pub use flow_execution::{ConnectionError, ConnectionLookup, FlowWarning, ResolvedZoweConnection};
44pub use install::{InstallProgress, ProgressCallback};
45pub use settings::{Settings, SettingsPatch, SettingsStore};
46pub use templates::TemplateRecord;
47
48#[derive(Debug, Error)]
49pub enum AppError {
50    #[error("graph not found: {0}")]
51    GraphNotFound(String),
52    #[error("validation failed: {0}")]
53    Validation(String),
54    #[error("ai invocation failed: {0}")]
55    AiInvocation(String),
56    #[error("execution failed: {0}")]
57    Execution(String),
58    #[error("storage error: {0}")]
59    Storage(String),
60    #[error("settings error: {0}")]
61    Settings(String),
62    #[error("template error: {0}")]
63    Template(String),
64    #[error("credential error: {0}")]
65    Credential(String),
66    #[error("dsl parse error at line {line}:{col}: {message}")]
67    Dsl {
68        line: u32,
69        col: u32,
70        message: String,
71    },
72}
73
74/// Outcome of a coding-agent turn run via [`FlowApp::run_agent_turn`]: the run
75/// summary plus the filesystem edits the agent proposed (staged, not yet
76/// applied) for the IDE to review and apply.
77#[derive(Debug, Clone, serde::Serialize)]
78pub struct AgentTurn {
79    pub summary: ExecutionSummary,
80    pub edits: Vec<ProposedEdit>,
81}
82
83pub struct FlowApp {
84    sanitizer: Arc<PiiSanitizer>,
85    adapters: Arc<AdapterRegistry>,
86    cloud_providers: Vec<Arc<dyn CloudAiProvider>>,
87    credentials: Arc<dyn CredentialStore>,
88    resolver: Arc<dyn CredentialResolver>,
89    connections: ConnectionStore,
90    settings: SettingsStore,
91    store: Store,
92    llm_server: crate::llm_server::LlmServerHandle,
93    /// Fan-out target for streaming LLM tokens. Installed by the host
94    /// (Tauri main wires a `TauriStreamSink` that broadcasts
95    /// `flow:llm_token`); defaults to a no-op for headless tests.
96    ///
97    /// Wrapped in `RwLock<Arc<...>>` because Tauri manages `FlowApp` as
98    /// shared state (`&FlowApp` only), but the sink needs to be
99    /// installed from the `setup` closure after the `AppHandle` exists.
100    /// Callers fetch a clone of the current Arc via `stream_sink()` so
101    /// long-running futures aren't holding the lock.
102    stream_sink: std::sync::RwLock<Arc<dyn flow_adapter_ai::LlmStreamSink>>,
103    /// Session working memory backing `{{memory.<key>}}`. Owned here (not on
104    /// the per-run `Executor`) so it survives across `execute_graph` calls -
105    /// i.e. across re-plan iterations within an agent session. Cleared on app
106    /// restart; reset mid-session via [`FlowApp::clear_working_memory`].
107    working_memory: flow_execution::WorkingMemory,
108    /// Active runs keyed by execution id, each with its own pause/resume/cancel
109    /// control so multiple flows (e.g. several canvas tabs) can run and be
110    /// steered independently. A fresh control is inserted by
111    /// [`FlowApp::execute_graph_with_id`] for the duration of a run and removed
112    /// on completion. Steered per-run via [`FlowApp::pause_run`] /
113    /// [`resume_run`] / [`cancel_run`], or across all runs via the `*_all_runs`
114    /// helpers (used by headless runners that track a single run).
115    runs: std::sync::Mutex<std::collections::HashMap<String, flow_execution::SharedRunControl>>,
116    /// Cancellation flag for an in-flight Hub model download. Set by
117    /// [`FlowApp::cancel_download`], polled by the streaming loop in
118    /// `hub::download`. One flag suffices - the frontend allows a single
119    /// download at a time.
120    download_cancel: Arc<std::sync::atomic::AtomicBool>,
121    /// Forces the per-step destructive-action confirmation gate off for this
122    /// `FlowApp`, regardless of the persisted setting. Set by headless runners
123    /// (the CLI / TUI) via [`FlowApp::disable_confirmation_gate`] since they
124    /// have no way to deliver a confirmation - a destructive node would block
125    /// forever. Desktop / server leave it `false` so the setting applies.
126    confirm_gate_disabled: std::sync::atomic::AtomicBool,
127    /// How this host derives a flow's default execution workspace when no
128    /// per-flow override is stored. Desktop/server use `Scratch`; flow-cli sets
129    /// `Fixed` to the launch dir via [`FlowApp::with_workspace_base`].
130    workspace_base: WorkspaceBase,
131}
132
133/// Where a flow's `shell` / `cli` / `fs` nodes run, before any explicit
134/// per-node `cwd` / `workspaceRoot`. The default is edition-specific so the
135/// shared core serves desktop, server, and CLI from one resolution path.
136#[derive(Debug, Clone)]
137pub enum WorkspaceBase {
138    /// Per-flow scratch at `<home>/.flow-studio/work/<flow_id>/`. HOME-based
139    /// (not redirected by `FLOW_STUDIO_DIR`), matching the shell scratch.
140    Scratch,
141    /// One directory shared by every flow - the flow-cli launch dir (or its
142    /// `--workspace`).
143    Fixed(PathBuf),
144}
145
146impl Default for WorkspaceBase {
147    fn default() -> Self {
148        WorkspaceBase::Scratch
149    }
150}
151
152impl WorkspaceBase {
153    fn resolve(&self, flow_id: &str) -> PathBuf {
154        match self {
155            WorkspaceBase::Scratch => FlowApp::home_dir()
156                .join(".flow-studio")
157                .join("work")
158                .join(flow_id),
159            WorkspaceBase::Fixed(path) => path.clone(),
160        }
161    }
162}
163
164/// Expand a leading `~/` against `$HOME`; other paths pass through unchanged.
165fn expand_home(raw: &str) -> PathBuf {
166    if let Some(rest) = raw.strip_prefix("~/") {
167        if let Some(home) = std::env::var_os("HOME") {
168            return PathBuf::from(home).join(rest);
169        }
170    }
171    PathBuf::from(raw)
172}
173
174impl Default for FlowApp {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180/// System prompt used when routing the Generate tab through a cloud provider.
181///
182/// Sourced from the shared `docs/dsl/spec_compiled.md` artifact (which flow-ml
183/// also syncs into the flow-graph-generator's training prompt). That file is
184/// regenerated by `scripts/build-dsl-spec.sh` (driven by this crate's
185/// `build.rs` whenever any source under `docs/dsl/` or any adapter
186/// `descriptor()` changes), so editing the spec without re-running the
187/// script is impossible: the build-time script call refreshes
188/// `spec_compiled.md` first.
189///
190/// The artifact begins with a one-paragraph "emit DSL only - no markdown
191/// fences, no preamble" header. `extract_dsl_text` below still strips an
192/// outer fence as a belt-and-braces measure for general-purpose cloud
193/// models that occasionally ignore the instruction.
194const DSL_GENERATION_SYSTEM_PROMPT: &str = include_str!("../../../docs/dsl/spec_compiled.md");
195
196/// Outcome of a hybrid flow-graph generation. Carries the DSL text plus a
197/// machine-readable record of which path produced it so the UI can
198/// render a "local" / "cloud (fallback)" tag without re-deriving the
199/// information.
200///
201/// Serialised to camelCase (via the Tauri command layer) so the
202/// frontend's `generateDslAuto` wrapper consumes
203/// `{ dsl, source, fallbackReason }`.
204#[derive(Debug, Clone, serde::Serialize)]
205pub struct DslGenerationOutcome {
206    pub dsl: String,
207    pub source: DslSource,
208    pub fallback_reason: Option<String>,
209    /// Optional natural-language summary of the generated DSL. Produced by
210    /// the agentic path via a second chat call; non-agentic generation
211    /// leaves this `None`. The frontend falls back to a deterministic
212    /// client-side synthesis when this is missing.
213    #[serde(default, skip_serializing_if = "Option::is_none")]
214    pub plan: Option<String>,
215}
216
217/// Safety gate for the autonomous agentic loop: the hard ceiling on
218/// generate→run→observe→re-plan cycles. The loop stops the moment a run has
219/// zero failures; this bounds the unhappy path so the agent can't spin
220/// indefinitely (and can't rack up unbounded model/compute cost).
221pub const MAX_AGENTIC_ITERATIONS: u32 = 5;
222
223/// Result of an autonomous agentic loop ([`FlowApp::agentic_run_loop`]).
224/// Serialised camelCase for the frontend, which loads `finalDsl` onto the
225/// canvas and renders the per-iteration `steps` as a run log.
226#[derive(Debug, Clone, serde::Serialize)]
227#[serde(rename_all = "camelCase")]
228pub struct AgenticLoopSummary {
229    /// Number of generate→run cycles actually performed (1..=MAX).
230    pub iterations: u32,
231    /// `"succeeded"` if a run reached zero failures; `"exhausted"` if the
232    /// iteration cap was hit without converging; `"budget_exhausted"` if the
233    /// wall-clock ceiling (`settings.max_agentic_seconds`) elapsed first.
234    pub status: String,
235    /// DSL from the last iteration (the converged graph on success, or the
236    /// final attempt on exhaustion).
237    pub final_dsl: String,
238    pub final_plan: Option<String>,
239    pub steps: Vec<AgenticLoopStep>,
240}
241
242/// One iteration of the autonomous loop: the run outcome plus the feedback
243/// that was fed into the next attempt (`None` on the converged iteration).
244#[derive(Debug, Clone, serde::Serialize)]
245#[serde(rename_all = "camelCase")]
246pub struct AgenticLoopStep {
247    pub iteration: u32,
248    pub run_status: String,
249    pub succeeded: usize,
250    pub failed: usize,
251    pub skipped: usize,
252    pub observations: Option<String>,
253}
254
255/// Host machine snapshot for the Model Hub device-compatibility check. RAM/disk
256/// are in binary GB (GiB) so they compare directly against the catalog's
257/// `minRamGb` / `minDiskGb`.
258#[derive(Debug, Clone, serde::Serialize)]
259#[serde(rename_all = "camelCase")]
260pub struct SystemInfo {
261    /// `macos` | `linux` | `windows` (from `std::env::consts::OS`).
262    pub os: String,
263    /// `aarch64` | `x86_64` | … (from `std::env::consts::ARCH`).
264    pub arch: String,
265    pub total_ram_gb: f64,
266    pub available_ram_gb: f64,
267    pub free_disk_gb: f64,
268    /// Logical CPU count (drives the threads slider max).
269    pub cpu_count: u32,
270}
271
272/// Tagged enum recording which side of the hybrid Auto path produced the
273/// DSL. The internally-tagged JSON form (`{ "kind": "local"|"cloud", ... }`)
274/// is what the frontend's `ModelChoice` discriminator expects to receive.
275#[derive(Debug, Clone, serde::Serialize)]
276#[serde(tag = "kind", rename_all = "lowercase")]
277pub enum DslSource {
278    Local {
279        #[serde(rename = "modelId")]
280        model_id: String,
281    },
282    Cloud {
283        provider: String,
284        #[serde(rename = "modelId")]
285        model_id: String,
286    },
287}
288
289/// Robustly extract the DSL body from a cloud model's response.
290///
291/// The system prompt instructs the model to emit DSL only, but Claude / GPT-4
292/// occasionally still wrap the response in a code fence or prefix a one-line
293/// preamble. We trim leading whitespace, strip a leading fence (` ``` ` with
294/// optional language tag), and strip a trailing fence if matched.
295fn extract_dsl_text(raw: &str) -> String {
296    let trimmed = raw.trim();
297    let with_fence_stripped = if trimmed.starts_with("```") {
298        let after_first = trimmed.split_once('\n').map(|x| x.1).unwrap_or("");
299        match after_first.rfind("```") {
300            Some(idx) => after_first[..idx].trim_end().to_string(),
301            None => after_first.to_string(),
302        }
303    } else {
304        trimmed.to_string()
305    };
306    escape_newlines_in_strings(with_fence_stripped.trim())
307}
308
309/// Escape raw newlines that appear *inside* double-quoted string literals.
310///
311/// Cloud models frequently emit multi-line file `content` with real line breaks
312/// inside the quotes, which the lexer rejects as `unterminated string` - and no
313/// amount of corrective prompting reliably stops some models from doing it. This
314/// normalizes those breaks to `\n` so the DSL parses, while leaving everything
315/// outside strings untouched. Backslash escapes are honored (`\"` does not close
316/// a string; `\\` is an escaped backslash), and `#` outside a string opens a
317/// line comment in which quotes are literal. A genuinely unterminated string
318/// (no closing quote before EOF) still runs to EOF and is reported as an error,
319/// so real mistakes aren't masked.
320fn escape_newlines_in_strings(src: &str) -> String {
321    let mut out = String::with_capacity(src.len());
322    let mut in_string = false;
323    let mut in_comment = false;
324    let mut escaped = false;
325    for ch in src.chars() {
326        if in_comment {
327            out.push(ch);
328            if ch == '\n' {
329                in_comment = false;
330            }
331            continue;
332        }
333        if in_string {
334            if escaped {
335                out.push(ch);
336                escaped = false;
337            } else if ch == '\\' {
338                out.push(ch);
339                escaped = true;
340            } else if ch == '"' {
341                out.push(ch);
342                in_string = false;
343            } else if ch == '\n' {
344                out.push_str("\\n");
345            } else if ch == '\r' {
346                // Drop a bare CR inside a string; a following LF becomes `\n`.
347            } else {
348                out.push(ch);
349            }
350            continue;
351        }
352        match ch {
353            '"' => {
354                in_string = true;
355                out.push(ch);
356            }
357            '#' => {
358                in_comment = true;
359                out.push(ch);
360            }
361            _ => out.push(ch),
362        }
363    }
364    out
365}
366
367/// One round-trip through a cloud provider with the flow-graph-generator system
368/// prompt. Pulled out of `generate_dsl_via_cloud` so the parse-then-retry
369/// path can reuse it without duplicating the `CloudAiRequest`
370/// construction or error mapping.
371async fn cloud_call_once(
372    provider: &dyn CloudAiProvider,
373    model: &str,
374    system: &str,
375    user: &str,
376    max_tokens: Option<u32>,
377    temperature: Option<f32>,
378    api_key: &str,
379    base_url: Option<String>,
380    sink: &dyn flow_adapter_ai::LlmStreamSink,
381) -> Result<(String, String), AppError> {
382    // Visible at the default `info` level so the server terminal shows an AI
383    // provider was actually called for generation (round 1 and each retry).
384    tracing::info!(
385        provider = provider.name(),
386        model,
387        "AI provider call for Flow DSL generation"
388    );
389    let req = flow_adapter_ai::CloudAiRequest {
390        model: model.to_string(),
391        prompt: user.to_string(),
392        max_tokens,
393        temperature,
394        api_key: api_key.to_string(),
395        system: Some(system.to_string()),
396        // Cloud providers ignore this (fixed endpoint); the `local`
397        // provider reads it as its on-device server address.
398        base_url,
399        // Flow-graph generation doesn't expose advanced sampling knobs and the
400        // response is structured DSL, so leave temperature and the rest at
401        // server defaults.
402        stream: true,
403        call_id: Some(format!("dsl-{}", uuid::Uuid::new_v4())),
404        ..Default::default()
405    };
406    let resp = provider
407        .invoke_stream(&req, sink)
408        .await
409        .map_err(|e| AppError::AiInvocation(e.to_string()))?;
410    // Hand back the finish/stop reason too: a response cut off at the token
411    // limit needs different handling (a clear error) than one that finished
412    // but wrote bad DSL (a corrective retry).
413    Ok((resp.text, resp.finish_reason))
414}
415
416/// Finish/stop reasons that mean the model was cut off at the token limit
417/// rather than completing its answer. OpenAI-compatible servers report
418/// `"length"`, Anthropic `"max_tokens"`, Gemini `"MAX_TOKENS"`. A truncated
419/// response almost always ends mid-string, so the DSL won't parse - and
420/// re-prompting at the same budget just reproduces it.
421fn is_truncated(finish_reason: &str) -> bool {
422    matches!(
423        finish_reason,
424        "length" | "max_tokens" | "MAX_TOKENS" | "model_length"
425    )
426}
427
428/// Clear, actionable error for a generation cut off at the token limit, in
429/// place of the misleading `unterminated string` the truncation produces.
430fn truncation_error(
431    provider: &str,
432    model: &str,
433    max_tokens: Option<u32>,
434    last_reason: &str,
435) -> AppError {
436    let budget = max_tokens
437        .map(|t| format!(" ({t}-token budget)"))
438        .unwrap_or_default();
439    tracing::warn!(
440        provider,
441        model,
442        %last_reason,
443        "Flow DSL generation hit the token limit (response truncated)"
444    );
445    AppError::AiInvocation(format!(
446        "{provider} model's response was cut off at the token limit{budget} before the \
447         flow was complete - the generated flow is too large (a file `content` value was \
448         truncated mid-string). Try a more focused request, fewer or smaller inline files, \
449         or a higher generation token budget."
450    ))
451}
452
453/// Cap a (possibly large) generated DSL blob before it goes into a log line,
454/// so a failed generation doesn't dump kilobytes into the server terminal.
455/// Truncates on a char boundary and notes how many bytes were dropped.
456fn truncate_for_log(s: &str) -> String {
457    const MAX: usize = 1200;
458    if s.len() <= MAX {
459        return s.to_string();
460    }
461    let mut end = MAX;
462    while !s.is_char_boundary(end) {
463        end -= 1;
464    }
465    format!("{}... (+{} more bytes)", &s[..end], s.len() - end)
466}
467
468/// Render the offending source line plus a caret under `col` (both 1-based),
469/// so the corrective-retry prompt shows the model exactly where parsing broke
470/// instead of an abstract `line:col`. Empty string if the line is out of range.
471fn offending_line_caret(src: &str, line: u32, col: u32) -> String {
472    let Some(text) = src.lines().nth(line.saturating_sub(1) as usize) else {
473        return String::new();
474    };
475    let nchars = text.chars().count();
476    let caret_at = (col.saturating_sub(1) as usize).min(nchars);
477    let pad = " ".repeat(caret_at);
478    format!("{text}\n{pad}^")
479}
480
481/// Build the corrective user prompt sent on the retry leg. Quotes the
482/// previous (broken) DSL between sentinel markers, reports the parser's
483/// `line:col` + message, then restates the original request and asks
484/// for corrected DSL only - same "no commentary" instruction the
485/// system prompt already gives, repeated here so the model can't
486/// fall back to apologetic chatter.
487///
488/// Sentinels (rather than a ```` ```flow ```` fence) avoid an ambiguous
489/// prompt when the previous attempt itself contained a triple-backtick
490/// run: the closing fence would land in the wrong place and confuse
491/// the model about where its prior output ended.
492fn build_retry_prompt(
493    original: &str,
494    previous_dsl: &str,
495    parse_err: &flow_dsl::DslError,
496) -> String {
497    let hint = parse_error_hint(&parse_err.message)
498        .map(|h| format!("\n{h}\n"))
499        .unwrap_or_default();
500    let pointer = offending_line_caret(previous_dsl, parse_err.line, parse_err.col);
501    let pointer_block = if pointer.is_empty() {
502        String::new()
503    } else {
504        format!("\nThe error is here:\n{pointer}\n")
505    };
506    format!(
507        "Your previous attempt failed to parse:\n\
508         \n\
509         <<<BEGIN_PREVIOUS_DSL>>>\n\
510         {previous_dsl}\n\
511         <<<END_PREVIOUS_DSL>>>\n\
512         \n\
513         Parser error: line {line}:{col} - {message}\n\
514         {pointer_block}\
515         {hint}\
516         \n\
517         Original request:\n\
518         {original}\n\
519         \n\
520         Return ONLY corrected Flow DSL. No commentary, no fences.",
521        line = parse_err.line,
522        col = parse_err.col,
523        message = parse_err.message,
524    )
525}
526
527fn build_semantic_retry_prompt(original: &str, previous_dsl: &str, issues: &[String]) -> String {
528    let bullets = issues
529        .iter()
530        .map(|i| format!("- {i}"))
531        .collect::<Vec<_>>()
532        .join("\n");
533    format!(
534        "Your previous DSL parsed but violates Flow semantics:\n\
535         \n\
536         {bullets}\n\
537         \n\
538         <<<BEGIN_PREVIOUS_DSL>>>\n\
539         {previous_dsl}\n\
540         <<<END_PREVIOUS_DSL>>>\n\
541         \n\
542         Original request:\n\
543         {original}\n\
544         \n\
545         Return ONLY corrected Flow DSL. No commentary, no fences. \
546         Use `flow \"<name>\" v1` as the header (not v1.0.0). \
547         Do not invent git commits - read the real history from the repo with a shell action.",
548    )
549}
550
551/// Parse, repair common model mistakes, validate semantics, return canonical DSL.
552/// Turn a finished run's captured events + summary into a compact,
553/// model-facing feedback string for the next re-plan iteration. Lists the
554/// failed and skipped nodes with their reasons (deduped to the latest state
555/// per node, since a bounded loop can touch a node more than once).
556/// Sum the token usage of every AI node that succeeded in a run. The executor's
557/// `ai_generate_envelope` stamps `input_tokens`/`output_tokens` onto each AI
558/// node's output, so we read them back from the captured `NodeSucceeded` events.
559/// Used by `agentic_run_loop`'s token budget; non-AI nodes carry no token fields
560/// and contribute zero.
561fn sum_ai_tokens(events: &[ExecutionEvent]) -> u64 {
562    let mut total = 0u64;
563    for ev in events {
564        if let ExecutionEvent::NodeSucceeded { output, .. } = ev {
565            let i = output
566                .get("input_tokens")
567                .and_then(|v| v.as_u64())
568                .unwrap_or(0);
569            let o = output
570                .get("output_tokens")
571                .and_then(|v| v.as_u64())
572                .unwrap_or(0);
573            total = total.saturating_add(i).saturating_add(o);
574        }
575    }
576    total
577}
578
579fn build_loop_observations(events: &[ExecutionEvent], summary: &ExecutionSummary) -> String {
580    use std::collections::BTreeMap;
581    let mut lines = vec![format!(
582        "The workflow ran: {} succeeded, {} failed, {} skipped (status: {}).",
583        summary.succeeded, summary.failed, summary.skipped, summary.status
584    )];
585    let mut failures: BTreeMap<&str, &str> = BTreeMap::new();
586    let mut skips: BTreeMap<&str, &str> = BTreeMap::new();
587    for e in events {
588        match e {
589            ExecutionEvent::NodeFailed { node_id, error, .. } => {
590                failures.insert(node_id, error);
591            }
592            ExecutionEvent::NodeSkipped { node_id, reason, .. } => {
593                skips.insert(node_id, reason);
594            }
595            _ => {}
596        }
597    }
598    if !failures.is_empty() {
599        lines.push("Failures:".into());
600        lines.extend(failures.iter().map(|(n, e)| format!("- node `{n}` failed: {e}")));
601    }
602    if !skips.is_empty() {
603        lines.push("Skipped (never ran):".into());
604        lines.extend(skips.iter().map(|(n, r)| format!("- node `{n}` skipped: {r}")));
605    }
606    lines.join("\n")
607}
608
609fn validate_generated_dsl(dsl: &str) -> Result<String, String> {
610    let mut graph = flow_dsl::parse(dsl).map_err(|e| {
611        format!(
612            "DSL parse error at line {}:{} - {}",
613            e.line, e.col, e.message
614        )
615    })?;
616    dsl_semantics::repair_generated_graph(&mut graph);
617    let issues = dsl_semantics::validate_generated_graph(&graph);
618    if !issues.is_empty() {
619        return Err(format!(
620            "DSL semantic validation failed:\n{}",
621            issues
622                .iter()
623                .map(|i| format!("- {i}"))
624                .collect::<Vec<_>>()
625                .join("\n")
626        ));
627    }
628    Ok(flow_dsl::serialize(&graph))
629}
630
631/// Produce a targeted correction for parse errors the cloud model commonly
632/// makes, injected into the corrective-retry prompt so the next attempt gets
633/// concrete guidance rather than just the raw `line:col`. Returns `None` for
634/// parse errors we have no specific remediation for.
635fn parse_error_hint(message: &str) -> Option<String> {
636    // Malformed string literal - almost always a multi-line value (e.g. file
637    // `content`) or an unescaped inner double quote. Restate the string rules
638    // concretely; the bare `unterminated string` line:col doesn't tell the
639    // model *how* to fix it.
640    if message.contains("unterminated string") {
641        return Some(
642            "IMPORTANT: a string literal is malformed. Flow DSL strings must be \
643             single-line and double-quoted. Escape every inner double quote as \\\" \
644             and write newlines as \\n - never put a real line break inside quotes. \
645             For multi-line file content, keep it on one physical line using \\n, \
646             e.g. content: \"line one\\nline two\\n\"."
647                .to_string(),
648        );
649    }
650
651    // Malformed edge: `a --> b` wants a node id immediately after `-->`, on one
652    // line. Models trip this by breaking the edge across lines, quoting the
653    // target, or leaving it empty.
654    if message.contains("expected target identifier") || message.contains("after `-->`") {
655        return Some(
656            "IMPORTANT: an edge is malformed. Connect nodes by id on a single line: \
657             `sourceId --> targetId` (chain as `a --> b --> c`). Put the target node id \
658             immediately after `-->`; do not quote it, leave it empty, or split the edge \
659             across lines."
660                .to_string(),
661        );
662    }
663
664    // ``unknown node type `fs` - supported: …``: an adapter name used as a node
665    // type (a common mistake: emitting `shell[...]` / `fs[...]` as headers).
666    // Produce a targeted correction showing the `action` + `adapter` form.
667    let kind = message
668        .split_once("unknown node type `")
669        .and_then(|(_, rest)| rest.split('`').next())
670        .filter(|k| !k.is_empty())?;
671    let example = if matches!(kind, "cli" | "github" | "zowe") {
672        "list[action: \"Run tool\"] {\n  adapter: \"cli\"\n  actionId: \"cli-tool\"\n  bin: \"<tool>\"\n  command: \"<subcommand>\"\n}"
673    } else {
674        "step[action: \"Run\"] {\n  adapter: \"ADAPTER\"\n  actionId: \"<see spec>\"\n}"
675    }
676    .replace("ADAPTER", kind);
677    Some(format!(
678        "IMPORTANT: `{kind}` is not a node type. The only node types are \
679         `action`, `ai`, `agentic`, `utility`, `service`. `{kind}` is an adapter - \
680         use an `action` node with `adapter: \"{kind}\"`, e.g.:\n\
681         {example}\n\
682         Do NOT write `{kind}[...]` as the header type."
683    ))
684}
685
686#[cfg(test)]
687mod parse_error_hint_tests {
688    use super::parse_error_hint;
689
690    #[test]
691    fn hint_fires_for_unknown_node_type() {
692        let msg = "unknown node type `fs` - supported: action, ai, agentic, utility, service";
693        let hint = parse_error_hint(msg).expect("should produce a hint");
694        assert!(hint.contains("`fs` is not a node type"));
695        assert!(hint.contains("adapter: \"fs\""));
696    }
697
698    #[test]
699    fn hint_fires_for_unterminated_string() {
700        let hint = parse_error_hint("unterminated string").expect("should produce a hint");
701        assert!(hint.contains("single-line"), "hint: {hint}");
702        assert!(hint.contains("\\n"), "hint: {hint}");
703        // The newline-before-quote variant carries the same substring.
704        assert!(
705            parse_error_hint("unterminated string (newline before closing quote)").is_some()
706        );
707    }
708
709    #[test]
710    fn hint_fires_for_malformed_edge() {
711        let hint = parse_error_hint("expected target identifier after `-->`")
712            .expect("should produce a hint");
713        assert!(hint.contains("edge is malformed"), "hint: {hint}");
714        assert!(hint.contains("-->"), "hint: {hint}");
715    }
716
717    #[test]
718    fn no_hint_for_unrelated_parse_errors() {
719        assert!(parse_error_hint("expected `}` but found end of input").is_none());
720        assert!(parse_error_hint("unknown node type ``").is_none());
721    }
722}
723
724/// End-to-end coverage of the cloud generation pipeline (parse → corrective
725/// retry → recover) driven by a scripted stub provider, so no network or API
726/// key is involved. Exercises the two robustness fixes: chained-arrow edges
727/// must flow through generation without a retry, and an `unterminated string`
728/// must trigger a corrective retry that carries the targeted string hint.
729#[cfg(test)]
730mod generation_e2e_tests {
731    use super::*;
732    use flow_adapter_ai::{CloudAiError, CloudAiProvider, CloudAiRequest, CloudAiResponse};
733    use std::collections::VecDeque;
734    use std::sync::Mutex;
735
736    /// A `CloudAiProvider` that replays a fixed script of responses and records
737    /// every user prompt it receives.
738    struct ScriptedProvider {
739        responses: Mutex<VecDeque<String>>,
740        prompts: Mutex<Vec<String>>,
741        finish_reason: String,
742    }
743
744    impl ScriptedProvider {
745        fn new(responses: Vec<&str>) -> Arc<Self> {
746            Self::new_with_finish(responses, "stop")
747        }
748        /// Replays the same `finish_reason` on every response - lets a test
749        /// simulate a token-limit cut-off (`"length"` / `"max_tokens"`).
750        fn new_with_finish(responses: Vec<&str>, finish_reason: &str) -> Arc<Self> {
751            Arc::new(Self {
752                responses: Mutex::new(responses.into_iter().map(String::from).collect()),
753                prompts: Mutex::new(Vec::new()),
754                finish_reason: finish_reason.to_string(),
755            })
756        }
757    }
758
759    #[async_trait::async_trait]
760    impl CloudAiProvider for ScriptedProvider {
761        fn name(&self) -> &str {
762            "stub"
763        }
764        fn env_var(&self) -> &str {
765            "STUB_KEY"
766        }
767        fn default_models(&self) -> &[&str] {
768            &["stub-model"]
769        }
770        async fn invoke(&self, req: &CloudAiRequest) -> Result<CloudAiResponse, CloudAiError> {
771            self.prompts.lock().unwrap().push(req.prompt.clone());
772            let text = self
773                .responses
774                .lock()
775                .unwrap()
776                .pop_front()
777                .expect("scripted provider ran out of responses");
778            Ok(CloudAiResponse {
779                provider: "stub".into(),
780                model: req.model.clone(),
781                text,
782                finish_reason: self.finish_reason.clone(),
783                input_tokens: 0,
784                output_tokens: 0,
785                latency_ms: 0,
786            })
787        }
788    }
789
790    fn app_with(stub: Arc<ScriptedProvider>) -> FlowApp {
791        let app = FlowApp::with_store_and_settings(
792            Store::open_in_memory().expect("in-memory store"),
793            SettingsStore::in_memory(),
794        )
795        .with_replaced_cloud_providers(vec![stub])
796        .with_in_memory_credentials();
797        app.settings
798            .update(SettingsPatch {
799                allow_cloud_ai: Some(true),
800                ..Default::default()
801            })
802            .expect("enable cloud ai");
803        app.set_cloud_ai_key("stub", "k").expect("seed key");
804        app
805    }
806
807    const CHAIN_DSL: &str = r#"flow "chain" v1
808
809a[utility: "A"] {
810  adapter: "utility"
811  actionId: "log"
812  message: "a"
813  level: "info"
814}
815
816b[utility: "B"] {
817  adapter: "utility"
818  actionId: "log"
819  message: "b"
820  level: "info"
821}
822
823c[utility: "C"] {
824  adapter: "utility"
825  actionId: "log"
826  message: "c"
827  level: "info"
828}
829
830a --> b --> c
831"#;
832
833    #[tokio::test]
834    async fn chained_arrows_from_model_parse_without_a_retry() {
835        let stub = ScriptedProvider::new(vec![CHAIN_DSL]);
836        let app = app_with(stub.clone());
837        let dsl = app
838            .generate_dsl_via_cloud("stub", "stub-model", "make a 3-step log chain", Some(512), None)
839            .await
840            .expect("chained-arrow DSL should generate without error");
841        let graph = flow_dsl::parse(&dsl).expect("returned DSL parses");
842        assert_eq!(graph.edges.len(), 2, "a --> b --> c desugars to two edges");
843        assert_eq!(
844            stub.prompts.lock().unwrap().len(),
845            1,
846            "valid-on-first-try DSL needs no corrective retry"
847        );
848    }
849
850    #[tokio::test]
851    async fn multiline_string_from_model_is_auto_sanitized_without_a_retry() {
852        // The model emits a real line break inside a string (a multi-line file
853        // `content` value) - a hard `unterminated string` parse error without
854        // sanitizing. The pre-parse sanitizer escapes the break to `\n`, so
855        // generation succeeds on the first try with no corrective retry.
856        let multiline = "flow \"x\" v1\n\nwrite[action: \"Write\"] {\n  adapter: \"fs\"\n  actionId: \"write-file\"\n  workspaceRoot: \"/tmp/x\"\n  path: \"notes.txt\"\n  content: \"line one\nline two\n\"\n}\n";
857        let stub = ScriptedProvider::new(vec![multiline]);
858        let app = app_with(stub.clone());
859        let dsl = app
860            .generate_dsl_via_cloud("stub", "stub-model", "write a two-line file", Some(512), None)
861            .await
862            .expect("multi-line string should be sanitized and generate cleanly");
863        let graph = flow_dsl::parse(&dsl).expect("sanitized DSL parses");
864        let content = graph.nodes[0].data["content"].as_str().unwrap();
865        assert!(
866            content.contains('\n'),
867            "the escaped \\n is unescaped back to a real newline in the value"
868        );
869        assert_eq!(
870            stub.prompts.lock().unwrap().len(),
871            1,
872            "auto-sanitized DSL needs no corrective retry"
873        );
874    }
875
876    // A node whose last field is missing its closing quote: the pre-parse
877    // sanitizer can't rescue a genuinely unterminated string, so the lexer
878    // reports `unterminated string` and a corrective retry must fire.
879    const BAD_UNTERMINATED: &str = r#"flow "x" v1
880
881n[utility: "N"] {
882  adapter: "utility"
883  actionId: "log"
884  message: "hello"
885  level: "info
886}
887"#;
888
889    #[tokio::test]
890    async fn unterminated_string_triggers_caret_retry_and_recovers() {
891        let stub = ScriptedProvider::new(vec![BAD_UNTERMINATED, CHAIN_DSL]);
892        let app = app_with(stub.clone());
893        let dsl = app
894            .generate_dsl_via_cloud("stub", "stub-model", "make a chain", Some(512), None)
895            .await
896            .expect("recovers on the corrective retry");
897        flow_dsl::parse(&dsl).expect("recovered DSL parses");
898        let prompts = stub.prompts.lock().unwrap();
899        assert_eq!(prompts.len(), 2, "exactly one corrective retry");
900        let retry = &prompts[1];
901        assert!(
902            retry.contains("The error is here:"),
903            "retry shows the offending source line: {retry}"
904        );
905        assert!(retry.contains('^'), "retry carries a caret pointer: {retry}");
906        assert!(
907            retry.contains("single-line"),
908            "retry restates the string rule: {retry}"
909        );
910    }
911
912    #[tokio::test]
913    async fn truncated_response_reports_token_limit_without_burning_retries() {
914        // finish_reason "length" = cut off at max_tokens mid-`content`. Retrying
915        // at the same budget only reproduces it, so generation must short-circuit
916        // with a clear token-limit error after the first call.
917        let stub = ScriptedProvider::new_with_finish(vec![BAD_UNTERMINATED], "length");
918        let app = app_with(stub.clone());
919        let err = app
920            .generate_dsl_via_cloud("stub", "stub-model", "scaffold an app", Some(2048), None)
921            .await
922            .expect_err("a truncated response is an error");
923        let msg = err.to_string();
924        assert!(
925            msg.contains("cut off at the token limit"),
926            "clear truncation error, not 'unterminated string': {msg}"
927        );
928        assert_eq!(
929            stub.prompts.lock().unwrap().len(),
930            1,
931            "truncation short-circuits - no wasted corrective retries"
932        );
933    }
934
935    #[tokio::test]
936    async fn exhausted_retries_name_the_provider_in_the_error() {
937        // initial attempt + MAX_CORRECTIVE_RETRIES, all unterminated.
938        let stub = ScriptedProvider::new(vec![
939            BAD_UNTERMINATED,
940            BAD_UNTERMINATED,
941            BAD_UNTERMINATED,
942            BAD_UNTERMINATED,
943        ]);
944        let app = app_with(stub.clone());
945        let err = app
946            .generate_dsl_via_cloud("stub", "stub-model", "make a chain", Some(512), None)
947            .await
948            .expect_err("every attempt fails to parse");
949        let msg = err.to_string();
950        assert!(
951            msg.contains("stub model returned invalid DSL"),
952            "error names the provider, not a hardcoded 'cloud': {msg}"
953        );
954        assert!(
955            msg.contains("3 corrective retries"),
956            "error names the retry budget: {msg}"
957        );
958    }
959}
960
961#[cfg(test)]
962mod escape_newlines_tests {
963    use super::escape_newlines_in_strings;
964
965    #[test]
966    fn escapes_newline_inside_a_string() {
967        assert_eq!(
968            escape_newlines_in_strings("content: \"a\nb\""),
969            "content: \"a\\nb\""
970        );
971    }
972
973    #[test]
974    fn leaves_structural_newlines_outside_strings() {
975        let src = "a: \"x\"\nb: \"y\"\n";
976        assert_eq!(escape_newlines_in_strings(src), src);
977    }
978
979    #[test]
980    fn honors_escaped_quote_so_string_stays_open() {
981        // The `\"` does not close the string, so the following newline is still
982        // inside it and gets escaped.
983        assert_eq!(
984            escape_newlines_in_strings("\"a\\\"b\nc\""),
985            "\"a\\\"b\\nc\""
986        );
987    }
988
989    #[test]
990    fn quotes_inside_comments_are_literal() {
991        // The `"` in the comment must not open a string, so the newline ending
992        // the comment stays a real newline.
993        let src = "# a \"quote\n\"real\nx\"";
994        assert_eq!(escape_newlines_in_strings(src), "# a \"quote\n\"real\\nx\"");
995    }
996}
997
998#[cfg(test)]
999mod existing_binary_tests {
1000    use super::FlowApp;
1001
1002    #[test]
1003    fn drops_missing_and_empty_paths() {
1004        assert_eq!(FlowApp::existing_binary(None), None);
1005        assert_eq!(FlowApp::existing_binary(Some("   ".into())), None);
1006        // A stale build path that no longer exists must not be returned, so
1007        // resolution can fall through to a managed engine / $PATH.
1008        assert_eq!(
1009            FlowApp::existing_binary(Some(
1010                "/nonexistent/target/debug/resources/llama/llama-server".into()
1011            )),
1012            None
1013        );
1014    }
1015
1016    #[test]
1017    fn keeps_an_existing_file() {
1018        // The test binary itself is a real, executable file on disk.
1019        let me = std::env::current_exe().unwrap();
1020        let path = me.to_string_lossy().to_string();
1021        assert_eq!(FlowApp::existing_binary(Some(path.clone())), Some(path));
1022    }
1023}
1024
1025impl FlowApp {
1026    pub fn new() -> Self {
1027        let store = Self::open_default_store().unwrap_or_else(|e| {
1028            tracing::warn!(error = %e, "failed to open persistent store; falling back to in-memory");
1029            Store::open_in_memory().expect("in-memory store init")
1030        });
1031        let settings = SettingsStore::open(Self::settings_path());
1032        Self::with_store_and_settings(store, settings)
1033    }
1034
1035    /// Test-only: replace the cloud-provider list. Lets unit tests inject
1036    /// a stub `CloudAiProvider` so `generate_dsl_via_cloud` and
1037    /// `generate_dsl_auto` can be exercised without a real network call.
1038    /// Out of the public surface - gated to `#[cfg(test)]` so a future
1039    /// caller doesn't reach in and quietly blank out the live providers.
1040    #[cfg(test)]
1041    pub(crate) fn with_replaced_cloud_providers(
1042        mut self,
1043        providers: Vec<Arc<dyn CloudAiProvider>>,
1044    ) -> Self {
1045        self.cloud_providers = providers;
1046        self
1047    }
1048
1049    /// Test-only: swap the OS-keyring-backed credential store for an
1050    /// in-memory one. The default `KeyringCredentialStore` prompts the
1051    /// user via macOS Keychain Access on the **first** read of a
1052    /// previously-unseen service namespace - and a cancelled prompt
1053    /// surfaces as a `Keyring(...)` error, *not* `NotFound`, so the
1054    /// env-var fallback in `EnvFallbackResolver` never fires. Tests
1055    /// that need to exercise the cloud-AI path must therefore avoid
1056    /// the keyring entirely.
1057    #[cfg(test)]
1058    pub(crate) fn with_in_memory_credentials(mut self) -> Self {
1059        use flow_security::InMemoryCredentialStore;
1060
1061        let store: Arc<dyn CredentialStore> = Arc::new(InMemoryCredentialStore::new());
1062        self.resolver = Arc::new(EnvFallbackResolver::new(store.clone()));
1063        self.credentials = store;
1064        self
1065    }
1066
1067    pub fn with_store_and_settings(store: Store, settings: SettingsStore) -> Self {
1068        // Persist the sidecar credential index alongside the app's other
1069        // on-disk state. The index records *which* accounts have keychain
1070        // entries (names only, no secrets) so `exists()` answers without
1071        // a keychain prompt - see KeyringCredentialStore for the rationale.
1072        let index_path = Self::flow_dir().join("credential_index.json");
1073        let credentials: Arc<dyn CredentialStore> =
1074            Arc::new(KeyringCredentialStore::with_index(index_path));
1075        let resolver: Arc<dyn CredentialResolver> =
1076            Arc::new(EnvFallbackResolver::new(credentials.clone()));
1077
1078        let connections = ConnectionStore::open(&Self::flow_dir(), credentials.clone())
1079            .unwrap_or_else(|e| {
1080                tracing::warn!(error = %e, "failed to open connections store; using in-memory");
1081                ConnectionStore::open(&std::env::temp_dir(), credentials.clone())
1082                    .expect("in-memory connections init")
1083            });
1084
1085        let sanitizer = Arc::new(PiiSanitizer::new());
1086        let mut adapters = AdapterRegistry::new();
1087        adapters.register(Arc::new(MockAdapter::new("mock")));
1088        // One vendor-neutral CLI adapter drives every cli-tool node; the binary
1089        // to run comes from each node's `bin` field, not a per-vendor adapter.
1090        adapters.register(Arc::new(CliAdapter::new()));
1091        adapters.register(Arc::new(MockAdapter::new("zosmf")));
1092        adapters.register(Arc::new(MockAdapter::new("ssh")));
1093        adapters.register(Arc::new(ShellAdapter::new()));
1094        adapters.register(Arc::new(FsAdapter::new()));
1095        adapters.register(Arc::new(UtilityAdapter::new(Self::downloads_dir())));
1096        // Generic, data-driven `service` adapter: every external API integration
1097        // is catalog data (each entry's `serviceIntegration`), so this one adapter
1098        // backs all service nodes. Credentials/tokens come from the keyring.
1099        adapters.register(Arc::new(flow_adapter_service::ServiceAdapter::new(
1100            Arc::new(nodes::service_integrations()),
1101            credentials.clone(),
1102        )));
1103
1104        let cloud_providers: Vec<Arc<dyn CloudAiProvider>> = vec![
1105            Arc::new(ClaudeProvider::new()),
1106            Arc::new(OpenAiProvider::new()),
1107            Arc::new(GeminiProvider::new()),
1108            Arc::new(NvidiaProvider::new()),
1109            Arc::new(DeepSeekProvider::new()),
1110            Arc::new(LocalOpenAiProvider::new()),
1111        ];
1112
1113        // Seed the system-managed Default template collection row. Failure is
1114        // non-fatal: with no row, list/save still treats "no membership" as
1115        // Default, and the row will be re-seeded on the next successful boot.
1116        if let Err(e) = templates::ensure_default(&store) {
1117            tracing::warn!(error = ?e, "failed to ensure default template collection");
1118        }
1119
1120        // Restore durable working memory so `{{memory.<key>}}` survives restarts
1121        // (roadmap E1 durable agent memory). Best-effort: a load failure just
1122        // starts empty.
1123        let initial_memory = store.load_memory().unwrap_or_default();
1124
1125        Self {
1126            sanitizer,
1127            adapters: Arc::new(adapters),
1128            cloud_providers,
1129            credentials,
1130            resolver,
1131            connections,
1132            settings,
1133            store,
1134            llm_server: crate::llm_server::LlmServerHandle::new(),
1135            stream_sink: std::sync::RwLock::new(Arc::new(flow_adapter_ai::NullStreamSink)),
1136            working_memory: Arc::new(std::sync::Mutex::new(initial_memory)),
1137            runs: Default::default(),
1138            download_cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1139            confirm_gate_disabled: std::sync::atomic::AtomicBool::new(false),
1140            workspace_base: WorkspaceBase::default(),
1141        }
1142    }
1143
1144    /// Override the edition default workspace. flow-cli calls this with
1145    /// `Fixed(<launch dir or --workspace>)`; desktop/server keep `Scratch`.
1146    pub fn with_workspace_base(mut self, base: WorkspaceBase) -> Self {
1147        self.workspace_base = base;
1148        self
1149    }
1150
1151    /// Resolve the execution workspace for a flow: the stored per-flow override
1152    /// (if any) else this host's edition default. The path is created before a
1153    /// run so fs nodes (which require an existing root) work against it.
1154    fn resolve_workspace(&self, flow_id: &str) -> PathBuf {
1155        self.store
1156            .get_flow_workspace(flow_id)
1157            .ok()
1158            .flatten()
1159            .filter(|s| !s.trim().is_empty())
1160            .map(|s| expand_home(&s))
1161            .unwrap_or_else(|| self.workspace_base.resolve(flow_id))
1162    }
1163
1164    /// The per-flow workspace override, or `None` when the flow uses the default.
1165    pub fn flow_workspace(&self, flow_id: &str) -> Option<String> {
1166        self.store.get_flow_workspace(flow_id).ok().flatten()
1167    }
1168
1169    /// The default workspace this host would use for `flow_id` absent an
1170    /// override - surfaced to the UI so it can show the resolved path.
1171    pub fn default_workspace(&self, flow_id: &str) -> String {
1172        self.workspace_base.resolve(flow_id).to_string_lossy().into_owned()
1173    }
1174
1175    pub fn set_flow_workspace(&self, flow_id: &str, path: &str) -> Result<(), AppError> {
1176        self.store
1177            .set_flow_workspace(flow_id, path)
1178            .map_err(|e| AppError::Storage(e.to_string()))
1179    }
1180
1181    pub fn clear_flow_workspace(&self, flow_id: &str) -> Result<(), AppError> {
1182        self.store
1183            .clear_flow_workspace(flow_id)
1184            .map_err(|e| AppError::Storage(e.to_string()))
1185    }
1186
1187    /// Install the LLM token sink for streaming events. The host (Tauri) wires
1188    /// a sink that re-emits each delta as a `flow:llm_token` Tauri event so
1189    /// the header chip can render the rolling tail; headless callers can keep
1190    /// the default `NullStreamSink`. Safe to call at any time from any thread.
1191    pub fn set_stream_sink(&self, sink: Arc<dyn flow_adapter_ai::LlmStreamSink>) {
1192        *self.stream_sink.write().expect("stream_sink write") = sink;
1193    }
1194
1195    /// Snapshot the current sink so callers can hold a reference for the
1196    /// duration of an LLM call without blocking concurrent sink swaps.
1197    pub fn stream_sink(&self) -> Arc<dyn flow_adapter_ai::LlmStreamSink> {
1198        self.stream_sink.read().expect("stream_sink read").clone()
1199    }
1200
1201    pub fn list_connections(&self) -> Vec<flow_domain::connection::ConnectionProfile> {
1202        self.connections.list()
1203    }
1204
1205    pub fn save_connection(
1206        &self,
1207        profile: flow_domain::connection::ConnectionProfile,
1208        secret: Option<&str>,
1209    ) -> Result<(), AppError> {
1210        self.connections
1211            .upsert(profile, secret)
1212            .map_err(|e| AppError::Credential(e.to_string()))
1213    }
1214
1215    pub fn delete_connection(&self, id: &str) -> Result<(), AppError> {
1216        self.connections
1217            .delete(id)
1218            .map_err(|e| AppError::Credential(e.to_string()))
1219    }
1220
1221    pub fn connection_has_secret(&self, id: &str) -> bool {
1222        self.connections.has_secret(id)
1223    }
1224
1225    pub fn resolve_zowe_connection(&self, id: &str) -> Result<ResolvedZoweConnection, AppError> {
1226        self.connections
1227            .resolve_zowe(id)
1228            .map_err(|e| AppError::Credential(e.to_string()))
1229    }
1230
1231    /// True when `provider` matches a registered cloud-AI provider name.
1232    fn is_known_cloud_provider(&self, provider: &str) -> bool {
1233        self.cloud_providers.iter().any(|p| p.name() == provider)
1234    }
1235
1236    pub fn set_cloud_ai_key(&self, provider: &str, secret: &str) -> Result<(), AppError> {
1237        if !self.is_known_cloud_provider(provider) {
1238            return Err(AppError::Credential(format!(
1239                "unknown cloud provider '{provider}'"
1240            )));
1241        }
1242        let account = CredentialKind::CloudAiKey.account_for(provider);
1243        self.credentials
1244            .set(&account, secret)
1245            .map_err(|e| AppError::Credential(e.to_string()))
1246    }
1247
1248    pub fn delete_cloud_ai_key(&self, provider: &str) -> Result<(), AppError> {
1249        if !self.is_known_cloud_provider(provider) {
1250            return Err(AppError::Credential(format!(
1251                "unknown cloud provider '{provider}'"
1252            )));
1253        }
1254        let account = CredentialKind::CloudAiKey.account_for(provider);
1255        self.credentials
1256            .delete(&account)
1257            .map_err(|e| AppError::Credential(e.to_string()))
1258    }
1259
1260    pub fn cloud_ai_key_exists(&self, provider: &str) -> bool {
1261        if !self.is_known_cloud_provider(provider) {
1262            return false;
1263        }
1264        let account = CredentialKind::CloudAiKey.account_for(provider);
1265        self.credentials.exists(&account)
1266    }
1267
1268    // ---- Service-node connections ----
1269    //
1270    // A `service` node's connection secret lives in the keyring under
1271    // `service:<slug>`: a raw API key / bearer token / `user:pass` for basic
1272    // auth, or (for OAuth2) the JSON token bundle written by `service_oauth_*`.
1273    // The OAuth client id/secret (operator-supplied) live under
1274    // `service:<slug>:client_id` / `:client_secret`.
1275
1276    fn service_kv_account(slug: &str, suffix: &str) -> String {
1277        CredentialKind::Service.account_for(&format!("{slug}:{suffix}"))
1278    }
1279
1280    fn service_integration_for(slug: &str) -> Result<crate::nodes::ServiceIntegration, AppError> {
1281        crate::nodes::entry_by_slug(slug)
1282            .and_then(|e| e.service_integration)
1283            .ok_or_else(|| AppError::Credential(format!("`{slug}` is not a service node")))
1284    }
1285
1286    /// Store a raw connection secret (api-key / bearer token / `user:pass`) for a
1287    /// service node, keyed by its catalog slug.
1288    pub fn set_service_secret(&self, slug: &str, secret: &str) -> Result<(), AppError> {
1289        let account = CredentialKind::Service.account_for(slug);
1290        self.credentials
1291            .set(&account, secret)
1292            .map_err(|e| AppError::Credential(e.to_string()))
1293    }
1294
1295    /// Forget a service connection (token/secret + any OAuth client credentials).
1296    pub fn disconnect_service(&self, slug: &str) -> Result<(), AppError> {
1297        let _ = self.credentials.delete(&CredentialKind::Service.account_for(slug));
1298        let _ = self.credentials.delete(&Self::service_kv_account(slug, "client_id"));
1299        let _ = self.credentials.delete(&Self::service_kv_account(slug, "client_secret"));
1300        Ok(())
1301    }
1302
1303    /// True when a connection secret/token is stored for the service.
1304    pub fn service_connection_exists(&self, slug: &str) -> bool {
1305        self.credentials
1306            .exists(&CredentialKind::Service.account_for(slug))
1307    }
1308
1309    /// Persist the operator-supplied OAuth2 client id + secret for a service.
1310    pub fn set_service_oauth_client(
1311        &self,
1312        slug: &str,
1313        client_id: &str,
1314        client_secret: &str,
1315    ) -> Result<(), AppError> {
1316        self.credentials
1317            .set(&Self::service_kv_account(slug, "client_id"), client_id)
1318            .map_err(|e| AppError::Credential(e.to_string()))?;
1319        self.credentials
1320            .set(&Self::service_kv_account(slug, "client_secret"), client_secret)
1321            .map_err(|e| AppError::Credential(e.to_string()))
1322    }
1323
1324    /// Build the OAuth2 consent URL the user opens to authorize a service.
1325    /// `redirect_uri` is the loopback the desktop app captures the code on.
1326    pub fn service_oauth_authorize_url(
1327        &self,
1328        slug: &str,
1329        redirect_uri: &str,
1330        state: &str,
1331    ) -> Result<String, AppError> {
1332        let integ = Self::service_integration_for(slug)?;
1333        if integ.auth.scheme != "oauth2" {
1334            return Err(AppError::Credential(format!("`{slug}` does not use OAuth2")));
1335        }
1336        let auth_url = integ
1337            .auth
1338            .auth_url
1339            .as_deref()
1340            .ok_or_else(|| AppError::Credential(format!("`{slug}` integration has no authUrl")))?;
1341        let client_id = self
1342            .credentials
1343            .get(&Self::service_kv_account(slug, "client_id"))
1344            .map_err(|_| {
1345                AppError::Credential(format!("set the OAuth client id for `{slug}` first"))
1346            })?;
1347        Ok(flow_security::oauth::build_authorize_url(
1348            auth_url,
1349            &client_id,
1350            redirect_uri,
1351            &integ.auth.scopes,
1352            state,
1353        ))
1354    }
1355
1356    /// Exchange an OAuth2 authorization code and persist the token bundle.
1357    pub async fn service_oauth_complete(
1358        &self,
1359        slug: &str,
1360        code: &str,
1361        redirect_uri: &str,
1362    ) -> Result<(), AppError> {
1363        let integ = Self::service_integration_for(slug)?;
1364        let token_url = integ
1365            .auth
1366            .token_url
1367            .as_deref()
1368            .ok_or_else(|| AppError::Credential(format!("`{slug}` integration has no tokenUrl")))?;
1369        let client_id = self
1370            .credentials
1371            .get(&Self::service_kv_account(slug, "client_id"))
1372            .map_err(|_| AppError::Credential(format!("missing OAuth client id for `{slug}`")))?;
1373        let client_secret = self
1374            .credentials
1375            .get(&Self::service_kv_account(slug, "client_secret"))
1376            .map_err(|_| AppError::Credential(format!("missing OAuth client secret for `{slug}`")))?;
1377        let token = flow_security::oauth::exchange_code(
1378            token_url,
1379            &client_id,
1380            &client_secret,
1381            code,
1382            redirect_uri,
1383        )
1384        .await
1385        .map_err(|e| AppError::Credential(e.to_string()))?;
1386        flow_security::oauth::store_token(self.credentials.as_ref(), slug, &token)
1387            .map_err(|e| AppError::Credential(e.to_string()))
1388    }
1389
1390    fn home_dir() -> PathBuf {
1391        PathBuf::from(
1392            std::env::var("HOME")
1393                .or_else(|_| std::env::var("USERPROFILE"))
1394                .unwrap_or_else(|_| ".".into()),
1395        )
1396    }
1397
1398    /// The app's data directory (db, settings, installed nodes/templates,
1399    /// models, downloads, connections). Honors the `FLOW_STUDIO_DIR` env
1400    /// override - the `desktop:dev:fresh` dev script points it at a throwaway
1401    /// directory so every run starts from a fresh profile. Falls back to
1402    /// `<home>/.flow-studio` when unset.
1403    fn flow_dir() -> PathBuf {
1404        if let Some(dir) = std::env::var_os("FLOW_STUDIO_DIR") {
1405            if !dir.is_empty() {
1406                return PathBuf::from(dir);
1407            }
1408        }
1409        Self::home_dir().join(".flow-studio")
1410    }
1411
1412    fn open_default_store() -> Result<Store, flow_storage::StoreError> {
1413        let mut path = Self::flow_dir();
1414        path.push("db");
1415        let _ = std::fs::create_dir_all(&path);
1416        path.push("flow.sqlite");
1417        Store::open(path)
1418    }
1419
1420    fn templates_dir() -> PathBuf {
1421        let mut path = Self::flow_dir();
1422        path.push("templates");
1423        let _ = std::fs::create_dir_all(&path);
1424        path
1425    }
1426
1427    /// Where downloaded LLMs live: `<flow_dir()>/llms/`. The managed
1428    /// `llama-server` loads models from here.
1429    pub fn llms_dir() -> PathBuf {
1430        let mut path = Self::flow_dir();
1431        path.push("llms");
1432        let _ = std::fs::create_dir_all(&path);
1433        path
1434    }
1435
1436    /// Where the managed model-server engine is fetched on first use:
1437    /// `<flow_dir()>/engines/`. Shared by every edition on this host.
1438    pub fn engines_dir() -> PathBuf {
1439        let mut path = Self::flow_dir();
1440        path.push("engines");
1441        let _ = std::fs::create_dir_all(&path);
1442        path
1443    }
1444
1445    /// Where the `utility:download` action writes artifacts:
1446    /// `<flow_dir()>/downloads/`. Created lazily on first access.
1447    pub fn downloads_dir() -> PathBuf {
1448        let mut path = Self::flow_dir();
1449        path.push("downloads");
1450        let _ = std::fs::create_dir_all(&path);
1451        path
1452    }
1453
1454    /// Where third-party node schemes installed via the Node Hub live:
1455    /// `<flow_dir()>/nodes/<slug>.json`. Created lazily.
1456    pub fn nodes_dir() -> PathBuf {
1457        let mut path = Self::flow_dir();
1458        path.push("nodes");
1459        let _ = std::fs::create_dir_all(&path);
1460        path
1461    }
1462
1463    /// The Model Hub catalog (the static registry loaded from `hub_catalog.json`).
1464    pub fn hub_catalog(&self) -> Vec<crate::hub::HubModel> {
1465        crate::hub::catalog()
1466    }
1467
1468    /// Probe this machine for the device-compatibility check: OS/arch, total +
1469    /// available RAM, and free disk on the volume holding the LLMs directory.
1470    pub fn system_info(&self) -> SystemInfo {
1471        use sysinfo::{Disks, System};
1472        const GIB: f64 = 1024.0 * 1024.0 * 1024.0;
1473        let round1 = |v: f64| (v * 10.0).round() / 10.0;
1474
1475        let mut sys = System::new();
1476        sys.refresh_memory();
1477        let total = sys.total_memory() as f64 / GIB;
1478        let avail = sys.available_memory() as f64 / GIB;
1479
1480        // Free space on the disk whose mount point is the longest prefix of the
1481        // LLMs download dir.
1482        let llms = Self::llms_dir();
1483        let disks = Disks::new_with_refreshed_list();
1484        let mut free_disk: u64 = 0;
1485        let mut best_len = 0usize;
1486        for d in disks.list() {
1487            let mp = d.mount_point();
1488            if llms.starts_with(mp) && mp.as_os_str().len() >= best_len {
1489                best_len = mp.as_os_str().len();
1490                free_disk = d.available_space();
1491            }
1492        }
1493
1494        let cpu_count = std::thread::available_parallelism()
1495            .map(|n| n.get() as u32)
1496            .unwrap_or(0);
1497
1498        SystemInfo {
1499            os: std::env::consts::OS.to_string(),
1500            arch: std::env::consts::ARCH.to_string(),
1501            total_ram_gb: round1(total),
1502            available_ram_gb: round1(avail),
1503            free_disk_gb: round1(free_disk as f64 / GIB),
1504            cpu_count,
1505        }
1506    }
1507
1508    /// LLMs already downloaded into the local LLMs directory.
1509    pub fn list_local_llms(&self) -> Vec<crate::hub::LocalLlm> {
1510        crate::hub::list_local_llms(&Self::llms_dir())
1511    }
1512
1513    /// Installed models: downloaded LLMs matched to a catalog model whose
1514    /// download-option URL produces that file name.
1515    pub async fn hub_installed(&self) -> Vec<crate::hub::InstalledModel> {
1516        use crate::hub::{HubFormat, InstalledModel};
1517        let catalog = crate::hub::catalog();
1518        let mut out: Vec<InstalledModel> = Vec::new();
1519
1520        // LLMs: match a downloaded file name to a catalog model whose option
1521        // URL produces that file name.
1522        for llm in self.list_local_llms() {
1523            if let Some(m) = catalog.iter().find(|m| {
1524                m.download_options
1525                    .iter()
1526                    .any(|o| o.url.rsplit('/').next() == Some(llm.file_name.as_str()))
1527            }) {
1528                if out.iter().any(|i| i.id == m.id) {
1529                    continue;
1530                }
1531                out.push(InstalledModel {
1532                    id: m.id.clone(),
1533                    version: m.latest_version.clone(),
1534                    format: HubFormat::Gguf,
1535                    update_available: false,
1536                });
1537            }
1538        }
1539        out
1540    }
1541
1542    /// Download a catalog model's variant into the local LLMs directory,
1543    /// reporting progress through `cb`. Returns the on-disk path.
1544    pub async fn download_hub_model(
1545        &self,
1546        id: &str,
1547        variant: Option<&str>,
1548        cb: Option<crate::install::ProgressCallback>,
1549    ) -> Result<PathBuf, AppError> {
1550        let model = crate::hub::find(id)
1551            .ok_or_else(|| AppError::AiInvocation(format!("unknown hub model '{id}'")))?;
1552        // Fresh download: clear any leftover cancel request from a prior one.
1553        self.download_cancel
1554            .store(false, std::sync::atomic::Ordering::Relaxed);
1555        crate::hub::download(
1556            &model,
1557            variant,
1558            &Self::llms_dir(),
1559            cb,
1560            Some(self.download_cancel.clone()),
1561        )
1562        .await
1563        .map_err(AppError::AiInvocation)
1564    }
1565
1566    /// Request cancellation of the in-flight Hub download (if any). The
1567    /// streaming loop in `hub::download` notices on its next chunk, removes the
1568    /// partial file, and returns a "download cancelled" error.
1569    pub fn cancel_download(&self) {
1570        self.download_cancel
1571            .store(true, std::sync::atomic::Ordering::Relaxed);
1572    }
1573
1574    /// Delete a downloaded LLM from the local LLMs directory (confined to it).
1575    pub fn delete_local_llm(&self, file_name: &str) -> Result<(), AppError> {
1576        crate::hub::delete_local_llm(&Self::llms_dir(), file_name)
1577            .map_err(AppError::AiInvocation)
1578    }
1579
1580    pub fn list_templates(&self) -> Result<Vec<TemplateRecord>, AppError> {
1581        templates::list(&Self::templates_dir(), &self.store)
1582            .map_err(|e| AppError::Template(e.to_string()))
1583    }
1584
1585    pub fn list_templates_in(
1586        &self,
1587        collection_slug: &str,
1588    ) -> Result<Vec<TemplateRecord>, AppError> {
1589        templates::list_in(&Self::templates_dir(), &self.store, collection_slug)
1590            .map_err(|e| AppError::Template(e.to_string()))
1591    }
1592
1593    pub fn save_template(&self, name: &str, graph: FlowGraph) -> Result<TemplateRecord, AppError> {
1594        templates::save(
1595            &Self::templates_dir(),
1596            &self.store,
1597            name,
1598            &graph,
1599            templates::DEFAULT_COLLECTION_SLUG,
1600            None,
1601        )
1602        .map_err(|e| AppError::Template(e.to_string()))
1603    }
1604
1605    pub fn save_template_in(
1606        &self,
1607        name: &str,
1608        graph: FlowGraph,
1609        collection_slug: &str,
1610        source: Option<templates::TemplateSource>,
1611    ) -> Result<TemplateRecord, AppError> {
1612        templates::save(
1613            &Self::templates_dir(),
1614            &self.store,
1615            name,
1616            &graph,
1617            collection_slug,
1618            source,
1619        )
1620        .map_err(|e| AppError::Template(e.to_string()))
1621    }
1622
1623    pub fn load_template(&self, slug: &str) -> Result<FlowGraph, AppError> {
1624        templates::load(&Self::templates_dir(), slug).map_err(|e| AppError::Template(e.to_string()))
1625    }
1626
1627    pub fn delete_template(&self, slug: &str) -> Result<(), AppError> {
1628        templates::delete(&Self::templates_dir(), &self.store, slug)
1629            .map_err(|e| AppError::Template(e.to_string()))?;
1630        // Drop any recurring schedule so the scheduler doesn't keep trying to
1631        // load a template that no longer exists (E11).
1632        let _ = self.store.delete_schedule(slug);
1633        Ok(())
1634    }
1635
1636    /// Every persisted flow schedule (roadmap E11), newest-updated first.
1637    pub fn list_schedules(&self) -> Result<Vec<ScheduleRecord>, AppError> {
1638        self.store
1639            .list_schedules()
1640            .map_err(|e| AppError::Storage(e.to_string()))
1641    }
1642
1643    /// The schedule for a saved flow, if one exists.
1644    pub fn get_schedule(&self, template_slug: &str) -> Result<Option<ScheduleRecord>, AppError> {
1645        self.store
1646            .get_schedule(template_slug)
1647            .map_err(|e| AppError::Storage(e.to_string()))
1648    }
1649
1650    /// Create or update a flow's schedule. Validates the frequency, computes the
1651    /// next fire time from the anchor (when enabled), and preserves `created_at`
1652    /// / `last_run_at` across updates.
1653    pub fn upsert_schedule(
1654        &self,
1655        patch: scheduler::SchedulePatch,
1656    ) -> Result<ScheduleRecord, AppError> {
1657        let freq = scheduler::ScheduleFrequency::parse(&patch.frequency).ok_or_else(|| {
1658            AppError::Validation(format!("unknown schedule frequency `{}`", patch.frequency))
1659        })?;
1660        let catchup = patch
1661            .catchup
1662            .as_deref()
1663            .and_then(scheduler::CatchUpPolicy::parse)
1664            .unwrap_or_default();
1665        let now = Utc::now();
1666        let existing = self
1667            .store
1668            .get_schedule(&patch.template_slug)
1669            .map_err(|e| AppError::Storage(e.to_string()))?;
1670        if patch.enabled {
1671            let graph = self.load_template(&patch.template_slug)?;
1672            if graph.nodes.is_empty() {
1673                return Err(AppError::Validation(
1674                    "cannot schedule an empty flow".into(),
1675                ));
1676            }
1677        }
1678        let timezone = patch
1679            .timezone
1680            .clone()
1681            .or_else(|| existing.as_ref().map(|e| e.timezone.clone()))
1682            .unwrap_or_else(|| "UTC".into());
1683        let options = scheduler::ScheduleOptions {
1684            timezone: Some(timezone.clone()),
1685            cron: patch.cron.clone(),
1686            every_minutes: patch.every_minutes,
1687            until: patch.until,
1688            max_runs: patch.max_runs,
1689            run_count: existing.as_ref().map(|e| e.run_count).unwrap_or(0),
1690        };
1691        let next_run_at = if patch.enabled {
1692            scheduler::next_run_after(freq, patch.anchor_at, now, &options)
1693                .map_err(AppError::Validation)?
1694        } else {
1695            None
1696        };
1697        let rec = ScheduleRecord {
1698            template_slug: patch.template_slug,
1699            collection_slug: patch.collection_slug,
1700            flow_name: patch.flow_name,
1701            enabled: patch.enabled,
1702            frequency: freq.as_str().to_string(),
1703            anchor_at: patch.anchor_at,
1704            timezone,
1705            cron: patch.cron,
1706            every_minutes: patch.every_minutes,
1707            until: patch.until,
1708            max_runs: patch.max_runs,
1709            catchup: catchup.as_str().to_string(),
1710            next_run_at,
1711            last_run_at: existing.as_ref().and_then(|e| e.last_run_at),
1712            last_status: existing.as_ref().and_then(|e| e.last_status.clone()),
1713            run_count: existing.as_ref().map(|e| e.run_count).unwrap_or(0),
1714            created_at: existing.as_ref().map(|e| e.created_at).unwrap_or(now),
1715            updated_at: now,
1716        };
1717        self.store
1718            .upsert_schedule(&rec)
1719            .map_err(|e| AppError::Storage(e.to_string()))?;
1720        Ok(rec)
1721    }
1722
1723    pub fn preview_schedule(
1724        &self,
1725        req: scheduler::SchedulePreviewRequest,
1726    ) -> Result<Vec<DateTime<Utc>>, AppError> {
1727        scheduler::preview(req).map_err(AppError::Validation)
1728    }
1729
1730    pub fn delete_schedule(&self, template_slug: &str) -> Result<(), AppError> {
1731        self.store
1732            .delete_schedule(template_slug)
1733            .map_err(|e| AppError::Storage(e.to_string()))
1734    }
1735
1736    pub fn set_schedule_enabled(
1737        &self,
1738        template_slug: &str,
1739        enabled: bool,
1740    ) -> Result<Option<ScheduleRecord>, AppError> {
1741        let Some(schedule) = self.get_schedule(template_slug)? else {
1742            return Ok(None);
1743        };
1744        let next_run_at = if enabled {
1745            self.compute_next_schedule_run(&schedule, Utc::now(), schedule.run_count)?
1746        } else {
1747            None
1748        };
1749        self.store
1750            .set_schedule_enabled(template_slug, enabled, next_run_at)
1751            .map_err(|e| AppError::Storage(e.to_string()))?;
1752        self.get_schedule(template_slug)
1753    }
1754
1755    pub fn migrate_schedule(
1756        &self,
1757        old_slug: &str,
1758        new_slug: &str,
1759        collection_slug: &str,
1760        flow_name: &str,
1761    ) -> Result<(), AppError> {
1762        if old_slug == new_slug {
1763            return Ok(());
1764        }
1765        self.store
1766            .migrate_schedule(old_slug, new_slug, collection_slug, flow_name)
1767            .map_err(|e| AppError::Storage(e.to_string()))
1768    }
1769
1770    /// Enabled schedules due to fire now - the background scheduler's per-tick
1771    /// work list.
1772    pub fn due_schedules(&self) -> Result<Vec<ScheduleRecord>, AppError> {
1773        if !self.settings.scheduler_enabled() {
1774            return Ok(Vec::new());
1775        }
1776        self.store
1777            .list_due_schedules(Utc::now())
1778            .map_err(|e| AppError::Storage(e.to_string()))
1779    }
1780
1781    /// Fire a scheduled flow: advance its timer first (so an overlapping tick
1782    /// won't double-fire), load the saved template, and run it under a fresh
1783    /// execution id with the run persisted to history (metadata only - the
1784    /// zero-egress boundary is preserved). Returns the run summary.
1785    pub async fn run_scheduled(
1786        &self,
1787        schedule: &ScheduleRecord,
1788    ) -> Result<Option<ExecutionSummary>, AppError> {
1789        let now = Utc::now();
1790        let scheduled_at = schedule.next_run_at.unwrap_or(now);
1791        let poll_grace = chrono::Duration::seconds((self.settings.scheduler_poll_secs() * 2) as i64);
1792        let missed = now - scheduled_at > poll_grace;
1793        let catchup = scheduler::CatchUpPolicy::parse(&schedule.catchup).unwrap_or_default();
1794        if missed && catchup == scheduler::CatchUpPolicy::Skip {
1795            let next = self.compute_next_schedule_run(schedule, now, schedule.run_count)?;
1796            let _ = self
1797                .store
1798                .advance_schedule_without_run(&schedule.template_slug, now, next, "skipped");
1799            return Ok(None);
1800        }
1801
1802        let after = if missed && catchup == scheduler::CatchUpPolicy::RunAll {
1803            scheduled_at
1804        } else {
1805            now
1806        };
1807        let next = self.compute_next_schedule_run(
1808            schedule,
1809            after,
1810            schedule.run_count.saturating_add(1),
1811        )?;
1812        let _ = self
1813            .store
1814            .mark_schedule_run(&schedule.template_slug, now, next, "running");
1815        let graph = self.load_template(&schedule.template_slug)?;
1816        let events: Arc<dyn EventSink> =
1817            Arc::new(flow_execution::StorageSink::with_flow_name_and_trigger(
1818                self.store.clone(),
1819                schedule.flow_name.clone(),
1820                "scheduled".into(),
1821            ));
1822        match self
1823            .run_graph_internal(Uuid::new_v4().to_string(), graph, events, Some(false), None, None)
1824            .await
1825        {
1826            Ok(summary) => {
1827                let _ = self.store.advance_schedule_without_run(
1828                    &schedule.template_slug,
1829                    Utc::now(),
1830                    next,
1831                    &summary.status,
1832                );
1833                Ok(Some(summary))
1834            }
1835            Err(e) => {
1836                let _ = self.store.advance_schedule_without_run(
1837                    &schedule.template_slug,
1838                    Utc::now(),
1839                    next,
1840                    "failed",
1841                );
1842                Err(e)
1843            }
1844        }
1845    }
1846
1847    /// Fire one due schedule, honoring its catch-up policy. `run-all` may
1848    /// produce multiple summaries when several fire times were missed while the
1849    /// process was down; other policies produce zero or one.
1850    pub async fn run_due_schedule(
1851        &self,
1852        schedule: &ScheduleRecord,
1853    ) -> Result<Vec<ExecutionSummary>, AppError> {
1854        let mut out = Vec::new();
1855        let mut current = schedule.clone();
1856        let run_all =
1857            scheduler::CatchUpPolicy::parse(&current.catchup) == Some(scheduler::CatchUpPolicy::RunAll);
1858        for _ in 0..100 {
1859            if let Some(summary) = self.run_scheduled(&current).await? {
1860                out.push(summary);
1861            }
1862            if !run_all {
1863                break;
1864            }
1865            let Some(next) = self.get_schedule(&current.template_slug)? else {
1866                break;
1867            };
1868            if !next.enabled || next.next_run_at.is_none_or(|n| n > Utc::now()) {
1869                break;
1870            }
1871            current = next;
1872        }
1873        Ok(out)
1874    }
1875
1876    fn compute_next_schedule_run(
1877        &self,
1878        schedule: &ScheduleRecord,
1879        after: DateTime<Utc>,
1880        run_count: u32,
1881    ) -> Result<Option<DateTime<Utc>>, AppError> {
1882        let freq = scheduler::ScheduleFrequency::parse(&schedule.frequency).ok_or_else(|| {
1883            AppError::Validation(format!("unknown schedule frequency `{}`", schedule.frequency))
1884        })?;
1885        scheduler::next_run_after(
1886            freq,
1887            schedule.anchor_at,
1888            after,
1889            &scheduler::ScheduleOptions {
1890                timezone: Some(schedule.timezone.clone()),
1891                cron: schedule.cron.clone(),
1892                every_minutes: schedule.every_minutes,
1893                until: schedule.until,
1894                max_runs: schedule.max_runs,
1895                run_count,
1896            },
1897        )
1898        .map_err(AppError::Validation)
1899    }
1900
1901    pub fn list_template_collections(
1902        &self,
1903    ) -> Result<Vec<flow_storage::TemplateCollectionRecord>, AppError> {
1904        self.store
1905            .list_template_collections()
1906            .map_err(|e| AppError::Template(e.to_string()))
1907    }
1908
1909    /// Insert a new user collection from a display name. Slug is derived
1910    /// server-side via the same `slugify` rule as templates. Refuses on
1911    /// empty-after-slugify and on slug collision (idempotent re-creation
1912    /// callers should branch on the returned `AppError::Template` text).
1913    pub fn create_template_collection(
1914        &self,
1915        name: &str,
1916    ) -> Result<flow_storage::TemplateCollectionRecord, AppError> {
1917        let slug = templates::slugify(name);
1918        if slug.is_empty() {
1919            return Err(AppError::Template(
1920                "collection name is empty after slugification".into(),
1921            ));
1922        }
1923        if self
1924            .store
1925            .get_template_collection(&slug)
1926            .map_err(|e| AppError::Template(e.to_string()))?
1927            .is_some()
1928        {
1929            return Err(AppError::Template(format!(
1930                "collection `{slug}` already exists"
1931            )));
1932        }
1933        self.store
1934            .upsert_template_collection(&slug, name, false)
1935            .map_err(|e| AppError::Template(e.to_string()))
1936    }
1937
1938    /// Idempotent get-or-create: returns the existing collection if a row
1939    /// already exists for the slugified name, otherwise inserts a new one.
1940    /// Used by the Ansible-collection importer (step 6) so re-importing the
1941    /// same tarball reuses the same collection row instead of failing on
1942    /// the slug-collision check in [`Self::create_template_collection`].
1943    pub fn ensure_template_collection(
1944        &self,
1945        name: &str,
1946    ) -> Result<flow_storage::TemplateCollectionRecord, AppError> {
1947        let slug = templates::slugify(name);
1948        if slug.is_empty() {
1949            return Err(AppError::Template(
1950                "collection name is empty after slugification".into(),
1951            ));
1952        }
1953        if let Some(existing) = self
1954            .store
1955            .get_template_collection(&slug)
1956            .map_err(|e| AppError::Template(e.to_string()))?
1957        {
1958            return Ok(existing);
1959        }
1960        self.store
1961            .upsert_template_collection(&slug, name, false)
1962            .map_err(|e| AppError::Template(e.to_string()))
1963    }
1964
1965    pub fn rename_template_collection(
1966        &self,
1967        slug: &str,
1968        new_name: &str,
1969    ) -> Result<flow_storage::TemplateCollectionRecord, AppError> {
1970        templates::validate_slug(slug).map_err(|e| AppError::Template(e.to_string()))?;
1971        if new_name.trim().is_empty() {
1972            return Err(AppError::Template(
1973                "collection display name cannot be empty".into(),
1974            ));
1975        }
1976        self.store
1977            .rename_template_collection(slug, new_name)
1978            .map_err(|e| AppError::Template(e.to_string()))
1979    }
1980
1981    pub fn delete_template_collection(&self, slug: &str) -> Result<(), AppError> {
1982        templates::validate_slug(slug).map_err(|e| AppError::Template(e.to_string()))?;
1983        self.store
1984            .delete_template_collection(slug)
1985            .map_err(|e| AppError::Template(e.to_string()))
1986    }
1987
1988    /// Browse the Template Hub catalog (downloadable flow templates).
1989    pub fn list_template_hub(&self) -> Vec<template_hub::TemplateHubEntry> {
1990        template_hub::catalog()
1991    }
1992
1993    /// Same as [`Self::list_template_hub`] but annotated with where each
1994    /// entry is installed locally + whether the catalog ships a newer
1995    /// version than what the user has on disk. Used by the Hub UI to render
1996    /// the "Installed in <collection>" indicator and the "Update available"
1997    /// pill.
1998    pub fn list_template_hub_with_status(
1999        &self,
2000    ) -> Result<Vec<template_hub::TemplateHubEntryWithStatus>, AppError> {
2001        template_hub::catalog_with_status(&self.store, &Self::templates_dir())
2002            .map_err(|e| AppError::Template(e.to_string()))
2003    }
2004
2005    /// Install a Hub catalog entry into the user-chosen collection. Refuses
2006    /// if a template with the catalog slug already exists locally (re-add
2007    /// must go through [`Self::update_hub_template`]).
2008    pub fn add_hub_template(
2009        &self,
2010        hub_slug: &str,
2011        collection_slug: &str,
2012    ) -> Result<TemplateRecord, AppError> {
2013        template_hub::add_to_collection(
2014            &Self::templates_dir(),
2015            &self.store,
2016            hub_slug,
2017            collection_slug,
2018        )
2019        .map_err(|e| AppError::Template(e.to_string()))
2020    }
2021
2022    /// Overwrite an installed Hub template in place with the catalog's
2023    /// current graph + version. When `force` is false and the user has
2024    /// edited the file since install, the call refuses so the frontend can
2025    /// confirm before clobbering local edits.
2026    pub fn update_hub_template(
2027        &self,
2028        hub_slug: &str,
2029        force: bool,
2030    ) -> Result<TemplateRecord, AppError> {
2031        template_hub::update_installed(&Self::templates_dir(), &self.store, hub_slug, force)
2032            .map_err(|e| AppError::Template(e.to_string()))
2033    }
2034
2035    /// Browse the Node Hub catalog. Sorted by `sortKey` then slug.
2036    pub fn list_node_catalog(&self) -> Vec<nodes::NodeCatalogEntry> {
2037        node_hub::catalog()
2038    }
2039
2040    /// Same as [`Self::list_node_catalog`] but annotated with install
2041    /// status (which entries are present in `node_library` and whether
2042    /// the catalog ships a newer version).
2043    pub fn list_node_catalog_with_status(
2044        &self,
2045    ) -> Result<Vec<node_hub::NodeCatalogEntryWithStatus>, AppError> {
2046        node_hub::catalog_with_status(&self.store)
2047            .map_err(|e| AppError::Template(e.to_string()))
2048    }
2049
2050    /// Runtime view: every installed kind's full scheme, read from disk.
2051    /// This is what the canvas, palette, factory, inspector, and renderer
2052    /// consume - the embedded catalog only participates in installation.
2053    pub fn list_installed_nodes(&self) -> Result<Vec<nodes::NodeCatalogEntry>, AppError> {
2054        nodes::list_installed_schemes(&self.store, &Self::nodes_dir())
2055            .map_err(|e| AppError::Template(e.to_string()))
2056    }
2057
2058    /// Install a catalog entry into the user library. Refuses on slug
2059    /// collision (use [`Self::update_installed_node`] to overwrite).
2060    pub fn add_node_to_library(
2061        &self,
2062        slug: &str,
2063    ) -> Result<flow_storage::NodeLibraryRow, AppError> {
2064        node_hub::add_to_library(&Self::nodes_dir(), &self.store, slug)
2065            .map_err(|e| AppError::Template(e.to_string()))
2066    }
2067
2068    /// Overwrite an installed scheme with the catalog's current version
2069    /// and bump `node_library.version`.
2070    pub fn update_installed_node(
2071        &self,
2072        slug: &str,
2073        force: bool,
2074    ) -> Result<flow_storage::NodeLibraryRow, AppError> {
2075        node_hub::update_installed(&Self::nodes_dir(), &self.store, slug, force)
2076            .map_err(|e| AppError::Template(e.to_string()))
2077    }
2078
2079    /// Remove an installed scheme from disk and the `node_library` row.
2080    pub fn uninstall_node(&self, slug: &str) -> Result<(), AppError> {
2081        node_hub::uninstall(&Self::nodes_dir(), &self.store, slug)
2082            .map_err(|e| AppError::Template(e.to_string()))
2083    }
2084
2085    /// Resolve a catalog entry by slug. Prefers the on-disk installed
2086    /// scheme so user edits round-trip; falls back to the embedded
2087    /// catalog (the operator-supplied default seed data).
2088    pub fn get_node_catalog_entry(
2089        &self,
2090        slug: &str,
2091    ) -> Result<nodes::NodeCatalogEntry, AppError> {
2092        if let Ok(entry) = nodes::read_scheme(&Self::nodes_dir(), slug) {
2093            return Ok(entry);
2094        }
2095        nodes::entry_by_slug(slug)
2096            .ok_or_else(|| AppError::Template(format!("node `{slug}` not found")))
2097    }
2098
2099    /// Parse a DSL document into a `FlowGraph`. Positions in the result are
2100    /// always `(0.0, 0.0)` - callers (the desktop UI) run an auto-layout pass
2101    /// before mounting on the canvas.
2102    pub fn import_dsl(&self, source: &str) -> Result<FlowGraph, AppError> {
2103        flow_dsl::parse(source).map_err(|e| AppError::Dsl {
2104            line: e.line,
2105            col: e.col,
2106            message: e.message,
2107        })
2108    }
2109
2110    /// Serialize a `FlowGraph` to canonical DSL text.
2111    pub fn serialize_dsl(&self, graph: &FlowGraph) -> String {
2112        flow_dsl::serialize(graph)
2113    }
2114
2115    fn settings_path() -> PathBuf {
2116        let mut path = Self::flow_dir();
2117        path.push("settings.json");
2118        path
2119    }
2120
2121    pub fn settings(&self) -> Settings {
2122        self.settings.snapshot()
2123    }
2124
2125    pub fn update_settings(&self, patch: SettingsPatch) -> Result<Settings, AppError> {
2126        self.settings
2127            .update(patch)
2128            .map_err(|e| AppError::Settings(e.to_string()))
2129    }
2130
2131    pub fn list_cloud_providers(&self) -> Vec<flow_adapter_ai::registry::ProviderInfo> {
2132        let discovered_local = self.settings.local_ai_models();
2133        self.cloud_providers
2134            .iter()
2135            .map(|p| {
2136                // For the local provider, prefer the models discovered from
2137                // the running server (cached in settings) over the static
2138                // fallback, so the node inspector's dropdown reflects what's
2139                // actually loaded.
2140                let default_models = if p.category()
2141                    == flow_adapter_ai::ProviderCategory::Local
2142                    && !discovered_local.is_empty()
2143                {
2144                    discovered_local.clone()
2145                } else {
2146                    p.default_models().iter().map(|s| s.to_string()).collect()
2147                };
2148                flow_adapter_ai::registry::ProviderInfo {
2149                    name: p.name().to_string(),
2150                    env_var: p.env_var().to_string(),
2151                    default_models,
2152                    model_capabilities: p.model_capabilities(),
2153                    category: p.category(),
2154                }
2155            })
2156            .collect()
2157    }
2158
2159    fn build_cloud_registry_for_run(&self) -> Arc<CloudAiRegistry> {
2160        let mut reg = CloudAiRegistry::new();
2161        for p in &self.cloud_providers {
2162            if self.settings.provider_enabled(p.name()) {
2163                reg.register(p.clone());
2164            }
2165        }
2166        Arc::new(reg)
2167    }
2168
2169    /// Best-effort discovery of a `llama-server` (llama.cpp) executable so the
2170    /// user usually doesn't have to pick it manually. Checks `$PATH` then a few
2171    /// common install locations (Homebrew, /usr/local, /usr/bin).
2172    pub fn detect_llama_server() -> Option<String> {
2173        let names: &[&str] = if cfg!(windows) {
2174            &["llama-server.exe", "llama-cpp-server.exe"]
2175        } else {
2176            &["llama-server", "llama-cpp-server"]
2177        };
2178        let mut dirs: Vec<PathBuf> = Vec::new();
2179        if let Ok(path) = std::env::var("PATH") {
2180            dirs.extend(std::env::split_paths(&path));
2181        }
2182        for d in ["/opt/homebrew/bin", "/usr/local/bin", "/usr/bin"] {
2183            dirs.push(PathBuf::from(d));
2184        }
2185        for dir in dirs {
2186            for name in names {
2187                let candidate = dir.join(name);
2188                if candidate.is_file() {
2189                    return Some(candidate.to_string_lossy().to_string());
2190                }
2191            }
2192        }
2193        None
2194    }
2195
2196    /// `<engines_dir()>/llama-server[.exe]` if a managed engine was already
2197    /// fetched, else `None`. Read-only - never triggers a fetch.
2198    fn managed_engine_path() -> Option<String> {
2199        let name = if cfg!(windows) {
2200            "llama-server.exe"
2201        } else {
2202            "llama-server"
2203        };
2204        let path = Self::engines_dir().join(name);
2205        path.is_file().then(|| path.to_string_lossy().to_string())
2206    }
2207
2208    /// Resolve the `llama-server` binary the Hub would use: a previously-fetched
2209    /// managed engine, else the saved setting, else an auto-detected one.
2210    /// `None` means none is present yet (Load will fetch one).
2211    ///
2212    /// A saved setting that points at a now-missing file (e.g. an old build
2213    /// path that was cleaned away) is ignored so resolution falls through to a
2214    /// managed engine / `$PATH` instead of returning a dead path.
2215    pub fn llama_server_path(&self) -> Option<String> {
2216        Self::managed_engine_path()
2217            .or_else(|| Self::existing_binary(self.settings.llama_server_binary()))
2218            .or_else(Self::detect_llama_server)
2219    }
2220
2221    /// Keep a binary path only if it points at an existing file; otherwise
2222    /// `None`. Used to drop stale saved/explicit `llama-server` paths so a
2223    /// missing file never short-circuits auto-resolution.
2224    fn existing_binary(path: Option<String>) -> Option<String> {
2225        path.filter(|p| !p.trim().is_empty())
2226            .filter(|p| std::path::Path::new(p).is_file())
2227    }
2228
2229    /// Ensure a managed engine is present under `engines_dir()`, fetching it on
2230    /// first use. Fetch progress is mapped onto the shared `InstallProgress`
2231    /// pipeline (`stage: "engine"`) so callers surface it exactly like a hub
2232    /// model download.
2233    pub async fn ensure_llama_engine(
2234        &self,
2235        progress: Option<ProgressCallback>,
2236    ) -> Result<PathBuf, AppError> {
2237        let dir = Self::engines_dir();
2238        crate::llm_server::fetch::ensure_engine(&dir, move |p| {
2239            if let Some(cb) = &progress {
2240                cb(InstallProgress {
2241                    stage: "engine",
2242                    current: p.current,
2243                    total: p.total.unwrap_or(0),
2244                    message: p.message,
2245                });
2246            }
2247        })
2248        .await
2249        .map_err(AppError::AiInvocation)
2250    }
2251
2252    /// Start the managed local LLM server (`llama-server`) for the given
2253    /// binary + model, falling back to the saved setting then auto-detection
2254    /// when omitted. On success: persist the paths, point `local_ai_base_url`
2255    /// at the managed endpoint, and enable local AI - so the existing `local`
2256    /// provider, flow-graph generator, and agent immediately use it.
2257    pub async fn start_local_llm(
2258        &self,
2259        binary: Option<String>,
2260        model: Option<String>,
2261        params: crate::llm_server::LlamaParams,
2262    ) -> Result<crate::llm_server::LlmServerStatus, AppError> {
2263        // Resolution order (managed engine wins, auto-fetch is the last resort):
2264        // explicit arg → fetched engine → saved setting → $PATH → fetch on demand.
2265        // Explicit/saved paths are dropped when the file is missing so a stale
2266        // path (e.g. a cleaned-up build dir) self-heals to a managed engine
2267        // instead of failing with "binary not found".
2268        let binary = match Self::existing_binary(binary)
2269            .or_else(Self::managed_engine_path)
2270            .or_else(|| Self::existing_binary(self.settings.llama_server_binary()))
2271            .or_else(Self::detect_llama_server)
2272        {
2273            Some(b) => b,
2274            None => self
2275                .ensure_llama_engine(None)
2276                .await?
2277                .to_string_lossy()
2278                .to_string(),
2279        };
2280        let model = model
2281            .filter(|s| !s.trim().is_empty())
2282            .or_else(|| self.settings.llama_server_model())
2283            .ok_or_else(|| AppError::AiInvocation("no LLM selected".into()))?;
2284
2285        // Per-model load params are owned by the frontend (persisted in
2286        // `settings.llama_params` keyed by model id) and passed in here.
2287        let endpoint = self
2288            .llm_server
2289            .start(PathBuf::from(&binary), PathBuf::from(&model), params)
2290            .await
2291            .map_err(AppError::AiInvocation)?;
2292
2293        // Discover the served model id(s) from `/v1/models` so the node
2294        // inspector + flow-graph-generator dropdowns reflect what's actually loaded
2295        // (this replaces the role the manual "Test connection" step played).
2296        // Best-effort: the server just answered readiness, so this normally
2297        // succeeds; fall back to an empty list on error.
2298        let discovered = flow_adapter_ai::LocalOpenAiProvider::new()
2299            .list_models(&endpoint)
2300            .await
2301            .unwrap_or_default();
2302
2303        let _ = self.update_settings(SettingsPatch {
2304            llama_server_binary: Some(binary),
2305            llama_server_model: Some(model),
2306            local_ai_base_url: Some(endpoint),
2307            local_ai_models: Some(discovered),
2308            allow_local_ai: Some(true),
2309            ..Default::default()
2310        });
2311        Ok(self.llm_server.status())
2312    }
2313
2314    /// Stop the managed local LLM server (idempotent). Clears the managed
2315    /// wiring (`local_ai_base_url` + discovered models) so the local provider,
2316    /// flow-graph generator, and agent don't keep calling a now-dead port.
2317    pub async fn stop_local_llm(&self) -> crate::llm_server::LlmServerStatus {
2318        self.llm_server.stop().await;
2319        let _ = self.update_settings(SettingsPatch {
2320            local_ai_base_url: Some(String::new()),
2321            local_ai_models: Some(Vec::new()),
2322            ..Default::default()
2323        });
2324        self.llm_server.status()
2325    }
2326
2327    /// Status snapshot of the managed local LLM server.
2328    pub fn local_llm_status(&self) -> crate::llm_server::LlmServerStatus {
2329        self.llm_server.status()
2330    }
2331
2332    pub fn store(&self) -> &Store {
2333        &self.store
2334    }
2335
2336    pub fn list_executions(&self, limit: usize) -> Result<Vec<ExecutionRecord>, AppError> {
2337        self.store
2338            .list_executions(limit)
2339            .map_err(|e| AppError::Storage(e.to_string()))
2340    }
2341
2342    pub fn get_execution(
2343        &self,
2344        execution_id: &str,
2345    ) -> Result<
2346        Option<(ExecutionRecord, Vec<ExecutionStepRecord>, Vec<InterceptionRecord>)>,
2347        AppError,
2348    > {
2349        self.store
2350            .get_execution(execution_id)
2351            .map_err(|e| AppError::Storage(e.to_string()))
2352    }
2353
2354    /// Record a monitor interception (auto-fix) against a run, so the History
2355    /// view can show what was changed/added beyond the per-node steps.
2356    pub fn record_interception(
2357        &self,
2358        execution_id: String,
2359        failed_node: String,
2360        issue: Option<String>,
2361        summary: String,
2362        dsl_before: Option<String>,
2363        dsl_after: Option<String>,
2364    ) -> Result<(), AppError> {
2365        self.store
2366            .record_interception(&InterceptionRecord {
2367                execution_id,
2368                at: chrono::Utc::now(),
2369                failed_node,
2370                issue,
2371                summary,
2372                dsl_before,
2373                dsl_after,
2374            })
2375            .map_err(|e| AppError::Storage(e.to_string()))
2376    }
2377
2378    pub async fn execute_graph(
2379        &self,
2380        graph: FlowGraph,
2381        events: Arc<dyn EventSink>,
2382    ) -> Result<ExecutionSummary, AppError> {
2383        self.execute_graph_with_id(Uuid::new_v4().to_string(), graph, events)
2384            .await
2385    }
2386
2387    /// Run `graph` under a caller-supplied execution id. A fresh per-run
2388    /// [`flow_execution::RunControl`] is registered under that id for the
2389    /// lifetime of the run (so [`FlowApp::pause_run`] / [`resume_run`] /
2390    /// [`cancel_run`] can target this run while others keep running) and removed
2391    /// on completion. The host generates the id up front so it can map the run
2392    /// to the originating tab/flow before the `Started` event is observed.
2393    pub async fn execute_graph_with_id(
2394        &self,
2395        execution_id: String,
2396        graph: FlowGraph,
2397        events: Arc<dyn EventSink>,
2398    ) -> Result<ExecutionSummary, AppError> {
2399        self.run_graph_internal(execution_id, graph, events, None, None, None)
2400            .await
2401    }
2402
2403    /// Shared run path behind [`execute_graph_with_id`] and [`run_agent_turn`].
2404    /// `confirm_destructive_override` forces the per-step destructive-action
2405    /// confirmation gate on/off (`None` follows the persisted setting).
2406    /// `edit_staging` is `None` for a normal run; an agent turn passes a buffer
2407    /// so fs mutations are staged for review instead of written to disk.
2408    /// `workspace_override` pins the execution workspace for this run (the
2409    /// autonomous loop passes the user's chosen folder); `None` resolves it
2410    /// from the stored per-flow path or the edition default.
2411    async fn run_graph_internal(
2412        &self,
2413        execution_id: String,
2414        graph: FlowGraph,
2415        events: Arc<dyn EventSink>,
2416        confirm_destructive_override: Option<bool>,
2417        edit_staging: Option<flow_execution::StagedEdits>,
2418        workspace_override: Option<PathBuf>,
2419    ) -> Result<ExecutionSummary, AppError> {
2420        let allow_cloud_ai = self.settings.allow_cloud_ai();
2421        let allow_local_ai = self.settings.allow_local_ai();
2422        let local_ai_base_url = self.settings.local_ai_base_url();
2423        let cloud_providers = self.build_cloud_registry_for_run();
2424        // The per-step confirmation gate follows the persisted setting, unless a
2425        // headless runner has forced it off (no way to confirm).
2426        let confirm_destructive = confirm_destructive_override.unwrap_or_else(|| {
2427            self.settings.confirm_destructive()
2428                && !self
2429                    .confirm_gate_disabled
2430                    .load(std::sync::atomic::Ordering::SeqCst)
2431        });
2432        // Each run gets its own control, registered under its id so the host can
2433        // steer this run independently of any other in-flight runs.
2434        let control: flow_execution::SharedRunControl = Default::default();
2435        if let Ok(mut runs) = self.runs.lock() {
2436            runs.insert(execution_id.clone(), control.clone());
2437        }
2438        // Trace the node lifecycle to the terminal / log file (under the
2439        // `flow_execution` target) by wrapping the host's sink, so a dev run
2440        // shows what the engine is doing - not just llama-server output.
2441        let events: Arc<dyn EventSink> = Arc::new(flow_execution::TracingSink::new(events));
2442        // The shared execution workspace for this flow; every shell/cli/fs node
2443        // defaults its cwd / workspaceRoot to it. Created up front so fs nodes
2444        // (which require an existing root) work against the scratch default.
2445        let workspace_root =
2446            workspace_override.unwrap_or_else(|| self.resolve_workspace(&graph.id));
2447        if let Err(e) = std::fs::create_dir_all(&workspace_root) {
2448            tracing::warn!(error = %e, path = %workspace_root.display(), "failed to create workspace dir");
2449        }
2450        let executor = Executor {
2451            adapters: self.adapters.clone(),
2452            sanitizer: self.sanitizer.clone(),
2453            events,
2454            cloud_providers,
2455            credentials: self.resolver.clone(),
2456            allow_cloud_ai,
2457            allow_local_ai,
2458            local_ai_base_url,
2459            stream_sink: self.stream_sink(),
2460            working_memory: self.working_memory.clone(),
2461            control,
2462            confirm_destructive,
2463            // The review gate needs a host UI to resolve it; headless runners
2464            // disable the confirmation gate, which marks exactly that absence.
2465            review_gate_available: !self
2466                .confirm_gate_disabled
2467                .load(std::sync::atomic::Ordering::SeqCst),
2468            edit_staging,
2469            workspace_root,
2470        };
2471        let result = executor.run_with_id(execution_id.clone(), &graph).await;
2472        if let Ok(mut runs) = self.runs.lock() {
2473            runs.remove(&execution_id);
2474        }
2475        // Persist working memory so set-variable values survive across sessions.
2476        self.persist_memory();
2477        result.map_err(|e| AppError::Execution(e.to_string()))
2478    }
2479
2480    /// Run one coding-agent turn as a flow. Same executor path as
2481    /// [`execute_graph_with_id`] - events stream and the run is steerable
2482    /// (pause/resume/cancel) under `execution_id` - but filesystem mutations are
2483    /// *staged* (computed, not written). Returns the run summary plus the
2484    /// proposed edits for the IDE to review and apply.
2485    pub async fn run_agent_turn(
2486        &self,
2487        execution_id: String,
2488        graph: FlowGraph,
2489        events: Arc<dyn EventSink>,
2490    ) -> Result<AgentTurn, AppError> {
2491        let staging: flow_execution::StagedEdits = Default::default();
2492        let summary = self
2493            .run_graph_internal(execution_id, graph, events, None, Some(staging.clone()), None)
2494            .await?;
2495        let edits = staging.lock().map(|s| s.edits()).unwrap_or_default();
2496        Ok(AgentTurn { summary, edits })
2497    }
2498
2499    /// Request a pause of the run with `execution_id`. Honored at the next node
2500    /// boundary; an in-flight node finishes first. No-op if no such run.
2501    pub fn pause_run(&self, execution_id: &str) {
2502        if let Ok(runs) = self.runs.lock() {
2503            if let Some(c) = runs.get(execution_id) {
2504                c.pause();
2505            }
2506        }
2507    }
2508
2509    /// Release the paused run with `execution_id` so scheduling continues.
2510    pub fn resume_run(&self, execution_id: &str) {
2511        if let Ok(runs) = self.runs.lock() {
2512            if let Some(c) = runs.get(execution_id) {
2513                c.resume();
2514            }
2515        }
2516    }
2517
2518    /// Resolve the AI review gate of the run with `execution_id`: record the
2519    /// human verdict and release the pause (RAO human authority). Approve
2520    /// passes the contract-bound output downstream; reject fails the node onto
2521    /// its fallback path. No-op if no such run.
2522    pub fn resolve_ai_review(&self, execution_id: &str, approved: bool) {
2523        if let Ok(runs) = self.runs.lock() {
2524            if let Some(c) = runs.get(execution_id) {
2525                c.resolve_review(approved);
2526            }
2527        }
2528    }
2529
2530    /// Pause every active run. Convenience for headless runners (CLI / TUI)
2531    /// that only ever drive a single run and don't track its id.
2532    pub fn pause_all_runs(&self) {
2533        if let Ok(runs) = self.runs.lock() {
2534            for c in runs.values() {
2535                c.pause();
2536            }
2537        }
2538    }
2539
2540    /// Resume every paused run. Companion to [`FlowApp::pause_all_runs`].
2541    pub fn resume_all_runs(&self) {
2542        if let Ok(runs) = self.runs.lock() {
2543            for c in runs.values() {
2544                c.resume();
2545            }
2546        }
2547    }
2548
2549    /// Force the per-step destructive-action confirmation gate off for this
2550    /// instance. Headless runners (CLI / TUI) call this at startup: they have
2551    /// no confirmation UI, so a destructive node would otherwise block the run
2552    /// forever waiting on a confirm that can't arrive.
2553    pub fn disable_confirmation_gate(&self) {
2554        self.confirm_gate_disabled
2555            .store(true, std::sync::atomic::Ordering::SeqCst);
2556    }
2557
2558    /// Cancel the run with `execution_id`. The current node finishes, remaining
2559    /// nodes are skipped, and the run ends with `cancelled` status. No-op if no
2560    /// such run.
2561    pub fn cancel_run(&self, execution_id: &str) {
2562        if let Ok(runs) = self.runs.lock() {
2563            if let Some(c) = runs.get(execution_id) {
2564                c.cancel();
2565            }
2566        }
2567    }
2568
2569    /// Cancel every active run. Used by headless runners (CLI / TUI) which only
2570    /// ever have a single run and don't track its id, and as a stop-all signal.
2571    pub fn cancel_all_runs(&self) {
2572        if let Ok(runs) = self.runs.lock() {
2573            for c in runs.values() {
2574                c.cancel();
2575            }
2576        }
2577    }
2578
2579    /// Clear session working memory. Call when starting a fresh agent session
2580    /// so `{{memory.<key>}}` doesn't leak values from a prior, unrelated run.
2581    pub fn clear_working_memory(&self) {
2582        if let Ok(mut mem) = self.working_memory.lock() {
2583            mem.clear();
2584        }
2585        let _ = self.store.clear_memory();
2586    }
2587
2588    /// Persist the current working memory so `{{memory.<key>}}` survives across
2589    /// sessions (durable agent memory, roadmap E1). Best-effort.
2590    fn persist_memory(&self) {
2591        if let Ok(mem) = self.working_memory.lock() {
2592            let _ = self.store.save_memory(&mem);
2593        }
2594    }
2595
2596    /// Static pre-apply verification of a proposed flow's DSL: returns advisories
2597    /// (destructive operations, possible sandbox escapes) for the review +
2598    /// interception modals. Best-effort - DSL that fails to parse yields no
2599    /// warnings (the caller surfaces the parse error separately).
2600    pub fn verify_flow(&self, dsl: &str) -> Vec<FlowWarning> {
2601        match flow_dsl::parse(dsl) {
2602            Ok(graph) => flow_execution::verify_graph(&graph),
2603            Err(_) => Vec::new(),
2604        }
2605    }
2606
2607    /// Generate Flow DSL via a cloud AI provider (Claude / OpenAI / Gemini) or
2608    /// the local LLM server. Pick a model, hit Generate, and the frontend gets
2609    /// a parse-validated DSL string back regardless of source.
2610    ///
2611    /// The flow-graph-generator system prompt is shipped via each provider's
2612    /// native system channel (Anthropic top-level `system`, OpenAI
2613    /// system-role message, Gemini `system_instruction`) rather than
2614    /// concatenated into the user message. The PII sanitizer is
2615    /// intentionally bypassed: filenames and dataset names need to reach
2616    /// the model verbatim so the generated DSL refers to the right
2617    /// artifacts.
2618    ///
2619    /// **Parse validation with one retry.** The first response is parsed
2620    /// via `flow_dsl::parse`. On parse failure we re-issue the request
2621    /// with a corrective user message that quotes the bad DSL plus the
2622    /// parser's `line:col` and asks for a fix; the retry uses the same
2623    /// model, temperature, and max-tokens. Only parse-validated DSL is
2624    /// ever returned. If both attempts fail, the error message carries
2625    /// both `line:col` positions so the caller (and `generate_dsl_auto`)
2626    /// can surface them.
2627    pub async fn generate_dsl_via_cloud(
2628        &self,
2629        provider: &str,
2630        model: &str,
2631        prompt: &str,
2632        max_tokens: Option<u32>,
2633        temperature: Option<f32>,
2634    ) -> Result<String, AppError> {
2635        // The `local` provider is an on-device OpenAI-compatible server, not
2636        // network egress - it has its own gate (`allow_local_ai`), needs no
2637        // API key, and carries its endpoint via `base_url`.
2638        let is_local = provider == "local";
2639        if is_local {
2640            if !self.settings.allow_local_ai() {
2641                return Err(AppError::AiInvocation(
2642                    "local AI is disabled (Settings -> LLM -> Local -> enable)".into(),
2643                ));
2644            }
2645        } else if !self.settings.allow_cloud_ai() {
2646            return Err(AppError::AiInvocation(
2647                "cloud AI is disabled (Settings -> LLM -> Cloud -> enable to allow outbound calls)"
2648                    .into(),
2649            ));
2650        }
2651        let registry = self.build_cloud_registry_for_run();
2652        let p = registry.get(provider).ok_or_else(|| {
2653            AppError::AiInvocation(format!("provider '{provider}' is not enabled in Settings"))
2654        })?;
2655        let env_var = p.env_var();
2656        let api_key = match self.resolver.resolve_cloud_ai_key(provider, env_var) {
2657            Ok(k) => k,
2658            // Local servers usually need no auth; a missing key is fine.
2659            Err(_) if is_local => String::new(),
2660            Err(e) => {
2661                return Err(AppError::AiInvocation(format!(
2662                    "no API key for '{provider}': {e}. Set it in Settings or export {env_var}."
2663                )))
2664            }
2665        };
2666        let base_url = if is_local {
2667            self.settings.local_ai_base_url()
2668        } else {
2669            None
2670        };
2671
2672        // Local "thinking" models (Qwen3, R1-distills) can spend thousands of
2673        // tokens in `reasoning_content` before emitting the answer; with a low
2674        // cap they get cut off mid-thought and return empty content. Local
2675        // inference is free, so floor the budget high.
2676        //
2677        // Cloud generation inlines file `content`, so a multi-file scaffold
2678        // easily exceeds a small cap and gets cut off mid-string - which
2679        // surfaces as a misleading `unterminated string`. A ceiling costs
2680        // nothing unless used (billing is per actual output token), so floor
2681        // cloud generation to the same headroom rather than trusting a small
2682        // caller default. Truncation past this floor is detected below.
2683        let effective_max_tokens = Some(max_tokens.unwrap_or(0).max(8192));
2684        let max_tokens = effective_max_tokens;
2685
2686        let system_prompt = DSL_GENERATION_SYSTEM_PROMPT;
2687
2688        tracing::info!(
2689            provider,
2690            model,
2691            is_local,
2692            prompt_chars = prompt.len(),
2693            "generating Flow DSL"
2694        );
2695
2696        // Round 1: clean system + user split.
2697        let sink = self.stream_sink();
2698        let (raw1, finish1) = cloud_call_once(
2699            p.as_ref(),
2700            model,
2701            system_prompt,
2702            prompt,
2703            max_tokens,
2704            temperature,
2705            &api_key,
2706            base_url.clone(),
2707            sink.as_ref(),
2708        )
2709        .await?;
2710        let extracted1 = extract_dsl_text(&raw1);
2711
2712        // Corrective retry loop. Each pass inspects the failure kind and sends
2713        // the matching corrective prompt: parse errors quote the parser's
2714        // `line:col`; semantic errors quote the specific contract violations.
2715        // Handling both kinds in one loop is what lets a parse fix that then
2716        // trips semantics (e.g. a `utility` node carrying `run-command`) still
2717        // get a semantic-correction pass instead of failing outright.
2718        const MAX_CORRECTIVE_RETRIES: usize = 3;
2719        let mut current = extracted1;
2720        let mut attempt = 0usize;
2721        let mut last_reason = match validate_generated_dsl(&current) {
2722            Ok(dsl) => {
2723                tracing::info!(provider, model, attempt, "Flow DSL generated on first try");
2724                return Ok(dsl);
2725            }
2726            Err(reason) => reason,
2727        };
2728        // A response cut off at the token limit yields an unterminated string no
2729        // amount of re-prompting can fix at the same budget, so short-circuit
2730        // with a clear, actionable error instead of burning corrective retries.
2731        if is_truncated(&finish1) {
2732            return Err(truncation_error(provider, model, max_tokens, &last_reason));
2733        }
2734        for _ in 0..MAX_CORRECTIVE_RETRIES {
2735            attempt += 1;
2736            let retry_user = if last_reason.starts_with("DSL parse error") {
2737                let parse_err = flow_dsl::parse(&current).expect_err("parse error expected");
2738                tracing::warn!(
2739                    provider,
2740                    attempt,
2741                    line = parse_err.line,
2742                    col = parse_err.col,
2743                    message = %parse_err.message,
2744                    "generated DSL failed to parse; retrying with corrective prompt"
2745                );
2746                build_retry_prompt(prompt, &current, &parse_err)
2747            } else {
2748                let graph = flow_dsl::parse(&current).expect("already parsed");
2749                let issues = dsl_semantics::validate_generated_graph(&graph);
2750                tracing::warn!(
2751                    provider,
2752                    attempt,
2753                    %last_reason,
2754                    "generated DSL failed semantic checks; retrying with corrective prompt"
2755                );
2756                build_semantic_retry_prompt(prompt, &current, &issues)
2757            };
2758            let (raw, finish) = cloud_call_once(
2759                p.as_ref(),
2760                model,
2761                system_prompt,
2762                &retry_user,
2763                max_tokens,
2764                temperature,
2765                &api_key,
2766                base_url.clone(),
2767                sink.as_ref(),
2768            )
2769            .await?;
2770            current = extract_dsl_text(&raw);
2771            match validate_generated_dsl(&current) {
2772                Ok(dsl) => {
2773                    tracing::info!(
2774                        provider,
2775                        model,
2776                        attempt,
2777                        "Flow DSL generated after corrective retry"
2778                    );
2779                    return Ok(dsl);
2780                }
2781                Err(reason) => last_reason = reason,
2782            }
2783            // Same token-limit short-circuit as round 1: don't keep retrying a
2784            // response the model could not finish within the budget.
2785            if is_truncated(&finish) {
2786                return Err(truncation_error(provider, model, max_tokens, &last_reason));
2787            }
2788        }
2789
2790        // Retries exhausted. For a trailing parse error, append the targeted
2791        // hint (adapter-name-as-node-kind, malformed string, …) so the surfaced
2792        // error tells the user how to fix it, not just where it broke.
2793        let hint = last_reason
2794            .strip_prefix("DSL parse error at ")
2795            .and_then(|rest| rest.split_once(" - "))
2796            .and_then(|(_, msg)| parse_error_hint(msg))
2797            .map(|h| format!("\n\n{h}"))
2798            .unwrap_or_default();
2799        tracing::warn!(
2800            provider,
2801            model,
2802            reason = %last_reason,
2803            dsl = %truncate_for_log(&current),
2804            "Flow DSL generation failed after corrective retries"
2805        );
2806        Err(AppError::AiInvocation(format!(
2807            "{provider} model returned invalid DSL after {MAX_CORRECTIVE_RETRIES} corrective retries: {last_reason}{hint}"
2808        )))
2809    }
2810
2811    /// Generate Flow DSL via a cloud/local-provider model and parse-validate
2812    /// it. `cloud` is `(provider, model_id)` - the loaded LLM is the `local`
2813    /// provider. Returns parse-valid DSL or an error.
2814    pub async fn generate_dsl_auto(
2815        &self,
2816        _local_model_id: Option<&str>,
2817        cloud: Option<(&str, &str)>,
2818        prompt: &str,
2819    ) -> Result<DslGenerationOutcome, AppError> {
2820        if let Some((provider, model)) = cloud {
2821            let dsl = self
2822                .generate_dsl_via_cloud(provider, model, prompt, Some(2048), None)
2823                .await
2824                .map_err(|e| AppError::AiInvocation(format!("cloud: {e}")))?;
2825            return Ok(DslGenerationOutcome {
2826                dsl,
2827                source: DslSource::Cloud {
2828                    provider: provider.to_string(),
2829                    model_id: model.to_string(),
2830                },
2831                fallback_reason: None,
2832                plan: None,
2833            });
2834        }
2835        Err(AppError::AiInvocation(
2836            "no flow-graph generator configured (select a model or provider)".into(),
2837        ))
2838    }
2839
2840    /// Agentic generation: generate DSL from the prompt alone, then ask a
2841    /// chat-capable model for a one-paragraph plan describing the result.
2842    /// Generation never reads local/user data - the workspace is an execution
2843    /// concern resolved at run time, not a generation input (see ADR-0007).
2844    /// The plan call is best-effort: failures (no chat-capable provider
2845    /// reachable, network error, empty response) leave `plan = None` so
2846    /// the caller can fall back to a deterministic client-side synthesis.
2847    ///
2848    /// "Chat-capable provider" means anything we can call via
2849    /// `generate_dsl_via_cloud` with an arbitrary system prompt - the
2850    /// loaded LLM (when `cloud == Some(("local", _))`) or a real cloud
2851    /// provider. With neither available there is no chat path; the call
2852    /// returns `Ok(outcome)` with `plan = None`.
2853    pub async fn agentic_generate(
2854        &self,
2855        local_model_id: Option<&str>,
2856        cloud: Option<(&str, &str)>,
2857        prompt: &str,
2858    ) -> Result<DslGenerationOutcome, AppError> {
2859        let mut outcome = self.generate_dsl_auto(local_model_id, cloud, prompt).await?;
2860        outcome.plan = self.synthesize_plan(&outcome.dsl, cloud).await;
2861        Ok(outcome)
2862    }
2863
2864    /// Observe-and-re-plan: regenerate the workflow given the prior DSL and a
2865    /// feedback string describing what went wrong when it ran (from
2866    /// [`build_loop_observations`]). The original task `prompt` is preserved so
2867    /// the model keeps the goal in view while fixing the failures. Goes through
2868    /// the same prompt-only agentic pipeline as a first generation, just with an
2869    /// augmented prompt.
2870    ///
2871    /// This is the single re-plan step. The frontend's review-then-run path
2872    /// calls it per user request; [`Self::agentic_run_loop`] calls it
2873    /// repeatedly for the autonomous path.
2874    pub async fn agentic_replan(
2875        &self,
2876        local_model_id: Option<&str>,
2877        cloud: Option<(&str, &str)>,
2878        prompt: &str,
2879        prior_dsl: &str,
2880        observations: &str,
2881    ) -> Result<DslGenerationOutcome, AppError> {
2882        let augmented = format!(
2883            "{prompt}\n\n\
2884             You previously generated this Flow DSL:\n\n{prior_dsl}\n\n\
2885             When it ran, these problems occurred:\n\n{observations}\n\n\
2886             Produce a corrected Flow DSL that fixes the failing steps. Keep the \
2887             parts that already succeeded. Emit DSL only."
2888        );
2889        self.agentic_generate(local_model_id, cloud, &augmented)
2890            .await
2891    }
2892
2893    /// Autonomous agentic loop (the no-human-in-the-loop path): repeatedly
2894    /// generate → run → observe → re-plan until a run has zero failures or the
2895    /// [`MAX_AGENTIC_ITERATIONS`] safety cap is hit. Each iteration's run emits
2896    /// the normal `flow:execution` events through `events` (so the Log Panel
2897    /// streams live); the returned [`AgenticLoopSummary`] carries the final DSL
2898    /// for the caller to load onto the canvas plus a per-iteration record.
2899    ///
2900    /// Safety: bounded by the iteration cap, and every generated graph still
2901    /// runs through `execute_graph`, so the existing per-adapter sandbox and
2902    /// `allow_cloud_ai` / `allow_local_ai` gates apply unchanged - autonomy
2903    /// does not bypass them.
2904    pub async fn agentic_run_loop(
2905        &self,
2906        local_model_id: Option<&str>,
2907        cloud: Option<(&str, &str)>,
2908        prompt: &str,
2909        workspace: Option<&str>,
2910        events: Arc<dyn EventSink>,
2911    ) -> Result<AgenticLoopSummary, AppError> {
2912        // `workspace` pins where each iteration's run executes - it is never an
2913        // input to generation (ADR-0007).
2914        let ws_override = workspace
2915            .filter(|w| !w.trim().is_empty())
2916            .map(expand_home);
2917        let mut steps = Vec::new();
2918        let mut prev_dsl: Option<String> = None;
2919        let mut observations: Option<String> = None;
2920        let mut last_outcome: Option<DslGenerationOutcome> = None;
2921        let cap = self.settings.max_agentic_iterations();
2922        let budget_secs = self.settings.max_agentic_seconds();
2923        let token_budget = self.settings.max_agentic_tokens();
2924        let started = std::time::Instant::now();
2925        let mut tokens_used: u64 = 0;
2926
2927        for iter in 0..cap {
2928            // Run budgets: stop starting new iterations once the wall-clock or
2929            // cumulative-token ceiling is hit (the first iteration always runs).
2930            // `0` = unlimited for either.
2931            let over_time =
2932                budget_secs > 0 && started.elapsed().as_secs() >= budget_secs as u64;
2933            let over_tokens = token_budget > 0 && tokens_used >= token_budget as u64;
2934            if iter > 0 && (over_time || over_tokens) {
2935                let last = last_outcome.expect("loop body runs at least once");
2936                return Ok(AgenticLoopSummary {
2937                    iterations: iter,
2938                    status: "budget_exhausted".into(),
2939                    final_dsl: last.dsl,
2940                    final_plan: last.plan,
2941                    steps,
2942                });
2943            }
2944            let outcome = match (&prev_dsl, &observations) {
2945                (Some(dsl), Some(obs)) => {
2946                    self.agentic_replan(local_model_id, cloud, prompt, dsl, obs)
2947                        .await?
2948                }
2949                _ => self.agentic_generate(local_model_id, cloud, prompt).await?,
2950            };
2951
2952            let graph = flow_dsl::parse(&outcome.dsl).map_err(|e| {
2953                AppError::AiInvocation(format!(
2954                    "generated DSL failed to parse at {}:{} - {}",
2955                    e.line, e.col, e.message
2956                ))
2957            })?;
2958
2959            // Run with a capturing sink alongside the caller's so we can read
2960            // per-node outcomes back for the next re-plan.
2961            let capture = CapturingSink::default();
2962            let sink: Arc<dyn EventSink> =
2963                Arc::new(MultiSink::new(vec![events.clone(), Arc::new(capture.clone())]));
2964            let summary = self
2965                .run_graph_internal(
2966                    Uuid::new_v4().to_string(),
2967                    graph,
2968                    sink,
2969                    None,
2970                    None,
2971                    ws_override.clone(),
2972                )
2973                .await?;
2974            let captured = capture.events.lock().map(|g| g.clone()).unwrap_or_default();
2975            tokens_used = tokens_used.saturating_add(sum_ai_tokens(&captured));
2976
2977            let converged = summary.failed == 0;
2978            let obs = if converged {
2979                None
2980            } else {
2981                Some(build_loop_observations(&captured, &summary))
2982            };
2983            steps.push(AgenticLoopStep {
2984                iteration: iter,
2985                run_status: summary.status.clone(),
2986                succeeded: summary.succeeded,
2987                failed: summary.failed,
2988                skipped: summary.skipped,
2989                observations: obs.clone(),
2990            });
2991
2992            if converged {
2993                return Ok(AgenticLoopSummary {
2994                    iterations: iter + 1,
2995                    status: "succeeded".into(),
2996                    final_dsl: outcome.dsl,
2997                    final_plan: outcome.plan,
2998                    steps,
2999                });
3000            }
3001
3002            observations = obs;
3003            prev_dsl = Some(outcome.dsl.clone());
3004            last_outcome = Some(outcome);
3005        }
3006
3007        // Cap hit without a clean run. Return the last attempt so the user can
3008        // inspect/repair it manually.
3009        let last = last_outcome.expect("loop body runs at least once");
3010        Ok(AgenticLoopSummary {
3011            iterations: cap,
3012            status: "exhausted".into(),
3013            final_dsl: last.dsl,
3014            final_plan: last.plan,
3015            steps,
3016        })
3017    }
3018
3019    /// Best-effort second pass: ask a chat-capable provider to summarise
3020    /// the generated DSL. Returns `Some(text)` on
3021    /// success; `None` on any failure (gating off, no provider, parse
3022    /// error, empty body). Callers must not treat `None` as fatal.
3023    async fn synthesize_plan(
3024        &self,
3025        dsl: &str,
3026        cloud: Option<(&str, &str)>,
3027    ) -> Option<String> {
3028        // We can only synthesise a plan via a chat model - a cloud provider
3029        // or the loaded local LLM (the `local` provider).
3030        let (provider, model) = cloud?;
3031        let registry = self.build_cloud_registry_for_run();
3032        let p = registry.get(provider)?;
3033        let is_local = provider == "local";
3034        let allowed = if is_local {
3035            self.settings.allow_local_ai()
3036        } else {
3037            self.settings.allow_cloud_ai()
3038        };
3039        if !allowed {
3040            return None;
3041        }
3042        let env_var = p.env_var();
3043        let api_key = match self.resolver.resolve_cloud_ai_key(provider, env_var) {
3044            Ok(k) => k,
3045            Err(_) if is_local => String::new(),
3046            Err(_) => return None,
3047        };
3048        let base_url = if is_local {
3049            self.settings.local_ai_base_url()
3050        } else {
3051            None
3052        };
3053        let system = "You explain a Flow DSL workflow for a \
3054                      reviewer about to apply it. Write one short paragraph \
3055                      (under 60 words) summarising what the workflow does, \
3056                      then a numbered list with one line per step describing \
3057                      that step's purpose. No code fences, no DSL syntax.";
3058        let user = format!(
3059            "Flow DSL to summarise:\n\n{dsl}\n\nReturn the paragraph + numbered list only."
3060        );
3061        let req = flow_adapter_ai::CloudAiRequest {
3062            model: model.to_string(),
3063            prompt: user,
3064            // Plan calls don't need a giant budget; keep it tight so even
3065            // a tiny local model returns quickly.
3066            max_tokens: Some(if is_local { 1024 } else { 600 }),
3067            temperature: Some(0.2),
3068            api_key,
3069            system: Some(system.to_string()),
3070            base_url,
3071            stream: true,
3072            call_id: Some(format!("plan-{}", uuid::Uuid::new_v4())),
3073            ..Default::default()
3074        };
3075        let sink = self.stream_sink();
3076        match p.invoke_stream(&req, sink.as_ref()).await {
3077            Ok(resp) => {
3078                let trimmed = resp.text.trim().to_string();
3079                if trimmed.is_empty() {
3080                    None
3081                } else {
3082                    Some(trimmed)
3083                }
3084            }
3085            Err(e) => {
3086                tracing::debug!(provider, error = %e, "plan synthesis call failed; returning None");
3087                None
3088            }
3089        }
3090    }
3091
3092    pub async fn load_graph(&self, graph_id: &str) -> Result<FlowGraph, AppError> {
3093        if graph_id.is_empty() {
3094            return Err(AppError::GraphNotFound("empty graph id".into()));
3095        }
3096
3097        Ok(FlowGraph {
3098            subflows: Vec::new(),
3099            id: graph_id.to_string(),
3100            name: "JCL Pipeline".into(),
3101            version: "0.1.0".into(),
3102            description: None,
3103            nodes: vec![
3104                FlowNode {
3105                    id: "validate-jcl".into(),
3106                    node_type: "ai".into(),
3107                    position: Position { x: 80.0, y: 80.0 },
3108                    data: serde_json::json!({
3109                        "label": "Review JCL",
3110                        "modelId": "local-llm",
3111                        "input": "{{input}}"
3112                    }),
3113                },
3114                FlowNode {
3115                    id: "submit-jcl".into(),
3116                    node_type: "action".into(),
3117                    position: Position { x: 380.0, y: 80.0 },
3118                    data: serde_json::json!({
3119                        "label": "Submit JCL",
3120                        "actionId": "cli-tool",
3121                        "adapter": "cli",
3122                        "bin": "zowe",
3123                        "command": "jobs submit local-file"
3124                    }),
3125                },
3126            ],
3127            edges: vec![FlowEdge {
3128                id: "e1".into(),
3129                source: "validate-jcl".into(),
3130                target: "submit-jcl".into(),
3131                label: None,
3132                condition: None,
3133                outcome: flow_domain::graph::EdgeOutcome::Always,
3134            }],
3135        })
3136    }
3137
3138    pub async fn validate_graph(&self, graph: FlowGraph) -> Result<(), AppError> {
3139        if graph.nodes.is_empty() {
3140            return Err(AppError::Validation(
3141                "graph must contain at least one node".into(),
3142            ));
3143        }
3144        Ok(())
3145    }
3146
3147    pub async fn run_flow(&self, _req: ExecutionRequest) -> Result<ExecutionResult, AppError> {
3148        Ok(ExecutionResult {
3149            execution_id: Uuid::new_v4().to_string(),
3150            status: ExecutionStatus::Succeeded,
3151            started_at: Utc::now(),
3152            ended_at: Some(Utc::now()),
3153        })
3154    }
3155}