Code Examples#
Use these examples when you need the direct runtime surfaces behind the application instrumentation guides.
Invocation API Selection#
The following table shows which API to use based on your integration need:
Need |
Preferred API |
Use When |
|---|---|---|
Run a tool with full instrumentation |
|
Application code owns the callback. |
Run an LLM call with full instrumentation |
|
Application code owns the provider call. |
Run a streaming LLM call |
|
You need chunk collection and one final aggregate end event. |
Emit start/end manually |
|
A framework owns the real invocation boundary. |
Emit a checkpoint |
|
You need milestone visibility inside an active scope. |
Attach work to one request |
Scope-local registration helpers |
Middleware or subscribers should disappear when that scope closes. |
Manual Tool Lifecycle#
Use manual lifecycle calls only when the surrounding code owns the real tool invocation and only exposes reliable start and finish hooks.
If you are replaying events or bridging a framework clock, pass an explicit timestamp to the manual start, end, or mark helpers.
Python accepts timezone-aware datetime values, Node.js and WebAssembly accept Unix microseconds since epoch, Rust accepts DateTime<Utc>, and Go accepts time.Time.
import nemo_flow
handle = nemo_flow.tools.call("search", {"query": "weather"}, data={"attempt": 1})
try:
result = {"hits": 2}
finally:
nemo_flow.tools.call_end(handle, result)
import { toolCall, toolCallEnd } from 'nemo-flow-node';
const handle = toolCall('search', { query: 'weather' }, null, null, { attempt: 1 }, null, null);
const result = { hits: 2 };
toolCallEnd(handle, result, null, null);
use nemo_flow::api::tool::{tool_call, tool_call_end, ToolCallEndParams, ToolCallParams};
use serde_json::json;
let handle = tool_call(
ToolCallParams::builder()
.name("search")
.args(json!({"query": "weather"}))
.data(json!({"attempt": 1}))
.build(),
)?;
tool_call_end(
ToolCallEndParams::builder()
.handle(&handle)
.result(json!({"hits": 2}))
.build(),
)?;
Managed LLM Execution#
Use managed execution when NeMo Flow should run the full middleware pipeline around the provider call.
import nemo_flow
from nemo_flow import LLMRequest
request = LLMRequest({}, {"messages": [{"role": "user", "content": "hello"}]})
async def invoke(req: LLMRequest):
return {"text": "hi", "request": req.content}
response = await nemo_flow.llm.execute(
"demo-provider",
request,
invoke,
model_name="demo-model",
)
import { LlmRequest, llmCallExecute } from 'nemo-flow-node';
const request = new LlmRequest({}, { messages: [{ role: 'user', content: 'hello' }] });
const response = await llmCallExecute(
'demo-provider',
request,
async (req: LlmRequest) => ({ text: 'hi', request: req.content }),
null,
null,
null,
null,
'demo-model',
);
use nemo_flow::api::llm::{llm_call_execute, LlmCallExecuteParams, LlmRequest};
use serde_json::json;
use std::sync::Arc;
let request = LlmRequest {
headers: Default::default(),
content: json!({"messages": [{"role": "user", "content": "hello"}]}),
};
let response = llm_call_execute(
LlmCallExecuteParams::builder()
.name("demo-provider")
.request(request)
.func(Arc::new(|req| Box::pin(async move {
Ok(json!({"text": "hi", "request": req.content}))
})))
.model_name("demo-model")
.build(),
).await?;
Streaming LLM Execution#
Use the streaming helper when subscribers need chunk collection plus one final response payload.
from dataclasses import dataclass
from nemo_flow import LLMRequest
from nemo_flow.typed import DataclassCodec, llm_stream_execute
@dataclass
class Chunk:
delta: str
@dataclass
class FinalResponse:
text: str
request = LLMRequest({}, {"messages": [{"role": "user", "content": "hello"}]})
collected: list[Chunk] = []
async def stream_impl(_request: LLMRequest):
yield Chunk(delta="hi")
stream = await llm_stream_execute(
"demo-provider",
request,
stream_impl,
collector=collected.append,
finalizer=lambda: FinalResponse(text="".join(chunk.delta for chunk in collected)),
chunk_json_codec=DataclassCodec(Chunk),
response_json_codec=DataclassCodec(FinalResponse),
)
import { LlmRequest } from 'nemo-flow-node';
import { typedLlmStreamExecute, type Codec } from 'nemo-flow-node/typed';
type Chunk = { delta: string };
type FinalResponse = { text: string };
const chunkCodec: Codec<Chunk> = {
toJson: (value) => value,
fromJson: (json) => json as Chunk,
};
const responseCodec: Codec<FinalResponse> = {
toJson: (value) => value,
fromJson: (json) => json as FinalResponse,
};
const request = new LlmRequest({}, { messages: [{ role: 'user', content: 'hello' }] });
const collected: Chunk[] = [];
const stream = await typedLlmStreamExecute(
'demo-provider',
request,
async function* () {
yield { delta: 'hi' };
},
(chunk) => collected.push(chunk),
() => ({ text: collected.map((chunk) => chunk.delta).join('') }),
chunkCodec,
responseCodec,
);
use nemo_flow::api::llm::{
llm_stream_call_execute, LlmAttributes, LlmRequest, LlmStreamCallExecuteParams,
};
use serde_json::json;
let request = LlmRequest {
headers: Default::default(),
content: json!({"messages": [{"role": "user", "content": "hello"}]}),
};
let stream = llm_stream_call_execute(
LlmStreamCallExecuteParams::builder()
.name("demo-provider")
.request(request)
.func(std::sync::Arc::new(|_req| Box::pin(async move {
Ok(Box::pin(tokio_stream::iter(vec![Ok(json!({"delta": "hi"}))])))
})))
.collector(Box::new(|_chunk| Ok(())))
.finalizer(Box::new(|| json!({"text": "hi"})))
.attributes(LlmAttributes::STREAMING)
.model_name("demo-model")
.build(),
).await?;
Partial Middleware Calls#
These helpers are useful when framework code cannot use managed execution but still wants a request rewrite or block decision.
import nemo_flow
from nemo_flow import LLMRequest
tool_args = nemo_flow.tools.request_intercepts("search", {"query": "weather"})
nemo_flow.tools.conditional_execution("search", tool_args)
llm_request = LLMRequest({}, {"messages": [{"role": "user", "content": "hello"}]})
llm_request = nemo_flow.llm.request_intercepts("demo-provider", llm_request)
nemo_flow.llm.conditional_execution(llm_request)
import {
LlmRequest,
llmConditionalExecution,
llmRequestIntercepts,
toolConditionalExecution,
toolRequestIntercepts,
} from 'nemo-flow-node';
const toolArgs = await toolRequestIntercepts('search', { query: 'weather' });
await toolConditionalExecution('search', toolArgs);
const request = new LlmRequest({}, { messages: [{ role: 'user', content: 'hello' }] });
const rewritten = await llmRequestIntercepts('demo-provider', request);
await llmConditionalExecution(rewritten);
use nemo_flow::api::llm::{llm_conditional_execution, llm_request_intercepts, LlmRequest};
use nemo_flow::api::tool::{tool_conditional_execution, tool_request_intercepts};
use serde_json::json;
let tool_args = tool_request_intercepts("search", json!({"query": "weather"}))?;
tool_conditional_execution("search", &tool_args)?;
let request = LlmRequest {
headers: Default::default(),
content: json!({"messages": [{"role": "user", "content": "hello"}]}),
};
let rewritten = llm_request_intercepts("demo-provider", request)?;
llm_conditional_execution(&rewritten)?;
Scope and Context Helpers#
Use normal scope helpers first. Reach for explicit stack helpers only when work crosses thread, task, worker, or request boundaries.
from concurrent.futures import ThreadPoolExecutor
import nemo_flow
with nemo_flow.scope.scope("request", nemo_flow.ScopeType.Agent):
nemo_flow.scope.event("started", data={"ok": True})
shared = nemo_flow.propagate_scope_to_thread()
def worker() -> None:
nemo_flow.set_thread_scope_stack(shared)
nemo_flow.scope.event("worker-ran")
with ThreadPoolExecutor() as pool:
pool.submit(worker).result()
import { ScopeType, createScopeStack, event, setThreadScopeStack, withScope } from 'nemo-flow-node';
const workerStack = createScopeStack();
setThreadScopeStack(workerStack);
await withScope('request', ScopeType.Agent, async (handle) => {
event('started', handle, { ok: true }, null);
});
use nemo_flow::api::runtime::{create_scope_stack, set_thread_scope_stack, TASK_SCOPE_STACK};
use nemo_flow::api::scope::{event, EmitMarkEventParams};
use serde_json::json;
let stack = create_scope_stack();
TASK_SCOPE_STACK
.scope(stack.clone(), async {
event(EmitMarkEventParams::builder().name("started").data(json!({"ok": true})).build())
})
.await?;
std::thread::spawn(move || {
set_thread_scope_stack(stack);
// NeMo Flow calls in this thread attach to the same explicit stack.
})
.join()
.unwrap();
Middleware Registration Families#
The runtime exposes the same registration families for tool and LLM calls:
Sanitize-request guardrails change emitted start-event payloads only
Sanitize-response guardrails change emitted end-event payloads only
Conditional-execution guardrails return an allow-or-block decision
Request intercepts change the real request before execution
Execution intercepts wrap the callback and may post-process or short-circuit
LLM stream execution intercepts wrap streaming provider callbacks
Every family also has a scope-local surface:
Python:
nemo_flow.scope_local.register_*Node.js:
scopeRegister*Rust: middleware
scope_register_*functions undernemo_flow::api::registry; subscriber scope registration undernemo_flow::api::subscriber
Use Advanced Guide: Add Middleware for an end-to-end policy example and API Reference for symbol-level details.