nv_ingest_api.util.message_brokers.simple_message_broker package#
Submodules#
nv_ingest_api.util.message_brokers.simple_message_broker.broker module#
- class nv_ingest_api.util.message_brokers.simple_message_broker.broker.SimpleMessageBroker(host: str, port: int, max_queue_size: int)[source]#
Bases:
ThreadingMixIn
,TCPServer
A thread-safe message broker server that manages multiple message queues and supports commands such as PUSH, POP, SIZE, and PING.
- allow_reuse_address = True#
- class nv_ingest_api.util.message_brokers.simple_message_broker.broker.SimpleMessageBrokerHandler(request, client_address, server)[source]#
Bases:
BaseRequestHandler
Handles incoming client requests for the SimpleMessageBroker server, processes commands such as PUSH, POP, SIZE, and PING, and manages message queues with thread-safe operations.
nv_ingest_api.util.message_brokers.simple_message_broker.ordered_message_queue module#
nv_ingest_api.util.message_brokers.simple_message_broker.simple_client module#
- class nv_ingest_api.util.message_brokers.simple_message_broker.simple_client.SimpleClient(
- host: str,
- port: int,
- db: int = 0,
- max_retries: int = 3,
- max_backoff: int = 32,
- connection_timeout: int = 300,
- max_pool_size: int = 128,
- use_ssl: bool = False,
Bases:
MessageBrokerClientBase
A client for interfacing with SimpleMessageBroker, creating a new socket connection per request to ensure thread safety and robustness. Respects timeouts for all operations.
- fetch_message(
- queue_name: str,
- timeout: Tuple[int, float | None] | None = (1200, None),
Fetch a message from a specified queue.
- Parameters:
queue_name (str) – The name of the queue.
timeout (tuple, optional) – A tuple containing the timeout value and an unused second element.
- Returns:
The response from the broker.
- Return type:
- get_client()[source]#
Retrieve the current client instance.
- Returns:
The current client instance.
- Return type:
- ping() ResponseSchema [source]#
Ping the broker to check connectivity.
- Returns:
The response indicating the success of the ping operation.
- Return type:
- size(
- queue_name: str,
Fetch the size of the specified queue.
- Parameters:
queue_name (str) – The name of the queue.
- Returns:
The response containing the queue size.
- Return type:
- submit_message(
- queue_name: str,
- message: str,
- timeout: Tuple[int, float] | None = (100, None),
- for_nv_ingest: bool = False,
Submit a message to the specified queue.
- Parameters:
queue_name (str) – The name of the queue.
message (str) – The message to be submitted.
timeout (float, optional) – Timeout in seconds for the operation.
for_nv_ingest (bool, optional) – Indicates whether the message is for NV ingest operations.
- Returns:
The response from the broker.
- Return type:
Module contents#
- pydantic model nv_ingest_api.util.message_brokers.simple_message_broker.ResponseSchema[source]#
Bases:
BaseModel
Show JSON schema
{ "title": "ResponseSchema", "type": "object", "properties": { "response_code": { "title": "Response Code", "type": "integer" }, "response_reason": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": "OK", "title": "Response Reason" }, "response": { "anyOf": [ { "type": "string" }, { "additionalProperties": true, "type": "object" }, { "type": "null" } ], "default": null, "title": "Response" }, "trace_id": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Trace Id" }, "transaction_id": { "anyOf": [ { "type": "string" }, { "type": "null" } ], "default": null, "title": "Transaction Id" } }, "required": [ "response_code" ] }
- Fields:
- field response: str | dict | None = None#
- field response_code: int [Required]#
- field response_reason: str | None = 'OK'#
- field trace_id: str | None = None#
- field transaction_id: str | None = None#
- class nv_ingest_api.util.message_brokers.simple_message_broker.SimpleClient(
- host: str,
- port: int,
- db: int = 0,
- max_retries: int = 3,
- max_backoff: int = 32,
- connection_timeout: int = 300,
- max_pool_size: int = 128,
- use_ssl: bool = False,
Bases:
MessageBrokerClientBase
A client for interfacing with SimpleMessageBroker, creating a new socket connection per request to ensure thread safety and robustness. Respects timeouts for all operations.
- fetch_message(
- queue_name: str,
- timeout: Tuple[int, float | None] | None = (1200, None),
Fetch a message from a specified queue.
- Parameters:
queue_name (str) – The name of the queue.
timeout (tuple, optional) – A tuple containing the timeout value and an unused second element.
- Returns:
The response from the broker.
- Return type:
- get_client()[source]#
Retrieve the current client instance.
- Returns:
The current client instance.
- Return type:
- ping() ResponseSchema [source]#
Ping the broker to check connectivity.
- Returns:
The response indicating the success of the ping operation.
- Return type:
- size(
- queue_name: str,
Fetch the size of the specified queue.
- Parameters:
queue_name (str) – The name of the queue.
- Returns:
The response containing the queue size.
- Return type:
- submit_message(
- queue_name: str,
- message: str,
- timeout: Tuple[int, float] | None = (100, None),
- for_nv_ingest: bool = False,
Submit a message to the specified queue.
- Parameters:
queue_name (str) – The name of the queue.
message (str) – The message to be submitted.
timeout (float, optional) – Timeout in seconds for the operation.
for_nv_ingest (bool, optional) – Indicates whether the message is for NV ingest operations.
- Returns:
The response from the broker.
- Return type:
- class nv_ingest_api.util.message_brokers.simple_message_broker.SimpleMessageBroker(host: str, port: int, max_queue_size: int)[source]#
Bases:
ThreadingMixIn
,TCPServer
A thread-safe message broker server that manages multiple message queues and supports commands such as PUSH, POP, SIZE, and PING.
- allow_reuse_address = True#