Streaming#

class StreamChannel#

Per-slot streaming channel.

Encapsulated MPSC pipe between the runtime (producer) and a single consumer. State is private; consumer interacts via consume()/waitPop()/tryPop()/cancel(). Runtime-side operations (push, finish, setOriginalBatchIdx) are accessed via friendship so consumers cannot hold the lock or skip chunks.

Public Functions

StreamChannel(StreamChannel const&) = delete#
StreamChannel &operator=(StreamChannel const&) = delete#
StreamChannel(StreamChannel&&) = delete#
StreamChannel &operator=(StreamChannel&&) = delete#
~StreamChannel() noexcept = default#
template<typename Handler>
void consume(
Handler &&handler,
std::chrono::milliseconds poll = std::chrono::milliseconds{100}
)#

Block until finished or cancelled, delivering every chunk.

Exits when pending is empty AND either finished or cancelled is set. Lock is held only for pop; handler runs with the mutex released.

Template Parameters:

Handler – Callable with signature void(StreamChunk&&).

Parameters:
  • handler – Invoked once per chunk delivered.

  • poll – Max interval between wakeups (defaults to 100ms).

std::optional<StreamChunk> tryPop()#

Non-blocking single-chunk pop.

std::optional<StreamChunk> waitPop(
std::chrono::milliseconds timeout
)#

Blocking single-chunk pop with timeout.

Returns std::nullopt on timeout, or when woken by finish()/cancel() with empty deque. Callers that need the terminal signal should combine waitPop() with isFinished()/isCancelled().

bool isFinished() const noexcept#
FinishReason getReason() const noexcept#
int32_t getOriginalBatchIdx() const noexcept#
bool isCancelled() const noexcept#
void cancel() noexcept#

Fire-and-forget cancellation. Safe from any thread. Wakes blocked consume/waitPop.

void setStreamInterval(int32_t n) noexcept#

Per-request emit throttle. Values < 1 are clamped to 1. Default 1.

int32_t getStreamInterval() const noexcept#
void setSkipSpecialTokens(bool skip) noexcept#

Filter special tokens (EOS, <|im_end|>, <think>, vision placeholders, etc.) out of chunk.text. Default true. chunk.tokenIds is unaffected either way. Set before the channel is attached.

bool getSkipSpecialTokens() const noexcept#

Public Static Functions

static std::shared_ptr<StreamChannel> create()#

Factory — only way to construct. Enforces shared_ptr ownership.

class StreamChannelFinalizer#

RAII guard that guarantees every attached StreamChannel terminates.

On destruction, for every slot whose channel has not yet been finalized the guard:

  1. Tries to build a terminal chunk containing any un-emitted tokens + text (sanitized + flushed to U+FFFD) with reason=kError and push()es it.

  2. Calls finish(kError) — idempotent, no-throw — to guarantee the consumer unblocks even if (1) failed under OOM.

Public Functions

StreamChannelFinalizer(
DecodingInferenceContext &ctx,
tokenizer::Tokenizer const &tok
) noexcept#
~StreamChannelFinalizer() noexcept#
StreamChannelFinalizer(StreamChannelFinalizer const&) = delete#
StreamChannelFinalizer &operator=(
StreamChannelFinalizer const&
) = delete#
StreamChannelFinalizer(StreamChannelFinalizer&&) = delete#
StreamChannelFinalizer &operator=(StreamChannelFinalizer&&) = delete#
struct StreamChunk#

Single delta chunk on a streaming channel.

text is always well-formed UTF-8 when pushed by the runtime — invalid byte sequences are replaced with U+FFFD before emission. Empty text chunks are legal (e.g., terminal chunks after all bytes were emitted).

Public Members

std::vector<int32_t> tokenIds#

Delta tokens since last chunk (may be >1 under spec-decode).

std::string text#

Delta text; always well-formed UTF-8.

bool finished = {false}#

True for the final chunk on this channel.

FinishReason reason = {FinishReason::kNotFinished}#

Terminal reason (only meaningful when finished==true).

struct StopMatchOutcome#

Result of applyStopStringMatch.

Public Members

std::string emitted#

Bytes safe to push right now.

bool stopMatched = {false}#

True iff a stop string was found and triggered truncation.

struct SlotStreamState#

Per-slot detokenization and streaming state.

Lives in DecodingInferenceContext, compacted in lockstep with tokenIds.

Public Members

std::shared_ptr<StreamChannel> channel#

Channel (null ⇒ streaming disabled for this slot).

size_t sentTokenCount = {0}#

Count of tokens whose piece bytes have been fed through emitDelta.

size_t lastEmittedTokenCount = {0}#

sentTokenCount at last push (for streamInterval gating).

std::string pendingBytes#

Trailing incomplete UTF-8 bytes carried across iterations.

FinishReason terminalReason = {FinishReason::kNotFinished}#

Terminal reason latched at the moment finishedStates[i] flips to 1. One writer (the code that flips the state), one reader (the emit hook).

std::string stopMatchBuffer#

Hold-back buffer for cross-iteration stop-string matching; bounded by maxStopLen - 1 bytes.

size_t maxStopLen = {0}#

Cached max(stopStrings[*].size()) for this slot; 0 if no stops. Pre-computed once at handleRequest entry so applyStopStringMatch can skip the per-call scan.

std::string pendingEmitText#

Per-iteration emit text; populated by decodePerSlot, consumed (moved out) by emitChunks. Empty between iterations.

bool stopMatchedThisIter = {false}#

True iff stop-string match fired in decodePerSlot this iteration. Read by updateFinishStates to apply the kStopWords override; reset at top of each decodePerSlot pass.

void trt_edgellm::rt::attachStreamChannel(
std::shared_ptr<StreamChannel> const &channel,
int32_t originalIdx
)#
bool trt_edgellm::rt::validateStreamingSubmission(
LLMGenerationRequest const &request
)#
char const *trt_edgellm::rt::finishReasonName(
FinishReason r
) noexcept#

Human-readable name for a FinishReason. Returns a static, NUL-terminated C-string. Useful for logs, footer lines, and JSON output.

StopMatchOutcome trt_edgellm::rt::applyStopStringMatch(
std::string &buffer,
std::vector<std::string> const &stops,
size_t maxStopLen,
bool isFinal
)#

Stop-string match against a per-slot rolling buffer.

Caller appends new decoded bytes into buffer before invoking and passes maxStopLen = max(stops[*].size()) (cached at request entry; 0 ⇒ no stops or all-empty entries — degenerates to pass-through). On return:

  • Match found (earliest position wins): emitted = bytes before the match, stopMatched = true, buffer cleared.

  • isFinal, no match: full flush — emitted = buffer, buffer cleared.

  • Otherwise: emit buffer minus its trailing maxStopLen - 1 bytes; those bytes stay in buffer for cross-iteration matching.

Matching is byte-level; caller must guarantee UTF-8.

void trt_edgellm::rt::applyCancellationToFinishStates(
DecodingInferenceContext &context
)#
void trt_edgellm::rt::decodePerSlot(
DecodingInferenceContext &context,
tokenizer::Tokenizer const &tok
)#

Stage 1 of the per-iter pipeline: decode new tokens into per-slot UTF-8 bytes and run stop-string matching. Sets s.pendingEmitText (bytes safe to emit this iter) and s.stopMatchedThisIter (signal for updateFinishStates to apply the kStopWords override). Does not push chunks and does not modify finishedStates / terminalReason — termination decisions live in updateFinishStates.

void trt_edgellm::rt::emitChunks(DecodingInferenceContext &context)#

Stage 2 of the per-iter pipeline (after updateFinishStates): push chunks to channels using the pre-computed pendingEmitText and finalized terminalReason. Non-streaming slots are skipped here — their output is assembled at handleRequest finalization.