nemo_flow.llm#
LLM lifecycle helpers for non-streaming and streaming calls.
This module is the LLM analogue of nemo_flow.tools. It manages emitted
events, global middleware, optional request codecs for annotated intercepts,
and optional response codecs for structured end-event annotations.
Example:
import nemo_flow
request = nemo_flow.LLMRequest(
{},
{"messages": [{"role": "user", "content": "hello"}], "model": "demo-model"},
)
async def impl(req):
return {"id": "r1", "choices": [{"message": {"role": "assistant", "content": "hi"}}]}
result = await nemo_flow.llm.execute(
"demo-provider",
request,
impl,
response_codec=nemo_flow.codecs.OpenAIChatCodec(),
)
Functions#
|
Start a manual LLM span and return its |
|
Finish a manual LLM span started by |
|
Run an LLM call through the managed middleware pipeline. |
|
Run a streaming LLM call through the managed middleware pipeline. |
|
Apply global LLM request intercepts to |
|
Run LLM conditional-execution guardrails for |
Module Contents#
- nemo_flow.llm.call(
- name: str,
- request: nemo_flow._native.LLMRequest,
- *,
- handle=None,
- attributes=None,
- data=None,
- metadata=None,
- model_name: str | None = None,
- timestamp: datetime.datetime | None = None,
Start a manual LLM span and return its
LLMHandle.- Parameters:
name – Provider or logical call name recorded on emitted events.
request – Raw
LLMRequestto associate with the call.handle – Optional parent scope handle. When omitted, the current scope becomes the parent.
attributes – Optional native LLM attributes attached to the start event.
data – Optional JSON application payload stored on the LLM handle.
metadata – Optional JSON metadata recorded on the emitted start event.
model_name – Optional normalized model name to record separately from the provider-specific request payload.
timestamp – Optional timezone-aware
datetimerecorded as the handle start time and on the emitted start event. When omitted, the current runtime time is used.
- Returns:
Handle used to finish the manual span with
call_end().- Return type:
Notes
This starts only the manual LLM lifecycle span. It applies sanitize-request guardrails to the emitted start-event payload but does not run request or execution intercepts.
timestampmust be a timezone-awaredatetime; strings and naive datetimes are rejected.Example:
import nemo_flow request = nemo_flow.LLMRequest({}, {"messages": [], "model": "demo-model"}) handle = nemo_flow.llm.call( "demo-provider", request, handle=None, attributes=None, data={"attempt": 1}, metadata={"path": "manual"}, model_name="demo-model", ) nemo_flow.llm.call_end( handle, {"ok": True}, data={"cached": False}, metadata={"status": "success"}, )
- nemo_flow.llm.call_end(
- handle,
- response,
- *,
- data=None,
- metadata=None,
- timestamp: datetime.datetime | None = None,
Finish a manual LLM span started by
call().- Parameters:
handle – LLM handle returned by
call().response – Raw JSON-compatible response to record on the end event.
data – Optional JSON payload used when the sanitized
responseis JSON null.metadata – Optional JSON metadata recorded on the emitted end event.
timestamp – Optional timezone-aware
datetimerecorded on the emitted end event. When omitted, the runtime default end timestamp is used.
- Returns:
This function returns after the end event has been recorded.
- Return type:
None
Notes
call_end()applies sanitize-response guardrails to the emitted end-event payload but does not normalize or decode the response automatically.timestampmust be a timezone-awaredatetime; strings and naive datetimes are rejected.
- nemo_flow.llm.execute(
- name: str,
- request: nemo_flow._native.LLMRequest,
- func,
- *,
- handle=None,
- attributes=None,
- data=None,
- metadata=None,
- model_name: str | None = None,
- codec: nemo_flow.codecs.LlmCodec | None = None,
- response_codec: nemo_flow.codecs.LlmResponseCodec | None = None,
Run an LLM call through the managed middleware pipeline.
Pipeline order:
LLM conditional-execution guardrails
LLM request intercepts
LLM sanitize-request guardrails for emitted start events
LLM execution intercepts
func(request)LLM sanitize-response guardrails for emitted end events
- Parameters:
name – Provider or logical call name recorded on emitted events.
request – Raw
LLMRequestpassed through guardrails, intercepts, and then intofunc.func – Provider callback invoked as
func(request)after middleware has finished processing the request.handle – Optional parent scope handle. When omitted, the current scope becomes the parent.
attributes – Optional native LLM attributes attached to the start event.
data – Optional JSON application payload stored on the managed LLM handle.
metadata – Optional JSON metadata recorded on the emitted start event.
model_name – Optional normalized model name to record separately from the provider-specific request payload.
codec – Optional request codec used to provide
AnnotatedLLMRequestvalues to request intercepts.response_codec – Optional response codec used to attach a normalized response to the emitted
LLMEndevent for observability.
- Returns:
The raw JSON-compatible value returned by
funcor by an execution intercept.- Return type:
Json
Notes
codecenables annotated request intercepts.response_codecdecodes the raw response for observability only and does not change the value returned to the caller.Example:
import nemo_flow request = nemo_flow.LLMRequest( {}, {"messages": [{"role": "user", "content": "hi"}], "model": "demo-model"}, ) async def impl(req): return {"id": "r1", "choices": [{"message": {"role": "assistant", "content": "hello"}}]} result = await nemo_flow.llm.execute( "demo-provider", request, impl, handle=None, attributes=None, data={"path": "managed"}, metadata={"request_id": "req-1"}, model_name="demo-model", codec=None, response_codec=nemo_flow.codecs.OpenAIChatCodec(), )
- nemo_flow.llm.stream_execute(
- name: str,
- request: nemo_flow._native.LLMRequest,
- func,
- collector,
- finalizer,
- *,
- handle=None,
- attributes=None,
- data=None,
- metadata=None,
- model_name: str | None = None,
- codec: nemo_flow.codecs.LlmCodec | None = None,
- response_codec: nemo_flow.codecs.LlmResponseCodec | None = None,
Run a streaming LLM call through the managed middleware pipeline.
- Parameters:
name – Provider or logical call name recorded on emitted events.
request – Raw
LLMRequestpassed through guardrails and intercepts.func – Provider callback invoked as
func(request)that yields raw JSON chunks.collector – Callback invoked for each chunk after streaming intercepts run. It typically accumulates state for
finalizer.finalizer – Callback invoked after the stream completes to build the final JSON-compatible response recorded on the
LLMEndevent.handle – Optional parent scope handle. When omitted, the current scope becomes the parent.
attributes – Optional native LLM attributes attached to the start event.
data – Optional JSON application payload stored on the managed LLM handle.
metadata – Optional JSON metadata recorded on the emitted start event.
model_name – Optional normalized model name to record separately from the provider-specific request payload.
codec – Optional request codec used to provide
AnnotatedLLMRequestvalues to request intercepts.response_codec – Optional response codec used to attach a normalized final response to the emitted
LLMEndevent for observability.
- Returns:
Async iterator that yields the streamed JSON chunks.
- Return type:
Notes
collectorobserves the post-intercept chunk values.finalizerruns once at stream completion and should return a representation of the full response, not the final chunk.Example:
import nemo_flow request = nemo_flow.LLMRequest( {}, {"messages": [{"role": "user", "content": "hi"}], "model": "demo-model"}, ) collected = [] async def impl(req): yield {"token": "hel"} yield {"token": "lo"} def collect(chunk): collected.append(chunk) def finalize(): return {"text": "".join(chunk["token"] for chunk in collected)} stream = await nemo_flow.llm.stream_execute( "demo-provider", request, impl, collect, finalize, handle=None, attributes=None, data={"path": "stream"}, metadata={"request_id": "req-2"}, model_name="demo-model", codec=None, response_codec=None, ) async for chunk in stream: print(chunk)
- nemo_flow.llm.request_intercepts(name, request)#
Apply global LLM request intercepts to
request.- Parameters:
name – Provider or logical call name used when evaluating intercepts.
request – Raw
LLMRequestto pass through the registered request intercept chain.
- Returns:
The request produced by the final request intercept.
- Return type:
Notes
This runs only the request-intercept chain. It does not execute guardrails, codecs, provider callbacks, or stream handling.
- nemo_flow.llm.conditional_execution(request)#
Run LLM conditional-execution guardrails for
request.- Parameters:
request – Raw
LLMRequestto validate against registered conditional-execution guardrails.- Returns:
A rejection message if execution should be blocked, otherwise
None.- Return type:
str | None
Notes
This helper evaluates only conditional-execution guardrails and does not invoke request intercepts, codecs, or provider execution.