NV-Ingest-Client is a tool designed for efficient ingestion and processing of large datasets. It provides both a Python API and a command-line interface to cater to various ingestion needs.
To install NV-Ingest-Client, run the following command in your terminal:
pip install [REPO_ROOT]/client
This command installs both the API libraries and the nv-ingest-cli
tool which can subsequently be called from the
command line.
Specification for creating a job for submission to the nv-ingest microservice.
Parameters:
payload
(Dict): The payload data for the job.tasks
(Optional[List], optional): A list of tasks to be added to the job. Defaults to None.source_id
(Optional[str], optional): An identifier for the source of the job. Defaults to None.source_name
(Optional[str], optional): A name for the source of the job. Defaults to None.document_type
(Optional[str], optional): Type of the document. Defaults to ‘txt’.job_id
(Optional[Union[UUID, str]], optional): A unique identifier for the job. Defaults to a new UUID.extended_options
(Optional[Dict], optional): Additional options for job processing. Defaults to None.Attributes:
_payload
(Dict): Storage for the payload data._tasks
(List): Storage for the list of tasks._source_id
(str): Storage for the source identifier._job_id
(UUID): Storage for the job’s unique identifier._extended_options
(Dict): Storage for the additional options.Methods:
Dict
: Dictionary representation of the job specification.task
: The task to be added. Assumes the task has a to_dict()
method.ValueError
: If the task does not have a to_dict()
method or is not an instance of Task
.Properties:
payload
: Getter/Setter for the payload data.job_id
: Getter/Setter for the job’s unique identifier.source_id
: Getter/Setter for the source identifier.source_name
: Getter/Setter for the source name.Example Usage:
job_spec = JobSpec(
payload={"data": "Example data"},
tasks=[extract_task, split_task],
source_id="12345",
job_id="abcd-efgh-ijkl-mnop",
extended_options={"tracing_options": {"trace": True}}
)
print(job_spec.to_dict())
Function: task_factory(task_type, **kwargs)
task_type
(TaskType or str): The type of the task to create. Can be an enum member of TaskType
or a string
representing a valid task type.**kwargs
(dict): Additional keyword arguments to pass to the task’s constructor.Task
: An instance of the task corresponding to the given task type.ValueError
: If an invalid task type is provided, or if any unexpected keyword arguments are passed that do
not match the task constructor’s parameters.Example:
# Assuming TaskType has 'Extract' and 'Split' as valid members and corresponding classes are defined.
extract_task = task_factory('extract', document_type='PDF', extract_text=True)
split_task = task_factory('split', split_by='sentence', split_length=100)
Object for document extraction tasks, extending the Task
class.
Method: __init__(document_type, extract_method='pdfium', extract_text=False, extract_images=False,
extract_tables=False)
document_type
: Type of document.extract_method
: Method used for extraction. Default is ‘pdfium’.extract_text
: Boolean indicating if text should be extracted. Default is False.extract_images
: Boolean indicating if images should be extracted. Default is False.extract_tables
: Boolean indicating if tables should be extracted. Default is False.to_dict()
Dict
: Dictionary containing task type and properties.extract_task = ExtractTask(
document_type=file_type,
extract_text=True,
extract_images=True,
extract_tables=True
)
Object for document splitting tasks, extending the Task
class.
__init__(split_by=None, split_length=None, split_overlap=None, max_character_length=None,
sentence_window_size=None)
split_by
: Criterion for splitting, e.g., ‘word’, ‘sentence’, ‘passage’.split_length
: The length of each split segment.split_overlap
: Overlap length between segments.max_character_length
: Maximum character length for a split.sentence_window_size
: Window size for sentence-based splits.to_dict()
Dict
: Dictionary containing task type and properties.split_task = SplitTask(
split_by="word",
split_length=300,
split_overlap=10,
max_character_length=5000,
sentence_window_size=0,
)
The NvIngestClient
class provides a comprehensive suite of methods to handle job submission and retrieval processes
efficiently. Below are the public methods available:
__init__
:
Initializes the NvIngestClient with customizable client allocator and Redis configuration.
message_client_allocator
: A callable that returns an instance of the client used for communication.message_client_hostname
: Hostname of the message client server. Defaults to “localhost”.message_client_port
: Port number of the message client server. Defaults to 7670.message_client_kwargs
: Additional keyword arguments for the message client.msg_counter_id
: Redis key for tracking message counts. Defaults to “nv-ingest-message-id”.worker_pool_size
: Number of worker processes in the pool. Defaults to 1.client = NvIngestClient(
message_client_hostname="localhost", # Host where nv-ingest-ms-runtime is running
message_client_port=7670 # REST port, defaults to 7670
)
Submits a job to a specified job queue. This method can optionally wait for a response if blocking is set to True.
job_id
: The unique identifier of the job to be submitted.job_queue_id
: The ID of the job queue where the job will be submitted.client.submit_job(job_id, "morpheus_task_queue")
Submits multiple jobs to a specified job queue. This method does not wait for any of the jobs to complete.
job_ids
: A list of job IDs to be submitted.job_queue_id
: The ID of the job queue where the jobs will be submitted.client.submit_jobs([job_id0, job_id1], "morpheus_task_queue")
Asynchronously submits one or more jobs to a specified job queue using a thread pool. This method handles both single job ID or a list of job IDs.
job_ids
: A single job ID or a list of job IDs to be submitted.job_queue_id
: The ID of the job queue where the jobs will be submitted.client.submit_job_async(job_id, "morpheus_task_queue")
fetch_job_result(job_id, timeout=10, data_only=True)
job_id
(str): The identifier of the job.timeout
(float, optional): Timeout for the fetch operation in seconds. Defaults to 10.data_only
(bool, optional): If true, only returns the data part of the job result.ValueError
: If there is an error in decoding the job result.TimeoutError
: If the fetch operation times out.Exception
: For all other unexpected issues.job_id = client.add_job(job_spec)
client.submit_job(job_id, TASK_QUEUE)
generated_metadata = client.fetch_job_result(
job_id, timeout=DEFAULT_JOB_TIMEOUT
)
fetch_job_result_async(job_ids, timeout=10, data_only=True)
job_ids
(Union[str, List[str]]): A single job ID or a list of job IDs.timeout
(float, optional): Timeout for fetching each job result, in seconds. Defaults to 10.data_only
(bool, optional): Whether to return only the data part of the job result.fetch_job_result
.job_id = client.add_job(job_spec)
client.submit_job(job_id, TASK_QUEUE)
generated_metadata = client.fetch_job_result_async(
job_id, timeout=DEFAULT_JOB_TIMEOUT
)
job_count()
Returns: Integer representing the total number of jobs.
client.job_count()
add_job(job_spec)
job_spec
(JobSpec, optional): The job specification to add. If not provided, a new job ID will be generated.ValueError
: If a job with the specified job ID already exists.extract_task = ExtractTask(
document_type=file_type,
extract_text=True,
extract_images=True,
extract_tables=True,
text_depth="document",
extract_tables_method="yolox",
)
job_spec.add_task(extract_task)
job_id = client.add_job(job_spec)
create_job(payload, source_id, source_name, document_type, tasks, job_id, extended_options)
payload
(str): The payload associated with the job.source_id
(str): The source identifier for the job.source_name
(str): The unique name of the job’s source data.document_type
(str, optional): The type of document to be processed.tasks
(list, optional): A list of tasks to be associated with the job.job_id (uuid.UUID |
str, optional): The unique identifier for the job. |
extended_options
(dict, optional): Additional options for job creation.ValueError
: If a job with the specified job ID already exists.add_task(job_id, task)
job_id
(str): The job ID to which the task will be added.task
(Task): The task to add.ValueError
: If the job does not exist or is not in the correct state.job_spec = JobSpec(
document_type=file_type,
payload=file_content,
source_id=SAMPLE_PDF,
source_name=SAMPLE_PDF,
extended_options={
"tracing_options": {
"trace": True,
"ts_send": time.time_ns(),
}
},
)
extract_task = ExtractTask(
document_type=file_type,
extract_text=True,
extract_images=True,
extract_tables=True,
text_depth="document",
extract_tables_method="yolox",
)
job_spec.add_task(extract_task)
create_task(job_id, task_type, task_params)
job_id (uuid.UUID |
str): The unique identifier of the job. |
task_type
(TaskType): The type of the task.task_params
(dict, optional): Parameters for the task.ValueError
: If the job does not exist or if an attempt is made to modify a job after its submission.job_id = client.add_job(job_spec)
client.create_task(job_id, DedupTask, {content_type: "image", filter: True})
After installation, you can use the nv-ingest-cli
tool from the command line to manage and process datasets.
Here are the options provided by the CLI, explained:
--batch_size
: Specifies the number of documents to process in a single batch. Default is 10. Must be 1 or more.--doc
: Adds a new document to be processed. Supports multiple entries. Files must exist.--dataset
: Specifies the path to a dataset definition file.--client
: Sets the client type with choices including REST, Redis, Kafka. Default is Redis.--client_host
: Specifies the DNS name or URL for the endpoint.--client_port
: Sets the port number for the client endpoint.--client_kwargs
: Provides additional arguments to pass to the client. Default is {}
.--concurrency_n
: Defines the number of inflight jobs to maintain at one time. Default is 1.--dry_run
: Enables a dry run without executing actions.--output_directory
: Specifies the output directory for results.--log_level
: Sets the log level. Choices are DEBUG, INFO, WARNING, ERROR, CRITICAL. Default is INFO.--shuffle_dataset
: Shuffles the dataset before processing if enabled. Default is true.--task
: Allows for specification of tasks in JSON format. Supports multiple tasks.--collect_profiling_traces
: Collect the tracing profile for the run after processing.--zipkin_host
: Host used to connect to Zipkin to gather tracing profiles.--zipkin_port
: Port used to connect to Zipkin to gether tracing profiles.You can find a notebook with examples using the client here.