Advanced Guide: Add Middleware#

Use this guide when instrumentation is working and you want NeMo Flow to enforce policy, transform requests, wrap execution, or sanitize observability payloads around tool and LLM calls.

What You Build#

You will add middleware to an instrumented application and verify that it runs in the expected part of the pipeline:

  • Request intercepts transform the real request before execution.

  • Sanitize guardrails transform only the payload recorded on events.

  • Conditional-execution guardrails can block execution.

  • Execution intercepts wrap the callback and can add timing, retries, routing, or fallback behavior.

Before You Start#

Complete Basic Guide: Instrument a Tool Call or Basic Guide: Instrument an LLM Call. Middleware only runs when the call goes through a NeMo Flow managed lifecycle API.

Choose the Middleware Type#

Use this table to match the behavior you need with the correct middleware family.

Need

Middleware Type

Changes Real Execution

Redact event payloads

Sanitize-request or sanitize-response guardrail

No

Normalize tool arguments or model requests

Request intercept

Yes

Block unsafe or invalid work

Conditional-execution guardrail

Yes, by rejecting

Add timing, retries, routing, or fallback

Execution intercept

Yes

Wrap streaming model output

LLM stream execution intercept

Yes

Use the narrowest middleware type that matches the behavior. For example, do not use a request intercept when you only need to hide a secret from exported events.

Add a Tool Policy#

This example adds three behaviors around a search tool:

  • Redact api_key from emitted request events.

  • Reject empty queries before execution.

  • Measure execution duration.

import time

import nemo_flow


def redact_api_key(tool_name, args):
    safe_args = dict(args)
    if "api_key" in safe_args:
        safe_args["api_key"] = "<redacted>"
    return safe_args


def require_query(tool_name, args):
    if not args.get("query"):
        return "search.query is required"
    return None


async def measure_tool(tool_name, args, next_call):
    started = time.perf_counter()
    try:
        return await next_call(args)
    finally:
        elapsed_ms = round((time.perf_counter() - started) * 1000, 2)
        print(f"{tool_name} completed in {elapsed_ms} ms")


nemo_flow.guardrails.register_tool_sanitize_request("search.redact_api_key", 10, redact_api_key)
nemo_flow.guardrails.register_tool_conditional_execution("search.require_query", 20, require_query)
nemo_flow.intercepts.register_tool_execution("search.measure", 30, measure_tool)
const {
  registerToolConditionalExecutionGuardrail,
  registerToolExecutionIntercept,
  registerToolSanitizeRequestGuardrail,
} = require("nemo-flow-node");

registerToolSanitizeRequestGuardrail("search.redact_api_key", 10, (_toolName, args) => {
  if (!args.api_key) {
    return args;
  }
  return { ...args, api_key: "<redacted>" };
});

registerToolConditionalExecutionGuardrail("search.require_query", 20, (_toolName, args) => (
  args.query ? null : "search.query is required"
));

registerToolExecutionIntercept("search.measure", 30, async (args, next) => {
  const started = performance.now();
  try {
    return await next(args);
  } finally {
    console.log(`search completed in ${Math.round(performance.now() - started)} ms`);
  }
});
use nemo_flow::api::registry::{
    register_tool_conditional_execution_guardrail,
    register_tool_execution_intercept,
    register_tool_sanitize_request_guardrail,
};
use serde_json::json;
use std::sync::Arc;
use std::time::Instant;

register_tool_sanitize_request_guardrail(
    "search.redact_api_key",
    10,
    Arc::new(|_tool_name, mut args| {
        if let Some(object) = args.as_object_mut() {
            if object.contains_key("api_key") {
                object.insert("api_key".into(), json!("<redacted>"));
            }
        }
        args
    }),
)?;

register_tool_conditional_execution_guardrail(
    "search.require_query",
    20,
    Arc::new(|_tool_name, args| {
        Ok(match args.get("query").and_then(|value| value.as_str()) {
            Some(query) if !query.is_empty() => None,
            _ => Some("search.query is required".into()),
        })
    }),
)?;

register_tool_execution_intercept(
    "search.measure",
    30,
    Arc::new(|name, args, next| {
        Box::pin(async move {
            let started = Instant::now();
            let result = next(name.clone(), args).await;
            println!("{name} completed in {:?}", started.elapsed());
            result
        })
    }),
)?;

Scope Middleware to One Request#

Use scope-local middleware when a policy applies only to one request, tenant, experiment, or agent run.

  1. Create or receive the active scope handle.

  2. Register middleware with the scope-local helper for that handle.

  3. Execute tools or LLM calls inside that scope.

  4. Let the scope end remove the scope-local registrations automatically.

Use global middleware for process-wide behavior, such as organization-wide redaction. Use scope-local middleware for request-specific policy, such as tenant routing or an A/B test.

Middleware Registration Families#

NeMo Flow exposes the same core middleware families for tools and LLMs:

Family

Tool Registration

LLM Registration

Changes Real Execution

Sanitize request

register_tool_sanitize_request

register_llm_sanitize_request

No

Sanitize response

register_tool_sanitize_response

register_llm_sanitize_response

No

Conditional execution

register_tool_conditional_execution

register_llm_conditional_execution

Yes, by rejecting

Request intercept

register_tool_request

register_llm_request

Yes

Execution intercept

register_tool_execution

register_llm_execution

Yes

Stream execution intercept

Not applicable

register_llm_stream_execution

Yes

Sanitize guardrails affect only the payload recorded on emitted events. Request intercepts affect the real request that reaches the tool or provider. Execution intercepts wrap the callback itself and are only available when the invocation uses managed execution.

Scope-local variants are available through nemo_flow.scope_local.register_*, Node.js scopeRegister* helpers, and Rust scope_register_* functions.

Validate the Middleware#

Run one allowed request and one rejected request:

  • The allowed request should return the same business result as before.

  • The rejected request should fail before the tool callback executes.

  • Subscriber output should show redacted api_key values.

  • The timing intercept should print once for each executed tool call.

Debug Middleware Order#

Middleware runs by ascending priority inside each middleware family. Families run in this order for managed tool calls:

  1. Conditional-execution guardrails.

  2. Request intercepts.

  3. Sanitize-request guardrails for emitted start events.

  4. Execution intercepts and the real callback.

  5. Sanitize-response guardrails for emitted end events.

If a later middleware does not run, check whether an earlier conditional-execution guardrail rejected the call or a request intercept raised an error.

Common Issues#

Check these symptoms first when the workflow does not behave as expected.

  • Sanitized data reaches the real tool: Use a sanitize guardrail only for event payloads. Use a request intercept when the real request should change.

  • Middleware affects unrelated requests: Register it scope-locally instead of globally.

  • Duplicate names replace behavior: Middleware names are registry keys. Use stable, unique names for each behavior.

  • Execution intercept never prints: Confirm that the application uses the managed execute helper and that no guardrail rejected the request.

Next Steps#

Use these links to continue from this workflow into the next related task.