Skip to content

flow-execution

flow-execution schedules a FlowGraph, dispatches nodes through adapters and AI providers, and narrates everything as a typed event stream. Hosts rarely construct the Executor directly. FlowApp does that for them, but they consume its types constantly.

pub trait EventSink: Send + Sync {
fn emit(&self, event: ExecutionEvent);
}

ExecutionEvent is a tagged union covering the full run lifecycle:

VariantMeaning
Started / DoneRun boundaries (Done carries the terminal status).
NodeStarted / NodeSucceeded / NodeFailed / NodeSkippedNode lifecycle, with per-loop iteration.
NodeLogStreamed stdout/stderr line from a streaming adapter.
ToolCallA model tool call inside an AI node’s tool loop, emitted before it runs.
Paused / Resumed / AwaitingConfirmationRun control and the destructive-action gate.
AiInvocation / AiRoutingDecision / AiReviewRequired / AiReviewResolvedThe AI audit trail: inputs, contract routing, human verdicts.
IterationCappedA bounded loop hit its visit cap.

Several composable sinks ship in the crate. These are MultiSink, StorageSink (which persists runs, steps, and AI audit records), TracingSink, CapturingSink (for tests), and NoopSink.

SharedRunControl gives every run pause, resume, and cancel, plus the review gate. The executor honors that control at node boundaries. The destructive gate and the AI review gate pause themselves and then resolve through resume-as-approve or resolve_review.

pub struct AdapterRegistry { /* name -> Arc<dyn Adapter> */ }

Action, utility, and service nodes resolve their adapter by name at dispatch time. See adapters for the trait.

An ai node with a non-empty tools array runs the in-node tool loop:

  • build_tool_specs(adapters, bound) turns adapter action descriptors into model-facing tool schemas (fs__read-file, shell__run-command, …).
  • AdapterToolDispatcher maps each model tool call back to a synthesized action node and executes it through the registered adapter, so every sandbox applies unchanged.
  • ToolObserver emits one ExecutionEvent::ToolCall per dispatched call for live UI granularity.
  • StagingToolDispatcher is the coding-agent variant. Filesystem mutations are computed into a ProposedEdit { path, kind, before, after } buffer instead of being written, and an in-turn overlay lets the model read its own pending edits. Reads and shell calls pass through for real.

run_ai_node is the executor’s single entry point for ai nodes, local and cloud:

Executor::run_ai_node(node)
├─ provider == "local" → run_local_llm_ai_node → "local" provider
│ └─ POST <endpoint>/v1/chat/completions (streamed token deltas)
│ → { text, model, finish_reason, input_tokens, output_tokens, latency_ms }
├─ otherwise (cloud):
├─ if cloud AI disabled by policy → Skipped
├─ resolve provider from registry → if absent: Skipped
├─ resolve key: OS keyring, then env-var fallback
├─ sanitize(prompt) → (sanitized, rehydration map)
├─ dispatch on task: generate | embedding | classify
├─ rehydrate response text
└─ build output payload (metadata + preview, or full content if audited)

The user-facing behavior this implements is documented in cloud providers and the local model runtime.

destructive_reason(node) flags data-loss operations (fs delete-file, destructive shell verbs). With the confirmation gate enabled the executor emits AwaitingConfirmation and blocks until the host resolves it.

verify_flow and graph checks run ahead of execution. A contract-bound ai node without a .fail edge fails pre-run validation.