Skip to main content

flow_adapter_ai/
stream.rs

1//! LLM token streaming.
2//!
3//! Providers that support real streaming override `CloudAiProvider::invoke_stream`
4//! to push deltas through a [`LlmStreamSink`] as they arrive; providers without
5//! streaming support inherit the default impl, which calls `invoke` and emits a
6//! single final delta + `done = true`. That keeps the call signature uniform so
7//! the executor + agentic path can opt into streaming without per-provider
8//! branching, and the frontend chip ticks something useful in both cases.
9
10use async_trait::async_trait;
11
12use crate::CloudAiResponse;
13
14/// A single chunk of generated text emitted as the model produces tokens.
15///
16/// `delta` is the **new** text since the previous event; the consumer is
17/// responsible for accumulating if it needs the full response. `done` is set
18/// exactly once per call - either with the final partial delta or on its own
19/// with an empty `delta` - and signals that the stream is closed.
20///
21/// `error` is non-empty when the call failed mid-stream; `done` will also be
22/// true in that case. Successful calls leave `error` `None`.
23#[derive(Debug, Clone)]
24pub struct LlmStreamEvent {
25    /// Caller-supplied id correlating this event to a specific call. Lets the
26    /// frontend group deltas when multiple LLM calls overlap.
27    pub call_id: String,
28    /// Provider name (e.g. `"local"`, `"claude"`).
29    pub provider: String,
30    /// Model id the provider reported (may differ from the requested id when
31    /// the server aliases). Empty until the first chunk arrives.
32    pub model: String,
33    /// New text since the previous event.
34    pub delta: String,
35    /// True on the final event (either with the last delta or empty).
36    pub done: bool,
37    /// Set when the call failed mid-stream; the consumer should surface this
38    /// to the user (e.g. an inline error tag on the chip).
39    pub error: Option<String>,
40}
41
42impl LlmStreamEvent {
43    /// Construct the single `done` event the default `invoke_stream` impl
44    /// emits when wrapping a non-streaming `invoke` call.
45    pub fn final_delta(call_id: &str, resp: &CloudAiResponse) -> Self {
46        Self {
47            call_id: call_id.to_string(),
48            provider: resp.provider.clone(),
49            model: resp.model.clone(),
50            delta: resp.text.clone(),
51            done: true,
52            error: None,
53        }
54    }
55
56    /// Construct a terminal error event so consumers know the stream is over.
57    pub fn error(call_id: &str, provider: &str, error: String) -> Self {
58        Self {
59            call_id: call_id.to_string(),
60            provider: provider.to_string(),
61            model: String::new(),
62            delta: String::new(),
63            done: true,
64            error: Some(error),
65        }
66    }
67}
68
69/// Async fan-out target for `LlmStreamEvent`s. Providers receive a `&dyn
70/// LlmStreamSink`, call `emit` once per chunk, and the caller decides what
71/// happens with the events (Tauri broadcasts to the frontend; tests collect
72/// into a Vec).
73#[async_trait]
74pub trait LlmStreamSink: Send + Sync {
75    async fn emit(&self, event: LlmStreamEvent);
76}
77
78/// No-op sink. Useful in headless tests + as the default `Executor` field
79/// value so `Executor::run` doesn't require a real sink when nobody cares
80/// about streaming.
81pub struct NullStreamSink;
82
83#[async_trait]
84impl LlmStreamSink for NullStreamSink {
85    async fn emit(&self, _event: LlmStreamEvent) {}
86}
87
88/// Buffering sink used by tests. Captures every event so assertions can
89/// inspect deltas, done-flags, and errors after the fact.
90#[derive(Default)]
91pub struct CapturingStreamSink {
92    pub events: std::sync::Mutex<Vec<LlmStreamEvent>>,
93}
94
95#[async_trait]
96impl LlmStreamSink for CapturingStreamSink {
97    async fn emit(&self, event: LlmStreamEvent) {
98        self.events.lock().unwrap().push(event);
99    }
100}