Advanced Guide: Add Middleware#
Use this guide when instrumentation is working and you want NeMo Flow to enforce policy, transform requests, wrap execution, or sanitize observability payloads around tool and LLM calls.
What You Build#
You will add middleware to an instrumented application and verify that it runs in the expected part of the pipeline:
Request intercepts transform the real request before execution.
Sanitize guardrails transform only the payload recorded on events.
Conditional-execution guardrails can block execution.
Execution intercepts wrap the callback and can add timing, retries, routing, or fallback behavior.
Before You Start#
Complete Basic Guide: Instrument a Tool Call or Basic Guide: Instrument an LLM Call. Middleware only runs when the call goes through a NeMo Flow managed lifecycle API.
Choose the Middleware Type#
Use this table to match the behavior you need with the correct middleware family.
Need |
Middleware Type |
Changes Real Execution |
|---|---|---|
Redact event payloads |
Sanitize-request or sanitize-response guardrail |
No |
Normalize tool arguments or model requests |
Request intercept |
Yes |
Block unsafe or invalid work |
Conditional-execution guardrail |
Yes, by rejecting |
Add timing, retries, routing, or fallback |
Execution intercept |
Yes |
Wrap streaming model output |
LLM stream execution intercept |
Yes |
Use the narrowest middleware type that matches the behavior. For example, do not use a request intercept when you only need to hide a secret from exported events.
Add a Tool Policy#
This example adds three behaviors around a search tool:
Redact
api_keyfrom emitted request events.Reject empty queries before execution.
Measure execution duration.
import time
import nemo_flow
def redact_api_key(tool_name, args):
safe_args = dict(args)
if "api_key" in safe_args:
safe_args["api_key"] = "<redacted>"
return safe_args
def require_query(tool_name, args):
if not args.get("query"):
return "search.query is required"
return None
async def measure_tool(tool_name, args, next_call):
started = time.perf_counter()
try:
return await next_call(args)
finally:
elapsed_ms = round((time.perf_counter() - started) * 1000, 2)
print(f"{tool_name} completed in {elapsed_ms} ms")
nemo_flow.guardrails.register_tool_sanitize_request("search.redact_api_key", 10, redact_api_key)
nemo_flow.guardrails.register_tool_conditional_execution("search.require_query", 20, require_query)
nemo_flow.intercepts.register_tool_execution("search.measure", 30, measure_tool)
const {
registerToolConditionalExecutionGuardrail,
registerToolExecutionIntercept,
registerToolSanitizeRequestGuardrail,
} = require("nemo-flow-node");
registerToolSanitizeRequestGuardrail("search.redact_api_key", 10, (_toolName, args) => {
if (!args.api_key) {
return args;
}
return { ...args, api_key: "<redacted>" };
});
registerToolConditionalExecutionGuardrail("search.require_query", 20, (_toolName, args) => (
args.query ? null : "search.query is required"
));
registerToolExecutionIntercept("search.measure", 30, async (args, next) => {
const started = performance.now();
try {
return await next(args);
} finally {
console.log(`search completed in ${Math.round(performance.now() - started)} ms`);
}
});
use nemo_flow::api::registry::{
register_tool_conditional_execution_guardrail,
register_tool_execution_intercept,
register_tool_sanitize_request_guardrail,
};
use serde_json::json;
use std::sync::Arc;
use std::time::Instant;
register_tool_sanitize_request_guardrail(
"search.redact_api_key",
10,
Arc::new(|_tool_name, mut args| {
if let Some(object) = args.as_object_mut() {
if object.contains_key("api_key") {
object.insert("api_key".into(), json!("<redacted>"));
}
}
args
}),
)?;
register_tool_conditional_execution_guardrail(
"search.require_query",
20,
Arc::new(|_tool_name, args| {
Ok(match args.get("query").and_then(|value| value.as_str()) {
Some(query) if !query.is_empty() => None,
_ => Some("search.query is required".into()),
})
}),
)?;
register_tool_execution_intercept(
"search.measure",
30,
Arc::new(|name, args, next| {
Box::pin(async move {
let started = Instant::now();
let result = next(name.clone(), args).await;
println!("{name} completed in {:?}", started.elapsed());
result
})
}),
)?;
Scope Middleware to One Request#
Use scope-local middleware when a policy applies only to one request, tenant, experiment, or agent run.
Create or receive the active scope handle.
Register middleware with the scope-local helper for that handle.
Execute tools or LLM calls inside that scope.
Let the scope end remove the scope-local registrations automatically.
Use global middleware for process-wide behavior, such as organization-wide redaction. Use scope-local middleware for request-specific policy, such as tenant routing or an A/B test.
Middleware Registration Families#
NeMo Flow exposes the same core middleware families for tools and LLMs:
Family |
Tool Registration |
LLM Registration |
Changes Real Execution |
|---|---|---|---|
Sanitize request |
|
|
No |
Sanitize response |
|
|
No |
Conditional execution |
|
|
Yes, by rejecting |
Request intercept |
|
|
Yes |
Execution intercept |
|
|
Yes |
Stream execution intercept |
Not applicable |
|
Yes |
Sanitize guardrails affect only the payload recorded on emitted events. Request intercepts affect the real request that reaches the tool or provider. Execution intercepts wrap the callback itself and are only available when the invocation uses managed execution.
Scope-local variants are available through nemo_flow.scope_local.register_*, Node.js scopeRegister* helpers, and Rust scope_register_* functions.
Validate the Middleware#
Run one allowed request and one rejected request:
The allowed request should return the same business result as before.
The rejected request should fail before the tool callback executes.
Subscriber output should show redacted
api_keyvalues.The timing intercept should print once for each executed tool call.
Debug Middleware Order#
Middleware runs by ascending priority inside each middleware family. Families run in this order for managed tool calls:
Conditional-execution guardrails.
Request intercepts.
Sanitize-request guardrails for emitted start events.
Execution intercepts and the real callback.
Sanitize-response guardrails for emitted end events.
If a later middleware does not run, check whether an earlier conditional-execution guardrail rejected the call or a request intercept raised an error.
Common Issues#
Check these symptoms first when the workflow does not behave as expected.
Sanitized data reaches the real tool: Use a sanitize guardrail only for event payloads. Use a request intercept when the real request should change.
Middleware affects unrelated requests: Register it scope-locally instead of globally.
Duplicate names replace behavior: Middleware names are registry keys. Use stable, unique names for each behavior.
Execution intercept never prints: Confirm that the application uses the managed execute helper and that no guardrail rejected the request.
Next Steps#
Use these links to continue from this workflow into the next related task.
Use Middleware to review execution order.
Use Code Examples for direct registration and partial-execution examples.
Use Advanced Guide: Handle Non-Serializable Data if middleware needs to work with framework objects.