Streaming#

class StreamChannel#

Per-slot streaming channel.

Encapsulated MPSC pipe between the runtime (producer) and a single consumer. State is private; consumer interacts via consume()/waitPop()/tryPop()/cancel(). Runtime-side operations (push, finish, setOriginalBatchIdx) are accessed via friendship so consumers cannot hold the lock or skip chunks.

Public Functions

StreamChannel(StreamChannel const&) = delete#
StreamChannel &operator=(StreamChannel const&) = delete#
StreamChannel(StreamChannel&&) = delete#
StreamChannel &operator=(StreamChannel&&) = delete#
~StreamChannel() noexcept = default#
template<typename Handler>
void consume(
Handler &&handler,
std::chrono::milliseconds poll = std::chrono::milliseconds{100}
)#

Block until finished or cancelled, delivering every chunk.

Exits when pending is empty AND either finished or cancelled is set. Lock is held only for pop; handler runs with the mutex released.

Template Parameters:

Handler – Callable with signature void(StreamChunk&&).

Parameters:
  • handler – Invoked once per chunk delivered.

  • poll – Max interval between wakeups (defaults to 100ms).

std::optional<StreamChunk> tryPop()#

Non-blocking single-chunk pop.

std::optional<StreamChunk> waitPop(
std::chrono::milliseconds timeout
)#

Blocking single-chunk pop with timeout.

Returns std::nullopt on timeout, or when woken by finish()/cancel() with empty deque. Callers that need the terminal signal should combine waitPop() with isFinished()/isCancelled().

bool isFinished() const noexcept#
FinishReason getReason() const noexcept#
int32_t getOriginalBatchIdx() const noexcept#
bool isCancelled() const noexcept#
void cancel() noexcept#

Fire-and-forget cancellation. Safe from any thread. Wakes blocked consume/waitPop.

void setStreamInterval(int32_t n) noexcept#

Per-request emit throttle. Values < 1 are clamped to 1. Default 1.

int32_t getStreamInterval() const noexcept#
void setSkipSpecialTokens(bool skip) noexcept#

Filter special tokens (EOS, <|im_end|>, <think>, vision placeholders, etc.) out of chunk.text. Default true. chunk.tokenIds is unaffected either way. Set before the channel is attached.

bool getSkipSpecialTokens() const noexcept#

Public Static Functions

static std::shared_ptr<StreamChannel> create()#

Factory — only way to construct. Enforces shared_ptr ownership.

class StreamChannelFinalizer#

RAII guard that guarantees every attached StreamChannel terminates.

On destruction, for every slot whose channel has not yet been finalized the guard:

  1. Tries to build a terminal chunk containing any un-emitted tokens + text (sanitized + flushed to U+FFFD) with reason=kError and push()es it.

  2. Calls finish(kError) — idempotent, no-throw — to guarantee the consumer unblocks even if (1) failed under OOM.

Public Functions

StreamChannelFinalizer(
SpecDecodeInferenceContext &ctx,
tokenizer::Tokenizer const &tok
) noexcept#
~StreamChannelFinalizer() noexcept#
StreamChannelFinalizer(StreamChannelFinalizer const&) = delete#
StreamChannelFinalizer &operator=(
StreamChannelFinalizer const&
) = delete#
StreamChannelFinalizer(StreamChannelFinalizer&&) = delete#
StreamChannelFinalizer &operator=(StreamChannelFinalizer&&) = delete#
struct StreamChunk#

Single delta chunk on a streaming channel.

text is always well-formed UTF-8 when pushed by the runtime — invalid byte sequences are replaced with U+FFFD before emission. Empty text chunks are legal (e.g., terminal chunks after all bytes were emitted).

Public Members

std::vector<int32_t> tokenIds#

Delta tokens since last chunk (may be >1 under spec-decode).

std::string text#

Delta text; always well-formed UTF-8.

bool finished = {false}#

True for the final chunk on this channel.

FinishReason reason = {FinishReason::kNotFinished}#

Terminal reason (only meaningful when finished==true).

struct SlotStreamState#

Per-slot detokenization and streaming state.

Lives in SpecDecodeInferenceContext, compacted in lockstep with tokenIds.

Public Members

std::shared_ptr<StreamChannel> channel#

Channel (null ⇒ streaming disabled for this slot).

size_t sentTokenCount = {0}#

Count of tokens whose piece bytes have been fed through emitDelta.

size_t lastEmittedTokenCount = {0}#

sentTokenCount at last push (for streamInterval gating).

std::string pendingBytes#

Trailing incomplete UTF-8 bytes carried across iterations.

FinishReason terminalReason = {FinishReason::kNotFinished}#

Terminal reason latched at the moment finishedStates[i] flips to 1. One writer (the code that flips the state), one reader (the emit hook).

void trt_edgellm::rt::attachStreamChannel(
std::shared_ptr<StreamChannel> const &channel,
int32_t originalIdx
)#
bool trt_edgellm::rt::validateStreamingSubmission(
LLMGenerationRequest const &request
)#
void trt_edgellm::rt::applyCancellationToFinishStates(
SpecDecodeInferenceContext &context
)#
void trt_edgellm::rt::emitChunks(
SpecDecodeInferenceContext &context,
tokenizer::Tokenizer const &tok
)#