OpenTelemetry#
Use the opentelemetry section when you want NeMo Flow lifecycle events
exported as generic OpenTelemetry Protocol (OTLP) trace spans.
OpenTelemetry export is a good fit when your tracing backend already expects OTLP spans and you want NeMo Flow scopes, tool calls, LLM calls, and marks to appear in the same tracing pipeline as the rest of the application.
plugins.toml Example#
version = 1
[[components]]
kind = "observability"
enabled = true
[components.config]
version = 1
[components.config.opentelemetry]
enabled = true
transport = "http_binary"
endpoint = "http://localhost:4318/v1/traces"
service_name = "agent-service"
service_namespace = "nemo"
service_version = "1.0.0"
instrumentation_scope = "nemo-flow-otel"
timeout_millis = 3000
[components.config.opentelemetry.headers]
authorization = "Bearer <token>"
[components.config.opentelemetry.resource_attributes]
"deployment.environment" = "dev"
This configuration registers a plugin-owned OpenTelemetry subscriber and sends NeMo Flow trace spans to the configured OTLP endpoint.
Fields#
Field |
Default |
Notes |
|---|---|---|
|
|
Must be |
|
|
|
|
Exporter default |
OTLP endpoint. |
|
|
String-to-string exporter headers. |
|
|
String-to-string OTLP resource attributes. |
|
|
|
|
Omitted |
Optional |
|
Omitted |
Optional |
|
Omitted |
Optional instrumentation scope name. |
|
|
Export timeout. |
Expected Output#
The collector should receive OTLP trace export requests. The tracing backend should show spans for NeMo Flow scopes, tools, LLM calls, and marks grouped by root scope.
Register the plugin before the first instrumented request, use stable service identity fields, keep credentials outside source code, and flush during graceful shutdown.
Plugin Configuration#
Use plugin configuration when the application should let NeMo Flow own the OpenTelemetry subscriber lifecycle.
from nemo_flow import plugin
from nemo_flow.observability import ComponentSpec, ObservabilityConfig, OtlpConfig
config = plugin.PluginConfig(
components=[
ComponentSpec(
ObservabilityConfig(
opentelemetry=OtlpConfig(
enabled=True,
transport="http_binary",
endpoint="http://localhost:4318/v1/traces",
service_name="agent-service",
service_namespace="nemo",
service_version="1.0.0",
instrumentation_scope="nemo-flow-otel",
resource_attributes={"deployment.environment": "dev"},
headers={"authorization": "Bearer <token>"},
)
)
)
]
)
report = plugin.validate(config)
if any(diagnostic["level"] == "error" for diagnostic in report["diagnostics"]):
raise RuntimeError(report["diagnostics"])
await plugin.initialize(config)
try:
# Run instrumented application work here.
pass
finally:
plugin.clear()
const plugin = require("nemo-flow-node/plugin");
const observability = require("nemo-flow-node/observability");
await plugin.initialize({
version: 1,
components: [
observability.ComponentSpec({
version: 1,
opentelemetry: observability.otlpConfig({
enabled: true,
transport: "http_binary",
endpoint: "http://localhost:4318/v1/traces",
service_name: "agent-service",
service_namespace: "nemo",
service_version: "1.0.0",
instrumentation_scope: "nemo-flow-otel",
resource_attributes: {
"deployment.environment": "dev",
},
headers: {
authorization: "Bearer <token>",
},
}),
}),
],
});
try {
// Run instrumented application work here.
} finally {
plugin.clear();
}
use nemo_flow::observability::plugin_component::{
ComponentSpec, ObservabilityConfig, OtlpSectionConfig,
};
use nemo_flow::plugin::{initialize_plugins, validate_plugin_config, PluginConfig};
let component = ComponentSpec::new(ObservabilityConfig {
opentelemetry: Some(OtlpSectionConfig {
enabled: true,
transport: "http_binary".into(),
endpoint: Some("http://localhost:4318/v1/traces".into()),
service_name: "agent-service".into(),
service_namespace: Some("nemo".into()),
service_version: Some("1.0.0".into()),
instrumentation_scope: Some("nemo-flow-otel".into()),
resource_attributes: [("deployment.environment".into(), "dev".into())].into(),
headers: [("authorization".into(), "Bearer <token>".into())].into(),
..OtlpSectionConfig::default()
}),
..ObservabilityConfig::default()
});
let config = PluginConfig {
version: 1,
components: vec![component.into()],
policy: Default::default(),
};
let report = validate_plugin_config(&config);
assert!(!report.has_errors());
let active = initialize_plugins(config).await?;
Manual API#
Use the manual subscriber API when you need an explicit subscriber name or
direct force_flush control.
from nemo_flow import OpenTelemetryConfig, OpenTelemetrySubscriber
config = OpenTelemetryConfig()
config.transport = "http_binary"
config.endpoint = "http://localhost:4318/v1/traces"
config.service_name = "agent-service"
config.set_resource_attribute("deployment.environment", "dev")
subscriber = OpenTelemetrySubscriber(config)
subscriber.register("otel-exporter")
# Run instrumented application work here.
subscriber.force_flush()
subscriber.deregister("otel-exporter")
subscriber.shutdown()
const { OpenTelemetrySubscriber } = require("nemo-flow-node");
const subscriber = new OpenTelemetrySubscriber({
transport: "http_binary",
endpoint: "http://localhost:4318/v1/traces",
serviceName: "agent-service",
resourceAttributes: {
"deployment.environment": "dev",
},
});
subscriber.register("otel-exporter");
try {
// Run instrumented application work here.
subscriber.forceFlush();
} finally {
subscriber.deregister("otel-exporter");
subscriber.shutdown();
}
use nemo_flow::observability::otel::{OpenTelemetryConfig, OpenTelemetrySubscriber};
let config = OpenTelemetryConfig::http_binary("agent-service")
.with_endpoint("http://localhost:4318/v1/traces")
.with_resource_attribute("deployment.environment", "dev");
let subscriber = OpenTelemetrySubscriber::new(config)?;
subscriber.register("otel-exporter")?;
// Run instrumented application work here.
subscriber.force_flush()?;
let _ = subscriber.deregister("otel-exporter")?;
subscriber.shutdown()?;
Common Validation Failures#
transportis nothttp_binaryorgrpc.Headers or resource attributes are not string-to-string maps.
The exporter feature is unavailable in the current build or target.
The endpoint is unreachable at runtime.