1use chrono::{DateTime, Utc};
2use flow_adapter_cli::CliAdapter;
3use flow_adapter_fs::FsAdapter;
4use flow_adapter_shell::ShellAdapter;
5use flow_adapter_utility::UtilityAdapter;
6use flow_adapter_ai::{
7 ClaudeProvider, CloudAiProvider, CloudAiRegistry, DeepSeekProvider, GeminiProvider,
8 LocalOpenAiProvider, NvidiaProvider, OpenAiProvider,
9};
10use flow_domain::execution::{ExecutionRequest, ExecutionResult, ExecutionStatus};
11use flow_domain::graph::{FlowEdge, FlowGraph, FlowNode, Position};
12use flow_execution::{
13 AdapterRegistry, CapturingSink, EventSink, ExecutionEvent,
14 ExecutionSummary, Executor, MockAdapter, MultiSink, ProposedEdit,
15};
16use flow_security::{
17 CredentialKind, CredentialResolver, CredentialStore, EnvFallbackResolver,
18 KeyringCredentialStore, PiiSanitizer,
19};
20use flow_storage::{ExecutionRecord, ExecutionStepRecord, InterceptionRecord, ScheduleRecord, Store};
21use std::path::PathBuf;
22use std::sync::Arc;
23use thiserror::Error;
24use uuid::Uuid;
25
26pub mod ansible_import;
27pub mod cli_tools;
28pub mod connections;
29pub mod node_hub;
30pub mod nodes;
31pub mod dsl_semantics;
32pub mod hub;
33pub mod install;
34pub use flow_models_server as llm_server;
38pub mod scheduler;
39pub mod settings;
40pub mod template_hub;
41pub mod templates;
42pub use connections::{ConnectionStore, StoreError as ConnectionStoreError};
43pub use flow_execution::{ConnectionError, ConnectionLookup, FlowWarning, ResolvedZoweConnection};
44pub use install::{InstallProgress, ProgressCallback};
45pub use settings::{Settings, SettingsPatch, SettingsStore};
46pub use templates::TemplateRecord;
47
48#[derive(Debug, Error)]
49pub enum AppError {
50 #[error("graph not found: {0}")]
51 GraphNotFound(String),
52 #[error("validation failed: {0}")]
53 Validation(String),
54 #[error("ai invocation failed: {0}")]
55 AiInvocation(String),
56 #[error("execution failed: {0}")]
57 Execution(String),
58 #[error("storage error: {0}")]
59 Storage(String),
60 #[error("settings error: {0}")]
61 Settings(String),
62 #[error("template error: {0}")]
63 Template(String),
64 #[error("credential error: {0}")]
65 Credential(String),
66 #[error("dsl parse error at line {line}:{col}: {message}")]
67 Dsl {
68 line: u32,
69 col: u32,
70 message: String,
71 },
72}
73
74#[derive(Debug, Clone, serde::Serialize)]
78pub struct AgentTurn {
79 pub summary: ExecutionSummary,
80 pub edits: Vec<ProposedEdit>,
81}
82
83pub struct FlowApp {
84 sanitizer: Arc<PiiSanitizer>,
85 adapters: Arc<AdapterRegistry>,
86 cloud_providers: Vec<Arc<dyn CloudAiProvider>>,
87 credentials: Arc<dyn CredentialStore>,
88 resolver: Arc<dyn CredentialResolver>,
89 connections: ConnectionStore,
90 settings: SettingsStore,
91 store: Store,
92 llm_server: crate::llm_server::LlmServerHandle,
93 stream_sink: std::sync::RwLock<Arc<dyn flow_adapter_ai::LlmStreamSink>>,
103 working_memory: flow_execution::WorkingMemory,
108 runs: std::sync::Mutex<std::collections::HashMap<String, flow_execution::SharedRunControl>>,
116 download_cancel: Arc<std::sync::atomic::AtomicBool>,
121 confirm_gate_disabled: std::sync::atomic::AtomicBool,
127 workspace_base: WorkspaceBase,
131}
132
133#[derive(Debug, Clone)]
137pub enum WorkspaceBase {
138 Scratch,
141 Fixed(PathBuf),
144}
145
146impl Default for WorkspaceBase {
147 fn default() -> Self {
148 WorkspaceBase::Scratch
149 }
150}
151
152impl WorkspaceBase {
153 fn resolve(&self, flow_id: &str) -> PathBuf {
154 match self {
155 WorkspaceBase::Scratch => FlowApp::home_dir()
156 .join(".flow-studio")
157 .join("work")
158 .join(flow_id),
159 WorkspaceBase::Fixed(path) => path.clone(),
160 }
161 }
162}
163
164fn expand_home(raw: &str) -> PathBuf {
166 if let Some(rest) = raw.strip_prefix("~/") {
167 if let Some(home) = std::env::var_os("HOME") {
168 return PathBuf::from(home).join(rest);
169 }
170 }
171 PathBuf::from(raw)
172}
173
174impl Default for FlowApp {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180const DSL_GENERATION_SYSTEM_PROMPT: &str = include_str!("../../../docs/dsl/spec_compiled.md");
195
196#[derive(Debug, Clone, serde::Serialize)]
205pub struct DslGenerationOutcome {
206 pub dsl: String,
207 pub source: DslSource,
208 pub fallback_reason: Option<String>,
209 #[serde(default, skip_serializing_if = "Option::is_none")]
214 pub plan: Option<String>,
215}
216
217pub const MAX_AGENTIC_ITERATIONS: u32 = 5;
222
223#[derive(Debug, Clone, serde::Serialize)]
227#[serde(rename_all = "camelCase")]
228pub struct AgenticLoopSummary {
229 pub iterations: u32,
231 pub status: String,
235 pub final_dsl: String,
238 pub final_plan: Option<String>,
239 pub steps: Vec<AgenticLoopStep>,
240}
241
242#[derive(Debug, Clone, serde::Serialize)]
245#[serde(rename_all = "camelCase")]
246pub struct AgenticLoopStep {
247 pub iteration: u32,
248 pub run_status: String,
249 pub succeeded: usize,
250 pub failed: usize,
251 pub skipped: usize,
252 pub observations: Option<String>,
253}
254
255#[derive(Debug, Clone, serde::Serialize)]
259#[serde(rename_all = "camelCase")]
260pub struct SystemInfo {
261 pub os: String,
263 pub arch: String,
265 pub total_ram_gb: f64,
266 pub available_ram_gb: f64,
267 pub free_disk_gb: f64,
268 pub cpu_count: u32,
270}
271
272#[derive(Debug, Clone, serde::Serialize)]
276#[serde(tag = "kind", rename_all = "lowercase")]
277pub enum DslSource {
278 Local {
279 #[serde(rename = "modelId")]
280 model_id: String,
281 },
282 Cloud {
283 provider: String,
284 #[serde(rename = "modelId")]
285 model_id: String,
286 },
287}
288
289fn extract_dsl_text(raw: &str) -> String {
296 let trimmed = raw.trim();
297 let with_fence_stripped = if trimmed.starts_with("```") {
298 let after_first = trimmed.split_once('\n').map(|x| x.1).unwrap_or("");
299 match after_first.rfind("```") {
300 Some(idx) => after_first[..idx].trim_end().to_string(),
301 None => after_first.to_string(),
302 }
303 } else {
304 trimmed.to_string()
305 };
306 escape_newlines_in_strings(with_fence_stripped.trim())
307}
308
309fn escape_newlines_in_strings(src: &str) -> String {
321 let mut out = String::with_capacity(src.len());
322 let mut in_string = false;
323 let mut in_comment = false;
324 let mut escaped = false;
325 for ch in src.chars() {
326 if in_comment {
327 out.push(ch);
328 if ch == '\n' {
329 in_comment = false;
330 }
331 continue;
332 }
333 if in_string {
334 if escaped {
335 out.push(ch);
336 escaped = false;
337 } else if ch == '\\' {
338 out.push(ch);
339 escaped = true;
340 } else if ch == '"' {
341 out.push(ch);
342 in_string = false;
343 } else if ch == '\n' {
344 out.push_str("\\n");
345 } else if ch == '\r' {
346 } else {
348 out.push(ch);
349 }
350 continue;
351 }
352 match ch {
353 '"' => {
354 in_string = true;
355 out.push(ch);
356 }
357 '#' => {
358 in_comment = true;
359 out.push(ch);
360 }
361 _ => out.push(ch),
362 }
363 }
364 out
365}
366
367async fn cloud_call_once(
372 provider: &dyn CloudAiProvider,
373 model: &str,
374 system: &str,
375 user: &str,
376 max_tokens: Option<u32>,
377 temperature: Option<f32>,
378 api_key: &str,
379 base_url: Option<String>,
380 sink: &dyn flow_adapter_ai::LlmStreamSink,
381) -> Result<(String, String), AppError> {
382 tracing::info!(
385 provider = provider.name(),
386 model,
387 "AI provider call for Flow DSL generation"
388 );
389 let req = flow_adapter_ai::CloudAiRequest {
390 model: model.to_string(),
391 prompt: user.to_string(),
392 max_tokens,
393 temperature,
394 api_key: api_key.to_string(),
395 system: Some(system.to_string()),
396 base_url,
399 stream: true,
403 call_id: Some(format!("dsl-{}", uuid::Uuid::new_v4())),
404 ..Default::default()
405 };
406 let resp = provider
407 .invoke_stream(&req, sink)
408 .await
409 .map_err(|e| AppError::AiInvocation(e.to_string()))?;
410 Ok((resp.text, resp.finish_reason))
414}
415
416fn is_truncated(finish_reason: &str) -> bool {
422 matches!(
423 finish_reason,
424 "length" | "max_tokens" | "MAX_TOKENS" | "model_length"
425 )
426}
427
428fn truncation_error(
431 provider: &str,
432 model: &str,
433 max_tokens: Option<u32>,
434 last_reason: &str,
435) -> AppError {
436 let budget = max_tokens
437 .map(|t| format!(" ({t}-token budget)"))
438 .unwrap_or_default();
439 tracing::warn!(
440 provider,
441 model,
442 %last_reason,
443 "Flow DSL generation hit the token limit (response truncated)"
444 );
445 AppError::AiInvocation(format!(
446 "{provider} model's response was cut off at the token limit{budget} before the \
447 flow was complete - the generated flow is too large (a file `content` value was \
448 truncated mid-string). Try a more focused request, fewer or smaller inline files, \
449 or a higher generation token budget."
450 ))
451}
452
453fn truncate_for_log(s: &str) -> String {
457 const MAX: usize = 1200;
458 if s.len() <= MAX {
459 return s.to_string();
460 }
461 let mut end = MAX;
462 while !s.is_char_boundary(end) {
463 end -= 1;
464 }
465 format!("{}... (+{} more bytes)", &s[..end], s.len() - end)
466}
467
468fn offending_line_caret(src: &str, line: u32, col: u32) -> String {
472 let Some(text) = src.lines().nth(line.saturating_sub(1) as usize) else {
473 return String::new();
474 };
475 let nchars = text.chars().count();
476 let caret_at = (col.saturating_sub(1) as usize).min(nchars);
477 let pad = " ".repeat(caret_at);
478 format!("{text}\n{pad}^")
479}
480
481fn build_retry_prompt(
493 original: &str,
494 previous_dsl: &str,
495 parse_err: &flow_dsl::DslError,
496) -> String {
497 let hint = parse_error_hint(&parse_err.message)
498 .map(|h| format!("\n{h}\n"))
499 .unwrap_or_default();
500 let pointer = offending_line_caret(previous_dsl, parse_err.line, parse_err.col);
501 let pointer_block = if pointer.is_empty() {
502 String::new()
503 } else {
504 format!("\nThe error is here:\n{pointer}\n")
505 };
506 format!(
507 "Your previous attempt failed to parse:\n\
508 \n\
509 <<<BEGIN_PREVIOUS_DSL>>>\n\
510 {previous_dsl}\n\
511 <<<END_PREVIOUS_DSL>>>\n\
512 \n\
513 Parser error: line {line}:{col} - {message}\n\
514 {pointer_block}\
515 {hint}\
516 \n\
517 Original request:\n\
518 {original}\n\
519 \n\
520 Return ONLY corrected Flow DSL. No commentary, no fences.",
521 line = parse_err.line,
522 col = parse_err.col,
523 message = parse_err.message,
524 )
525}
526
527fn build_semantic_retry_prompt(original: &str, previous_dsl: &str, issues: &[String]) -> String {
528 let bullets = issues
529 .iter()
530 .map(|i| format!("- {i}"))
531 .collect::<Vec<_>>()
532 .join("\n");
533 format!(
534 "Your previous DSL parsed but violates Flow semantics:\n\
535 \n\
536 {bullets}\n\
537 \n\
538 <<<BEGIN_PREVIOUS_DSL>>>\n\
539 {previous_dsl}\n\
540 <<<END_PREVIOUS_DSL>>>\n\
541 \n\
542 Original request:\n\
543 {original}\n\
544 \n\
545 Return ONLY corrected Flow DSL. No commentary, no fences. \
546 Use `flow \"<name>\" v1` as the header (not v1.0.0). \
547 Do not invent git commits - read the real history from the repo with a shell action.",
548 )
549}
550
551fn sum_ai_tokens(events: &[ExecutionEvent]) -> u64 {
562 let mut total = 0u64;
563 for ev in events {
564 if let ExecutionEvent::NodeSucceeded { output, .. } = ev {
565 let i = output
566 .get("input_tokens")
567 .and_then(|v| v.as_u64())
568 .unwrap_or(0);
569 let o = output
570 .get("output_tokens")
571 .and_then(|v| v.as_u64())
572 .unwrap_or(0);
573 total = total.saturating_add(i).saturating_add(o);
574 }
575 }
576 total
577}
578
579fn build_loop_observations(events: &[ExecutionEvent], summary: &ExecutionSummary) -> String {
580 use std::collections::BTreeMap;
581 let mut lines = vec![format!(
582 "The workflow ran: {} succeeded, {} failed, {} skipped (status: {}).",
583 summary.succeeded, summary.failed, summary.skipped, summary.status
584 )];
585 let mut failures: BTreeMap<&str, &str> = BTreeMap::new();
586 let mut skips: BTreeMap<&str, &str> = BTreeMap::new();
587 for e in events {
588 match e {
589 ExecutionEvent::NodeFailed { node_id, error, .. } => {
590 failures.insert(node_id, error);
591 }
592 ExecutionEvent::NodeSkipped { node_id, reason, .. } => {
593 skips.insert(node_id, reason);
594 }
595 _ => {}
596 }
597 }
598 if !failures.is_empty() {
599 lines.push("Failures:".into());
600 lines.extend(failures.iter().map(|(n, e)| format!("- node `{n}` failed: {e}")));
601 }
602 if !skips.is_empty() {
603 lines.push("Skipped (never ran):".into());
604 lines.extend(skips.iter().map(|(n, r)| format!("- node `{n}` skipped: {r}")));
605 }
606 lines.join("\n")
607}
608
609fn validate_generated_dsl(dsl: &str) -> Result<String, String> {
610 let mut graph = flow_dsl::parse(dsl).map_err(|e| {
611 format!(
612 "DSL parse error at line {}:{} - {}",
613 e.line, e.col, e.message
614 )
615 })?;
616 dsl_semantics::repair_generated_graph(&mut graph);
617 let issues = dsl_semantics::validate_generated_graph(&graph);
618 if !issues.is_empty() {
619 return Err(format!(
620 "DSL semantic validation failed:\n{}",
621 issues
622 .iter()
623 .map(|i| format!("- {i}"))
624 .collect::<Vec<_>>()
625 .join("\n")
626 ));
627 }
628 Ok(flow_dsl::serialize(&graph))
629}
630
631fn parse_error_hint(message: &str) -> Option<String> {
636 if message.contains("unterminated string") {
641 return Some(
642 "IMPORTANT: a string literal is malformed. Flow DSL strings must be \
643 single-line and double-quoted. Escape every inner double quote as \\\" \
644 and write newlines as \\n - never put a real line break inside quotes. \
645 For multi-line file content, keep it on one physical line using \\n, \
646 e.g. content: \"line one\\nline two\\n\"."
647 .to_string(),
648 );
649 }
650
651 if message.contains("expected target identifier") || message.contains("after `-->`") {
655 return Some(
656 "IMPORTANT: an edge is malformed. Connect nodes by id on a single line: \
657 `sourceId --> targetId` (chain as `a --> b --> c`). Put the target node id \
658 immediately after `-->`; do not quote it, leave it empty, or split the edge \
659 across lines."
660 .to_string(),
661 );
662 }
663
664 let kind = message
668 .split_once("unknown node type `")
669 .and_then(|(_, rest)| rest.split('`').next())
670 .filter(|k| !k.is_empty())?;
671 let example = if matches!(kind, "cli" | "github" | "zowe") {
672 "list[action: \"Run tool\"] {\n adapter: \"cli\"\n actionId: \"cli-tool\"\n bin: \"<tool>\"\n command: \"<subcommand>\"\n}"
673 } else {
674 "step[action: \"Run\"] {\n adapter: \"ADAPTER\"\n actionId: \"<see spec>\"\n}"
675 }
676 .replace("ADAPTER", kind);
677 Some(format!(
678 "IMPORTANT: `{kind}` is not a node type. The only node types are \
679 `action`, `ai`, `agentic`, `utility`, `service`. `{kind}` is an adapter - \
680 use an `action` node with `adapter: \"{kind}\"`, e.g.:\n\
681 {example}\n\
682 Do NOT write `{kind}[...]` as the header type."
683 ))
684}
685
686#[cfg(test)]
687mod parse_error_hint_tests {
688 use super::parse_error_hint;
689
690 #[test]
691 fn hint_fires_for_unknown_node_type() {
692 let msg = "unknown node type `fs` - supported: action, ai, agentic, utility, service";
693 let hint = parse_error_hint(msg).expect("should produce a hint");
694 assert!(hint.contains("`fs` is not a node type"));
695 assert!(hint.contains("adapter: \"fs\""));
696 }
697
698 #[test]
699 fn hint_fires_for_unterminated_string() {
700 let hint = parse_error_hint("unterminated string").expect("should produce a hint");
701 assert!(hint.contains("single-line"), "hint: {hint}");
702 assert!(hint.contains("\\n"), "hint: {hint}");
703 assert!(
705 parse_error_hint("unterminated string (newline before closing quote)").is_some()
706 );
707 }
708
709 #[test]
710 fn hint_fires_for_malformed_edge() {
711 let hint = parse_error_hint("expected target identifier after `-->`")
712 .expect("should produce a hint");
713 assert!(hint.contains("edge is malformed"), "hint: {hint}");
714 assert!(hint.contains("-->"), "hint: {hint}");
715 }
716
717 #[test]
718 fn no_hint_for_unrelated_parse_errors() {
719 assert!(parse_error_hint("expected `}` but found end of input").is_none());
720 assert!(parse_error_hint("unknown node type ``").is_none());
721 }
722}
723
724#[cfg(test)]
730mod generation_e2e_tests {
731 use super::*;
732 use flow_adapter_ai::{CloudAiError, CloudAiProvider, CloudAiRequest, CloudAiResponse};
733 use std::collections::VecDeque;
734 use std::sync::Mutex;
735
736 struct ScriptedProvider {
739 responses: Mutex<VecDeque<String>>,
740 prompts: Mutex<Vec<String>>,
741 finish_reason: String,
742 }
743
744 impl ScriptedProvider {
745 fn new(responses: Vec<&str>) -> Arc<Self> {
746 Self::new_with_finish(responses, "stop")
747 }
748 fn new_with_finish(responses: Vec<&str>, finish_reason: &str) -> Arc<Self> {
751 Arc::new(Self {
752 responses: Mutex::new(responses.into_iter().map(String::from).collect()),
753 prompts: Mutex::new(Vec::new()),
754 finish_reason: finish_reason.to_string(),
755 })
756 }
757 }
758
759 #[async_trait::async_trait]
760 impl CloudAiProvider for ScriptedProvider {
761 fn name(&self) -> &str {
762 "stub"
763 }
764 fn env_var(&self) -> &str {
765 "STUB_KEY"
766 }
767 fn default_models(&self) -> &[&str] {
768 &["stub-model"]
769 }
770 async fn invoke(&self, req: &CloudAiRequest) -> Result<CloudAiResponse, CloudAiError> {
771 self.prompts.lock().unwrap().push(req.prompt.clone());
772 let text = self
773 .responses
774 .lock()
775 .unwrap()
776 .pop_front()
777 .expect("scripted provider ran out of responses");
778 Ok(CloudAiResponse {
779 provider: "stub".into(),
780 model: req.model.clone(),
781 text,
782 finish_reason: self.finish_reason.clone(),
783 input_tokens: 0,
784 output_tokens: 0,
785 latency_ms: 0,
786 })
787 }
788 }
789
790 fn app_with(stub: Arc<ScriptedProvider>) -> FlowApp {
791 let app = FlowApp::with_store_and_settings(
792 Store::open_in_memory().expect("in-memory store"),
793 SettingsStore::in_memory(),
794 )
795 .with_replaced_cloud_providers(vec![stub])
796 .with_in_memory_credentials();
797 app.settings
798 .update(SettingsPatch {
799 allow_cloud_ai: Some(true),
800 ..Default::default()
801 })
802 .expect("enable cloud ai");
803 app.set_cloud_ai_key("stub", "k").expect("seed key");
804 app
805 }
806
807 const CHAIN_DSL: &str = r#"flow "chain" v1
808
809a[utility: "A"] {
810 adapter: "utility"
811 actionId: "log"
812 message: "a"
813 level: "info"
814}
815
816b[utility: "B"] {
817 adapter: "utility"
818 actionId: "log"
819 message: "b"
820 level: "info"
821}
822
823c[utility: "C"] {
824 adapter: "utility"
825 actionId: "log"
826 message: "c"
827 level: "info"
828}
829
830a --> b --> c
831"#;
832
833 #[tokio::test]
834 async fn chained_arrows_from_model_parse_without_a_retry() {
835 let stub = ScriptedProvider::new(vec![CHAIN_DSL]);
836 let app = app_with(stub.clone());
837 let dsl = app
838 .generate_dsl_via_cloud("stub", "stub-model", "make a 3-step log chain", Some(512), None)
839 .await
840 .expect("chained-arrow DSL should generate without error");
841 let graph = flow_dsl::parse(&dsl).expect("returned DSL parses");
842 assert_eq!(graph.edges.len(), 2, "a --> b --> c desugars to two edges");
843 assert_eq!(
844 stub.prompts.lock().unwrap().len(),
845 1,
846 "valid-on-first-try DSL needs no corrective retry"
847 );
848 }
849
850 #[tokio::test]
851 async fn multiline_string_from_model_is_auto_sanitized_without_a_retry() {
852 let multiline = "flow \"x\" v1\n\nwrite[action: \"Write\"] {\n adapter: \"fs\"\n actionId: \"write-file\"\n workspaceRoot: \"/tmp/x\"\n path: \"notes.txt\"\n content: \"line one\nline two\n\"\n}\n";
857 let stub = ScriptedProvider::new(vec![multiline]);
858 let app = app_with(stub.clone());
859 let dsl = app
860 .generate_dsl_via_cloud("stub", "stub-model", "write a two-line file", Some(512), None)
861 .await
862 .expect("multi-line string should be sanitized and generate cleanly");
863 let graph = flow_dsl::parse(&dsl).expect("sanitized DSL parses");
864 let content = graph.nodes[0].data["content"].as_str().unwrap();
865 assert!(
866 content.contains('\n'),
867 "the escaped \\n is unescaped back to a real newline in the value"
868 );
869 assert_eq!(
870 stub.prompts.lock().unwrap().len(),
871 1,
872 "auto-sanitized DSL needs no corrective retry"
873 );
874 }
875
876 const BAD_UNTERMINATED: &str = r#"flow "x" v1
880
881n[utility: "N"] {
882 adapter: "utility"
883 actionId: "log"
884 message: "hello"
885 level: "info
886}
887"#;
888
889 #[tokio::test]
890 async fn unterminated_string_triggers_caret_retry_and_recovers() {
891 let stub = ScriptedProvider::new(vec![BAD_UNTERMINATED, CHAIN_DSL]);
892 let app = app_with(stub.clone());
893 let dsl = app
894 .generate_dsl_via_cloud("stub", "stub-model", "make a chain", Some(512), None)
895 .await
896 .expect("recovers on the corrective retry");
897 flow_dsl::parse(&dsl).expect("recovered DSL parses");
898 let prompts = stub.prompts.lock().unwrap();
899 assert_eq!(prompts.len(), 2, "exactly one corrective retry");
900 let retry = &prompts[1];
901 assert!(
902 retry.contains("The error is here:"),
903 "retry shows the offending source line: {retry}"
904 );
905 assert!(retry.contains('^'), "retry carries a caret pointer: {retry}");
906 assert!(
907 retry.contains("single-line"),
908 "retry restates the string rule: {retry}"
909 );
910 }
911
912 #[tokio::test]
913 async fn truncated_response_reports_token_limit_without_burning_retries() {
914 let stub = ScriptedProvider::new_with_finish(vec![BAD_UNTERMINATED], "length");
918 let app = app_with(stub.clone());
919 let err = app
920 .generate_dsl_via_cloud("stub", "stub-model", "scaffold an app", Some(2048), None)
921 .await
922 .expect_err("a truncated response is an error");
923 let msg = err.to_string();
924 assert!(
925 msg.contains("cut off at the token limit"),
926 "clear truncation error, not 'unterminated string': {msg}"
927 );
928 assert_eq!(
929 stub.prompts.lock().unwrap().len(),
930 1,
931 "truncation short-circuits - no wasted corrective retries"
932 );
933 }
934
935 #[tokio::test]
936 async fn exhausted_retries_name_the_provider_in_the_error() {
937 let stub = ScriptedProvider::new(vec![
939 BAD_UNTERMINATED,
940 BAD_UNTERMINATED,
941 BAD_UNTERMINATED,
942 BAD_UNTERMINATED,
943 ]);
944 let app = app_with(stub.clone());
945 let err = app
946 .generate_dsl_via_cloud("stub", "stub-model", "make a chain", Some(512), None)
947 .await
948 .expect_err("every attempt fails to parse");
949 let msg = err.to_string();
950 assert!(
951 msg.contains("stub model returned invalid DSL"),
952 "error names the provider, not a hardcoded 'cloud': {msg}"
953 );
954 assert!(
955 msg.contains("3 corrective retries"),
956 "error names the retry budget: {msg}"
957 );
958 }
959}
960
961#[cfg(test)]
962mod escape_newlines_tests {
963 use super::escape_newlines_in_strings;
964
965 #[test]
966 fn escapes_newline_inside_a_string() {
967 assert_eq!(
968 escape_newlines_in_strings("content: \"a\nb\""),
969 "content: \"a\\nb\""
970 );
971 }
972
973 #[test]
974 fn leaves_structural_newlines_outside_strings() {
975 let src = "a: \"x\"\nb: \"y\"\n";
976 assert_eq!(escape_newlines_in_strings(src), src);
977 }
978
979 #[test]
980 fn honors_escaped_quote_so_string_stays_open() {
981 assert_eq!(
984 escape_newlines_in_strings("\"a\\\"b\nc\""),
985 "\"a\\\"b\\nc\""
986 );
987 }
988
989 #[test]
990 fn quotes_inside_comments_are_literal() {
991 let src = "# a \"quote\n\"real\nx\"";
994 assert_eq!(escape_newlines_in_strings(src), "# a \"quote\n\"real\\nx\"");
995 }
996}
997
998#[cfg(test)]
999mod existing_binary_tests {
1000 use super::FlowApp;
1001
1002 #[test]
1003 fn drops_missing_and_empty_paths() {
1004 assert_eq!(FlowApp::existing_binary(None), None);
1005 assert_eq!(FlowApp::existing_binary(Some(" ".into())), None);
1006 assert_eq!(
1009 FlowApp::existing_binary(Some(
1010 "/nonexistent/target/debug/resources/llama/llama-server".into()
1011 )),
1012 None
1013 );
1014 }
1015
1016 #[test]
1017 fn keeps_an_existing_file() {
1018 let me = std::env::current_exe().unwrap();
1020 let path = me.to_string_lossy().to_string();
1021 assert_eq!(FlowApp::existing_binary(Some(path.clone())), Some(path));
1022 }
1023}
1024
1025impl FlowApp {
1026 pub fn new() -> Self {
1027 let store = Self::open_default_store().unwrap_or_else(|e| {
1028 tracing::warn!(error = %e, "failed to open persistent store; falling back to in-memory");
1029 Store::open_in_memory().expect("in-memory store init")
1030 });
1031 let settings = SettingsStore::open(Self::settings_path());
1032 Self::with_store_and_settings(store, settings)
1033 }
1034
1035 #[cfg(test)]
1041 pub(crate) fn with_replaced_cloud_providers(
1042 mut self,
1043 providers: Vec<Arc<dyn CloudAiProvider>>,
1044 ) -> Self {
1045 self.cloud_providers = providers;
1046 self
1047 }
1048
1049 #[cfg(test)]
1058 pub(crate) fn with_in_memory_credentials(mut self) -> Self {
1059 use flow_security::InMemoryCredentialStore;
1060
1061 let store: Arc<dyn CredentialStore> = Arc::new(InMemoryCredentialStore::new());
1062 self.resolver = Arc::new(EnvFallbackResolver::new(store.clone()));
1063 self.credentials = store;
1064 self
1065 }
1066
1067 pub fn with_store_and_settings(store: Store, settings: SettingsStore) -> Self {
1068 let index_path = Self::flow_dir().join("credential_index.json");
1073 let credentials: Arc<dyn CredentialStore> =
1074 Arc::new(KeyringCredentialStore::with_index(index_path));
1075 let resolver: Arc<dyn CredentialResolver> =
1076 Arc::new(EnvFallbackResolver::new(credentials.clone()));
1077
1078 let connections = ConnectionStore::open(&Self::flow_dir(), credentials.clone())
1079 .unwrap_or_else(|e| {
1080 tracing::warn!(error = %e, "failed to open connections store; using in-memory");
1081 ConnectionStore::open(&std::env::temp_dir(), credentials.clone())
1082 .expect("in-memory connections init")
1083 });
1084
1085 let sanitizer = Arc::new(PiiSanitizer::new());
1086 let mut adapters = AdapterRegistry::new();
1087 adapters.register(Arc::new(MockAdapter::new("mock")));
1088 adapters.register(Arc::new(CliAdapter::new()));
1091 adapters.register(Arc::new(MockAdapter::new("zosmf")));
1092 adapters.register(Arc::new(MockAdapter::new("ssh")));
1093 adapters.register(Arc::new(ShellAdapter::new()));
1094 adapters.register(Arc::new(FsAdapter::new()));
1095 adapters.register(Arc::new(UtilityAdapter::new(Self::downloads_dir())));
1096 adapters.register(Arc::new(flow_adapter_service::ServiceAdapter::new(
1100 Arc::new(nodes::service_integrations()),
1101 credentials.clone(),
1102 )));
1103
1104 let cloud_providers: Vec<Arc<dyn CloudAiProvider>> = vec![
1105 Arc::new(ClaudeProvider::new()),
1106 Arc::new(OpenAiProvider::new()),
1107 Arc::new(GeminiProvider::new()),
1108 Arc::new(NvidiaProvider::new()),
1109 Arc::new(DeepSeekProvider::new()),
1110 Arc::new(LocalOpenAiProvider::new()),
1111 ];
1112
1113 if let Err(e) = templates::ensure_default(&store) {
1117 tracing::warn!(error = ?e, "failed to ensure default template collection");
1118 }
1119
1120 let initial_memory = store.load_memory().unwrap_or_default();
1124
1125 Self {
1126 sanitizer,
1127 adapters: Arc::new(adapters),
1128 cloud_providers,
1129 credentials,
1130 resolver,
1131 connections,
1132 settings,
1133 store,
1134 llm_server: crate::llm_server::LlmServerHandle::new(),
1135 stream_sink: std::sync::RwLock::new(Arc::new(flow_adapter_ai::NullStreamSink)),
1136 working_memory: Arc::new(std::sync::Mutex::new(initial_memory)),
1137 runs: Default::default(),
1138 download_cancel: Arc::new(std::sync::atomic::AtomicBool::new(false)),
1139 confirm_gate_disabled: std::sync::atomic::AtomicBool::new(false),
1140 workspace_base: WorkspaceBase::default(),
1141 }
1142 }
1143
1144 pub fn with_workspace_base(mut self, base: WorkspaceBase) -> Self {
1147 self.workspace_base = base;
1148 self
1149 }
1150
1151 fn resolve_workspace(&self, flow_id: &str) -> PathBuf {
1155 self.store
1156 .get_flow_workspace(flow_id)
1157 .ok()
1158 .flatten()
1159 .filter(|s| !s.trim().is_empty())
1160 .map(|s| expand_home(&s))
1161 .unwrap_or_else(|| self.workspace_base.resolve(flow_id))
1162 }
1163
1164 pub fn flow_workspace(&self, flow_id: &str) -> Option<String> {
1166 self.store.get_flow_workspace(flow_id).ok().flatten()
1167 }
1168
1169 pub fn default_workspace(&self, flow_id: &str) -> String {
1172 self.workspace_base.resolve(flow_id).to_string_lossy().into_owned()
1173 }
1174
1175 pub fn set_flow_workspace(&self, flow_id: &str, path: &str) -> Result<(), AppError> {
1176 self.store
1177 .set_flow_workspace(flow_id, path)
1178 .map_err(|e| AppError::Storage(e.to_string()))
1179 }
1180
1181 pub fn clear_flow_workspace(&self, flow_id: &str) -> Result<(), AppError> {
1182 self.store
1183 .clear_flow_workspace(flow_id)
1184 .map_err(|e| AppError::Storage(e.to_string()))
1185 }
1186
1187 pub fn set_stream_sink(&self, sink: Arc<dyn flow_adapter_ai::LlmStreamSink>) {
1192 *self.stream_sink.write().expect("stream_sink write") = sink;
1193 }
1194
1195 pub fn stream_sink(&self) -> Arc<dyn flow_adapter_ai::LlmStreamSink> {
1198 self.stream_sink.read().expect("stream_sink read").clone()
1199 }
1200
1201 pub fn list_connections(&self) -> Vec<flow_domain::connection::ConnectionProfile> {
1202 self.connections.list()
1203 }
1204
1205 pub fn save_connection(
1206 &self,
1207 profile: flow_domain::connection::ConnectionProfile,
1208 secret: Option<&str>,
1209 ) -> Result<(), AppError> {
1210 self.connections
1211 .upsert(profile, secret)
1212 .map_err(|e| AppError::Credential(e.to_string()))
1213 }
1214
1215 pub fn delete_connection(&self, id: &str) -> Result<(), AppError> {
1216 self.connections
1217 .delete(id)
1218 .map_err(|e| AppError::Credential(e.to_string()))
1219 }
1220
1221 pub fn connection_has_secret(&self, id: &str) -> bool {
1222 self.connections.has_secret(id)
1223 }
1224
1225 pub fn resolve_zowe_connection(&self, id: &str) -> Result<ResolvedZoweConnection, AppError> {
1226 self.connections
1227 .resolve_zowe(id)
1228 .map_err(|e| AppError::Credential(e.to_string()))
1229 }
1230
1231 fn is_known_cloud_provider(&self, provider: &str) -> bool {
1233 self.cloud_providers.iter().any(|p| p.name() == provider)
1234 }
1235
1236 pub fn set_cloud_ai_key(&self, provider: &str, secret: &str) -> Result<(), AppError> {
1237 if !self.is_known_cloud_provider(provider) {
1238 return Err(AppError::Credential(format!(
1239 "unknown cloud provider '{provider}'"
1240 )));
1241 }
1242 let account = CredentialKind::CloudAiKey.account_for(provider);
1243 self.credentials
1244 .set(&account, secret)
1245 .map_err(|e| AppError::Credential(e.to_string()))
1246 }
1247
1248 pub fn delete_cloud_ai_key(&self, provider: &str) -> Result<(), AppError> {
1249 if !self.is_known_cloud_provider(provider) {
1250 return Err(AppError::Credential(format!(
1251 "unknown cloud provider '{provider}'"
1252 )));
1253 }
1254 let account = CredentialKind::CloudAiKey.account_for(provider);
1255 self.credentials
1256 .delete(&account)
1257 .map_err(|e| AppError::Credential(e.to_string()))
1258 }
1259
1260 pub fn cloud_ai_key_exists(&self, provider: &str) -> bool {
1261 if !self.is_known_cloud_provider(provider) {
1262 return false;
1263 }
1264 let account = CredentialKind::CloudAiKey.account_for(provider);
1265 self.credentials.exists(&account)
1266 }
1267
1268 fn service_kv_account(slug: &str, suffix: &str) -> String {
1277 CredentialKind::Service.account_for(&format!("{slug}:{suffix}"))
1278 }
1279
1280 fn service_integration_for(slug: &str) -> Result<crate::nodes::ServiceIntegration, AppError> {
1281 crate::nodes::entry_by_slug(slug)
1282 .and_then(|e| e.service_integration)
1283 .ok_or_else(|| AppError::Credential(format!("`{slug}` is not a service node")))
1284 }
1285
1286 pub fn set_service_secret(&self, slug: &str, secret: &str) -> Result<(), AppError> {
1289 let account = CredentialKind::Service.account_for(slug);
1290 self.credentials
1291 .set(&account, secret)
1292 .map_err(|e| AppError::Credential(e.to_string()))
1293 }
1294
1295 pub fn disconnect_service(&self, slug: &str) -> Result<(), AppError> {
1297 let _ = self.credentials.delete(&CredentialKind::Service.account_for(slug));
1298 let _ = self.credentials.delete(&Self::service_kv_account(slug, "client_id"));
1299 let _ = self.credentials.delete(&Self::service_kv_account(slug, "client_secret"));
1300 Ok(())
1301 }
1302
1303 pub fn service_connection_exists(&self, slug: &str) -> bool {
1305 self.credentials
1306 .exists(&CredentialKind::Service.account_for(slug))
1307 }
1308
1309 pub fn set_service_oauth_client(
1311 &self,
1312 slug: &str,
1313 client_id: &str,
1314 client_secret: &str,
1315 ) -> Result<(), AppError> {
1316 self.credentials
1317 .set(&Self::service_kv_account(slug, "client_id"), client_id)
1318 .map_err(|e| AppError::Credential(e.to_string()))?;
1319 self.credentials
1320 .set(&Self::service_kv_account(slug, "client_secret"), client_secret)
1321 .map_err(|e| AppError::Credential(e.to_string()))
1322 }
1323
1324 pub fn service_oauth_authorize_url(
1327 &self,
1328 slug: &str,
1329 redirect_uri: &str,
1330 state: &str,
1331 ) -> Result<String, AppError> {
1332 let integ = Self::service_integration_for(slug)?;
1333 if integ.auth.scheme != "oauth2" {
1334 return Err(AppError::Credential(format!("`{slug}` does not use OAuth2")));
1335 }
1336 let auth_url = integ
1337 .auth
1338 .auth_url
1339 .as_deref()
1340 .ok_or_else(|| AppError::Credential(format!("`{slug}` integration has no authUrl")))?;
1341 let client_id = self
1342 .credentials
1343 .get(&Self::service_kv_account(slug, "client_id"))
1344 .map_err(|_| {
1345 AppError::Credential(format!("set the OAuth client id for `{slug}` first"))
1346 })?;
1347 Ok(flow_security::oauth::build_authorize_url(
1348 auth_url,
1349 &client_id,
1350 redirect_uri,
1351 &integ.auth.scopes,
1352 state,
1353 ))
1354 }
1355
1356 pub async fn service_oauth_complete(
1358 &self,
1359 slug: &str,
1360 code: &str,
1361 redirect_uri: &str,
1362 ) -> Result<(), AppError> {
1363 let integ = Self::service_integration_for(slug)?;
1364 let token_url = integ
1365 .auth
1366 .token_url
1367 .as_deref()
1368 .ok_or_else(|| AppError::Credential(format!("`{slug}` integration has no tokenUrl")))?;
1369 let client_id = self
1370 .credentials
1371 .get(&Self::service_kv_account(slug, "client_id"))
1372 .map_err(|_| AppError::Credential(format!("missing OAuth client id for `{slug}`")))?;
1373 let client_secret = self
1374 .credentials
1375 .get(&Self::service_kv_account(slug, "client_secret"))
1376 .map_err(|_| AppError::Credential(format!("missing OAuth client secret for `{slug}`")))?;
1377 let token = flow_security::oauth::exchange_code(
1378 token_url,
1379 &client_id,
1380 &client_secret,
1381 code,
1382 redirect_uri,
1383 )
1384 .await
1385 .map_err(|e| AppError::Credential(e.to_string()))?;
1386 flow_security::oauth::store_token(self.credentials.as_ref(), slug, &token)
1387 .map_err(|e| AppError::Credential(e.to_string()))
1388 }
1389
1390 fn home_dir() -> PathBuf {
1391 PathBuf::from(
1392 std::env::var("HOME")
1393 .or_else(|_| std::env::var("USERPROFILE"))
1394 .unwrap_or_else(|_| ".".into()),
1395 )
1396 }
1397
1398 fn flow_dir() -> PathBuf {
1404 if let Some(dir) = std::env::var_os("FLOW_STUDIO_DIR") {
1405 if !dir.is_empty() {
1406 return PathBuf::from(dir);
1407 }
1408 }
1409 Self::home_dir().join(".flow-studio")
1410 }
1411
1412 fn open_default_store() -> Result<Store, flow_storage::StoreError> {
1413 let mut path = Self::flow_dir();
1414 path.push("db");
1415 let _ = std::fs::create_dir_all(&path);
1416 path.push("flow.sqlite");
1417 Store::open(path)
1418 }
1419
1420 fn templates_dir() -> PathBuf {
1421 let mut path = Self::flow_dir();
1422 path.push("templates");
1423 let _ = std::fs::create_dir_all(&path);
1424 path
1425 }
1426
1427 pub fn llms_dir() -> PathBuf {
1430 let mut path = Self::flow_dir();
1431 path.push("llms");
1432 let _ = std::fs::create_dir_all(&path);
1433 path
1434 }
1435
1436 pub fn engines_dir() -> PathBuf {
1439 let mut path = Self::flow_dir();
1440 path.push("engines");
1441 let _ = std::fs::create_dir_all(&path);
1442 path
1443 }
1444
1445 pub fn downloads_dir() -> PathBuf {
1448 let mut path = Self::flow_dir();
1449 path.push("downloads");
1450 let _ = std::fs::create_dir_all(&path);
1451 path
1452 }
1453
1454 pub fn nodes_dir() -> PathBuf {
1457 let mut path = Self::flow_dir();
1458 path.push("nodes");
1459 let _ = std::fs::create_dir_all(&path);
1460 path
1461 }
1462
1463 pub fn hub_catalog(&self) -> Vec<crate::hub::HubModel> {
1465 crate::hub::catalog()
1466 }
1467
1468 pub fn system_info(&self) -> SystemInfo {
1471 use sysinfo::{Disks, System};
1472 const GIB: f64 = 1024.0 * 1024.0 * 1024.0;
1473 let round1 = |v: f64| (v * 10.0).round() / 10.0;
1474
1475 let mut sys = System::new();
1476 sys.refresh_memory();
1477 let total = sys.total_memory() as f64 / GIB;
1478 let avail = sys.available_memory() as f64 / GIB;
1479
1480 let llms = Self::llms_dir();
1483 let disks = Disks::new_with_refreshed_list();
1484 let mut free_disk: u64 = 0;
1485 let mut best_len = 0usize;
1486 for d in disks.list() {
1487 let mp = d.mount_point();
1488 if llms.starts_with(mp) && mp.as_os_str().len() >= best_len {
1489 best_len = mp.as_os_str().len();
1490 free_disk = d.available_space();
1491 }
1492 }
1493
1494 let cpu_count = std::thread::available_parallelism()
1495 .map(|n| n.get() as u32)
1496 .unwrap_or(0);
1497
1498 SystemInfo {
1499 os: std::env::consts::OS.to_string(),
1500 arch: std::env::consts::ARCH.to_string(),
1501 total_ram_gb: round1(total),
1502 available_ram_gb: round1(avail),
1503 free_disk_gb: round1(free_disk as f64 / GIB),
1504 cpu_count,
1505 }
1506 }
1507
1508 pub fn list_local_llms(&self) -> Vec<crate::hub::LocalLlm> {
1510 crate::hub::list_local_llms(&Self::llms_dir())
1511 }
1512
1513 pub async fn hub_installed(&self) -> Vec<crate::hub::InstalledModel> {
1516 use crate::hub::{HubFormat, InstalledModel};
1517 let catalog = crate::hub::catalog();
1518 let mut out: Vec<InstalledModel> = Vec::new();
1519
1520 for llm in self.list_local_llms() {
1523 if let Some(m) = catalog.iter().find(|m| {
1524 m.download_options
1525 .iter()
1526 .any(|o| o.url.rsplit('/').next() == Some(llm.file_name.as_str()))
1527 }) {
1528 if out.iter().any(|i| i.id == m.id) {
1529 continue;
1530 }
1531 out.push(InstalledModel {
1532 id: m.id.clone(),
1533 version: m.latest_version.clone(),
1534 format: HubFormat::Gguf,
1535 update_available: false,
1536 });
1537 }
1538 }
1539 out
1540 }
1541
1542 pub async fn download_hub_model(
1545 &self,
1546 id: &str,
1547 variant: Option<&str>,
1548 cb: Option<crate::install::ProgressCallback>,
1549 ) -> Result<PathBuf, AppError> {
1550 let model = crate::hub::find(id)
1551 .ok_or_else(|| AppError::AiInvocation(format!("unknown hub model '{id}'")))?;
1552 self.download_cancel
1554 .store(false, std::sync::atomic::Ordering::Relaxed);
1555 crate::hub::download(
1556 &model,
1557 variant,
1558 &Self::llms_dir(),
1559 cb,
1560 Some(self.download_cancel.clone()),
1561 )
1562 .await
1563 .map_err(AppError::AiInvocation)
1564 }
1565
1566 pub fn cancel_download(&self) {
1570 self.download_cancel
1571 .store(true, std::sync::atomic::Ordering::Relaxed);
1572 }
1573
1574 pub fn delete_local_llm(&self, file_name: &str) -> Result<(), AppError> {
1576 crate::hub::delete_local_llm(&Self::llms_dir(), file_name)
1577 .map_err(AppError::AiInvocation)
1578 }
1579
1580 pub fn list_templates(&self) -> Result<Vec<TemplateRecord>, AppError> {
1581 templates::list(&Self::templates_dir(), &self.store)
1582 .map_err(|e| AppError::Template(e.to_string()))
1583 }
1584
1585 pub fn list_templates_in(
1586 &self,
1587 collection_slug: &str,
1588 ) -> Result<Vec<TemplateRecord>, AppError> {
1589 templates::list_in(&Self::templates_dir(), &self.store, collection_slug)
1590 .map_err(|e| AppError::Template(e.to_string()))
1591 }
1592
1593 pub fn save_template(&self, name: &str, graph: FlowGraph) -> Result<TemplateRecord, AppError> {
1594 templates::save(
1595 &Self::templates_dir(),
1596 &self.store,
1597 name,
1598 &graph,
1599 templates::DEFAULT_COLLECTION_SLUG,
1600 None,
1601 )
1602 .map_err(|e| AppError::Template(e.to_string()))
1603 }
1604
1605 pub fn save_template_in(
1606 &self,
1607 name: &str,
1608 graph: FlowGraph,
1609 collection_slug: &str,
1610 source: Option<templates::TemplateSource>,
1611 ) -> Result<TemplateRecord, AppError> {
1612 templates::save(
1613 &Self::templates_dir(),
1614 &self.store,
1615 name,
1616 &graph,
1617 collection_slug,
1618 source,
1619 )
1620 .map_err(|e| AppError::Template(e.to_string()))
1621 }
1622
1623 pub fn load_template(&self, slug: &str) -> Result<FlowGraph, AppError> {
1624 templates::load(&Self::templates_dir(), slug).map_err(|e| AppError::Template(e.to_string()))
1625 }
1626
1627 pub fn delete_template(&self, slug: &str) -> Result<(), AppError> {
1628 templates::delete(&Self::templates_dir(), &self.store, slug)
1629 .map_err(|e| AppError::Template(e.to_string()))?;
1630 let _ = self.store.delete_schedule(slug);
1633 Ok(())
1634 }
1635
1636 pub fn list_schedules(&self) -> Result<Vec<ScheduleRecord>, AppError> {
1638 self.store
1639 .list_schedules()
1640 .map_err(|e| AppError::Storage(e.to_string()))
1641 }
1642
1643 pub fn get_schedule(&self, template_slug: &str) -> Result<Option<ScheduleRecord>, AppError> {
1645 self.store
1646 .get_schedule(template_slug)
1647 .map_err(|e| AppError::Storage(e.to_string()))
1648 }
1649
1650 pub fn upsert_schedule(
1654 &self,
1655 patch: scheduler::SchedulePatch,
1656 ) -> Result<ScheduleRecord, AppError> {
1657 let freq = scheduler::ScheduleFrequency::parse(&patch.frequency).ok_or_else(|| {
1658 AppError::Validation(format!("unknown schedule frequency `{}`", patch.frequency))
1659 })?;
1660 let catchup = patch
1661 .catchup
1662 .as_deref()
1663 .and_then(scheduler::CatchUpPolicy::parse)
1664 .unwrap_or_default();
1665 let now = Utc::now();
1666 let existing = self
1667 .store
1668 .get_schedule(&patch.template_slug)
1669 .map_err(|e| AppError::Storage(e.to_string()))?;
1670 if patch.enabled {
1671 let graph = self.load_template(&patch.template_slug)?;
1672 if graph.nodes.is_empty() {
1673 return Err(AppError::Validation(
1674 "cannot schedule an empty flow".into(),
1675 ));
1676 }
1677 }
1678 let timezone = patch
1679 .timezone
1680 .clone()
1681 .or_else(|| existing.as_ref().map(|e| e.timezone.clone()))
1682 .unwrap_or_else(|| "UTC".into());
1683 let options = scheduler::ScheduleOptions {
1684 timezone: Some(timezone.clone()),
1685 cron: patch.cron.clone(),
1686 every_minutes: patch.every_minutes,
1687 until: patch.until,
1688 max_runs: patch.max_runs,
1689 run_count: existing.as_ref().map(|e| e.run_count).unwrap_or(0),
1690 };
1691 let next_run_at = if patch.enabled {
1692 scheduler::next_run_after(freq, patch.anchor_at, now, &options)
1693 .map_err(AppError::Validation)?
1694 } else {
1695 None
1696 };
1697 let rec = ScheduleRecord {
1698 template_slug: patch.template_slug,
1699 collection_slug: patch.collection_slug,
1700 flow_name: patch.flow_name,
1701 enabled: patch.enabled,
1702 frequency: freq.as_str().to_string(),
1703 anchor_at: patch.anchor_at,
1704 timezone,
1705 cron: patch.cron,
1706 every_minutes: patch.every_minutes,
1707 until: patch.until,
1708 max_runs: patch.max_runs,
1709 catchup: catchup.as_str().to_string(),
1710 next_run_at,
1711 last_run_at: existing.as_ref().and_then(|e| e.last_run_at),
1712 last_status: existing.as_ref().and_then(|e| e.last_status.clone()),
1713 run_count: existing.as_ref().map(|e| e.run_count).unwrap_or(0),
1714 created_at: existing.as_ref().map(|e| e.created_at).unwrap_or(now),
1715 updated_at: now,
1716 };
1717 self.store
1718 .upsert_schedule(&rec)
1719 .map_err(|e| AppError::Storage(e.to_string()))?;
1720 Ok(rec)
1721 }
1722
1723 pub fn preview_schedule(
1724 &self,
1725 req: scheduler::SchedulePreviewRequest,
1726 ) -> Result<Vec<DateTime<Utc>>, AppError> {
1727 scheduler::preview(req).map_err(AppError::Validation)
1728 }
1729
1730 pub fn delete_schedule(&self, template_slug: &str) -> Result<(), AppError> {
1731 self.store
1732 .delete_schedule(template_slug)
1733 .map_err(|e| AppError::Storage(e.to_string()))
1734 }
1735
1736 pub fn set_schedule_enabled(
1737 &self,
1738 template_slug: &str,
1739 enabled: bool,
1740 ) -> Result<Option<ScheduleRecord>, AppError> {
1741 let Some(schedule) = self.get_schedule(template_slug)? else {
1742 return Ok(None);
1743 };
1744 let next_run_at = if enabled {
1745 self.compute_next_schedule_run(&schedule, Utc::now(), schedule.run_count)?
1746 } else {
1747 None
1748 };
1749 self.store
1750 .set_schedule_enabled(template_slug, enabled, next_run_at)
1751 .map_err(|e| AppError::Storage(e.to_string()))?;
1752 self.get_schedule(template_slug)
1753 }
1754
1755 pub fn migrate_schedule(
1756 &self,
1757 old_slug: &str,
1758 new_slug: &str,
1759 collection_slug: &str,
1760 flow_name: &str,
1761 ) -> Result<(), AppError> {
1762 if old_slug == new_slug {
1763 return Ok(());
1764 }
1765 self.store
1766 .migrate_schedule(old_slug, new_slug, collection_slug, flow_name)
1767 .map_err(|e| AppError::Storage(e.to_string()))
1768 }
1769
1770 pub fn due_schedules(&self) -> Result<Vec<ScheduleRecord>, AppError> {
1773 if !self.settings.scheduler_enabled() {
1774 return Ok(Vec::new());
1775 }
1776 self.store
1777 .list_due_schedules(Utc::now())
1778 .map_err(|e| AppError::Storage(e.to_string()))
1779 }
1780
1781 pub async fn run_scheduled(
1786 &self,
1787 schedule: &ScheduleRecord,
1788 ) -> Result<Option<ExecutionSummary>, AppError> {
1789 let now = Utc::now();
1790 let scheduled_at = schedule.next_run_at.unwrap_or(now);
1791 let poll_grace = chrono::Duration::seconds((self.settings.scheduler_poll_secs() * 2) as i64);
1792 let missed = now - scheduled_at > poll_grace;
1793 let catchup = scheduler::CatchUpPolicy::parse(&schedule.catchup).unwrap_or_default();
1794 if missed && catchup == scheduler::CatchUpPolicy::Skip {
1795 let next = self.compute_next_schedule_run(schedule, now, schedule.run_count)?;
1796 let _ = self
1797 .store
1798 .advance_schedule_without_run(&schedule.template_slug, now, next, "skipped");
1799 return Ok(None);
1800 }
1801
1802 let after = if missed && catchup == scheduler::CatchUpPolicy::RunAll {
1803 scheduled_at
1804 } else {
1805 now
1806 };
1807 let next = self.compute_next_schedule_run(
1808 schedule,
1809 after,
1810 schedule.run_count.saturating_add(1),
1811 )?;
1812 let _ = self
1813 .store
1814 .mark_schedule_run(&schedule.template_slug, now, next, "running");
1815 let graph = self.load_template(&schedule.template_slug)?;
1816 let events: Arc<dyn EventSink> =
1817 Arc::new(flow_execution::StorageSink::with_flow_name_and_trigger(
1818 self.store.clone(),
1819 schedule.flow_name.clone(),
1820 "scheduled".into(),
1821 ));
1822 match self
1823 .run_graph_internal(Uuid::new_v4().to_string(), graph, events, Some(false), None, None)
1824 .await
1825 {
1826 Ok(summary) => {
1827 let _ = self.store.advance_schedule_without_run(
1828 &schedule.template_slug,
1829 Utc::now(),
1830 next,
1831 &summary.status,
1832 );
1833 Ok(Some(summary))
1834 }
1835 Err(e) => {
1836 let _ = self.store.advance_schedule_without_run(
1837 &schedule.template_slug,
1838 Utc::now(),
1839 next,
1840 "failed",
1841 );
1842 Err(e)
1843 }
1844 }
1845 }
1846
1847 pub async fn run_due_schedule(
1851 &self,
1852 schedule: &ScheduleRecord,
1853 ) -> Result<Vec<ExecutionSummary>, AppError> {
1854 let mut out = Vec::new();
1855 let mut current = schedule.clone();
1856 let run_all =
1857 scheduler::CatchUpPolicy::parse(¤t.catchup) == Some(scheduler::CatchUpPolicy::RunAll);
1858 for _ in 0..100 {
1859 if let Some(summary) = self.run_scheduled(¤t).await? {
1860 out.push(summary);
1861 }
1862 if !run_all {
1863 break;
1864 }
1865 let Some(next) = self.get_schedule(¤t.template_slug)? else {
1866 break;
1867 };
1868 if !next.enabled || next.next_run_at.is_none_or(|n| n > Utc::now()) {
1869 break;
1870 }
1871 current = next;
1872 }
1873 Ok(out)
1874 }
1875
1876 fn compute_next_schedule_run(
1877 &self,
1878 schedule: &ScheduleRecord,
1879 after: DateTime<Utc>,
1880 run_count: u32,
1881 ) -> Result<Option<DateTime<Utc>>, AppError> {
1882 let freq = scheduler::ScheduleFrequency::parse(&schedule.frequency).ok_or_else(|| {
1883 AppError::Validation(format!("unknown schedule frequency `{}`", schedule.frequency))
1884 })?;
1885 scheduler::next_run_after(
1886 freq,
1887 schedule.anchor_at,
1888 after,
1889 &scheduler::ScheduleOptions {
1890 timezone: Some(schedule.timezone.clone()),
1891 cron: schedule.cron.clone(),
1892 every_minutes: schedule.every_minutes,
1893 until: schedule.until,
1894 max_runs: schedule.max_runs,
1895 run_count,
1896 },
1897 )
1898 .map_err(AppError::Validation)
1899 }
1900
1901 pub fn list_template_collections(
1902 &self,
1903 ) -> Result<Vec<flow_storage::TemplateCollectionRecord>, AppError> {
1904 self.store
1905 .list_template_collections()
1906 .map_err(|e| AppError::Template(e.to_string()))
1907 }
1908
1909 pub fn create_template_collection(
1914 &self,
1915 name: &str,
1916 ) -> Result<flow_storage::TemplateCollectionRecord, AppError> {
1917 let slug = templates::slugify(name);
1918 if slug.is_empty() {
1919 return Err(AppError::Template(
1920 "collection name is empty after slugification".into(),
1921 ));
1922 }
1923 if self
1924 .store
1925 .get_template_collection(&slug)
1926 .map_err(|e| AppError::Template(e.to_string()))?
1927 .is_some()
1928 {
1929 return Err(AppError::Template(format!(
1930 "collection `{slug}` already exists"
1931 )));
1932 }
1933 self.store
1934 .upsert_template_collection(&slug, name, false)
1935 .map_err(|e| AppError::Template(e.to_string()))
1936 }
1937
1938 pub fn ensure_template_collection(
1944 &self,
1945 name: &str,
1946 ) -> Result<flow_storage::TemplateCollectionRecord, AppError> {
1947 let slug = templates::slugify(name);
1948 if slug.is_empty() {
1949 return Err(AppError::Template(
1950 "collection name is empty after slugification".into(),
1951 ));
1952 }
1953 if let Some(existing) = self
1954 .store
1955 .get_template_collection(&slug)
1956 .map_err(|e| AppError::Template(e.to_string()))?
1957 {
1958 return Ok(existing);
1959 }
1960 self.store
1961 .upsert_template_collection(&slug, name, false)
1962 .map_err(|e| AppError::Template(e.to_string()))
1963 }
1964
1965 pub fn rename_template_collection(
1966 &self,
1967 slug: &str,
1968 new_name: &str,
1969 ) -> Result<flow_storage::TemplateCollectionRecord, AppError> {
1970 templates::validate_slug(slug).map_err(|e| AppError::Template(e.to_string()))?;
1971 if new_name.trim().is_empty() {
1972 return Err(AppError::Template(
1973 "collection display name cannot be empty".into(),
1974 ));
1975 }
1976 self.store
1977 .rename_template_collection(slug, new_name)
1978 .map_err(|e| AppError::Template(e.to_string()))
1979 }
1980
1981 pub fn delete_template_collection(&self, slug: &str) -> Result<(), AppError> {
1982 templates::validate_slug(slug).map_err(|e| AppError::Template(e.to_string()))?;
1983 self.store
1984 .delete_template_collection(slug)
1985 .map_err(|e| AppError::Template(e.to_string()))
1986 }
1987
1988 pub fn list_template_hub(&self) -> Vec<template_hub::TemplateHubEntry> {
1990 template_hub::catalog()
1991 }
1992
1993 pub fn list_template_hub_with_status(
1999 &self,
2000 ) -> Result<Vec<template_hub::TemplateHubEntryWithStatus>, AppError> {
2001 template_hub::catalog_with_status(&self.store, &Self::templates_dir())
2002 .map_err(|e| AppError::Template(e.to_string()))
2003 }
2004
2005 pub fn add_hub_template(
2009 &self,
2010 hub_slug: &str,
2011 collection_slug: &str,
2012 ) -> Result<TemplateRecord, AppError> {
2013 template_hub::add_to_collection(
2014 &Self::templates_dir(),
2015 &self.store,
2016 hub_slug,
2017 collection_slug,
2018 )
2019 .map_err(|e| AppError::Template(e.to_string()))
2020 }
2021
2022 pub fn update_hub_template(
2027 &self,
2028 hub_slug: &str,
2029 force: bool,
2030 ) -> Result<TemplateRecord, AppError> {
2031 template_hub::update_installed(&Self::templates_dir(), &self.store, hub_slug, force)
2032 .map_err(|e| AppError::Template(e.to_string()))
2033 }
2034
2035 pub fn list_node_catalog(&self) -> Vec<nodes::NodeCatalogEntry> {
2037 node_hub::catalog()
2038 }
2039
2040 pub fn list_node_catalog_with_status(
2044 &self,
2045 ) -> Result<Vec<node_hub::NodeCatalogEntryWithStatus>, AppError> {
2046 node_hub::catalog_with_status(&self.store)
2047 .map_err(|e| AppError::Template(e.to_string()))
2048 }
2049
2050 pub fn list_installed_nodes(&self) -> Result<Vec<nodes::NodeCatalogEntry>, AppError> {
2054 nodes::list_installed_schemes(&self.store, &Self::nodes_dir())
2055 .map_err(|e| AppError::Template(e.to_string()))
2056 }
2057
2058 pub fn add_node_to_library(
2061 &self,
2062 slug: &str,
2063 ) -> Result<flow_storage::NodeLibraryRow, AppError> {
2064 node_hub::add_to_library(&Self::nodes_dir(), &self.store, slug)
2065 .map_err(|e| AppError::Template(e.to_string()))
2066 }
2067
2068 pub fn update_installed_node(
2071 &self,
2072 slug: &str,
2073 force: bool,
2074 ) -> Result<flow_storage::NodeLibraryRow, AppError> {
2075 node_hub::update_installed(&Self::nodes_dir(), &self.store, slug, force)
2076 .map_err(|e| AppError::Template(e.to_string()))
2077 }
2078
2079 pub fn uninstall_node(&self, slug: &str) -> Result<(), AppError> {
2081 node_hub::uninstall(&Self::nodes_dir(), &self.store, slug)
2082 .map_err(|e| AppError::Template(e.to_string()))
2083 }
2084
2085 pub fn get_node_catalog_entry(
2089 &self,
2090 slug: &str,
2091 ) -> Result<nodes::NodeCatalogEntry, AppError> {
2092 if let Ok(entry) = nodes::read_scheme(&Self::nodes_dir(), slug) {
2093 return Ok(entry);
2094 }
2095 nodes::entry_by_slug(slug)
2096 .ok_or_else(|| AppError::Template(format!("node `{slug}` not found")))
2097 }
2098
2099 pub fn import_dsl(&self, source: &str) -> Result<FlowGraph, AppError> {
2103 flow_dsl::parse(source).map_err(|e| AppError::Dsl {
2104 line: e.line,
2105 col: e.col,
2106 message: e.message,
2107 })
2108 }
2109
2110 pub fn serialize_dsl(&self, graph: &FlowGraph) -> String {
2112 flow_dsl::serialize(graph)
2113 }
2114
2115 fn settings_path() -> PathBuf {
2116 let mut path = Self::flow_dir();
2117 path.push("settings.json");
2118 path
2119 }
2120
2121 pub fn settings(&self) -> Settings {
2122 self.settings.snapshot()
2123 }
2124
2125 pub fn update_settings(&self, patch: SettingsPatch) -> Result<Settings, AppError> {
2126 self.settings
2127 .update(patch)
2128 .map_err(|e| AppError::Settings(e.to_string()))
2129 }
2130
2131 pub fn list_cloud_providers(&self) -> Vec<flow_adapter_ai::registry::ProviderInfo> {
2132 let discovered_local = self.settings.local_ai_models();
2133 self.cloud_providers
2134 .iter()
2135 .map(|p| {
2136 let default_models = if p.category()
2141 == flow_adapter_ai::ProviderCategory::Local
2142 && !discovered_local.is_empty()
2143 {
2144 discovered_local.clone()
2145 } else {
2146 p.default_models().iter().map(|s| s.to_string()).collect()
2147 };
2148 flow_adapter_ai::registry::ProviderInfo {
2149 name: p.name().to_string(),
2150 env_var: p.env_var().to_string(),
2151 default_models,
2152 model_capabilities: p.model_capabilities(),
2153 category: p.category(),
2154 }
2155 })
2156 .collect()
2157 }
2158
2159 fn build_cloud_registry_for_run(&self) -> Arc<CloudAiRegistry> {
2160 let mut reg = CloudAiRegistry::new();
2161 for p in &self.cloud_providers {
2162 if self.settings.provider_enabled(p.name()) {
2163 reg.register(p.clone());
2164 }
2165 }
2166 Arc::new(reg)
2167 }
2168
2169 pub fn detect_llama_server() -> Option<String> {
2173 let names: &[&str] = if cfg!(windows) {
2174 &["llama-server.exe", "llama-cpp-server.exe"]
2175 } else {
2176 &["llama-server", "llama-cpp-server"]
2177 };
2178 let mut dirs: Vec<PathBuf> = Vec::new();
2179 if let Ok(path) = std::env::var("PATH") {
2180 dirs.extend(std::env::split_paths(&path));
2181 }
2182 for d in ["/opt/homebrew/bin", "/usr/local/bin", "/usr/bin"] {
2183 dirs.push(PathBuf::from(d));
2184 }
2185 for dir in dirs {
2186 for name in names {
2187 let candidate = dir.join(name);
2188 if candidate.is_file() {
2189 return Some(candidate.to_string_lossy().to_string());
2190 }
2191 }
2192 }
2193 None
2194 }
2195
2196 fn managed_engine_path() -> Option<String> {
2199 let name = if cfg!(windows) {
2200 "llama-server.exe"
2201 } else {
2202 "llama-server"
2203 };
2204 let path = Self::engines_dir().join(name);
2205 path.is_file().then(|| path.to_string_lossy().to_string())
2206 }
2207
2208 pub fn llama_server_path(&self) -> Option<String> {
2216 Self::managed_engine_path()
2217 .or_else(|| Self::existing_binary(self.settings.llama_server_binary()))
2218 .or_else(Self::detect_llama_server)
2219 }
2220
2221 fn existing_binary(path: Option<String>) -> Option<String> {
2225 path.filter(|p| !p.trim().is_empty())
2226 .filter(|p| std::path::Path::new(p).is_file())
2227 }
2228
2229 pub async fn ensure_llama_engine(
2234 &self,
2235 progress: Option<ProgressCallback>,
2236 ) -> Result<PathBuf, AppError> {
2237 let dir = Self::engines_dir();
2238 crate::llm_server::fetch::ensure_engine(&dir, move |p| {
2239 if let Some(cb) = &progress {
2240 cb(InstallProgress {
2241 stage: "engine",
2242 current: p.current,
2243 total: p.total.unwrap_or(0),
2244 message: p.message,
2245 });
2246 }
2247 })
2248 .await
2249 .map_err(AppError::AiInvocation)
2250 }
2251
2252 pub async fn start_local_llm(
2258 &self,
2259 binary: Option<String>,
2260 model: Option<String>,
2261 params: crate::llm_server::LlamaParams,
2262 ) -> Result<crate::llm_server::LlmServerStatus, AppError> {
2263 let binary = match Self::existing_binary(binary)
2269 .or_else(Self::managed_engine_path)
2270 .or_else(|| Self::existing_binary(self.settings.llama_server_binary()))
2271 .or_else(Self::detect_llama_server)
2272 {
2273 Some(b) => b,
2274 None => self
2275 .ensure_llama_engine(None)
2276 .await?
2277 .to_string_lossy()
2278 .to_string(),
2279 };
2280 let model = model
2281 .filter(|s| !s.trim().is_empty())
2282 .or_else(|| self.settings.llama_server_model())
2283 .ok_or_else(|| AppError::AiInvocation("no LLM selected".into()))?;
2284
2285 let endpoint = self
2288 .llm_server
2289 .start(PathBuf::from(&binary), PathBuf::from(&model), params)
2290 .await
2291 .map_err(AppError::AiInvocation)?;
2292
2293 let discovered = flow_adapter_ai::LocalOpenAiProvider::new()
2299 .list_models(&endpoint)
2300 .await
2301 .unwrap_or_default();
2302
2303 let _ = self.update_settings(SettingsPatch {
2304 llama_server_binary: Some(binary),
2305 llama_server_model: Some(model),
2306 local_ai_base_url: Some(endpoint),
2307 local_ai_models: Some(discovered),
2308 allow_local_ai: Some(true),
2309 ..Default::default()
2310 });
2311 Ok(self.llm_server.status())
2312 }
2313
2314 pub async fn stop_local_llm(&self) -> crate::llm_server::LlmServerStatus {
2318 self.llm_server.stop().await;
2319 let _ = self.update_settings(SettingsPatch {
2320 local_ai_base_url: Some(String::new()),
2321 local_ai_models: Some(Vec::new()),
2322 ..Default::default()
2323 });
2324 self.llm_server.status()
2325 }
2326
2327 pub fn local_llm_status(&self) -> crate::llm_server::LlmServerStatus {
2329 self.llm_server.status()
2330 }
2331
2332 pub fn store(&self) -> &Store {
2333 &self.store
2334 }
2335
2336 pub fn list_executions(&self, limit: usize) -> Result<Vec<ExecutionRecord>, AppError> {
2337 self.store
2338 .list_executions(limit)
2339 .map_err(|e| AppError::Storage(e.to_string()))
2340 }
2341
2342 pub fn get_execution(
2343 &self,
2344 execution_id: &str,
2345 ) -> Result<
2346 Option<(ExecutionRecord, Vec<ExecutionStepRecord>, Vec<InterceptionRecord>)>,
2347 AppError,
2348 > {
2349 self.store
2350 .get_execution(execution_id)
2351 .map_err(|e| AppError::Storage(e.to_string()))
2352 }
2353
2354 pub fn record_interception(
2357 &self,
2358 execution_id: String,
2359 failed_node: String,
2360 issue: Option<String>,
2361 summary: String,
2362 dsl_before: Option<String>,
2363 dsl_after: Option<String>,
2364 ) -> Result<(), AppError> {
2365 self.store
2366 .record_interception(&InterceptionRecord {
2367 execution_id,
2368 at: chrono::Utc::now(),
2369 failed_node,
2370 issue,
2371 summary,
2372 dsl_before,
2373 dsl_after,
2374 })
2375 .map_err(|e| AppError::Storage(e.to_string()))
2376 }
2377
2378 pub async fn execute_graph(
2379 &self,
2380 graph: FlowGraph,
2381 events: Arc<dyn EventSink>,
2382 ) -> Result<ExecutionSummary, AppError> {
2383 self.execute_graph_with_id(Uuid::new_v4().to_string(), graph, events)
2384 .await
2385 }
2386
2387 pub async fn execute_graph_with_id(
2394 &self,
2395 execution_id: String,
2396 graph: FlowGraph,
2397 events: Arc<dyn EventSink>,
2398 ) -> Result<ExecutionSummary, AppError> {
2399 self.run_graph_internal(execution_id, graph, events, None, None, None)
2400 .await
2401 }
2402
2403 async fn run_graph_internal(
2412 &self,
2413 execution_id: String,
2414 graph: FlowGraph,
2415 events: Arc<dyn EventSink>,
2416 confirm_destructive_override: Option<bool>,
2417 edit_staging: Option<flow_execution::StagedEdits>,
2418 workspace_override: Option<PathBuf>,
2419 ) -> Result<ExecutionSummary, AppError> {
2420 let allow_cloud_ai = self.settings.allow_cloud_ai();
2421 let allow_local_ai = self.settings.allow_local_ai();
2422 let local_ai_base_url = self.settings.local_ai_base_url();
2423 let cloud_providers = self.build_cloud_registry_for_run();
2424 let confirm_destructive = confirm_destructive_override.unwrap_or_else(|| {
2427 self.settings.confirm_destructive()
2428 && !self
2429 .confirm_gate_disabled
2430 .load(std::sync::atomic::Ordering::SeqCst)
2431 });
2432 let control: flow_execution::SharedRunControl = Default::default();
2435 if let Ok(mut runs) = self.runs.lock() {
2436 runs.insert(execution_id.clone(), control.clone());
2437 }
2438 let events: Arc<dyn EventSink> = Arc::new(flow_execution::TracingSink::new(events));
2442 let workspace_root =
2446 workspace_override.unwrap_or_else(|| self.resolve_workspace(&graph.id));
2447 if let Err(e) = std::fs::create_dir_all(&workspace_root) {
2448 tracing::warn!(error = %e, path = %workspace_root.display(), "failed to create workspace dir");
2449 }
2450 let executor = Executor {
2451 adapters: self.adapters.clone(),
2452 sanitizer: self.sanitizer.clone(),
2453 events,
2454 cloud_providers,
2455 credentials: self.resolver.clone(),
2456 allow_cloud_ai,
2457 allow_local_ai,
2458 local_ai_base_url,
2459 stream_sink: self.stream_sink(),
2460 working_memory: self.working_memory.clone(),
2461 control,
2462 confirm_destructive,
2463 review_gate_available: !self
2466 .confirm_gate_disabled
2467 .load(std::sync::atomic::Ordering::SeqCst),
2468 edit_staging,
2469 workspace_root,
2470 };
2471 let result = executor.run_with_id(execution_id.clone(), &graph).await;
2472 if let Ok(mut runs) = self.runs.lock() {
2473 runs.remove(&execution_id);
2474 }
2475 self.persist_memory();
2477 result.map_err(|e| AppError::Execution(e.to_string()))
2478 }
2479
2480 pub async fn run_agent_turn(
2486 &self,
2487 execution_id: String,
2488 graph: FlowGraph,
2489 events: Arc<dyn EventSink>,
2490 ) -> Result<AgentTurn, AppError> {
2491 let staging: flow_execution::StagedEdits = Default::default();
2492 let summary = self
2493 .run_graph_internal(execution_id, graph, events, None, Some(staging.clone()), None)
2494 .await?;
2495 let edits = staging.lock().map(|s| s.edits()).unwrap_or_default();
2496 Ok(AgentTurn { summary, edits })
2497 }
2498
2499 pub fn pause_run(&self, execution_id: &str) {
2502 if let Ok(runs) = self.runs.lock() {
2503 if let Some(c) = runs.get(execution_id) {
2504 c.pause();
2505 }
2506 }
2507 }
2508
2509 pub fn resume_run(&self, execution_id: &str) {
2511 if let Ok(runs) = self.runs.lock() {
2512 if let Some(c) = runs.get(execution_id) {
2513 c.resume();
2514 }
2515 }
2516 }
2517
2518 pub fn resolve_ai_review(&self, execution_id: &str, approved: bool) {
2523 if let Ok(runs) = self.runs.lock() {
2524 if let Some(c) = runs.get(execution_id) {
2525 c.resolve_review(approved);
2526 }
2527 }
2528 }
2529
2530 pub fn pause_all_runs(&self) {
2533 if let Ok(runs) = self.runs.lock() {
2534 for c in runs.values() {
2535 c.pause();
2536 }
2537 }
2538 }
2539
2540 pub fn resume_all_runs(&self) {
2542 if let Ok(runs) = self.runs.lock() {
2543 for c in runs.values() {
2544 c.resume();
2545 }
2546 }
2547 }
2548
2549 pub fn disable_confirmation_gate(&self) {
2554 self.confirm_gate_disabled
2555 .store(true, std::sync::atomic::Ordering::SeqCst);
2556 }
2557
2558 pub fn cancel_run(&self, execution_id: &str) {
2562 if let Ok(runs) = self.runs.lock() {
2563 if let Some(c) = runs.get(execution_id) {
2564 c.cancel();
2565 }
2566 }
2567 }
2568
2569 pub fn cancel_all_runs(&self) {
2572 if let Ok(runs) = self.runs.lock() {
2573 for c in runs.values() {
2574 c.cancel();
2575 }
2576 }
2577 }
2578
2579 pub fn clear_working_memory(&self) {
2582 if let Ok(mut mem) = self.working_memory.lock() {
2583 mem.clear();
2584 }
2585 let _ = self.store.clear_memory();
2586 }
2587
2588 fn persist_memory(&self) {
2591 if let Ok(mem) = self.working_memory.lock() {
2592 let _ = self.store.save_memory(&mem);
2593 }
2594 }
2595
2596 pub fn verify_flow(&self, dsl: &str) -> Vec<FlowWarning> {
2601 match flow_dsl::parse(dsl) {
2602 Ok(graph) => flow_execution::verify_graph(&graph),
2603 Err(_) => Vec::new(),
2604 }
2605 }
2606
2607 pub async fn generate_dsl_via_cloud(
2628 &self,
2629 provider: &str,
2630 model: &str,
2631 prompt: &str,
2632 max_tokens: Option<u32>,
2633 temperature: Option<f32>,
2634 ) -> Result<String, AppError> {
2635 let is_local = provider == "local";
2639 if is_local {
2640 if !self.settings.allow_local_ai() {
2641 return Err(AppError::AiInvocation(
2642 "local AI is disabled (Settings -> LLM -> Local -> enable)".into(),
2643 ));
2644 }
2645 } else if !self.settings.allow_cloud_ai() {
2646 return Err(AppError::AiInvocation(
2647 "cloud AI is disabled (Settings -> LLM -> Cloud -> enable to allow outbound calls)"
2648 .into(),
2649 ));
2650 }
2651 let registry = self.build_cloud_registry_for_run();
2652 let p = registry.get(provider).ok_or_else(|| {
2653 AppError::AiInvocation(format!("provider '{provider}' is not enabled in Settings"))
2654 })?;
2655 let env_var = p.env_var();
2656 let api_key = match self.resolver.resolve_cloud_ai_key(provider, env_var) {
2657 Ok(k) => k,
2658 Err(_) if is_local => String::new(),
2660 Err(e) => {
2661 return Err(AppError::AiInvocation(format!(
2662 "no API key for '{provider}': {e}. Set it in Settings or export {env_var}."
2663 )))
2664 }
2665 };
2666 let base_url = if is_local {
2667 self.settings.local_ai_base_url()
2668 } else {
2669 None
2670 };
2671
2672 let effective_max_tokens = Some(max_tokens.unwrap_or(0).max(8192));
2684 let max_tokens = effective_max_tokens;
2685
2686 let system_prompt = DSL_GENERATION_SYSTEM_PROMPT;
2687
2688 tracing::info!(
2689 provider,
2690 model,
2691 is_local,
2692 prompt_chars = prompt.len(),
2693 "generating Flow DSL"
2694 );
2695
2696 let sink = self.stream_sink();
2698 let (raw1, finish1) = cloud_call_once(
2699 p.as_ref(),
2700 model,
2701 system_prompt,
2702 prompt,
2703 max_tokens,
2704 temperature,
2705 &api_key,
2706 base_url.clone(),
2707 sink.as_ref(),
2708 )
2709 .await?;
2710 let extracted1 = extract_dsl_text(&raw1);
2711
2712 const MAX_CORRECTIVE_RETRIES: usize = 3;
2719 let mut current = extracted1;
2720 let mut attempt = 0usize;
2721 let mut last_reason = match validate_generated_dsl(¤t) {
2722 Ok(dsl) => {
2723 tracing::info!(provider, model, attempt, "Flow DSL generated on first try");
2724 return Ok(dsl);
2725 }
2726 Err(reason) => reason,
2727 };
2728 if is_truncated(&finish1) {
2732 return Err(truncation_error(provider, model, max_tokens, &last_reason));
2733 }
2734 for _ in 0..MAX_CORRECTIVE_RETRIES {
2735 attempt += 1;
2736 let retry_user = if last_reason.starts_with("DSL parse error") {
2737 let parse_err = flow_dsl::parse(¤t).expect_err("parse error expected");
2738 tracing::warn!(
2739 provider,
2740 attempt,
2741 line = parse_err.line,
2742 col = parse_err.col,
2743 message = %parse_err.message,
2744 "generated DSL failed to parse; retrying with corrective prompt"
2745 );
2746 build_retry_prompt(prompt, ¤t, &parse_err)
2747 } else {
2748 let graph = flow_dsl::parse(¤t).expect("already parsed");
2749 let issues = dsl_semantics::validate_generated_graph(&graph);
2750 tracing::warn!(
2751 provider,
2752 attempt,
2753 %last_reason,
2754 "generated DSL failed semantic checks; retrying with corrective prompt"
2755 );
2756 build_semantic_retry_prompt(prompt, ¤t, &issues)
2757 };
2758 let (raw, finish) = cloud_call_once(
2759 p.as_ref(),
2760 model,
2761 system_prompt,
2762 &retry_user,
2763 max_tokens,
2764 temperature,
2765 &api_key,
2766 base_url.clone(),
2767 sink.as_ref(),
2768 )
2769 .await?;
2770 current = extract_dsl_text(&raw);
2771 match validate_generated_dsl(¤t) {
2772 Ok(dsl) => {
2773 tracing::info!(
2774 provider,
2775 model,
2776 attempt,
2777 "Flow DSL generated after corrective retry"
2778 );
2779 return Ok(dsl);
2780 }
2781 Err(reason) => last_reason = reason,
2782 }
2783 if is_truncated(&finish) {
2786 return Err(truncation_error(provider, model, max_tokens, &last_reason));
2787 }
2788 }
2789
2790 let hint = last_reason
2794 .strip_prefix("DSL parse error at ")
2795 .and_then(|rest| rest.split_once(" - "))
2796 .and_then(|(_, msg)| parse_error_hint(msg))
2797 .map(|h| format!("\n\n{h}"))
2798 .unwrap_or_default();
2799 tracing::warn!(
2800 provider,
2801 model,
2802 reason = %last_reason,
2803 dsl = %truncate_for_log(¤t),
2804 "Flow DSL generation failed after corrective retries"
2805 );
2806 Err(AppError::AiInvocation(format!(
2807 "{provider} model returned invalid DSL after {MAX_CORRECTIVE_RETRIES} corrective retries: {last_reason}{hint}"
2808 )))
2809 }
2810
2811 pub async fn generate_dsl_auto(
2815 &self,
2816 _local_model_id: Option<&str>,
2817 cloud: Option<(&str, &str)>,
2818 prompt: &str,
2819 ) -> Result<DslGenerationOutcome, AppError> {
2820 if let Some((provider, model)) = cloud {
2821 let dsl = self
2822 .generate_dsl_via_cloud(provider, model, prompt, Some(2048), None)
2823 .await
2824 .map_err(|e| AppError::AiInvocation(format!("cloud: {e}")))?;
2825 return Ok(DslGenerationOutcome {
2826 dsl,
2827 source: DslSource::Cloud {
2828 provider: provider.to_string(),
2829 model_id: model.to_string(),
2830 },
2831 fallback_reason: None,
2832 plan: None,
2833 });
2834 }
2835 Err(AppError::AiInvocation(
2836 "no flow-graph generator configured (select a model or provider)".into(),
2837 ))
2838 }
2839
2840 pub async fn agentic_generate(
2854 &self,
2855 local_model_id: Option<&str>,
2856 cloud: Option<(&str, &str)>,
2857 prompt: &str,
2858 ) -> Result<DslGenerationOutcome, AppError> {
2859 let mut outcome = self.generate_dsl_auto(local_model_id, cloud, prompt).await?;
2860 outcome.plan = self.synthesize_plan(&outcome.dsl, cloud).await;
2861 Ok(outcome)
2862 }
2863
2864 pub async fn agentic_replan(
2875 &self,
2876 local_model_id: Option<&str>,
2877 cloud: Option<(&str, &str)>,
2878 prompt: &str,
2879 prior_dsl: &str,
2880 observations: &str,
2881 ) -> Result<DslGenerationOutcome, AppError> {
2882 let augmented = format!(
2883 "{prompt}\n\n\
2884 You previously generated this Flow DSL:\n\n{prior_dsl}\n\n\
2885 When it ran, these problems occurred:\n\n{observations}\n\n\
2886 Produce a corrected Flow DSL that fixes the failing steps. Keep the \
2887 parts that already succeeded. Emit DSL only."
2888 );
2889 self.agentic_generate(local_model_id, cloud, &augmented)
2890 .await
2891 }
2892
2893 pub async fn agentic_run_loop(
2905 &self,
2906 local_model_id: Option<&str>,
2907 cloud: Option<(&str, &str)>,
2908 prompt: &str,
2909 workspace: Option<&str>,
2910 events: Arc<dyn EventSink>,
2911 ) -> Result<AgenticLoopSummary, AppError> {
2912 let ws_override = workspace
2915 .filter(|w| !w.trim().is_empty())
2916 .map(expand_home);
2917 let mut steps = Vec::new();
2918 let mut prev_dsl: Option<String> = None;
2919 let mut observations: Option<String> = None;
2920 let mut last_outcome: Option<DslGenerationOutcome> = None;
2921 let cap = self.settings.max_agentic_iterations();
2922 let budget_secs = self.settings.max_agentic_seconds();
2923 let token_budget = self.settings.max_agentic_tokens();
2924 let started = std::time::Instant::now();
2925 let mut tokens_used: u64 = 0;
2926
2927 for iter in 0..cap {
2928 let over_time =
2932 budget_secs > 0 && started.elapsed().as_secs() >= budget_secs as u64;
2933 let over_tokens = token_budget > 0 && tokens_used >= token_budget as u64;
2934 if iter > 0 && (over_time || over_tokens) {
2935 let last = last_outcome.expect("loop body runs at least once");
2936 return Ok(AgenticLoopSummary {
2937 iterations: iter,
2938 status: "budget_exhausted".into(),
2939 final_dsl: last.dsl,
2940 final_plan: last.plan,
2941 steps,
2942 });
2943 }
2944 let outcome = match (&prev_dsl, &observations) {
2945 (Some(dsl), Some(obs)) => {
2946 self.agentic_replan(local_model_id, cloud, prompt, dsl, obs)
2947 .await?
2948 }
2949 _ => self.agentic_generate(local_model_id, cloud, prompt).await?,
2950 };
2951
2952 let graph = flow_dsl::parse(&outcome.dsl).map_err(|e| {
2953 AppError::AiInvocation(format!(
2954 "generated DSL failed to parse at {}:{} - {}",
2955 e.line, e.col, e.message
2956 ))
2957 })?;
2958
2959 let capture = CapturingSink::default();
2962 let sink: Arc<dyn EventSink> =
2963 Arc::new(MultiSink::new(vec![events.clone(), Arc::new(capture.clone())]));
2964 let summary = self
2965 .run_graph_internal(
2966 Uuid::new_v4().to_string(),
2967 graph,
2968 sink,
2969 None,
2970 None,
2971 ws_override.clone(),
2972 )
2973 .await?;
2974 let captured = capture.events.lock().map(|g| g.clone()).unwrap_or_default();
2975 tokens_used = tokens_used.saturating_add(sum_ai_tokens(&captured));
2976
2977 let converged = summary.failed == 0;
2978 let obs = if converged {
2979 None
2980 } else {
2981 Some(build_loop_observations(&captured, &summary))
2982 };
2983 steps.push(AgenticLoopStep {
2984 iteration: iter,
2985 run_status: summary.status.clone(),
2986 succeeded: summary.succeeded,
2987 failed: summary.failed,
2988 skipped: summary.skipped,
2989 observations: obs.clone(),
2990 });
2991
2992 if converged {
2993 return Ok(AgenticLoopSummary {
2994 iterations: iter + 1,
2995 status: "succeeded".into(),
2996 final_dsl: outcome.dsl,
2997 final_plan: outcome.plan,
2998 steps,
2999 });
3000 }
3001
3002 observations = obs;
3003 prev_dsl = Some(outcome.dsl.clone());
3004 last_outcome = Some(outcome);
3005 }
3006
3007 let last = last_outcome.expect("loop body runs at least once");
3010 Ok(AgenticLoopSummary {
3011 iterations: cap,
3012 status: "exhausted".into(),
3013 final_dsl: last.dsl,
3014 final_plan: last.plan,
3015 steps,
3016 })
3017 }
3018
3019 async fn synthesize_plan(
3024 &self,
3025 dsl: &str,
3026 cloud: Option<(&str, &str)>,
3027 ) -> Option<String> {
3028 let (provider, model) = cloud?;
3031 let registry = self.build_cloud_registry_for_run();
3032 let p = registry.get(provider)?;
3033 let is_local = provider == "local";
3034 let allowed = if is_local {
3035 self.settings.allow_local_ai()
3036 } else {
3037 self.settings.allow_cloud_ai()
3038 };
3039 if !allowed {
3040 return None;
3041 }
3042 let env_var = p.env_var();
3043 let api_key = match self.resolver.resolve_cloud_ai_key(provider, env_var) {
3044 Ok(k) => k,
3045 Err(_) if is_local => String::new(),
3046 Err(_) => return None,
3047 };
3048 let base_url = if is_local {
3049 self.settings.local_ai_base_url()
3050 } else {
3051 None
3052 };
3053 let system = "You explain a Flow DSL workflow for a \
3054 reviewer about to apply it. Write one short paragraph \
3055 (under 60 words) summarising what the workflow does, \
3056 then a numbered list with one line per step describing \
3057 that step's purpose. No code fences, no DSL syntax.";
3058 let user = format!(
3059 "Flow DSL to summarise:\n\n{dsl}\n\nReturn the paragraph + numbered list only."
3060 );
3061 let req = flow_adapter_ai::CloudAiRequest {
3062 model: model.to_string(),
3063 prompt: user,
3064 max_tokens: Some(if is_local { 1024 } else { 600 }),
3067 temperature: Some(0.2),
3068 api_key,
3069 system: Some(system.to_string()),
3070 base_url,
3071 stream: true,
3072 call_id: Some(format!("plan-{}", uuid::Uuid::new_v4())),
3073 ..Default::default()
3074 };
3075 let sink = self.stream_sink();
3076 match p.invoke_stream(&req, sink.as_ref()).await {
3077 Ok(resp) => {
3078 let trimmed = resp.text.trim().to_string();
3079 if trimmed.is_empty() {
3080 None
3081 } else {
3082 Some(trimmed)
3083 }
3084 }
3085 Err(e) => {
3086 tracing::debug!(provider, error = %e, "plan synthesis call failed; returning None");
3087 None
3088 }
3089 }
3090 }
3091
3092 pub async fn load_graph(&self, graph_id: &str) -> Result<FlowGraph, AppError> {
3093 if graph_id.is_empty() {
3094 return Err(AppError::GraphNotFound("empty graph id".into()));
3095 }
3096
3097 Ok(FlowGraph {
3098 subflows: Vec::new(),
3099 id: graph_id.to_string(),
3100 name: "JCL Pipeline".into(),
3101 version: "0.1.0".into(),
3102 description: None,
3103 nodes: vec![
3104 FlowNode {
3105 id: "validate-jcl".into(),
3106 node_type: "ai".into(),
3107 position: Position { x: 80.0, y: 80.0 },
3108 data: serde_json::json!({
3109 "label": "Review JCL",
3110 "modelId": "local-llm",
3111 "input": "{{input}}"
3112 }),
3113 },
3114 FlowNode {
3115 id: "submit-jcl".into(),
3116 node_type: "action".into(),
3117 position: Position { x: 380.0, y: 80.0 },
3118 data: serde_json::json!({
3119 "label": "Submit JCL",
3120 "actionId": "cli-tool",
3121 "adapter": "cli",
3122 "bin": "zowe",
3123 "command": "jobs submit local-file"
3124 }),
3125 },
3126 ],
3127 edges: vec![FlowEdge {
3128 id: "e1".into(),
3129 source: "validate-jcl".into(),
3130 target: "submit-jcl".into(),
3131 label: None,
3132 condition: None,
3133 outcome: flow_domain::graph::EdgeOutcome::Always,
3134 }],
3135 })
3136 }
3137
3138 pub async fn validate_graph(&self, graph: FlowGraph) -> Result<(), AppError> {
3139 if graph.nodes.is_empty() {
3140 return Err(AppError::Validation(
3141 "graph must contain at least one node".into(),
3142 ));
3143 }
3144 Ok(())
3145 }
3146
3147 pub async fn run_flow(&self, _req: ExecutionRequest) -> Result<ExecutionResult, AppError> {
3148 Ok(ExecutionResult {
3149 execution_id: Uuid::new_v4().to_string(),
3150 status: ExecutionStatus::Succeeded,
3151 started_at: Utc::now(),
3152 ended_at: Some(Utc::now()),
3153 })
3154 }
3155}