Advanced Guide: Provider Response Codecs#

Use this guide when subscribers, exporters, or diagnostics need a provider-neutral view of raw LLM responses.

What You Build#

You will attach a response codec to a managed LLM wrapper so NeMo Flow can decode provider responses into AnnotatedLLMResponse data for LLM end events.

Response codecs are observability-only:

  • They do not rewrite the value returned to the application.

  • They do not run response middleware.

  • They attach normalized response data to lifecycle events for subscribers and exporters.

  • Decode failures are non-fatal; the LLM call still returns the provider response and the end event is emitted without an annotation.

Before You Start#

You need:

  • A managed LLM boundary from Basic Guide: Wrap LLM Calls.

  • A raw provider response that is JSON-compatible.

  • A built-in response codec or a custom response codec for the provider response shape.

  • A subscriber or exporter that consumes annotated_response from LLM end events.

What Response Codecs Decode#

Response codecs normalize provider output into fields that subscribers can inspect consistently:

Field

Purpose

id

Provider response identifier.

model

Model that served the request, when the provider returns it.

message

Primary assistant message content.

tool_calls

Tool calls requested by the model.

finish_reason

Normalized completion reason, such as complete, length, tool_use, or content_filter.

usage

Token accounting, including cache-read and cache-write counts when available.

api_specific

Provider-specific fields that do not fit the common model.

extra

Additional unmodeled response fields.

Use these annotations for observability, export, and debugging. Keep business logic that changes the caller-visible response in the framework or provider adapter, not in the response codec.

Built-in Response Codecs#

The built-in provider codecs also implement response decoding:

  • OpenAIChatCodec

  • OpenAIResponsesCodec

  • AnthropicMessagesCodec

Choose the codec that matches the actual provider response shape. For example, do not use OpenAIChatCodec for an OpenAI Responses API payload only because both came from an OpenAI-compatible provider.

Attach a Built-in Response Codec#

The examples below attach built-in response codecs for supported provider response shapes.

import nemo_flow
from nemo_flow import LLMRequest
from nemo_flow.codecs import OpenAIChatCodec


async def invoke_provider(request: LLMRequest):
    return {
        "id": "chatcmpl-demo",
        "model": request.content["model"],
        "choices": [
            {
                "finish_reason": "stop",
                "message": {"role": "assistant", "content": "Hello from the provider."},
            }
        ],
        "usage": {"prompt_tokens": 8, "completion_tokens": 5, "total_tokens": 13},
    }


codec = OpenAIChatCodec()
response = await nemo_flow.llm.execute(
    "openai-chat",
    LLMRequest({}, {"model": "gpt-4o-mini", "messages": []}),
    invoke_provider,
    model_name="gpt-4o-mini",
    response_codec=codec,
)
import { OpenAIChatCodec } from 'nemo-flow-node';
import { JsonPassthrough, typedLlmExecute } from 'nemo-flow-node/typed';

const codec = new OpenAIChatCodec();

const response = await typedLlmExecute(
  'openai-chat',
  { headers: {}, content: { model: 'gpt-4o-mini', messages: [] } },
  async (request) => ({
    id: 'chatcmpl-demo',
    model: request.content.model,
    choices: [
      {
        finish_reason: 'stop',
        message: { role: 'assistant', content: 'Hello from the provider.' },
      },
    ],
    usage: { prompt_tokens: 8, completion_tokens: 5, total_tokens: 13 },
  }),
  new JsonPassthrough(),
  {
    modelName: 'gpt-4o-mini',
    responseCodec: codec,
  },
);
use nemo_flow::api::llm::{llm_call_execute, LlmCallExecuteParams, LlmRequest};
use nemo_flow::codec::openai_chat::OpenAIChatCodec;
use nemo_flow::codec::traits::LlmResponseCodec;
use serde_json::json;
use std::sync::Arc;

let request = LlmRequest {
    headers: Default::default(),
    content: json!({"model": "gpt-4o-mini", "messages": []}),
};
let response_codec: Arc<dyn LlmResponseCodec> = Arc::new(OpenAIChatCodec);

let response = llm_call_execute(
    LlmCallExecuteParams::builder()
        .name("openai-chat")
        .request(request)
        .func(Arc::new(|provider_request| Box::pin(async move {
            Ok(json!({
                "id": "chatcmpl-demo",
                "model": provider_request.content["model"],
                "choices": [{
                    "finish_reason": "stop",
                    "message": {
                        "role": "assistant",
                        "content": "Hello from the provider."
                    }
                }],
                "usage": {
                    "prompt_tokens": 8,
                    "completion_tokens": 5,
                    "total_tokens": 13
                }
            }))
        })))
        .model_name("gpt-4o-mini")
        .response_codec(response_codec)
        .build(),
)
.await?;

Read Annotated Responses#

Subscribers can inspect annotated_response on LLM end events. The exact event category fields are binding-provided, so defensive checks should confirm the annotation exists before reading it.

import nemo_flow


def on_event(event):
    annotated = getattr(event, "annotated_response", None)
    if annotated is None:
        return

    print("model", annotated.model)
    print("text", annotated.response_text())
    print("usage", annotated.usage)


nemo_flow.subscribers.register("response-debugger", on_event)
import { registerSubscriber } from 'nemo-flow-node';

registerSubscriber('response-debugger', (event) => {
  const annotated = event.annotated_response;
  if (!annotated) {
    return;
  }

  console.log('model', annotated.model);
  console.log('message', annotated.message);
  console.log('usage', annotated.usage);
});

Custom Response Codecs#

Use a custom response codec when the provider or framework response does not match a built-in shape.

In Python, a custom response codec can route to built-in codecs and return their native AnnotatedLLMResponse values:

from nemo_flow.codecs import OpenAIChatCodec, OpenAIResponsesCodec


class OpenAIRoutingResponseCodec:
    def __init__(self):
        self.chat = OpenAIChatCodec()
        self.responses = OpenAIResponsesCodec()

    def decode_response(self, response):
        if response.get("object") == "response":
            return self.responses.decode_response(response)
        return self.chat.decode_response(response)

In Node.js, implement decodeResponse and return the normalized response JSON shape:

import type { JsonValue, LlmResponseCodec } from 'nemo-flow-node/typed';

const frameworkResponseCodec: LlmResponseCodec = {
  decodeResponse(response: JsonValue): JsonValue {
    const raw = response as {
      id?: string;
      model_name?: string;
      text?: string;
      stop_reason?: string;
      token_usage?: {
        input?: number;
        output?: number;
      };
    };

    return {
      id: raw.id ?? null,
      model: raw.model_name ?? null,
      message: raw.text ?? '',
      finish_reason: raw.stop_reason === 'max_tokens' ? 'length' : 'complete',
      usage: {
        prompt_tokens: raw.token_usage?.input ?? null,
        completion_tokens: raw.token_usage?.output ?? null,
        total_tokens:
          raw.token_usage?.input === undefined || raw.token_usage?.output === undefined
            ? null
            : raw.token_usage.input + raw.token_usage.output,
      },
      provider_stop_reason: raw.stop_reason ?? null,
    };
  },
};

In Rust, implement LlmResponseCodec directly:

use nemo_flow::codec::request::MessageContent;
use nemo_flow::codec::response::{AnnotatedLlmResponse, FinishReason, Usage};
use nemo_flow::codec::traits::LlmResponseCodec;
use nemo_flow::error::{FlowError, Result};
use serde::Deserialize;
use serde_json::{Map, Value as Json};

#[derive(Deserialize)]
struct FrameworkResponse {
    id: Option<String>,
    model_name: Option<String>,
    text: Option<String>,
    input_tokens: Option<u64>,
    output_tokens: Option<u64>,
}

struct FrameworkResponseCodec;

impl LlmResponseCodec for FrameworkResponseCodec {
    fn decode_response(&self, response: &Json) -> Result<AnnotatedLlmResponse> {
        let raw: FrameworkResponse = serde_json::from_value(response.clone())
            .map_err(|error| FlowError::Internal(error.to_string()))?;
        let total_tokens = match (raw.input_tokens, raw.output_tokens) {
            (Some(input), Some(output)) => Some(input + output),
            _ => None,
        };

        Ok(AnnotatedLlmResponse {
            id: raw.id,
            model: raw.model_name,
            message: raw.text.map(MessageContent::Text),
            tool_calls: None,
            finish_reason: Some(FinishReason::Complete),
            usage: Some(Usage {
                prompt_tokens: raw.input_tokens,
                completion_tokens: raw.output_tokens,
                total_tokens,
                cache_read_tokens: None,
                cache_write_tokens: None,
            }),
            api_specific: None,
            extra: Map::new(),
        })
    }
}

Streaming Responses#

Streaming LLM wrappers decode the aggregated response produced by the stream finalizer. The response codec does not see each token or chunk. Use stream collectors for chunk-level behavior, and use response codecs for the final normalized end-event annotation.

Validation Checklist#

Use this checklist to confirm the implementation preserves the expected runtime contract.

  • The response codec matches the actual provider response shape.

  • decode_response returns a normalized response with safe, JSON-compatible fields.

  • The provider response returned to the application is unchanged.

  • Subscribers see annotated_response only on LLM end events where decode succeeds.

  • Decode errors are tested and do not break the LLM call.

  • Streaming finalizers produce the same shape the response codec expects.

Common Issues#

Check these symptoms first when the workflow does not behave as expected.

  • No annotation appears: The response codec returned an error or the raw provider response did not match the codec.

  • Returned response changed unexpectedly: Response codecs are not the right place to mutate caller-visible output.

  • Tool calls are missing: The codec did not map the provider’s tool-call structure into tool_calls.

  • Usage is inconsistent across providers: Normalize known token fields and preserve provider-specific usage details in api_specific or extra.

Next Steps#

Use these links to continue from this workflow into the next related task.