LLM Streaming — Design#
Streaming output on top of LLMInferenceSpecDecodeRuntime. Per-slot chunked delivery, works for vanilla + Eagle spec-decode, text + multimodal, with cancellation and per-slot throttling.
Goals#
Per-batch-item streaming — one channel per slot, attached to
LLMGenerationRequest.Typed deltas —
StreamChunk { tokenIds, text, finished, reason }with always-well-formed UTF-8 on the wire.Burst-atomic emits — one chunk per iteration per slot, regardless of whether vanilla decode appended 1 token or spec-decode accepted N.
Batch-compaction safety — per-slot state compacts in lockstep with
tokenIdsinperformBatchEvict.Cancellation — fire-and-forget from any thread; KV released on the next eviction boundary.
Terminal-chunk guarantee — RAII finalizer ensures every attached channel unblocks on every exit path.
Zero impact on non-streaming callers — empty
streamChannelsmeans the response path is byte-identical to before.
Key decisions#
Concern |
Decision |
|---|---|
Sync primitive |
|
Detokenization |
Strategy A: per-token piece lookup + UTF-8 sanitizer. Works because |
UTF-8 safety |
|
Ownership |
|
Producer API |
Private on |
Cancellation |
|
Overflow policy |
Phase-1 unbounded |
Submission model |
Synchronous |
Data types (cpp/runtime/streaming.h)#
enum class FinishReason : uint8_t { kNotFinished = 0, kEndId = 1, kLength = 2, kCancelled = 3, kError = 4, kStopWords = 5 };
struct StreamChunk {
std::vector<int32_t> tokenIds; // delta tokens since last chunk
std::string text; // delta text, always well-formed UTF-8
bool finished{false}; // true for the terminal chunk
FinishReason reason{FinishReason::kNotFinished};
};
class StreamChannel {
public:
static std::shared_ptr<StreamChannel> create();
// Consumer API
template <typename Handler>
void consume(Handler&& handler, std::chrono::milliseconds poll = 100ms);
std::optional<StreamChunk> tryPop();
std::optional<StreamChunk> waitPop(std::chrono::milliseconds timeout);
void cancel() noexcept;
void setStreamInterval(int32_t n) noexcept;
// Status queries (atomic reads, any thread)
bool isFinished() const noexcept;
FinishReason getReason() const noexcept;
int32_t getOriginalBatchIdx() const noexcept;
bool isCancelled() const noexcept;
int32_t getStreamInterval() const noexcept;
private:
// Producer API is private; friends below are the only callers.
void push(StreamChunk);
void finish(FinishReason);
void setOriginalBatchIdx(int32_t) noexcept;
friend class StreamChannelFinalizer;
friend void attachStreamChannel(std::shared_ptr<StreamChannel> const&, int32_t);
friend bool validateStreamingSubmission(LLMGenerationRequest const&);
friend void applyCancellationToFinishStates(SpecDecodeInferenceContext&);
friend void emitChunks(SpecDecodeInferenceContext&, tokenizer::Tokenizer const&);
// ... private state: mutex, cv, deque, atomics
};
// Per-slot detokenization state, compacted in lockstep with context.tokenIds.
struct SlotStreamState {
std::shared_ptr<StreamChannel> channel;
size_t sentTokenCount{0};
size_t lastEmittedTokenCount{0};
std::string pendingBytes;
FinishReason terminalReason{FinishReason::kNotFinished};
};
Runtime integration — handleRequest flow#
Five insertion points plus one RAII guard:
M5 submission validation —
validateStreamingSubmission(request)rejects mismatched sizes, already-finished channels, or channels concurrently attached to another request.Setup (after
setUpForPrefillExecution) — loop over slots, callattachStreamChannel(ch, originalIdx), record the channel incontext.slotStreams[i], seedsentTokenCount = context.tokenIds[i].size()so streaming emits only generated tokens. ConstructStreamChannelFinalizer.Post-prefill —
applyCancellationToFinishStates(context)(cancel wins over natural finish) →updateFinishStates(latcheskEndId/kLengthatomically with thefinishedStates[i]flip) →emitChunks(context, *mTokenizer).Per iteration — same ordering at the top of every main-loop iteration, before
performBatchEvict.In
performBatchEvict—rt::compactVector(batchMapping, context.slotStreams)runs alongside the existingcompactVectorcalls.
The StreamChannelFinalizer destructor runs on every exit path from handleRequest (normal return, error return, exception). For every channel not already finalized, it tries to push a terminal chunk with any un-emitted tokens/text (sanitized + flushed) and reason=kError, then calls finish(kError). finish is idempotent and no-throw; chunk assembly can swallow OOM.
Terminal-reason latching (race-free by construction)#
terminalReason is written exactly once per slot, at the instruction that flips finishedStates[i] from 0 to 1. Two writers, guarded by !finishedStates[i]:
applyCancellationToFinishStates— runs at iteration top. WriteskCancelled.updateFinishStates— runs after the engine step. WriteskEndIdorkLength.
First writer wins; emitChunks reads the latched reason on the same (runtime) thread that wrote it — no synchronization needed. Consumer-side reads use StreamChannel::getReason() which pairs acquire/release with finish().
Detokenization algorithm (Strategy A)#
Per iteration, per slot with a channel:
newlyAvailable = context.tokenIds[i].size() - sentTokenCount.If not final and
newlyAvailable < streamInterval, skip.Concatenate piece bytes:
raw = concat(idToPiece(t) for t in tokenIds[i][sentTokenCount..end]). AdvancesentTokenCount.delta = sanitizeUtf8Streaming(raw, pendingBytes)— replaces invalid bytes with U+FFFD; trailing incompletes go topendingBytes.On the final chunk:
delta += sanitizeUtf8Flush(pendingBytes).If
deltanon-empty OR final, pushStreamChunk { tokenIds[lastEmitted..sent), delta, finished, reason }and updatelastEmittedTokenCount.
Key property: output is always well-formed UTF-8, whether the input contained valid multi-token codepoints, adversarial isolated continuation bytes, or both.
Non-streaming back-compat#
When request.streamChannels.empty(), slotStreams[i].channel is null, all five insertion points short-circuit. Zero overhead on the non-streaming path. One intentional content-level change: Tokenizer::decode routes output through sanitizeUtf8Streaming + Flush so invalid-byte adversarial outputs surface as U+FFFD in response.outputTexts — a latent-bug fix, bytes are identical for all valid outputs.
Non-goals (deferred)#
HTTP/SSE/gRPC server surface.
Worker-thread submit/run split (Phase 2 —
submitRequest(...) → future<StreamChannel>).Logprobs,
n > 1parallel samples, tool-calling deltas.Stop-string prefix hold-back (reserved field in
SlotStreamState).
Code layout#
File |
Role |
|---|---|
|
|
|
|
|
Wires streaming hooks into |
|
|
|
|
|
Interactive demo (JSON input, live chunk printing, hotkey control) |
|
51 CPU-only invariant tests |
|
4 engine-backed scenario tests |
User guide#
See docs/source/user_guide/features/streaming.md for consumer-side usage and examples.