flow_adapter_ai/stream.rs
1//! LLM token streaming.
2//!
3//! Providers that support real streaming override `CloudAiProvider::invoke_stream`
4//! to push deltas through a [`LlmStreamSink`] as they arrive; providers without
5//! streaming support inherit the default impl, which calls `invoke` and emits a
6//! single final delta + `done = true`. That keeps the call signature uniform so
7//! the executor + agentic path can opt into streaming without per-provider
8//! branching, and the frontend chip ticks something useful in both cases.
9
10use async_trait::async_trait;
11
12use crate::CloudAiResponse;
13
14/// A single chunk of generated text emitted as the model produces tokens.
15///
16/// `delta` is the **new** text since the previous event; the consumer is
17/// responsible for accumulating if it needs the full response. `done` is set
18/// exactly once per call - either with the final partial delta or on its own
19/// with an empty `delta` - and signals that the stream is closed.
20///
21/// `error` is non-empty when the call failed mid-stream; `done` will also be
22/// true in that case. Successful calls leave `error` `None`.
23#[derive(Debug, Clone)]
24pub struct LlmStreamEvent {
25 /// Caller-supplied id correlating this event to a specific call. Lets the
26 /// frontend group deltas when multiple LLM calls overlap.
27 pub call_id: String,
28 /// Provider name (e.g. `"local"`, `"claude"`).
29 pub provider: String,
30 /// Model id the provider reported (may differ from the requested id when
31 /// the server aliases). Empty until the first chunk arrives.
32 pub model: String,
33 /// New text since the previous event.
34 pub delta: String,
35 /// True on the final event (either with the last delta or empty).
36 pub done: bool,
37 /// Set when the call failed mid-stream; the consumer should surface this
38 /// to the user (e.g. an inline error tag on the chip).
39 pub error: Option<String>,
40}
41
42impl LlmStreamEvent {
43 /// Construct the single `done` event the default `invoke_stream` impl
44 /// emits when wrapping a non-streaming `invoke` call.
45 pub fn final_delta(call_id: &str, resp: &CloudAiResponse) -> Self {
46 Self {
47 call_id: call_id.to_string(),
48 provider: resp.provider.clone(),
49 model: resp.model.clone(),
50 delta: resp.text.clone(),
51 done: true,
52 error: None,
53 }
54 }
55
56 /// Construct a terminal error event so consumers know the stream is over.
57 pub fn error(call_id: &str, provider: &str, error: String) -> Self {
58 Self {
59 call_id: call_id.to_string(),
60 provider: provider.to_string(),
61 model: String::new(),
62 delta: String::new(),
63 done: true,
64 error: Some(error),
65 }
66 }
67}
68
69/// Async fan-out target for `LlmStreamEvent`s. Providers receive a `&dyn
70/// LlmStreamSink`, call `emit` once per chunk, and the caller decides what
71/// happens with the events (Tauri broadcasts to the frontend; tests collect
72/// into a Vec).
73#[async_trait]
74pub trait LlmStreamSink: Send + Sync {
75 async fn emit(&self, event: LlmStreamEvent);
76}
77
78/// No-op sink. Useful in headless tests + as the default `Executor` field
79/// value so `Executor::run` doesn't require a real sink when nobody cares
80/// about streaming.
81pub struct NullStreamSink;
82
83#[async_trait]
84impl LlmStreamSink for NullStreamSink {
85 async fn emit(&self, _event: LlmStreamEvent) {}
86}
87
88/// Buffering sink used by tests. Captures every event so assertions can
89/// inspect deltas, done-flags, and errors after the fact.
90#[derive(Default)]
91pub struct CapturingStreamSink {
92 pub events: std::sync::Mutex<Vec<LlmStreamEvent>>,
93}
94
95#[async_trait]
96impl LlmStreamSink for CapturingStreamSink {
97 async fn emit(&self, event: LlmStreamEvent) {
98 self.events.lock().unwrap().push(event);
99 }
100}