mod streaming#

module streaming#

Streaming response codecs for the managed LLM execution pipeline.

LlmResponseCodec (in crate::codec::traits) decodes a complete provider response into a normalized AnnotatedLlmResponse. For streaming providers, the analogous job is to:

  1. consume per-chunk events as they arrive on a streaming HTTP response, and

  2. assemble a single non-streaming-shape JSON payload at end of stream.

Once assembled, the payload can be fed back through the matching LlmResponseCodec to produce an AnnotatedLlmResponse — meaning streaming and non-streaming requests converge on the same observability output without per-route shape duplication.

StreamingCodec is the trait that bundles the two functions (LlmCollectorFn, LlmFinalizerFn) used by crate::api::llm::llm_stream_call_execute. Each provider supplies one impl whose internal state holds whatever incremental information is needed to materialize the final payload.

Traits

trait StreamingCodec#

Per-provider streaming codec used with crate::api::llm::llm_stream_call_execute.

collector() and finalizer() produce owned closures that share the codec’s internal accumulation state. Implementations typically wrap that state in Arc<Mutex<...>> so each &self-produced closure captures a clone of the handle.

LlmFinalizerFn is FnOnce, so a StreamingCodec instance is single-use: callers construct a fresh instance per managed-lifecycle call and discard it after the stream completes.

Functions

fn collector(&self) -> LlmCollectorFn#

Returns a closure that consumes one decoded provider event per call.

fn finalizer(&self) -> LlmFinalizerFn#

Returns a closure that, when called once at end of stream, produces the assembled response payload in the shape the matching crate::codec::traits::LlmResponseCodec can decode.

Structs and Unions

struct SseEvent#

One decoded SSE frame, paired with the parsed data: payload.

event: Option<String>#

Value of the event: line if present.

data: Json#

Parsed JSON payload from the data: line(s).

struct SseEventDecoder#

Incremental decoder for text/event-stream byte streams that yields one JSON object per complete data: payload.

SSE frames are separated by blank lines (\n\n); each frame may contain event: and data: lines. Anthropic Messages, OpenAI Responses, and OpenAI Chat Completions all emit one JSON object per data: line, so the decoder buffers received bytes, splits on frame boundaries, parses the JSON payload, and tags it with the frame’s event name when present.

The decoder is byte-stream-friendly: it accumulates partial frames across chunks and emits completed frames only when their terminating blank line arrives. Bytes after the last terminator are retained for the next call.

Implementations

impl SseEventDecoder#

Functions

fn finish(mut self) -> Result<Option<SseEvent>>#

Drains any remaining buffered frame at end of stream.

Most well-formed SSE streams end with a terminating blank line, in which case this returns Ok(None). Stops with no terminator are surfaced as a final partial frame so observability captures the last bytes the upstream sent before disconnect.

fn new() -> Self#

Creates a new decoder with an empty buffer.

fn push_bytes(&mut self, bytes: &[u8]) -> Result<Vec<SseEvent>>#

Appends bytes to the internal buffer and returns every now-complete SSE event.

Bytes are interpreted as UTF-8 with replacement characters for invalid sequences; provider SSE streams are well-formed UTF-8 in practice, but lossy decoding keeps the decoder honest rather than failing on a single corrupt chunk.

Returns Ok(events) containing zero or more events whose data: payloads parsed successfully. Frames whose data: line is non-empty but does not parse as JSON are surfaced as FlowError::Internal so the caller can decide whether to abort the stream or skip the frame; frames with no data: line at all (e.g. SSE heartbeats) are silently dropped.