1use async_trait::async_trait;
2use chrono::Utc;
3use flow_domain::graph::FlowNode;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::Duration;
9use thiserror::Error;
10
11use crate::events::{EventSink, ExecutionEvent, LogStream};
12
13#[derive(Debug, Error)]
14pub enum AdapterError {
15 #[error("adapter '{0}' not found")]
16 NotFound(String),
17 #[error("adapter '{adapter}' rejected node {node_id}: {reason}")]
18 Rejected {
19 adapter: String,
20 node_id: String,
21 reason: String,
22 },
23 #[error("adapter '{adapter}' failed on node {node_id}: {reason}")]
24 Failed {
25 adapter: String,
26 node_id: String,
27 reason: String,
28 },
29}
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct NodeOutput {
33 pub status: String,
34 pub payload: serde_json::Value,
35}
36
37pub struct AdapterCtx {
41 pub events: Arc<dyn EventSink>,
42 pub execution_id: String,
43 pub node_id: String,
44 pub workspace_root: PathBuf,
47}
48
49impl AdapterCtx {
50 pub fn emit_line(&self, stream: LogStream, line: impl Into<String>) {
52 self.events.emit(ExecutionEvent::NodeLog {
53 execution_id: self.execution_id.clone(),
54 node_id: self.node_id.clone(),
55 at: Utc::now(),
56 stream,
57 line: line.into(),
58 });
59 }
60}
61
62#[async_trait]
64pub trait Adapter: Send + Sync {
65 fn name(&self) -> &str;
66 async fn execute(&self, node: &FlowNode) -> Result<NodeOutput, AdapterError>;
67
68 async fn execute_with_events(
74 &self,
75 node: &FlowNode,
76 _ctx: &AdapterCtx,
77 ) -> Result<NodeOutput, AdapterError> {
78 self.execute(node).await
79 }
80
81 fn descriptor(&self) -> AdapterDescriptor {
93 AdapterDescriptor {
94 name: self.name().to_string(),
95 summary: String::new(),
96 actions: Vec::new(),
97 }
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct AdapterDescriptor {
109 pub name: String,
111 pub summary: String,
114 pub actions: Vec<ActionDescriptor>,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub struct ActionDescriptor {
123 pub id: String,
125 pub summary: String,
127 pub required_fields: Vec<FieldDescriptor>,
129 pub optional_fields: Vec<FieldDescriptor>,
132 pub example_dsl: String,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct FieldDescriptor {
140 pub name: String,
141 pub value_type: FieldType,
142 pub description: String,
143}
144
145#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
150#[serde(rename_all = "snake_case")]
151pub enum FieldType {
152 String,
153 Number,
154 Bool,
155 Null,
156}
157
158#[derive(Default)]
159pub struct AdapterRegistry {
160 adapters: HashMap<String, Arc<dyn Adapter>>,
161}
162
163impl AdapterRegistry {
164 pub fn new() -> Self {
165 Self::default()
166 }
167
168 pub fn register(&mut self, adapter: Arc<dyn Adapter>) {
169 self.adapters.insert(adapter.name().to_string(), adapter);
170 }
171
172 pub fn get(&self, name: &str) -> Option<Arc<dyn Adapter>> {
173 self.adapters.get(name).cloned()
174 }
175
176 pub fn list(&self) -> Vec<String> {
177 let mut names: Vec<_> = self.adapters.keys().cloned().collect();
178 names.sort();
179 names
180 }
181}
182
183pub struct MockAdapter {
184 name: String,
185 delay: Duration,
186}
187
188impl MockAdapter {
189 pub fn new(name: impl Into<String>) -> Self {
190 Self {
191 name: name.into(),
192 delay: Duration::from_millis(300),
193 }
194 }
195
196 pub fn with_delay(name: impl Into<String>, delay: Duration) -> Self {
197 Self {
198 name: name.into(),
199 delay,
200 }
201 }
202}
203
204#[async_trait]
205impl Adapter for MockAdapter {
206 fn name(&self) -> &str {
207 &self.name
208 }
209
210 async fn execute(&self, node: &FlowNode) -> Result<NodeOutput, AdapterError> {
211 tokio::time::sleep(self.delay).await;
212 Ok(NodeOutput {
213 status: "succeeded".into(),
214 payload: serde_json::json!({
215 "mock": true,
216 "node_id": node.id,
217 "node_type": node.node_type,
218 "label": node.data.get("label").cloned().unwrap_or(serde_json::Value::Null)
219 }),
220 })
221 }
222}
223
224#[cfg(test)]
225mod descriptor_tests {
226 use super::*;
227
228 #[test]
229 fn default_descriptor_is_empty_for_mock_adapter() {
230 let mock = MockAdapter::new("mock-example");
235 let d = mock.descriptor();
236 assert_eq!(d.name, "mock-example");
237 assert!(d.actions.is_empty());
238 assert!(d.summary.is_empty());
239 }
240
241 #[test]
242 fn descriptor_round_trips_through_json() {
243 let descriptor = AdapterDescriptor {
248 name: "shell".into(),
249 summary: "Run local commands.".into(),
250 actions: vec![ActionDescriptor {
251 id: "run-command".into(),
252 summary: "Run an arbitrary shell command via /bin/sh -c.".into(),
253 required_fields: vec![FieldDescriptor {
254 name: "command".into(),
255 value_type: FieldType::String,
256 description: "Command line as a single string.".into(),
257 }],
258 optional_fields: vec![FieldDescriptor {
259 name: "timeoutMs".into(),
260 value_type: FieldType::Number,
261 description: "Hard timeout in milliseconds (default 60000).".into(),
262 }],
263 example_dsl: r#"step[action: "echo"] { adapter: "shell", actionId: "run-command", command: "echo hi" }"#
264 .into(),
265 }],
266 };
267 let json = serde_json::to_string(&descriptor).expect("serialize");
268 let round: AdapterDescriptor = serde_json::from_str(&json).expect("deserialize");
269 assert_eq!(round.name, descriptor.name);
270 assert_eq!(round.actions.len(), 1);
271 assert_eq!(round.actions[0].id, "run-command");
272 assert_eq!(
273 round.actions[0].required_fields[0].value_type,
274 FieldType::String
275 );
276 }
277
278 #[test]
279 fn field_type_serializes_snake_case() {
280 assert_eq!(
284 serde_json::to_string(&FieldType::String).unwrap(),
285 "\"string\""
286 );
287 assert_eq!(
288 serde_json::to_string(&FieldType::Number).unwrap(),
289 "\"number\""
290 );
291 assert_eq!(serde_json::to_string(&FieldType::Bool).unwrap(), "\"bool\"");
292 assert_eq!(serde_json::to_string(&FieldType::Null).unwrap(), "\"null\"");
293 }
294}