Skip to main content

flow_execution/
adapter.rs

1use async_trait::async_trait;
2use chrono::Utc;
3use flow_domain::graph::FlowNode;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::Duration;
9use thiserror::Error;
10
11use crate::events::{EventSink, ExecutionEvent, LogStream};
12
13#[derive(Debug, Error)]
14pub enum AdapterError {
15    #[error("adapter '{0}' not found")]
16    NotFound(String),
17    #[error("adapter '{adapter}' rejected node {node_id}: {reason}")]
18    Rejected {
19        adapter: String,
20        node_id: String,
21        reason: String,
22    },
23    #[error("adapter '{adapter}' failed on node {node_id}: {reason}")]
24    Failed {
25        adapter: String,
26        node_id: String,
27        reason: String,
28    },
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct NodeOutput {
33    pub status: String,
34    pub payload: serde_json::Value,
35}
36
37/// Per-invocation context passed to streaming-capable adapters. Carries the
38/// sink the adapter can use to emit `NodeLog` events as work happens, plus
39/// identifiers so those events can be routed back to the right node.
40pub struct AdapterCtx {
41    pub events: Arc<dyn EventSink>,
42    pub execution_id: String,
43    pub node_id: String,
44    /// Resolved per-run workspace root. Adapters use it as the default cwd
45    /// (`shell` / `cli`) or `workspaceRoot` (`fs`) when the node sets none.
46    pub workspace_root: PathBuf,
47}
48
49impl AdapterCtx {
50    /// Helper for adapters: emit a single line on the given stream.
51    pub fn emit_line(&self, stream: LogStream, line: impl Into<String>) {
52        self.events.emit(ExecutionEvent::NodeLog {
53            execution_id: self.execution_id.clone(),
54            node_id: self.node_id.clone(),
55            at: Utc::now(),
56            stream,
57            line: line.into(),
58        });
59    }
60}
61
62// async_trait is required to make Adapter dyn-compatible.
63#[async_trait]
64pub trait Adapter: Send + Sync {
65    fn name(&self) -> &str;
66    async fn execute(&self, node: &FlowNode) -> Result<NodeOutput, AdapterError>;
67
68    /// Streaming variant. Default delegates to `execute` (no incremental
69    /// output). Adapters that want to push log lines while running override
70    /// this and use `ctx.emit_line` for each line. The shell adapter is the
71    /// first such consumer; the existing zowe / mock adapters keep the
72    /// default and continue to work unchanged.
73    async fn execute_with_events(
74        &self,
75        node: &FlowNode,
76        _ctx: &AdapterCtx,
77    ) -> Result<NodeOutput, AdapterError> {
78        self.execute(node).await
79    }
80
81    /// Static catalog of what this adapter dispatches. Consumed by the
82    /// `flow-spec-dump` binary that feeds the flow-graph-generator spec compiler:
83    /// the markdown under `docs/dsl/adapters/<name>.md` is regenerated from
84    /// these descriptors, and the same catalog grounds the local fine-tune
85    /// and the cloud_ai system prompt.
86    ///
87    /// Default returns an empty catalog (no actions). Live adapters
88    /// (`ShellAdapter`, `CliAdapter`) override with their real surface;
89    /// placeholder adapters (`ssh`, `zosmf`) and `MockAdapter` keep the
90    /// default and are surfaced separately as "placeholder" by the
91    /// compiler.
92    fn descriptor(&self) -> AdapterDescriptor {
93        AdapterDescriptor {
94            name: self.name().to_string(),
95            summary: String::new(),
96            actions: Vec::new(),
97        }
98    }
99}
100
101/// Catalog of an adapter's dispatch surface, exposed via [`Adapter::descriptor`].
102///
103/// Built at runtime (not const) because some adapter names are runtime values
104/// (e.g. [`MockAdapter`] takes its name as a constructor arg). This type is
105/// the on-the-wire JSON shape consumed by `flow-spec-dump` -> markdown
106/// renderer; renaming a field here is a breaking change to the spec build.
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct AdapterDescriptor {
109    /// `data.adapter` value to use in nodes that target this adapter.
110    pub name: String,
111    /// One-paragraph human description (rendered as the section intro in
112    /// `docs/dsl/adapters/<name>.md`).
113    pub summary: String,
114    /// Every [`ActionDescriptor::id`] here must correspond to a live arm of
115    /// this adapter's `execute()` match. Drift between the two is the bug
116    /// the per-adapter `descriptor_lists_every_action` test is meant to
117    /// catch.
118    pub actions: Vec<ActionDescriptor>,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct ActionDescriptor {
123    /// `data.actionId` value.
124    pub id: String,
125    /// One-line human description.
126    pub summary: String,
127    /// Fields the flow-graph-generator MUST emit for this action.
128    pub required_fields: Vec<FieldDescriptor>,
129    /// Fields the flow-graph-generator MAY emit. Defaults are documented in
130    /// `description`.
131    pub optional_fields: Vec<FieldDescriptor>,
132    /// Canonical DSL snippet that the generator should pattern-match. One
133    /// example per action is enough; multi-step compositions live in
134    /// `docs/dsl/examples.md` instead.
135    pub example_dsl: String,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct FieldDescriptor {
140    pub name: String,
141    pub value_type: FieldType,
142    pub description: String,
143}
144
145/// Scalar value types the DSL grammar accepts in node bodies. Matches the
146/// four scalar arms of `crates/flow-dsl/src/parser.rs` exactly. There is no
147/// `object` or `array` variant on purpose: the grammar refuses nested
148/// structures, so the descriptor surface refuses them too.
149#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
150#[serde(rename_all = "snake_case")]
151pub enum FieldType {
152    String,
153    Number,
154    Bool,
155    Null,
156}
157
158#[derive(Default)]
159pub struct AdapterRegistry {
160    adapters: HashMap<String, Arc<dyn Adapter>>,
161}
162
163impl AdapterRegistry {
164    pub fn new() -> Self {
165        Self::default()
166    }
167
168    pub fn register(&mut self, adapter: Arc<dyn Adapter>) {
169        self.adapters.insert(adapter.name().to_string(), adapter);
170    }
171
172    pub fn get(&self, name: &str) -> Option<Arc<dyn Adapter>> {
173        self.adapters.get(name).cloned()
174    }
175
176    pub fn list(&self) -> Vec<String> {
177        let mut names: Vec<_> = self.adapters.keys().cloned().collect();
178        names.sort();
179        names
180    }
181}
182
183pub struct MockAdapter {
184    name: String,
185    delay: Duration,
186}
187
188impl MockAdapter {
189    pub fn new(name: impl Into<String>) -> Self {
190        Self {
191            name: name.into(),
192            delay: Duration::from_millis(300),
193        }
194    }
195
196    pub fn with_delay(name: impl Into<String>, delay: Duration) -> Self {
197        Self {
198            name: name.into(),
199            delay,
200        }
201    }
202}
203
204#[async_trait]
205impl Adapter for MockAdapter {
206    fn name(&self) -> &str {
207        &self.name
208    }
209
210    async fn execute(&self, node: &FlowNode) -> Result<NodeOutput, AdapterError> {
211        tokio::time::sleep(self.delay).await;
212        Ok(NodeOutput {
213            status: "succeeded".into(),
214            payload: serde_json::json!({
215                "mock": true,
216                "node_id": node.id,
217                "node_type": node.node_type,
218                "label": node.data.get("label").cloned().unwrap_or(serde_json::Value::Null)
219            }),
220        })
221    }
222}
223
224#[cfg(test)]
225mod descriptor_tests {
226    use super::*;
227
228    #[test]
229    fn default_descriptor_is_empty_for_mock_adapter() {
230        // Adapters that do not override `descriptor()` should be surfaced as
231        // placeholders by the spec compiler. The contract is "name + empty
232        // actions"; the Python renderer treats an empty action vec as the
233        // signal to skip writing `adapters/<name>.md`.
234        let mock = MockAdapter::new("mock-example");
235        let d = mock.descriptor();
236        assert_eq!(d.name, "mock-example");
237        assert!(d.actions.is_empty());
238        assert!(d.summary.is_empty());
239    }
240
241    #[test]
242    fn descriptor_round_trips_through_json() {
243        // The spec compiler dumps descriptors as JSON; this asserts the
244        // serde shape stays stable. If you rename a field, the renderer
245        // python will break - that is intentional, but make sure to bump
246        // it in the same PR.
247        let descriptor = AdapterDescriptor {
248            name: "shell".into(),
249            summary: "Run local commands.".into(),
250            actions: vec![ActionDescriptor {
251                id: "run-command".into(),
252                summary: "Run an arbitrary shell command via /bin/sh -c.".into(),
253                required_fields: vec![FieldDescriptor {
254                    name: "command".into(),
255                    value_type: FieldType::String,
256                    description: "Command line as a single string.".into(),
257                }],
258                optional_fields: vec![FieldDescriptor {
259                    name: "timeoutMs".into(),
260                    value_type: FieldType::Number,
261                    description: "Hard timeout in milliseconds (default 60000).".into(),
262                }],
263                example_dsl: r#"step[action: "echo"] { adapter: "shell", actionId: "run-command", command: "echo hi" }"#
264                    .into(),
265            }],
266        };
267        let json = serde_json::to_string(&descriptor).expect("serialize");
268        let round: AdapterDescriptor = serde_json::from_str(&json).expect("deserialize");
269        assert_eq!(round.name, descriptor.name);
270        assert_eq!(round.actions.len(), 1);
271        assert_eq!(round.actions[0].id, "run-command");
272        assert_eq!(
273            round.actions[0].required_fields[0].value_type,
274            FieldType::String
275        );
276    }
277
278    #[test]
279    fn field_type_serializes_snake_case() {
280        // The Python renderer matches on the snake_case form
281        // ("string", "number", "bool", "null"); make sure serde stays
282        // aligned.
283        assert_eq!(
284            serde_json::to_string(&FieldType::String).unwrap(),
285            "\"string\""
286        );
287        assert_eq!(
288            serde_json::to_string(&FieldType::Number).unwrap(),
289            "\"number\""
290        );
291        assert_eq!(serde_json::to_string(&FieldType::Bool).unwrap(), "\"bool\"");
292        assert_eq!(serde_json::to_string(&FieldType::Null).unwrap(), "\"null\"");
293    }
294}