Streaming#
-
class StreamChannel#
Per-slot streaming channel.
Encapsulated MPSC pipe between the runtime (producer) and a single consumer. State is private; consumer interacts via consume()/waitPop()/tryPop()/cancel(). Runtime-side operations (push, finish, setOriginalBatchIdx) are accessed via friendship so consumers cannot hold the lock or skip chunks.
Public Functions
-
StreamChannel(StreamChannel const&) = delete#
-
StreamChannel &operator=(StreamChannel const&) = delete#
-
StreamChannel(StreamChannel&&) = delete#
-
StreamChannel &operator=(StreamChannel&&) = delete#
-
~StreamChannel() noexcept = default#
-
template<typename Handler>
void consume( - Handler &&handler,
- std::chrono::milliseconds poll = std::chrono::milliseconds{100}
Block until
finishedorcancelled, delivering every chunk.Exits when
pendingis empty AND eitherfinishedorcancelledis set. Lock is held only for pop;handlerruns with the mutex released.- Template Parameters:
Handler – Callable with signature
void(StreamChunk&&).- Parameters:
handler – Invoked once per chunk delivered.
poll – Max interval between wakeups (defaults to 100ms).
-
std::optional<StreamChunk> tryPop()#
Non-blocking single-chunk pop.
- std::optional<StreamChunk> waitPop(
- std::chrono::milliseconds timeout
Blocking single-chunk pop with timeout.
Returns std::nullopt on timeout, or when woken by finish()/cancel() with empty deque. Callers that need the terminal signal should combine waitPop() with isFinished()/isCancelled().
-
bool isFinished() const noexcept#
-
FinishReason getReason() const noexcept#
-
int32_t getOriginalBatchIdx() const noexcept#
-
bool isCancelled() const noexcept#
-
void cancel() noexcept#
Fire-and-forget cancellation. Safe from any thread. Wakes blocked consume/waitPop.
-
void setStreamInterval(int32_t n) noexcept#
Per-request emit throttle. Values < 1 are clamped to 1. Default 1.
-
int32_t getStreamInterval() const noexcept#
-
void setSkipSpecialTokens(bool skip) noexcept#
Filter special tokens (EOS,
<|im_end|>,<think>, vision placeholders, etc.) out ofchunk.text. Defaulttrue.chunk.tokenIdsis unaffected either way. Set before the channel is attached.
-
bool getSkipSpecialTokens() const noexcept#
Public Static Functions
-
static std::shared_ptr<StreamChannel> create()#
Factory — only way to construct. Enforces shared_ptr ownership.
-
StreamChannel(StreamChannel const&) = delete#
-
class StreamChannelFinalizer#
RAII guard that guarantees every attached StreamChannel terminates.
On destruction, for every slot whose channel has not yet been finalized the guard:
Tries to build a terminal chunk containing any un-emitted tokens + text (sanitized + flushed to U+FFFD) with reason=kError and push()es it.
Calls finish(kError) — idempotent, no-throw — to guarantee the consumer unblocks even if (1) failed under OOM.
Public Functions
- StreamChannelFinalizer(
- SpecDecodeInferenceContext &ctx,
- tokenizer::Tokenizer const &tok
-
~StreamChannelFinalizer() noexcept#
-
StreamChannelFinalizer(StreamChannelFinalizer const&) = delete#
- StreamChannelFinalizer &operator=(
- StreamChannelFinalizer const&
-
StreamChannelFinalizer(StreamChannelFinalizer&&) = delete#
-
StreamChannelFinalizer &operator=(StreamChannelFinalizer&&) = delete#
-
struct StreamChunk#
Single delta chunk on a streaming channel.
textis always well-formed UTF-8 when pushed by the runtime — invalid byte sequences are replaced with U+FFFD before emission. Emptytextchunks are legal (e.g., terminal chunks after all bytes were emitted).Public Members
-
std::vector<int32_t> tokenIds#
Delta tokens since last chunk (may be >1 under spec-decode).
-
std::string text#
Delta text; always well-formed UTF-8.
-
bool finished = {false}#
True for the final chunk on this channel.
-
FinishReason reason = {FinishReason::kNotFinished}#
Terminal reason (only meaningful when
finished==true).
-
std::vector<int32_t> tokenIds#
-
struct SlotStreamState#
Per-slot detokenization and streaming state.
Lives in SpecDecodeInferenceContext, compacted in lockstep with tokenIds.
Public Members
-
std::shared_ptr<StreamChannel> channel#
Channel (null ⇒ streaming disabled for this slot).
-
size_t sentTokenCount = {0}#
Count of tokens whose piece bytes have been fed through emitDelta.
-
size_t lastEmittedTokenCount = {0}#
sentTokenCount at last push (for streamInterval gating).
-
std::string pendingBytes#
Trailing incomplete UTF-8 bytes carried across iterations.
-
FinishReason terminalReason = {FinishReason::kNotFinished}#
Terminal reason latched at the moment finishedStates[i] flips to 1. One writer (the code that flips the state), one reader (the emit hook).
-
std::shared_ptr<StreamChannel> channel#
- std::shared_ptr<StreamChannel> const &channel,
- int32_t originalIdx
- bool trt_edgellm::rt::validateStreamingSubmission(
- LLMGenerationRequest const &request
- void trt_edgellm::rt::applyCancellationToFinishStates(
- SpecDecodeInferenceContext &context
- void trt_edgellm::rt::emitChunks(
- SpecDecodeInferenceContext &context,
- tokenizer::Tokenizer const &tok