Source code for nv_ingest_client.primitives.tasks.store

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


# pylint: disable=too-few-public-methods
# pylint: disable=too-many-arguments

import logging
from typing import Dict
from typing import Literal

from nv_ingest_api.internal.schemas.meta.ingest_job_schema import IngestTaskStoreSchema
from nv_ingest_api.internal.schemas.meta.ingest_job_schema import IngestTaskStoreEmbedSchema

from .task_base import Task

logger = logging.getLogger(__name__)

_DEFAULT_STORE_METHOD = "minio"


[docs] class StoreTask(Task): """ Object for image storage task. """ _Type_Content_Type = Literal["image",] _Type_Store_Method = Literal["minio",] def __init__( self, structured: bool = True, images: bool = False, store_method: _Type_Store_Method = None, params: dict = None, **extra_params, ) -> None: """ Setup Store Task Config """ super().__init__() # Handle None params by converting to empty dict for backward compatibility if params is None: params = {} # Merge extra_params into params for API schema compatibility merged_params = {**params, **extra_params} # Use the API schema for validation validated_data = IngestTaskStoreSchema( structured=structured, images=images, method=store_method or _DEFAULT_STORE_METHOD, params=merged_params ) self._structured = validated_data.structured self._images = validated_data.images self._store_method = validated_data.method self._params = validated_data.params self._extra_params = extra_params def __str__(self) -> str: """ Returns a string with the object's config and run time state """ info = "" info += "Store Task:\n" info += f" store structured types: {self._structured}\n" info += f" store image types: {self._images}\n" info += f" store method: {self._store_method}\n" for key, value in self._extra_params.items(): info += f" {key}: {value}\n" for key, value in self._params.items(): info += f" {key}: {value}\n" return info
[docs] def to_dict(self) -> Dict: """ Convert to a dict for submission to redis (fixme) """ task_properties = { "method": self._store_method, "structured": self._structured, "images": self._images, "params": self._params, **self._extra_params, } return {"type": "store", "task_properties": task_properties}
[docs] class StoreEmbedTask(Task): """ Object for image storage task. """ _Type_Content_Type = Literal["embedding",] _Type_Store_Method = Literal["minio",] def __init__(self, params: dict = None, **extra_params) -> None: """ Setup Store Task Config """ super().__init__() # Handle None params by converting to empty dict for backward compatibility if params is None: params = {} # Merge extra_params into params for API schema compatibility merged_params = {**params, **extra_params} # Use the API schema for validation validated_data = IngestTaskStoreEmbedSchema(params=merged_params) self._params = validated_data.params self._extra_params = extra_params def __str__(self) -> str: """ Returns a string with the object's config and run time state """ info = "" info += "Store Embed Task:\n" for key, value in self._extra_params.items(): info += f" {key}: {value}\n" for key, value in self._params.items(): info += f" {key}: {value}\n" return info
[docs] def to_dict(self) -> Dict: """ Convert to a dict for submission to redis (fixme) """ task_properties = { "params": self._params, **self._extra_params, } return {"type": "store_embedding", "task_properties": task_properties}