nemo_flow.codecs#

Protocol definitions for request and response codecs used by nemo_flow.llm.

LlmCodec is used for request-side translation. It lets intercepts work against AnnotatedLLMRequest instead of provider-specific raw payloads.

LlmResponseCodec is used for response-side translation. It lets NeMo Flow attach an AnnotatedLLMResponse to emitted LLMEnd events without changing the return value of llm.execute() or llm.stream_execute().

Example:

from nemo_flow import AnnotatedLLMRequest, LLMRequest, llm
from nemo_flow.codecs import LlmCodec, OpenAIChatCodec

class DemoCodec(LlmCodec):
    def decode(self, request: LLMRequest) -> AnnotatedLLMRequest:
        return AnnotatedLLMRequest(
            request.content.get("messages", []),
            model=request.content.get("model"),
        )

    def encode(self, annotated: AnnotatedLLMRequest, original: LLMRequest) -> LLMRequest:
        content = {**original.content, "messages": annotated.messages}
        if annotated.model is not None:
            content["model"] = annotated.model
        return LLMRequest(original.headers, content)

async def impl(request: LLMRequest):
    return {"id": "r1", "choices": [{"message": {"role": "assistant", "content": "hi"}}]}

# Request-side codec for intercepts, response-side codec for event annotation.
result = await llm.execute(
    "demo-model",
    LLMRequest({}, {"messages": [{"role": "user", "content": "hello"}]}),
    impl,
    codec=DemoCodec(),
    response_codec=OpenAIChatCodec(),
)

Classes#

AnnotatedLLMRequest

Structured view of an LLM request produced by a codec.

AnthropicMessagesCodec

Built-in codec for Anthropic Messages requests and responses.

OpenAIChatCodec

Built-in codec for OpenAI Chat Completions requests and responses.

OpenAIResponsesCodec

Built-in codec for OpenAI Responses requests and responses.

LlmCodec

Protocol for request codecs used by annotated LLM intercepts.

LlmResponseCodec

Protocol for codecs that normalize raw LLM responses.

Module Contents#

class nemo_flow.codecs.AnnotatedLLMRequest(
messages: collections.abc.Sequence[collections.abc.Mapping[str, _JsonValue]],
*,
model: str | None = None,
params: collections.abc.Mapping[str, _JsonValue] | None = None,
tools: collections.abc.Sequence[collections.abc.Mapping[str, _JsonValue]] | None = None,
tool_choice: str | collections.abc.Mapping[str, _JsonValue] | None = None,
extra: collections.abc.Mapping[str, _JsonValue] | None = None,
)#

Structured view of an LLM request produced by a codec.

Summary:

Provider-neutral request view for annotated LLM middleware.

Description:

Codecs decode provider-specific request bodies into this normalized shape so request intercepts can operate on messages, model name, parameters, tools, and extra provider fields.

property messages: list[_JsonObject]#

Return normalized message objects.

property model: str | None#

Return the normalized model name, if present.

property params: _JsonObject | None#

Return provider parameters, if present.

property tools: list[_JsonObject] | None#

Return normalized tool declarations, if present.

property tool_choice: str | _JsonObject | None#

Return the normalized tool-choice directive, if present.

property extra: _JsonObject#

Return provider-specific request fields.

system_prompt() str | None#

Return the first normalized system prompt, if one is present.

last_user_message() str | None#

Return the last normalized user message text, if one is present.

has_tool_calls() bool#

Return whether the normalized request includes tool declarations.

class nemo_flow.codecs.AnthropicMessagesCodec#

Built-in codec for Anthropic Messages requests and responses.

Summary:

Native codec bridge for Anthropic Messages payloads.

decode(request: LLMRequest) AnnotatedLLMRequest#

Decode an Anthropic Messages request into a normalized request view.

encode(
annotated: AnnotatedLLMRequest,
original: LLMRequest,
) LLMRequest#

Encode a normalized request back into Anthropic Messages shape.

decode_response(response: _Json) AnnotatedLLMResponse#

Decode an Anthropic response into a normalized response view.

class nemo_flow.codecs.OpenAIChatCodec#

Built-in codec for OpenAI Chat Completions requests and responses.

Summary:

Native codec bridge for Chat Completions payloads.

decode(request: LLMRequest) AnnotatedLLMRequest#

Decode a Chat Completions request into a normalized request view.

encode(
annotated: AnnotatedLLMRequest,
original: LLMRequest,
) LLMRequest#

Encode a normalized request back into Chat Completions shape.

decode_response(response: _Json) AnnotatedLLMResponse#

Decode a Chat Completions response into a normalized response view.

class nemo_flow.codecs.OpenAIResponsesCodec#

Built-in codec for OpenAI Responses requests and responses.

Summary:

Native codec bridge for OpenAI Responses payloads.

decode(request: LLMRequest) AnnotatedLLMRequest#

Decode a Responses request into a normalized request view.

encode(
annotated: AnnotatedLLMRequest,
original: LLMRequest,
) LLMRequest#

Encode a normalized request back into Responses shape.

decode_response(response: _Json) AnnotatedLLMResponse#

Decode a Responses response into a normalized response view.

class nemo_flow.codecs.LlmCodec#

Bases: Protocol

Protocol for request codecs used by annotated LLM intercepts.

decode() converts a provider-specific LLMRequest into an AnnotatedLLMRequest so request intercepts can work with a normalized structure. encode() merges any annotated edits back into the original raw payload before the provider callback is invoked.

Notes

encode() should preserve unknown provider-specific fields whenever possible instead of rebuilding the payload from scratch. That keeps transport-specific settings intact even when an intercept edits the normalized representation.

Example:

from nemo_flow import AnnotatedLLMRequest, LLMRequest
from nemo_flow.codecs import LlmCodec

class DemoCodec(LlmCodec):
    def decode(self, request: LLMRequest) -> AnnotatedLLMRequest:
        return AnnotatedLLMRequest(
            request.content.get("messages", []),
            model=request.content.get("model"),
        )

    def encode(
        self,
        annotated: AnnotatedLLMRequest,
        original: LLMRequest,
    ) -> LLMRequest:
        content = {**original.content, "messages": annotated.messages}
        if annotated.model is not None:
            content["model"] = annotated.model
        return LLMRequest(original.headers, content)
decode(
request: nemo_flow._native.LLMRequest,
) nemo_flow._native.AnnotatedLLMRequest#

Decode a raw provider request into AnnotatedLLMRequest.

Parameters:

request – The provider-specific request payload received by nemo_flow.llm.execute() or nemo_flow.llm.stream_execute().

Returns:

The normalized request consumed by annotated intercepts.

Return type:

AnnotatedLLMRequest

encode(
annotated: nemo_flow._native.AnnotatedLLMRequest,
original: nemo_flow._native.LLMRequest,
) nemo_flow._native.LLMRequest#

Merge annotated edits back into the original raw request.

Parameters:
  • annotated – The normalized request after intercepts have applied any edits.

  • original – The original provider-specific request passed into the runtime before normalization.

Returns:

The provider-specific request that should be forwarded to the provider callback.

Return type:

LLMRequest

class nemo_flow.codecs.LlmResponseCodec#

Bases: Protocol

Protocol for codecs that normalize raw LLM responses.

A response codec is used only for observability. The value returned from llm.execute() or llm.stream_execute() stays unchanged; the decoded response is attached to the emitted LLMEnd event as annotated_response.

Example:

import nemo_flow

result = await nemo_flow.llm.execute(
    "demo-provider",
    nemo_flow.LLMRequest({}, {"messages": [{"role": "user", "content": "hi"}]}),
    impl,
    response_codec=nemo_flow.codecs.OpenAIChatCodec(),
)
decode_response(
response: nemo_flow.Json,
) nemo_flow._native.AnnotatedLLMResponse#

Decode a raw provider response into AnnotatedLLMResponse.

Parameters:

response – The raw JSON-compatible value returned by the provider callback.

Returns:

The normalized response attached to the LLMEnd event for downstream subscribers.

Return type:

AnnotatedLLMResponse