nv_ingest_api.internal.primitives.tracing package#
Submodules#
nv_ingest_api.internal.primitives.tracing.latency module#
- class nv_ingest_api.internal.primitives.tracing.latency.ColorCodes[source]#
Bases:
object
- BLUE = '\x1b[94m'#
- GREEN = '\x1b[92m'#
- RED = '\x1b[91m'#
- RESET = '\x1b[0m'#
- YELLOW = '\x1b[93m'#
- nv_ingest_api.internal.primitives.tracing.latency.latency_logger(name=None)[source]#
A decorator to log the elapsed time of function execution. If available, it also logs the latency based on ‘latency::ts_send’ metadata in a IngestControlMessage object.
- Parameters:
name (str, optional) – Custom name to use in the log message. Defaults to the function’s name.
nv_ingest_api.internal.primitives.tracing.logging module#
- class nv_ingest_api.internal.primitives.tracing.logging.TaskResultStatus(*values)[source]#
Bases:
Enum
- FAILURE = 'FAILURE'#
- SUCCESS = 'SUCCESS'#
- nv_ingest_api.internal.primitives.tracing.logging.annotate_cm(
- control_message: IngestControlMessage,
- source_id=None,
- **kwargs,
Annotate a IngestControlMessage object with arbitrary metadata, a source ID, and a timestamp. Each annotation will be uniquely identified by a UUID.
Parameters: - control_message: The IngestControlMessage object to be annotated. - source_id: A unique identifier for the source of the annotation. If None, uses the caller’s __name__. - **kwargs: Arbitrary key-value pairs to be included in the annotation.
- nv_ingest_api.internal.primitives.tracing.logging.annotate_task_result(
- control_message,
- result,
- task_id,
- source_id=None,
- **kwargs,
Annotate a IngestControlMessage object with the result of a task, identified by a task_id, and an arbitrary number of additional key-value pairs. The result can be a TaskResultStatus enum or a string that will be converted to the corresponding enum.
Parameters: - control_message: The IngestControlMessage object to be annotated. - result: The result of the task, either SUCCESS or FAILURE, as an enum or string. - task_id: A unique identifier for the task. - **kwargs: Arbitrary additional key-value pairs to be included in the annotation.
nv_ingest_api.internal.primitives.tracing.tagging module#
- nv_ingest_api.internal.primitives.tracing.tagging.set_trace_timestamps_with_parent_context(
- control_message,
- execution_trace_log: dict,
- parent_name: str,
- logger=None,
Set trace timestamps on a control message with proper parent-child context.
This utility function processes trace timestamps from an execution_trace_log and ensures that child traces are properly namespaced under their parent context. This resolves OpenTelemetry span hierarchy issues where child spans cannot find their expected parent contexts.
- Parameters:
control_message (IngestControlMessage) – The control message to set timestamps on
execution_trace_log (dict) – Dictionary of trace keys to timestamp values from internal operations
parent_name (str) – The parent stage name to use as context for child traces
logger (logging.Logger, optional) – Logger for debug output of key transformations
Examples
Basic usage in a stage:
>>> execution_trace_log = {"trace::entry::yolox_inference": ts1, "trace::exit::yolox_inference": ts2} >>> set_trace_timestamps_with_parent_context( ... control_message, execution_trace_log, "pdf_extractor", logger ... )
This transforms: - trace::entry::yolox_inference -> trace::entry::pdf_extractor::yolox_inference - trace::exit::yolox_inference -> trace::exit::pdf_extractor::yolox_inference
- nv_ingest_api.internal.primitives.tracing.tagging.traceable(trace_name: str | None = None)[source]#
A decorator that adds entry and exit trace timestamps to a IngestControlMessage’s metadata based on the presence of a ‘config::add_trace_tagging’ flag.
This decorator checks if the ‘config::add_trace_tagging’ flag is set to True in the message’s metadata. If so, it records the entry and exit timestamps of the function execution, using either a provided custom trace name, auto-detected stage name from self.stage_name, or the function’s name as fallback.
- Parameters:
trace_name (str, optional) – A custom name for the trace entries in the message metadata. If not provided, attempts to use self.stage_name from the decorated method’s instance, falling back to the function’s name if neither is available.
- Returns:
decorator_trace_tagging – A wrapper function that decorates the target function to implement trace tagging.
- Return type:
Callable
Notes
The decorated function must accept a IngestControlMessage object as one of its arguments. For a regular function, this is expected to be the first argument; for a class method, this is expected to be the second argument (after ‘self’). The IngestControlMessage object must implement has_metadata, get_metadata, and set_metadata methods used by the decorator to check for the trace tagging flag and to add trace metadata.
The trace metadata added by the decorator includes two entries: - ‘trace::entry::<trace_name>’: The timestamp marking the function’s entry. - ‘trace::exit::<trace_name>’: The timestamp marking the function’s exit.
Examples
Automatic stage name detection (recommended):
>>> @traceable() # Uses self.stage_name automatically ... def process_message(self, message): ... pass
Explicit trace name (override):
>>> @traceable("custom_trace") ... def process_message(self, message): ... pass
Function without instance (uses function name):
>>> @traceable() ... def process_message(message): ... pass
- nv_ingest_api.internal.primitives.tracing.tagging.traceable_func(trace_name=None, dedupe=True)[source]#
A decorator that injects trace information for tracking the execution of a function. It logs the entry and exit timestamps of the function in a trace_info dictionary, which can be used for performance monitoring or debugging purposes.
- Parameters:
trace_name (str, optional) – An optional string used as the prefix for the trace log entries. If not provided, the decorated function’s name is used. The string can include placeholders (e.g., “pdf_extractor::{model_name}”) that will be dynamically replaced with matching function argument values.
dedupe (bool, optional) – If True, ensures that the trace entry and exit keys are unique by appending an index (e.g., _0, _1) to the keys if duplicate entries are detected. Default is True.
- Returns:
A wrapped function that injects trace information before and after the function’s execution.
- Return type:
function
Notes
If trace_info is not provided in the keyword arguments, a new dictionary is created and used for storing trace entries.
If trace_name contains format placeholders, the decorator attempts to populate them with matching argument values from the decorated function.
- The trace information is logged in the format:
trace::entry::{trace_name} for the entry timestamp.
trace::exit::{trace_name} for the exit timestamp.
If dedupe is True, the trace keys will be appended with an index to avoid overwriting existing entries.
Example
>>> @traceable_func(trace_name="pdf_extractor::{model_name}") >>> def extract_pdf(model_name): ... pass >>> trace_info = {} >>> extract_pdf("my_model", trace_info=trace_info)
In this example, model_name is dynamically replaced in the trace_name, and the trace information is logged with unique keys if deduplication is enabled.