nemo_flow.llm#

LLM lifecycle helpers for non-streaming and streaming calls.

This module is the LLM analogue of nemo_flow.tools. It manages emitted events, global middleware, optional request codecs for annotated intercepts, and optional response codecs for structured end-event annotations.

Example:

import nemo_flow

request = nemo_flow.LLMRequest(
    {},
    {"messages": [{"role": "user", "content": "hello"}], "model": "demo-model"},
)

async def impl(req):
    return {"id": "r1", "choices": [{"message": {"role": "assistant", "content": "hi"}}]}

result = await nemo_flow.llm.execute(
    "demo-provider",
    request,
    impl,
    response_codec=nemo_flow.codecs.OpenAIChatCodec(),
)

Functions#

call(name, request, *[, handle, attributes, data, ...])

Start a manual LLM span and return its LLMHandle.

call_end(handle, response, *[, data, metadata, timestamp])

Finish a manual LLM span started by call().

execute(name, request, func, *[, handle, attributes, ...])

Run an LLM call through the managed middleware pipeline.

stream_execute(name, request, func, collector, ...[, ...])

Run a streaming LLM call through the managed middleware pipeline.

request_intercepts(name, request)

Apply global LLM request intercepts to request.

conditional_execution(request)

Run LLM conditional-execution guardrails for request.

Module Contents#

nemo_flow.llm.call(
name: str,
request: nemo_flow._native.LLMRequest,
*,
handle=None,
attributes=None,
data=None,
metadata=None,
model_name: str | None = None,
timestamp: datetime.datetime | None = None,
)#

Start a manual LLM span and return its LLMHandle.

Parameters:
  • name – Provider or logical call name recorded on emitted events.

  • request – Raw LLMRequest to associate with the call.

  • handle – Optional parent scope handle. When omitted, the current scope becomes the parent.

  • attributes – Optional native LLM attributes attached to the start event.

  • data – Optional JSON application payload stored on the LLM handle.

  • metadata – Optional JSON metadata recorded on the emitted start event.

  • model_name – Optional normalized model name to record separately from the provider-specific request payload.

  • timestamp – Optional timezone-aware datetime recorded as the handle start time and on the emitted start event. When omitted, the current runtime time is used.

Returns:

Handle used to finish the manual span with call_end().

Return type:

LLMHandle

Notes

This starts only the manual LLM lifecycle span. It applies sanitize-request guardrails to the emitted start-event payload but does not run request or execution intercepts. timestamp must be a timezone-aware datetime; strings and naive datetimes are rejected.

Example:

import nemo_flow

request = nemo_flow.LLMRequest({}, {"messages": [], "model": "demo-model"})
handle = nemo_flow.llm.call(
    "demo-provider",
    request,
    handle=None,
    attributes=None,
    data={"attempt": 1},
    metadata={"path": "manual"},
    model_name="demo-model",
)
nemo_flow.llm.call_end(
    handle,
    {"ok": True},
    data={"cached": False},
    metadata={"status": "success"},
)
nemo_flow.llm.call_end(
handle,
response,
*,
data=None,
metadata=None,
timestamp: datetime.datetime | None = None,
) None#

Finish a manual LLM span started by call().

Parameters:
  • handle – LLM handle returned by call().

  • response – Raw JSON-compatible response to record on the end event.

  • data – Optional JSON payload used when the sanitized response is JSON null.

  • metadata – Optional JSON metadata recorded on the emitted end event.

  • timestamp – Optional timezone-aware datetime recorded on the emitted end event. When omitted, the runtime default end timestamp is used.

Returns:

This function returns after the end event has been recorded.

Return type:

None

Notes

call_end() applies sanitize-response guardrails to the emitted end-event payload but does not normalize or decode the response automatically. timestamp must be a timezone-aware datetime; strings and naive datetimes are rejected.

nemo_flow.llm.execute(
name: str,
request: nemo_flow._native.LLMRequest,
func,
*,
handle=None,
attributes=None,
data=None,
metadata=None,
model_name: str | None = None,
codec: nemo_flow.codecs.LlmCodec | None = None,
response_codec: nemo_flow.codecs.LlmResponseCodec | None = None,
)#

Run an LLM call through the managed middleware pipeline.

Pipeline order:

  1. LLM conditional-execution guardrails

  2. LLM request intercepts

  3. LLM sanitize-request guardrails for emitted start events

  4. LLM execution intercepts

  5. func(request)

  6. LLM sanitize-response guardrails for emitted end events

Parameters:
  • name – Provider or logical call name recorded on emitted events.

  • request – Raw LLMRequest passed through guardrails, intercepts, and then into func.

  • func – Provider callback invoked as func(request) after middleware has finished processing the request.

  • handle – Optional parent scope handle. When omitted, the current scope becomes the parent.

  • attributes – Optional native LLM attributes attached to the start event.

  • data – Optional JSON application payload stored on the managed LLM handle.

  • metadata – Optional JSON metadata recorded on the emitted start event.

  • model_name – Optional normalized model name to record separately from the provider-specific request payload.

  • codec – Optional request codec used to provide AnnotatedLLMRequest values to request intercepts.

  • response_codec – Optional response codec used to attach a normalized response to the emitted LLMEnd event for observability.

Returns:

The raw JSON-compatible value returned by func or by an execution intercept.

Return type:

Json

Notes

codec enables annotated request intercepts. response_codec decodes the raw response for observability only and does not change the value returned to the caller.

Example:

import nemo_flow

request = nemo_flow.LLMRequest(
    {},
    {"messages": [{"role": "user", "content": "hi"}], "model": "demo-model"},
)

async def impl(req):
    return {"id": "r1", "choices": [{"message": {"role": "assistant", "content": "hello"}}]}

result = await nemo_flow.llm.execute(
    "demo-provider",
    request,
    impl,
    handle=None,
    attributes=None,
    data={"path": "managed"},
    metadata={"request_id": "req-1"},
    model_name="demo-model",
    codec=None,
    response_codec=nemo_flow.codecs.OpenAIChatCodec(),
)
nemo_flow.llm.stream_execute(
name: str,
request: nemo_flow._native.LLMRequest,
func,
collector,
finalizer,
*,
handle=None,
attributes=None,
data=None,
metadata=None,
model_name: str | None = None,
codec: nemo_flow.codecs.LlmCodec | None = None,
response_codec: nemo_flow.codecs.LlmResponseCodec | None = None,
) nemo_flow._native.LlmStream#

Run a streaming LLM call through the managed middleware pipeline.

Parameters:
  • name – Provider or logical call name recorded on emitted events.

  • request – Raw LLMRequest passed through guardrails and intercepts.

  • func – Provider callback invoked as func(request) that yields raw JSON chunks.

  • collector – Callback invoked for each chunk after streaming intercepts run. It typically accumulates state for finalizer.

  • finalizer – Callback invoked after the stream completes to build the final JSON-compatible response recorded on the LLMEnd event.

  • handle – Optional parent scope handle. When omitted, the current scope becomes the parent.

  • attributes – Optional native LLM attributes attached to the start event.

  • data – Optional JSON application payload stored on the managed LLM handle.

  • metadata – Optional JSON metadata recorded on the emitted start event.

  • model_name – Optional normalized model name to record separately from the provider-specific request payload.

  • codec – Optional request codec used to provide AnnotatedLLMRequest values to request intercepts.

  • response_codec – Optional response codec used to attach a normalized final response to the emitted LLMEnd event for observability.

Returns:

Async iterator that yields the streamed JSON chunks.

Return type:

LlmStream

Notes

collector observes the post-intercept chunk values. finalizer runs once at stream completion and should return a representation of the full response, not the final chunk.

Example:

import nemo_flow

request = nemo_flow.LLMRequest(
    {},
    {"messages": [{"role": "user", "content": "hi"}], "model": "demo-model"},
)
collected = []

async def impl(req):
    yield {"token": "hel"}
    yield {"token": "lo"}

def collect(chunk):
    collected.append(chunk)

def finalize():
    return {"text": "".join(chunk["token"] for chunk in collected)}

stream = await nemo_flow.llm.stream_execute(
    "demo-provider",
    request,
    impl,
    collect,
    finalize,
    handle=None,
    attributes=None,
    data={"path": "stream"},
    metadata={"request_id": "req-2"},
    model_name="demo-model",
    codec=None,
    response_codec=None,
)
async for chunk in stream:
    print(chunk)
nemo_flow.llm.request_intercepts(name, request)#

Apply global LLM request intercepts to request.

Parameters:
  • name – Provider or logical call name used when evaluating intercepts.

  • request – Raw LLMRequest to pass through the registered request intercept chain.

Returns:

The request produced by the final request intercept.

Return type:

LLMRequest

Notes

This runs only the request-intercept chain. It does not execute guardrails, codecs, provider callbacks, or stream handling.

nemo_flow.llm.conditional_execution(request)#

Run LLM conditional-execution guardrails for request.

Parameters:

request – Raw LLMRequest to validate against registered conditional-execution guardrails.

Returns:

A rejection message if execution should be blocked, otherwise None.

Return type:

str | None

Notes

This helper evaluates only conditional-execution guardrails and does not invoke request intercepts, codecs, or provider execution.