mod streaming#
- module streaming#
Streaming response codecs for the managed LLM execution pipeline.
LlmResponseCodec(incrate::codec::traits) decodes a complete provider response into a normalizedAnnotatedLlmResponse. For streaming providers, the analogous job is to:consume per-chunk events as they arrive on a streaming HTTP response, and
assemble a single non-streaming-shape JSON payload at end of stream.
Once assembled, the payload can be fed back through the matching
LlmResponseCodecto produce anAnnotatedLlmResponse— meaning streaming and non-streaming requests converge on the same observability output without per-route shape duplication.StreamingCodecis the trait that bundles the two functions (LlmCollectorFn,LlmFinalizerFn) used bycrate::api::llm::llm_stream_call_execute. Each provider supplies one impl whose internal state holds whatever incremental information is needed to materialize the final payload.Traits
- trait StreamingCodec#
Per-provider streaming codec used with
crate::api::llm::llm_stream_call_execute.collector()andfinalizer()produce owned closures that share the codec’s internal accumulation state. Implementations typically wrap that state inArc<Mutex<...>>so each&self-produced closure captures a clone of the handle.LlmFinalizerFnisFnOnce, so aStreamingCodecinstance is single-use: callers construct a fresh instance per managed-lifecycle call and discard it after the stream completes.Functions
- fn collector(&self) -> LlmCollectorFn#
Returns a closure that consumes one decoded provider event per call.
- fn finalizer(&self) -> LlmFinalizerFn#
Returns a closure that, when called once at end of stream, produces the assembled response payload in the shape the matching
crate::codec::traits::LlmResponseCodeccan decode.
Structs and Unions
- struct SseEvent#
One decoded SSE frame, paired with the parsed
data:payload.- event: Option<String>#
Value of the
event:line if present.
- struct SseEventDecoder#
Incremental decoder for
text/event-streambyte streams that yields one JSON object per completedata:payload.SSE frames are separated by blank lines (
\n\n); each frame may containevent:anddata:lines. Anthropic Messages, OpenAI Responses, and OpenAI Chat Completions all emit one JSON object perdata:line, so the decoder buffers received bytes, splits on frame boundaries, parses the JSON payload, and tags it with the frame’s event name when present.The decoder is byte-stream-friendly: it accumulates partial frames across chunks and emits completed frames only when their terminating blank line arrives. Bytes after the last terminator are retained for the next call.
Implementations
- impl SseEventDecoder#
Functions
- fn finish(mut self) -> Result<Option<SseEvent>>#
Drains any remaining buffered frame at end of stream.
Most well-formed SSE streams end with a terminating blank line, in which case this returns
Ok(None). Stops with no terminator are surfaced as a final partial frame so observability captures the last bytes the upstream sent before disconnect.
- fn new() -> Self#
Creates a new decoder with an empty buffer.
- fn push_bytes(&mut self, bytes: &[u8]) -> Result<Vec<SseEvent>>#
Appends
bytesto the internal buffer and returns every now-complete SSE event.Bytes are interpreted as UTF-8 with replacement characters for invalid sequences; provider SSE streams are well-formed UTF-8 in practice, but lossy decoding keeps the decoder honest rather than failing on a single corrupt chunk.
Returns
Ok(events)containing zero or more events whosedata:payloads parsed successfully. Frames whosedata:line is non-empty but does not parse as JSON are surfaced asFlowError::Internalso the caller can decide whether to abort the stream or skip the frame; frames with nodata:line at all (e.g. SSE heartbeats) are silently dropped.