nemo_flow#

Python bindings for the NeMo Flow runtime.

This package exposes the runtime’s scope stack, lifecycle events, middleware registries, typed wrappers, and adaptive helpers from Python.

The main entry points are:

  • nemo_flow.scope for creating and nesting scopes

  • nemo_flow.tools for tool lifecycle management

  • nemo_flow.llm for non-streaming and streaming LLM lifecycle management

  • nemo_flow.guardrails and nemo_flow.intercepts for global middleware

  • nemo_flow.scope_local for middleware scoped to a specific ScopeHandle

  • nemo_flow.typed for codec-based typed wrappers

  • nemo_flow.plugin for global plugin configuration and custom plugin registration

  • nemo_flow.adaptive for adaptive component configuration helpers

Top-level exports also include:

  • scope stack helpers such as get_scope_stack(), create_scope_stack(), set_thread_scope_stack(), and scope_stack_active()

  • native runtime types such as ScopeHandle, ToolHandle, LLMHandle, LLMRequest, ScopeType, and the lifecycle event classes

  • observability helpers such as AtifExporter, OpenTelemetrySubscriber, and OpenInferenceSubscriber

  • JSON and callback type aliases used by middleware, typed wrappers, and plugin-facing configuration helpers

Example:

import asyncio

import nemo_flow

def redact_args(tool_name, args):
    return {**args, "api_key": "***"}

def add_header(name, request, annotated):
    request.headers["Authorization"] = "Bearer test-token"
    return request, annotated

async def tool_impl(args):
    return {"echo": args["query"]}

async def llm_impl(request):
    return {"messages": request.content["messages"], "ok": True}

async def main():
    nemo_flow.guardrails.register_tool_sanitize_request("redact", 10, redact_args)
    nemo_flow.intercepts.register_llm_request("auth", 10, False, add_header)

    with nemo_flow.scope.scope("demo-agent", nemo_flow.ScopeType.Agent):
        tool_result = await nemo_flow.tools.execute("search", {"query": "hello"}, tool_impl)
        llm_result = await nemo_flow.llm.execute(
            "demo-model",
            nemo_flow.LLMRequest({}, {"messages": [{"role": "user", "content": "hi"}]}),
            llm_impl,
        )

        print(tool_result, llm_result)

asyncio.run(main())

Submodules#

Attributes#

Classes#

AnnotatedLLMRequest

Structured view of an LLM request produced by a codec.

AnnotatedLLMResponse

Structured view of an LLM response produced by a response codec.

AtifExporter

ATIF trajectory exporter that collects events and exports trajectories.

LLMAttributes

Bitflags describing LLM call properties.

LLMHandle

An active LLM call.

LLMRequest

An LLM request carrying headers and a content payload.

MarkEvent

ATOF point-in-time mark event emitted to subscribers.

OpenInferenceConfig

Mutable configuration for OpenInferenceSubscriber.

OpenInferenceSubscriber

OpenInference-backed NeMo Flow event subscriber.

OpenTelemetryConfig

Mutable configuration for OpenTelemetrySubscriber.

OpenTelemetrySubscriber

OpenTelemetry-backed NeMo Flow event subscriber.

ScopeAttributes

Bitflags describing scope properties.

ScopeEvent

ATOF scope lifecycle event emitted to subscribers.

ScopeHandle

An active execution scope in the scope stack.

ScopeStack

An isolated scope stack for per-request or per-task isolation.

ScopeType

Enum identifying the kind of execution scope.

ToolAttributes

Bitflags describing tool call properties.

ToolHandle

An active tool call.

Functions#

get_scope_stack()

Return the current task's active scope stack.

scope_stack_active()

Report whether the current context already owns a scope stack.

propagate_scope_to_thread()

Capture the active scope stack for use in another thread.

create_scope_stack()

Create a new isolated scope stack.

set_thread_scope_stack(stack)

Install a scope stack into the current thread's native runtime context.

Package Contents#

class nemo_flow.AnnotatedLLMRequest(
messages: collections.abc.Sequence[collections.abc.Mapping[str, _JsonValue]],
*,
model: str | None = None,
params: collections.abc.Mapping[str, _JsonValue] | None = None,
tools: collections.abc.Sequence[collections.abc.Mapping[str, _JsonValue]] | None = None,
tool_choice: str | collections.abc.Mapping[str, _JsonValue] | None = None,
extra: collections.abc.Mapping[str, _JsonValue] | None = None,
)#

Structured view of an LLM request produced by a codec.

Summary:

Provider-neutral request view for annotated LLM middleware.

Description:

Codecs decode provider-specific request bodies into this normalized shape so request intercepts can operate on messages, model name, parameters, tools, and extra provider fields.

property messages: list[_JsonObject]#

Return normalized message objects.

property model: str | None#

Return the normalized model name, if present.

property params: _JsonObject | None#

Return provider parameters, if present.

property tools: list[_JsonObject] | None#

Return normalized tool declarations, if present.

property tool_choice: str | _JsonObject | None#

Return the normalized tool-choice directive, if present.

property extra: _JsonObject#

Return provider-specific request fields.

system_prompt() str | None#

Return the first normalized system prompt, if one is present.

last_user_message() str | None#

Return the last normalized user message text, if one is present.

has_tool_calls() bool#

Return whether the normalized request includes tool declarations.

class nemo_flow.AnnotatedLLMResponse#

Structured view of an LLM response produced by a response codec.

Summary:

Provider-neutral response view for LLM end-event annotations.

Description:

Response codecs decode provider responses into this normalized shape so subscribers can inspect model, text, tool-call, usage, and provider-specific fields consistently.

property id: str | None#

Return the provider response identifier, if present.

property model: str | None#

Return the provider model name, if present.

property message: _Json | None#

Return the normalized primary message payload, if present.

property tool_calls: list[_JsonObject] | None#

Return normalized tool-call payloads, if present.

property finish_reason: str | None#

Return the provider finish reason, if present.

property usage: _JsonObject | None#

Return normalized usage accounting, if present.

property api_specific: _JsonObject | None#

Return provider-specific response fields, if present.

property extra: _JsonObject#

Return additional normalized response fields.

response_text() str | None#

Return extracted response text, if present.

has_tool_calls() bool#

Return whether the response contains tool-call payloads.

class nemo_flow.AtifExporter(
session_id: str,
agent_name: str,
agent_version: str,
*,
model_name: str | None = None,
tool_definitions: list[_JsonObject] | None = None,
extra: _Json | None = None,
)#

ATIF trajectory exporter that collects events and exports trajectories.

Summary:

Subscriber implementation that accumulates lifecycle events.

Description:

Register the exporter under a subscriber name, run application code, and export the collected event set as an ATIF trajectory dictionary or JSON string.

register(name: str) None#

Register this exporter as a global event subscriber.

deregister(name: str) bool#

Deregister this exporter and return whether a subscriber was removed.

export() _JsonObject#

Return collected events as an ATIF trajectory object.

export_json() str#

Return collected events as an ATIF trajectory JSON string.

clear() None#

Clear collected events without changing subscriber registration.

class nemo_flow.LLMAttributes(value: int = 0)#

Bitflags describing LLM call properties.

Summary:

Compact flag container attached to LLM handles and LLM events.

Description:

LLMAttributes records semantic properties of an LLM call. Values can be created from raw integer bitmasks and combined with bit operators.

Flag constants:

STATEFUL indicates that the LLM call uses stateful context. STREAMING indicates that the LLM call returns a stream.

STATEFUL: int#
STREAMING: int#
property is_stateful: bool#

Return whether the STATEFUL flag is set.

property is_streaming: bool#

Return whether the STREAMING flag is set.

property value: int#

Return the raw integer bitmask.

class nemo_flow.LLMHandle#

An active LLM call.

Summary:

Native handle returned by manual LLM-call start APIs.

Description:

Pass this handle to llm_call_end or wrapper APIs to close the LLM lifecycle span and emit the corresponding end event.

property uuid: str#

Return the globally unique LLM-call identifier.

property name: str#

Return the LLM provider or logical call name.

property attributes: LLMAttributes#

Return the LLM attribute bitmask.

property parent_uuid: str | None#

Return the parent scope UUID, if any.

property data: _Json | None#

Return application data captured on the LLM call, if any.

property metadata: _Json | None#

Return metadata captured on the LLM call, if any.

class nemo_flow.LLMRequest(
headers: collections.abc.Mapping[str, _JsonValue],
content: _JsonObject,
)#

An LLM request carrying headers and a content payload.

Summary:

Provider request object passed through LLM middleware.

Description:

headers stores provider or transport metadata. content stores the JSON request body. Request intercepts and codecs consume this object to normalize, rewrite, and execute LLM calls.

property headers: _JsonObject#

Return the request headers as a JSON object.

property content: _JsonObject#

Return the request content body as a JSON object.

class nemo_flow.MarkEvent#

ATOF point-in-time mark event emitted to subscribers.

Summary:

Event emitted for standalone marks and guardrail rejections.

Description:

Mark events record a named point in time under the current scope hierarchy. They do not have a start/end lifecycle pair.

property kind: Literal['mark']#

Return the discriminant value "mark".

property atof_version: str#

Return the ATOF schema version used for this event.

property parent_uuid: str | None#

Return the parent event UUID, if present.

property uuid: str#

Return the event UUID.

property timestamp: str#

Return the event timestamp as an RFC 3339 string.

property name: str#

Return the mark event name.

property data: _Json | None#

Return application data attached to the event, if any.

property metadata: _Json | None#

Return metadata attached to the event, if any.

property category: str | None#

Return the semantic mark category, if present.

property category_profile: _JsonObject | None#

Return category-specific profile data, if any.

property data_schema: _JsonObject | None#

Return a schema descriptor for data, if one is present.

class nemo_flow.OpenInferenceConfig#

Mutable configuration for OpenInferenceSubscriber.

Summary:

Native OpenInference exporter configuration object.

Description:

Configure transport, endpoint, service identity, exporter timeout, headers, and resource attributes before constructing a subscriber.

transport: str#
endpoint: str | None#
service_name: str#
service_namespace: str | None#
service_version: str | None#
instrumentation_scope: str#
timeout_millis: int#
property headers: dict[str, str]#

Return additional exporter headers.

property resource_attributes: dict[str, str]#

Return additional OpenInference resource attributes.

set_header(key: str, value: str) None#

Set one exporter header key/value pair.

set_resource_attribute(key: str, value: str) None#

Set one OpenInference resource attribute key/value pair.

class nemo_flow.OpenInferenceSubscriber(config: OpenInferenceConfig)#

OpenInference-backed NeMo Flow event subscriber.

Summary:

Native subscriber that exports lifecycle events as OpenInference spans.

Description:

Register the subscriber under a name to receive runtime events. Flush or shut it down before process exit when deterministic export is required.

register(name: str) None#

Register the subscriber under name.

deregister(name: str) bool#

Deregister name and return whether it existed.

force_flush() None#

Flush pending telemetry through the configured exporter.

shutdown() None#

Shut down native OpenInference resources.

class nemo_flow.OpenTelemetryConfig#

Mutable configuration for OpenTelemetrySubscriber.

Summary:

Native OpenTelemetry exporter configuration object.

Description:

Configure transport, endpoint, service identity, exporter timeout, headers, and resource attributes before constructing a subscriber.

transport: str#
endpoint: str | None#
service_name: str#
service_namespace: str | None#
service_version: str | None#
instrumentation_scope: str#
timeout_millis: int#
property headers: dict[str, str]#

Return additional exporter headers.

property resource_attributes: dict[str, str]#

Return additional OpenTelemetry resource attributes.

set_header(key: str, value: str) None#

Set one exporter header key/value pair.

set_resource_attribute(key: str, value: str) None#

Set one OpenTelemetry resource attribute key/value pair.

class nemo_flow.OpenTelemetrySubscriber(config: OpenTelemetryConfig)#

OpenTelemetry-backed NeMo Flow event subscriber.

Summary:

Native subscriber that exports lifecycle events as OpenTelemetry spans.

Description:

Register the subscriber under a name to receive runtime events. Flush or shut it down before process exit when deterministic export is required.

register(name: str) None#

Register the subscriber under name.

deregister(name: str) bool#

Deregister name and return whether it existed.

force_flush() None#

Flush pending telemetry through the configured exporter.

shutdown() None#

Shut down native OpenTelemetry resources.

class nemo_flow.ScopeAttributes(value: int = 0)#

Bitflags describing scope properties.

Summary:

Compact flag container attached to scope handles and scope events.

Description:

ScopeAttributes values can be created from a raw integer bitmask and combined with | or intersected with &. The native runtime uses these flags to record semantic execution properties on scopes.

Flag constants:

PARALLEL indicates child work may execute in parallel. RELOCATABLE indicates work may move to another execution context.

Exceptional flow:

Construction or bit operations may raise native type errors when values cannot be converted to the expected bitmask representation.

PARALLEL: int#
RELOCATABLE: int#
property is_parallel: bool#

Return whether the PARALLEL flag is set.

property is_relocatable: bool#

Return whether the RELOCATABLE flag is set.

property value: int#

Return the raw integer bitmask.

class nemo_flow.ScopeEvent#

ATOF scope lifecycle event emitted to subscribers.

Summary:

Event emitted when a scope, tool call, or LLM call starts or ends.

Description:

Scope events contain hierarchy identifiers, semantic category data, user payloads, metadata, and optional normalized LLM request/response annotations.

property kind: Literal['scope']#

Return the discriminant value "scope".

property scope_category: Literal['start', 'end']#

Return whether this is a start or end lifecycle event.

property atof_version: str#

Return the ATOF schema version used for this event.

property parent_uuid: str | None#

Return the parent event UUID, if present.

property uuid: str#

Return the event UUID.

property timestamp: str#

Return the event timestamp as an RFC 3339 string.

property name: str#

Return the event or scope name.

property data: _Json | None#

Return application data attached to the event, if any.

property metadata: _Json | None#

Return metadata attached to the event, if any.

property attributes: list[str]#

Return stringified semantic attributes attached to the event.

property category: str#

Return the semantic event category.

property category_profile: _JsonObject | None#

Return category-specific profile data, if any.

property data_schema: _JsonObject | None#

Return a schema descriptor for data, if one is present.

property annotated_request: AnnotatedLLMRequest | None#

Return the normalized LLM request annotation, if present.

property annotated_response: AnnotatedLLMResponse | None#

Return the normalized LLM response annotation, if present.

class nemo_flow.ScopeHandle#

An active execution scope in the scope stack.

Summary:

Immutable native handle returned by scope creation APIs.

Description:

A ScopeHandle identifies one pushed scope. Pass it back to pop_scope or wrapper APIs to close the scope, attach child work, or register scope-local middleware.

Exceptional flow:

Accessing properties may propagate native errors if the handle has been invalidated by the runtime.

property uuid: str#

Return the globally unique scope identifier.

property name: str#

Return the human-readable scope name.

property scope_type: ScopeType#

Return the semantic scope type.

property attributes: ScopeAttributes#

Return the scope attribute bitmask.

property parent_uuid: str | None#

Return the parent scope UUID, or None for a root scope.

property data: _Json | None#

Return application data captured on the scope, if any.

property metadata: _Json | None#

Return metadata captured on the scope, if any.

class nemo_flow.ScopeStack#

An isolated scope stack for per-request or per-task isolation.

Summary:

Native stack object containing active scope hierarchy state.

Description:

Scope stacks are installed into Python context variables or native thread-local storage so nested scope, tool, and LLM events share the correct hierarchy.

class nemo_flow.ScopeType#

Enum identifying the kind of execution scope.

Summary:

Native scope category used when creating scopes.

Description:

The selected ScopeType is recorded on handles and emitted events so subscribers can distinguish agents, tools, LLM calls, guardrails, and other semantic units of work.

Agent: ScopeType#

Autonomous agent scope.

Function: ScopeType#

Generic function-call scope.

Tool: ScopeType#

Tool invocation scope.

Llm: ScopeType#

LLM call scope.

Retriever: ScopeType#

Retriever or RAG lookup scope.

Embedder: ScopeType#

Embedding model scope.

Reranker: ScopeType#

Reranking model scope.

Guardrail: ScopeType#

Guardrail evaluation scope.

Evaluator: ScopeType#

Evaluator or judge scope.

Custom: ScopeType#

User-defined scope type.

Unknown: ScopeType#

Unknown or unspecified scope type.

class nemo_flow.ToolAttributes(value: int = 0)#

Bitflags describing tool call properties.

Summary:

Compact flag container attached to tool handles and tool events.

Description:

ToolAttributes records semantic properties of a tool call. Values can be created from raw integer bitmasks and combined with bit operators.

Flag constants:

REMOTE indicates that the tool call is remote.

REMOTE: int#
property is_remote: bool#

Return whether the REMOTE flag is set.

property value: int#

Return the raw integer bitmask.

class nemo_flow.ToolHandle#

An active tool call.

Summary:

Native handle returned by manual tool-call start APIs.

Description:

Pass this handle to tool_call_end or wrapper APIs to close the tool lifecycle span and emit the corresponding end event.

property uuid: str#

Return the globally unique tool-call identifier.

property name: str#

Return the tool name.

property attributes: ToolAttributes#

Return the tool attribute bitmask.

property parent_uuid: str | None#

Return the parent scope UUID, if any.

property data: _Json | None#

Return application data captured on the tool call, if any.

property metadata: _Json | None#

Return metadata captured on the tool call, if any.

type nemo_flow.JsonPrimitive = str | int | float | bool | None#
type nemo_flow.JsonValue = JsonPrimitive | list['JsonValue'] | dict[str, 'JsonValue']#
type nemo_flow.JsonObject = dict[str, JsonValue]#
type nemo_flow.Json = JsonValue#
type nemo_flow.UnsupportedBehavior = Literal['ignore', 'warn', 'error']#
type nemo_flow.ToolSanitizeGuardrail = Callable[[str, Json], Json]#
type nemo_flow.ToolConditionalExecutionGuardrail = Callable[[str, Json], str | None]#
type nemo_flow.LlmSanitizeRequestGuardrail = Callable[[LLMRequest], LLMRequest]#
type nemo_flow.LlmSanitizeResponseGuardrail = Callable[[JsonObject], JsonObject]#
type nemo_flow.LlmConditionalExecutionGuardrail = Callable[[LLMRequest], str | None]#
type nemo_flow.ToolRequestIntercept = AbcCallable[[str, Json], Json]#
type nemo_flow.ToolExecutionIntercept = Callable[[str, Json, Callable[[Json], Awaitable[Json]]], Json | Awaitable[Json]]#
type nemo_flow.LlmRequestIntercept = Callable[[str, LLMRequest, AnnotatedLLMRequest | None], tuple[LLMRequest, AnnotatedLLMRequest | None]]#
type nemo_flow.LlmExecutionIntercept = Callable[[str, LLMRequest, Callable[[LLMRequest], Awaitable[Json]]], Json | Awaitable[Json]]#
type nemo_flow.LlmStreamExecutionIntercept = Callable[[LLMRequest, Callable[[LLMRequest], Awaitable[AsyncIterator[Json]]]], AsyncIterator[Json] | Awaitable[AsyncIterator[Json]]]#
nemo_flow.get_scope_stack() _native.ScopeStack#

Return the current task’s active scope stack.

If the current async context does not yet own a scope stack, this function creates one and synchronizes it into the Rust thread-local storage used by the native runtime. Most callers do not need to invoke this directly because higher-level helpers such as nemo_flow.scope.push() do it automatically.

Returns:

The scope stack associated with the current Python context.

Return type:

ScopeStack

Raises:

Exception – Propagates any exception raised by native scope-stack creation or synchronization.

Behavior:

The function first checks the Python ContextVar. If no stack is present, it creates one with the native runtime, stores it in the current context, and synchronizes that stack into native thread-local storage before returning.

Notes

Calling this function synchronizes the Python ContextVar state into the native thread-local slot so subsequent native runtime calls observe the same scope hierarchy.

Example:

import nemo_flow

stack = nemo_flow.get_scope_stack()
assert stack is not None
nemo_flow.scope_stack_active() bool#

Report whether the current context already owns a scope stack.

Returns:

True when the current Python context already has an active stack, either because it was created in this context or because a stack was explicitly installed for the current thread.

Return type:

bool

Raises:

Exception – Propagates any exception raised by the native active-stack status check.

Behavior:

The Python ContextVar is checked first. If it has no stack, the native runtime is asked whether the current thread has an explicitly active stack.

Notes

This function does not create a scope stack. It is a pure status check used to decide whether scope propagation work is required.

Example:

import nemo_flow

assert nemo_flow.scope_stack_active() is False
nemo_flow.get_scope_stack()
assert nemo_flow.scope_stack_active() is True
nemo_flow.propagate_scope_to_thread() _native.ScopeStack#

Capture the active scope stack for use in another thread.

The returned stack can be passed to set_thread_scope_stack() inside a worker thread so that the worker emits events into the same scope hierarchy as the parent context.

Returns:

The active stack from the current context.

Return type:

ScopeStack

Raises:
  • RuntimeError – If the current context does not yet have an active scope stack to propagate.

  • Exception – Propagates any exception raised while synchronizing an already-active native stack into the Python context.

Behavior:

This function does not clone the scope hierarchy. It shares the current stack reference with the target thread, which is appropriate when the worker should contribute events to the same logical trace.

Example:

from concurrent.futures import ThreadPoolExecutor

import nemo_flow

with nemo_flow.scope.scope("parent", nemo_flow.ScopeType.Agent) as handle:
    stack = nemo_flow.propagate_scope_to_thread()

    def worker() -> None:
        nemo_flow.set_thread_scope_stack(stack)
        nemo_flow.scope.event(
            "worker-ran",
            handle=handle,
            data={"source": "thread"},
            metadata={"thread": "pool-1"},
        )

    with ThreadPoolExecutor() as pool:
        pool.submit(worker).result()
nemo_flow.create_scope_stack() _native.ScopeStack#

Create a new isolated scope stack.

Returns:

A fresh scope stack that is not yet attached to the current Python context or thread.

Return type:

ScopeStack

Raises:

Exception – Propagates any native error raised while allocating the stack.

Behavior:

This is a direct top-level wrapper around the native stack factory. It does not mutate the Python ContextVar and does not install the stack into native thread-local storage.

Notes

Use this helper when you need explicit scope isolation, such as test fixtures, manual context propagation, or framework-managed request boundaries. Most application code should prefer get_scope_stack() so the current context is initialized lazily.

Example:

import nemo_flow

stack = nemo_flow.create_scope_stack()
nemo_flow.set_thread_scope_stack(stack)
nemo_flow.set_thread_scope_stack(stack: _native.ScopeStack) None#

Install a scope stack into the current thread’s native runtime context.

Parameters:

stack – Scope stack that should become active for subsequent NeMo Flow API calls on the current thread.

Returns:

This function returns after the native thread-local slot has been updated.

Return type:

None

Raises:

Exception – Propagates native errors raised when installing stack.

Behavior:

The supplied stack is installed into the native thread-local slot for the current OS thread. The function does not create, clone, or validate a Python ContextVar entry.

Notes

This helper is primarily used when propagating an existing logical trace into worker threads. It does not create or clone a scope stack; it installs the supplied stack reference for the current thread.

Example:

from concurrent.futures import ThreadPoolExecutor

import nemo_flow

with nemo_flow.scope.scope("parent", nemo_flow.ScopeType.Agent):
    stack = nemo_flow.propagate_scope_to_thread()

    def worker() -> None:
        nemo_flow.set_thread_scope_stack(stack)

    with ThreadPoolExecutor() as pool:
        pool.submit(worker).result()
nemo_flow.Event#