mod stream#
- module stream#
Streaming LLM response wrapper.
This module provides
LlmStreamWrapper, aStreamadapter that sits between the raw stream from an LLM API and the consumer. It feeds chunks to a user-supplied collector, and automatically emits lifecycle events when the stream ends.Pipeline
raw chunk (Json) -> collector(chunk) -> Ok(()) -> yield chunk -> Err(e) -> terminate stream with error upstream error -> terminate stream with error -> finalizer() -> Json -> SanitizeResponseGuardrails -> END event stream ends -> finalizer() -> Json -> SanitizeResponseGuardrails -> END eventThe collector receives each chunk (Json) and can accumulate state (e.g., concatenating tokens). If the collector returns
Err, the stream terminates immediately with that error. Upstream stream errors also terminate the stream immediately. The finalizer is called once when the stream terminates and returns the aggregated response asJson. That aggregated response then flows through sanitize response guardrails before being included in the END event.Structs and Unions
- struct LlmStreamWrapper#
Wraps an inner
Stream<Item = Result<Json>>of raw chunks and:Passes each chunk to the user-supplied collector closure. If the collector returns
Err, the stream terminates with that error.On stream exhaustion, calls the finalizer to produce an aggregated
Jsonresponse, runs sanitize response guardrails on it, then emits the LLM END event.
This type is returned by
crate::api::llm::llm_stream_call_executeand is usually consumed as an ordinary async stream. The wrapper preserves the originating scope stack so end-of-stream bookkeeping still uses the correct scope-local middleware and subscribers even when polling happens elsewhere.Implementations
- impl LlmStreamWrapper#
Functions
- fn new(inner: Pin<Box<dyn Stream<Item = Result<Json>> + Send>>, handle: LlmHandle, collector: Box<dyn FnMut(Json) -> Result<()> + Send>, finalizer: Box<dyn FnOnce) -> Json + Send>, _data: Option<Json>, metadata: Option<Json>, response_codec: Option<Arc<dyn LlmResponseCodec>>) -> Self#
Create a new
LlmStreamWrapperaround the given raw stream.Captures the current
ScopeStackHandleat creation time so the correct scope stack is used when the stream is later polled, even if polling happens on a different task or thread.Parameters
inner: Raw stream of JSON chunks from the provider callback.handle:LlmHandleidentifying the managed LLM span.collector: Per-chunk callback used to accumulate stream state or forward chunks elsewhere. ReturningErrterminates the stream.finalizer: One-shot callback invoked when the stream finishes to synthesize the aggregated response payload.data: Retained compatibility payload; ATOF end data is the finalized response.metadata: Optional event metadata merged into the emitted LLM-end event.response_codec: Optional codec used to derive annotated response metadata from the aggregated final payload.
Returns
A new
LlmStreamWrapperready to be polled.
- fn scope_stack(&self) -> &ScopeStackHandle#
Return the captured scope stack handle for this stream.
Callers can use this to bind the correct scope stack when spawning the stream on a different task via
TASK_SCOPE_STACK.scope(...).Returns
A shared reference to the
ScopeStackHandlecaptured when the stream wrapper was created.
Traits implemented
- impl Stream for LlmStreamWrapper#