nemo_flow#
Python bindings for the NeMo Flow runtime.
This package exposes the runtime’s scope stack, lifecycle events, middleware registries, typed wrappers, and adaptive helpers from Python.
The main entry points are:
nemo_flow.scopefor creating and nesting scopesnemo_flow.toolsfor tool lifecycle managementnemo_flow.llmfor non-streaming and streaming LLM lifecycle managementnemo_flow.guardrailsandnemo_flow.interceptsfor global middlewarenemo_flow.scope_localfor middleware scoped to a specificScopeHandlenemo_flow.typedfor codec-based typed wrappersnemo_flow.pluginfor global plugin configuration and custom plugin registrationnemo_flow.adaptivefor adaptive component configuration helpers
Top-level exports also include:
scope stack helpers such as
get_scope_stack(),create_scope_stack(),set_thread_scope_stack(), andscope_stack_active()native runtime types such as
ScopeHandle,ToolHandle,LLMHandle,LLMRequest,ScopeType, and the lifecycle event classesobservability helpers such as
AtifExporter,OpenTelemetrySubscriber, andOpenInferenceSubscriberJSON and callback type aliases used by middleware, typed wrappers, and plugin-facing configuration helpers
Example:
import asyncio
import nemo_flow
def redact_args(tool_name, args):
return {**args, "api_key": "***"}
def add_header(name, request, annotated):
request.headers["Authorization"] = "Bearer test-token"
return request, annotated
async def tool_impl(args):
return {"echo": args["query"]}
async def llm_impl(request):
return {"messages": request.content["messages"], "ok": True}
async def main():
nemo_flow.guardrails.register_tool_sanitize_request("redact", 10, redact_args)
nemo_flow.intercepts.register_llm_request("auth", 10, False, add_header)
with nemo_flow.scope.scope("demo-agent", nemo_flow.ScopeType.Agent):
tool_result = await nemo_flow.tools.execute("search", {"query": "hello"}, tool_impl)
llm_result = await nemo_flow.llm.execute(
"demo-model",
nemo_flow.LLMRequest({}, {"messages": [{"role": "user", "content": "hi"}]}),
llm_impl,
)
print(tool_result, llm_result)
asyncio.run(main())
Submodules#
Attributes#
Classes#
Structured view of an LLM request produced by a codec. |
|
Structured view of an LLM response produced by a response codec. |
|
ATIF trajectory exporter that collects events and exports trajectories. |
|
Bitflags describing LLM call properties. |
|
An active LLM call. |
|
An LLM request carrying headers and a content payload. |
|
ATOF point-in-time mark event emitted to subscribers. |
|
Mutable configuration for |
|
OpenInference-backed NeMo Flow event subscriber. |
|
Mutable configuration for |
|
OpenTelemetry-backed NeMo Flow event subscriber. |
|
Bitflags describing scope properties. |
|
ATOF scope lifecycle event emitted to subscribers. |
|
An active execution scope in the scope stack. |
|
An isolated scope stack for per-request or per-task isolation. |
|
Enum identifying the kind of execution scope. |
|
Bitflags describing tool call properties. |
|
An active tool call. |
Functions#
Return the current task's active scope stack. |
|
Report whether the current context already owns a scope stack. |
|
Capture the active scope stack for use in another thread. |
|
Create a new isolated scope stack. |
|
|
Install a scope stack into the current thread's native runtime context. |
Package Contents#
- class nemo_flow.AnnotatedLLMRequest(
- messages: collections.abc.Sequence[collections.abc.Mapping[str, _JsonValue]],
- *,
- model: str | None = None,
- params: collections.abc.Mapping[str, _JsonValue] | None = None,
- tools: collections.abc.Sequence[collections.abc.Mapping[str, _JsonValue]] | None = None,
- tool_choice: str | collections.abc.Mapping[str, _JsonValue] | None = None,
- extra: collections.abc.Mapping[str, _JsonValue] | None = None,
Structured view of an LLM request produced by a codec.
- Summary:
Provider-neutral request view for annotated LLM middleware.
- Description:
Codecs decode provider-specific request bodies into this normalized shape so request intercepts can operate on messages, model name, parameters, tools, and extra provider fields.
- property messages: list[_JsonObject]#
Return normalized message objects.
- property model: str | None#
Return the normalized model name, if present.
- property params: _JsonObject | None#
Return provider parameters, if present.
- property tools: list[_JsonObject] | None#
Return normalized tool declarations, if present.
- property tool_choice: str | _JsonObject | None#
Return the normalized tool-choice directive, if present.
- property extra: _JsonObject#
Return provider-specific request fields.
- system_prompt() str | None#
Return the first normalized system prompt, if one is present.
- last_user_message() str | None#
Return the last normalized user message text, if one is present.
- has_tool_calls() bool#
Return whether the normalized request includes tool declarations.
- class nemo_flow.AnnotatedLLMResponse#
Structured view of an LLM response produced by a response codec.
- Summary:
Provider-neutral response view for LLM end-event annotations.
- Description:
Response codecs decode provider responses into this normalized shape so subscribers can inspect model, text, tool-call, usage, and provider-specific fields consistently.
- property id: str | None#
Return the provider response identifier, if present.
- property model: str | None#
Return the provider model name, if present.
- property message: _Json | None#
Return the normalized primary message payload, if present.
- property tool_calls: list[_JsonObject] | None#
Return normalized tool-call payloads, if present.
- property finish_reason: str | None#
Return the provider finish reason, if present.
- property usage: _JsonObject | None#
Return normalized usage accounting, if present.
- property api_specific: _JsonObject | None#
Return provider-specific response fields, if present.
- property extra: _JsonObject#
Return additional normalized response fields.
- response_text() str | None#
Return extracted response text, if present.
- has_tool_calls() bool#
Return whether the response contains tool-call payloads.
- class nemo_flow.AtifExporter(
- session_id: str,
- agent_name: str,
- agent_version: str,
- *,
- model_name: str | None = None,
- tool_definitions: list[_JsonObject] | None = None,
- extra: _Json | None = None,
ATIF trajectory exporter that collects events and exports trajectories.
- Summary:
Subscriber implementation that accumulates lifecycle events.
- Description:
Register the exporter under a subscriber name, run application code, and export the collected event set as an ATIF trajectory dictionary or JSON string.
- register(name: str) None#
Register this exporter as a global event subscriber.
- deregister(name: str) bool#
Deregister this exporter and return whether a subscriber was removed.
- export() _JsonObject#
Return collected events as an ATIF trajectory object.
- export_json() str#
Return collected events as an ATIF trajectory JSON string.
- clear() None#
Clear collected events without changing subscriber registration.
- class nemo_flow.LLMAttributes(value: int = 0)#
Bitflags describing LLM call properties.
- Summary:
Compact flag container attached to LLM handles and LLM events.
- Description:
LLMAttributesrecords semantic properties of an LLM call. Values can be created from raw integer bitmasks and combined with bit operators.- Flag constants:
STATEFULindicates that the LLM call uses stateful context.STREAMINGindicates that the LLM call returns a stream.
- STATEFUL: int#
- STREAMING: int#
- property is_stateful: bool#
Return whether the
STATEFULflag is set.
- property is_streaming: bool#
Return whether the
STREAMINGflag is set.
- property value: int#
Return the raw integer bitmask.
- class nemo_flow.LLMHandle#
An active LLM call.
- Summary:
Native handle returned by manual LLM-call start APIs.
- Description:
Pass this handle to
llm_call_endor wrapper APIs to close the LLM lifecycle span and emit the corresponding end event.
- property uuid: str#
Return the globally unique LLM-call identifier.
- property name: str#
Return the LLM provider or logical call name.
- property attributes: LLMAttributes#
Return the LLM attribute bitmask.
- property parent_uuid: str | None#
Return the parent scope UUID, if any.
- property data: _Json | None#
Return application data captured on the LLM call, if any.
- property metadata: _Json | None#
Return metadata captured on the LLM call, if any.
- class nemo_flow.LLMRequest(
- headers: collections.abc.Mapping[str, _JsonValue],
- content: _JsonObject,
An LLM request carrying headers and a content payload.
- Summary:
Provider request object passed through LLM middleware.
- Description:
headersstores provider or transport metadata.contentstores the JSON request body. Request intercepts and codecs consume this object to normalize, rewrite, and execute LLM calls.
- property headers: _JsonObject#
Return the request headers as a JSON object.
- property content: _JsonObject#
Return the request content body as a JSON object.
- class nemo_flow.MarkEvent#
ATOF point-in-time mark event emitted to subscribers.
- Summary:
Event emitted for standalone marks and guardrail rejections.
- Description:
Mark events record a named point in time under the current scope hierarchy. They do not have a start/end lifecycle pair.
- property kind: Literal['mark']#
Return the discriminant value
"mark".
- property atof_version: str#
Return the ATOF schema version used for this event.
- property parent_uuid: str | None#
Return the parent event UUID, if present.
- property uuid: str#
Return the event UUID.
- property timestamp: str#
Return the event timestamp as an RFC 3339 string.
- property name: str#
Return the mark event name.
- property data: _Json | None#
Return application data attached to the event, if any.
- property metadata: _Json | None#
Return metadata attached to the event, if any.
- property category: str | None#
Return the semantic mark category, if present.
- property category_profile: _JsonObject | None#
Return category-specific profile data, if any.
- property data_schema: _JsonObject | None#
Return a schema descriptor for
data, if one is present.
- class nemo_flow.OpenInferenceConfig#
Mutable configuration for
OpenInferenceSubscriber.- Summary:
Native OpenInference exporter configuration object.
- Description:
Configure transport, endpoint, service identity, exporter timeout, headers, and resource attributes before constructing a subscriber.
- transport: str#
- endpoint: str | None#
- service_name: str#
- service_namespace: str | None#
- service_version: str | None#
- instrumentation_scope: str#
- timeout_millis: int#
- property headers: dict[str, str]#
Return additional exporter headers.
- property resource_attributes: dict[str, str]#
Return additional OpenInference resource attributes.
- set_header(key: str, value: str) None#
Set one exporter header key/value pair.
- set_resource_attribute(key: str, value: str) None#
Set one OpenInference resource attribute key/value pair.
- class nemo_flow.OpenInferenceSubscriber(config: OpenInferenceConfig)#
OpenInference-backed NeMo Flow event subscriber.
- Summary:
Native subscriber that exports lifecycle events as OpenInference spans.
- Description:
Register the subscriber under a name to receive runtime events. Flush or shut it down before process exit when deterministic export is required.
- register(name: str) None#
Register the subscriber under
name.
- deregister(name: str) bool#
Deregister
nameand return whether it existed.
- force_flush() None#
Flush pending telemetry through the configured exporter.
- shutdown() None#
Shut down native OpenInference resources.
- class nemo_flow.OpenTelemetryConfig#
Mutable configuration for
OpenTelemetrySubscriber.- Summary:
Native OpenTelemetry exporter configuration object.
- Description:
Configure transport, endpoint, service identity, exporter timeout, headers, and resource attributes before constructing a subscriber.
- transport: str#
- endpoint: str | None#
- service_name: str#
- service_namespace: str | None#
- service_version: str | None#
- instrumentation_scope: str#
- timeout_millis: int#
- property headers: dict[str, str]#
Return additional exporter headers.
- property resource_attributes: dict[str, str]#
Return additional OpenTelemetry resource attributes.
- set_header(key: str, value: str) None#
Set one exporter header key/value pair.
- set_resource_attribute(key: str, value: str) None#
Set one OpenTelemetry resource attribute key/value pair.
- class nemo_flow.OpenTelemetrySubscriber(config: OpenTelemetryConfig)#
OpenTelemetry-backed NeMo Flow event subscriber.
- Summary:
Native subscriber that exports lifecycle events as OpenTelemetry spans.
- Description:
Register the subscriber under a name to receive runtime events. Flush or shut it down before process exit when deterministic export is required.
- register(name: str) None#
Register the subscriber under
name.
- deregister(name: str) bool#
Deregister
nameand return whether it existed.
- force_flush() None#
Flush pending telemetry through the configured exporter.
- shutdown() None#
Shut down native OpenTelemetry resources.
- class nemo_flow.ScopeAttributes(value: int = 0)#
Bitflags describing scope properties.
- Summary:
Compact flag container attached to scope handles and scope events.
- Description:
ScopeAttributesvalues can be created from a raw integer bitmask and combined with|or intersected with&. The native runtime uses these flags to record semantic execution properties on scopes.- Flag constants:
PARALLELindicates child work may execute in parallel.RELOCATABLEindicates work may move to another execution context.- Exceptional flow:
Construction or bit operations may raise native type errors when values cannot be converted to the expected bitmask representation.
- PARALLEL: int#
- RELOCATABLE: int#
- property is_parallel: bool#
Return whether the
PARALLELflag is set.
- property is_relocatable: bool#
Return whether the
RELOCATABLEflag is set.
- property value: int#
Return the raw integer bitmask.
- class nemo_flow.ScopeEvent#
ATOF scope lifecycle event emitted to subscribers.
- Summary:
Event emitted when a scope, tool call, or LLM call starts or ends.
- Description:
Scope events contain hierarchy identifiers, semantic category data, user payloads, metadata, and optional normalized LLM request/response annotations.
- property kind: Literal['scope']#
Return the discriminant value
"scope".
- property scope_category: Literal['start', 'end']#
Return whether this is a start or end lifecycle event.
- property atof_version: str#
Return the ATOF schema version used for this event.
- property parent_uuid: str | None#
Return the parent event UUID, if present.
- property uuid: str#
Return the event UUID.
- property timestamp: str#
Return the event timestamp as an RFC 3339 string.
- property name: str#
Return the event or scope name.
- property data: _Json | None#
Return application data attached to the event, if any.
- property metadata: _Json | None#
Return metadata attached to the event, if any.
- property attributes: list[str]#
Return stringified semantic attributes attached to the event.
- property category: str#
Return the semantic event category.
- property category_profile: _JsonObject | None#
Return category-specific profile data, if any.
- property data_schema: _JsonObject | None#
Return a schema descriptor for
data, if one is present.
- property annotated_request: AnnotatedLLMRequest | None#
Return the normalized LLM request annotation, if present.
- property annotated_response: AnnotatedLLMResponse | None#
Return the normalized LLM response annotation, if present.
- class nemo_flow.ScopeHandle#
An active execution scope in the scope stack.
- Summary:
Immutable native handle returned by scope creation APIs.
- Description:
A
ScopeHandleidentifies one pushed scope. Pass it back topop_scopeor wrapper APIs to close the scope, attach child work, or register scope-local middleware.- Exceptional flow:
Accessing properties may propagate native errors if the handle has been invalidated by the runtime.
- property uuid: str#
Return the globally unique scope identifier.
- property name: str#
Return the human-readable scope name.
- property attributes: ScopeAttributes#
Return the scope attribute bitmask.
- property parent_uuid: str | None#
Return the parent scope UUID, or
Nonefor a root scope.
- property data: _Json | None#
Return application data captured on the scope, if any.
- property metadata: _Json | None#
Return metadata captured on the scope, if any.
- class nemo_flow.ScopeStack#
An isolated scope stack for per-request or per-task isolation.
- Summary:
Native stack object containing active scope hierarchy state.
- Description:
Scope stacks are installed into Python context variables or native thread-local storage so nested scope, tool, and LLM events share the correct hierarchy.
- class nemo_flow.ScopeType#
Enum identifying the kind of execution scope.
- Summary:
Native scope category used when creating scopes.
- Description:
The selected
ScopeTypeis recorded on handles and emitted events so subscribers can distinguish agents, tools, LLM calls, guardrails, and other semantic units of work.
- class nemo_flow.ToolAttributes(value: int = 0)#
Bitflags describing tool call properties.
- Summary:
Compact flag container attached to tool handles and tool events.
- Description:
ToolAttributesrecords semantic properties of a tool call. Values can be created from raw integer bitmasks and combined with bit operators.- Flag constants:
REMOTEindicates that the tool call is remote.
- REMOTE: int#
- property is_remote: bool#
Return whether the
REMOTEflag is set.
- property value: int#
Return the raw integer bitmask.
- class nemo_flow.ToolHandle#
An active tool call.
- Summary:
Native handle returned by manual tool-call start APIs.
- Description:
Pass this handle to
tool_call_endor wrapper APIs to close the tool lifecycle span and emit the corresponding end event.
- property uuid: str#
Return the globally unique tool-call identifier.
- property name: str#
Return the tool name.
- property attributes: ToolAttributes#
Return the tool attribute bitmask.
- property parent_uuid: str | None#
Return the parent scope UUID, if any.
- property data: _Json | None#
Return application data captured on the tool call, if any.
- property metadata: _Json | None#
Return metadata captured on the tool call, if any.
- type nemo_flow.JsonPrimitive = str | int | float | bool | None#
- type nemo_flow.JsonValue = JsonPrimitive | list['JsonValue'] | dict[str, 'JsonValue']#
- type nemo_flow.JsonObject = dict[str, JsonValue]#
- type nemo_flow.Json = JsonValue#
- type nemo_flow.UnsupportedBehavior = Literal['ignore', 'warn', 'error']#
- type nemo_flow.ToolSanitizeGuardrail = Callable[[str, Json], Json]#
- type nemo_flow.ToolConditionalExecutionGuardrail = Callable[[str, Json], str | None]#
- type nemo_flow.LlmSanitizeRequestGuardrail = Callable[[LLMRequest], LLMRequest]#
- type nemo_flow.LlmSanitizeResponseGuardrail = Callable[[JsonObject], JsonObject]#
- type nemo_flow.LlmConditionalExecutionGuardrail = Callable[[LLMRequest], str | None]#
- type nemo_flow.ToolRequestIntercept = AbcCallable[[str, Json], Json]#
- type nemo_flow.ToolExecutionIntercept = Callable[[str, Json, Callable[[Json], Awaitable[Json]]], Json | Awaitable[Json]]#
- type nemo_flow.LlmRequestIntercept = Callable[[str, LLMRequest, AnnotatedLLMRequest | None], tuple[LLMRequest, AnnotatedLLMRequest | None]]#
- type nemo_flow.LlmExecutionIntercept = Callable[[str, LLMRequest, Callable[[LLMRequest], Awaitable[Json]]], Json | Awaitable[Json]]#
- type nemo_flow.LlmStreamExecutionIntercept = Callable[[LLMRequest, Callable[[LLMRequest], Awaitable[AsyncIterator[Json]]]], AsyncIterator[Json] | Awaitable[AsyncIterator[Json]]]#
- nemo_flow.get_scope_stack() _native.ScopeStack#
Return the current task’s active scope stack.
If the current async context does not yet own a scope stack, this function creates one and synchronizes it into the Rust thread-local storage used by the native runtime. Most callers do not need to invoke this directly because higher-level helpers such as
nemo_flow.scope.push()do it automatically.- Returns:
The scope stack associated with the current Python context.
- Return type:
- Raises:
Exception – Propagates any exception raised by native scope-stack creation or synchronization.
- Behavior:
The function first checks the Python
ContextVar. If no stack is present, it creates one with the native runtime, stores it in the current context, and synchronizes that stack into native thread-local storage before returning.
Notes
Calling this function synchronizes the Python
ContextVarstate into the native thread-local slot so subsequent native runtime calls observe the same scope hierarchy.Example:
import nemo_flow stack = nemo_flow.get_scope_stack() assert stack is not None
- nemo_flow.scope_stack_active() bool#
Report whether the current context already owns a scope stack.
- Returns:
Truewhen the current Python context already has an active stack, either because it was created in this context or because a stack was explicitly installed for the current thread.- Return type:
bool
- Raises:
Exception – Propagates any exception raised by the native active-stack status check.
- Behavior:
The Python
ContextVaris checked first. If it has no stack, the native runtime is asked whether the current thread has an explicitly active stack.
Notes
This function does not create a scope stack. It is a pure status check used to decide whether scope propagation work is required.
Example:
import nemo_flow assert nemo_flow.scope_stack_active() is False nemo_flow.get_scope_stack() assert nemo_flow.scope_stack_active() is True
- nemo_flow.propagate_scope_to_thread() _native.ScopeStack#
Capture the active scope stack for use in another thread.
The returned stack can be passed to
set_thread_scope_stack()inside a worker thread so that the worker emits events into the same scope hierarchy as the parent context.- Returns:
The active stack from the current context.
- Return type:
- Raises:
RuntimeError – If the current context does not yet have an active scope stack to propagate.
Exception – Propagates any exception raised while synchronizing an already-active native stack into the Python context.
- Behavior:
This function does not clone the scope hierarchy. It shares the current stack reference with the target thread, which is appropriate when the worker should contribute events to the same logical trace.
Example:
from concurrent.futures import ThreadPoolExecutor import nemo_flow with nemo_flow.scope.scope("parent", nemo_flow.ScopeType.Agent) as handle: stack = nemo_flow.propagate_scope_to_thread() def worker() -> None: nemo_flow.set_thread_scope_stack(stack) nemo_flow.scope.event( "worker-ran", handle=handle, data={"source": "thread"}, metadata={"thread": "pool-1"}, ) with ThreadPoolExecutor() as pool: pool.submit(worker).result()
- nemo_flow.create_scope_stack() _native.ScopeStack#
Create a new isolated scope stack.
- Returns:
A fresh scope stack that is not yet attached to the current Python context or thread.
- Return type:
- Raises:
Exception – Propagates any native error raised while allocating the stack.
- Behavior:
This is a direct top-level wrapper around the native stack factory. It does not mutate the Python
ContextVarand does not install the stack into native thread-local storage.
Notes
Use this helper when you need explicit scope isolation, such as test fixtures, manual context propagation, or framework-managed request boundaries. Most application code should prefer
get_scope_stack()so the current context is initialized lazily.Example:
import nemo_flow stack = nemo_flow.create_scope_stack() nemo_flow.set_thread_scope_stack(stack)
- nemo_flow.set_thread_scope_stack(stack: _native.ScopeStack) None#
Install a scope stack into the current thread’s native runtime context.
- Parameters:
stack – Scope stack that should become active for subsequent NeMo Flow API calls on the current thread.
- Returns:
This function returns after the native thread-local slot has been updated.
- Return type:
None
- Raises:
Exception – Propagates native errors raised when installing
stack.
- Behavior:
The supplied stack is installed into the native thread-local slot for the current OS thread. The function does not create, clone, or validate a Python
ContextVarentry.
Notes
This helper is primarily used when propagating an existing logical trace into worker threads. It does not create or clone a scope stack; it installs the supplied stack reference for the current thread.
Example:
from concurrent.futures import ThreadPoolExecutor import nemo_flow with nemo_flow.scope.scope("parent", nemo_flow.ScopeType.Agent): stack = nemo_flow.propagate_scope_to_thread() def worker() -> None: nemo_flow.set_thread_scope_stack(stack) with ThreadPoolExecutor() as pool: pool.submit(worker).result()
- nemo_flow.Event#