nv_ingest_api.internal.primitives package#

Subpackages#

Submodules#

nv_ingest_api.internal.primitives.control_message_task module#

pydantic model nv_ingest_api.internal.primitives.control_message_task.ControlMessageTask[source]#

Bases: BaseModel

Show JSON schema
{
   "title": "ControlMessageTask",
   "type": "object",
   "properties": {
      "type": {
         "title": "Type",
         "type": "string"
      },
      "id": {
         "anyOf": [
            {
               "type": "string"
            },
            {
               "format": "uuid",
               "type": "string"
            }
         ],
         "title": "Id"
      },
      "properties": {
         "type": "object",
         "additionalProperties": true,
         "title": "Properties"
      }
   },
   "additionalProperties": false,
   "required": [
      "type",
      "id"
   ]
}

Config:
  • extra: str = forbid

Fields:
field id: str | UUID [Required]#
field properties: Dict[str, Any] [Optional]#
field type: str [Required]#

nv_ingest_api.internal.primitives.ingest_control_message module#

class nv_ingest_api.internal.primitives.ingest_control_message.IngestControlMessage[source]#

Bases: object

A control message class for ingesting tasks and managing associated metadata, timestamps, configuration, and payload.

add_task(
task: ControlMessageTask,
)[source]#

Add a task to the control message. Multiple tasks with the same ID are supported.

config(
config: Dict[str, Any] = None,
) Dict[str, Any][source]#

Get or update the control message configuration.

If ‘config’ is provided, it must be a dictionary. The configuration is updated with the provided values. If no argument is provided, returns a copy of the current configuration.

Raises:

ValueError – If the provided configuration is not a dictionary.

copy() IngestControlMessage[source]#

Create a deep copy of this control message.

filter_timestamp(
regex_filter: str,
) Dict[str, datetime][source]#

Retrieve timestamps whose keys match the regex filter.

get_metadata(
key: str | Pattern = None,
default_value: Any = None,
) Any[source]#

Retrieve metadata. If ‘key’ is None, returns a copy of all metadata.

Parameters:
  • key (str or re.Pattern, optional) – If a string is provided, returns the value for that exact key. If a regex pattern is provided, returns a dictionary of all metadata key-value pairs where the key matches the regex. If no matches are found, returns default_value.

  • default_value (Any, optional) – The value to return if the key is not found or no regex matches.

Returns:

The metadata value for an exact string key, or a dict of matching metadata if a regex is provided.

Return type:

Any

get_tasks() Generator[ControlMessageTask, None, None][source]#

Return all tasks as a generator.

get_timestamp(
key: str,
fail_if_nonexist: bool = False,
) datetime[source]#

Retrieve a timestamp for a given key.

Raises:

KeyError – If the key is not found and ‘fail_if_nonexist’ is True.

get_timestamps() Dict[str, datetime][source]#

Retrieve all timestamps.

has_metadata(key: str | Pattern) bool[source]#

Check if a metadata key exists.

Parameters:

key (str or re.Pattern) – If a string is provided, checks for the exact key. If a regex pattern is provided, returns True if any metadata key matches the regex.

Returns:

True if the key (or any matching key, in case of a regex) exists, False otherwise.

Return type:

bool

has_task(task_id: str) bool[source]#

Check if any tasks with the given ID exist.

list_metadata() list[source]#

List all metadata keys.

payload(
payload: DataFrame = None,
) DataFrame[source]#

Get or set the payload DataFrame.

Raises:

ValueError – If the provided payload is not a pandas DataFrame.

remove_task(
task_id: str,
) ControlMessageTask[source]#

Remove the first task with the given ID. Warns if no task exists.

set_metadata(
key: str,
value: Any,
) None[source]#

Set a metadata key-value pair.

set_timestamp(
key: str,
timestamp: Any,
) None[source]#

Set a timestamp for a given key. Accepts either a datetime object or an ISO format string.

Raises:

ValueError – If the provided timestamp is neither a datetime object nor a valid ISO format string.

nv_ingest_api.internal.primitives.ingest_control_message.remove_all_tasks_by_type(ctrl_msg, task: str)[source]#

Remove all tasks from the control message by matching their type.

This function iterates over the tasks in the control message, finds all tasks whose type matches the provided task string, removes them, and returns their properties as a list.

Parameters:
  • ctrl_msg (IngestControlMessage) – The control message from which to remove the tasks.

  • task (str) – The task type to remove.

Returns:

A list of dictionaries of properties for all removed tasks.

Return type:

list[dict]

Raises:

ValueError – If no tasks with the given type are found.

nv_ingest_api.internal.primitives.ingest_control_message.remove_task_by_type(ctrl_msg, task: str)[source]#

Remove a task from the control message by matching its type.

This function iterates over the tasks in the control message, and if it finds a task whose type matches the provided task string, it removes that task (using its unique id) and returns the task’s properties.

Parameters:
  • ctrl_msg (IngestControlMessage) – The control message from which to remove the task.

  • task (str) – The task type to remove.

Returns:

The properties of the removed task.

Return type:

dict

Raises:

ValueError – If no task with the given type is found.

Module contents#