Source code for nv_ingest_client.primitives.tasks.extract

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


# pylint: disable=too-few-public-methods
# pylint: disable=too-many-arguments

import logging
import os
from typing import Any
from typing import Dict
from typing import Literal
from typing import Optional

from nv_ingest_api.internal.schemas.meta.ingest_job_schema import IngestTaskExtractSchema

from .task_base import Task

logger = logging.getLogger(__name__)

UNSTRUCTURED_API_KEY = os.environ.get("UNSTRUCTURED_API_KEY", None)
UNSTRUCTURED_URL = os.environ.get("UNSTRUCTURED_URL", "https://api.unstructured.io/general/v0/general")
UNSTRUCTURED_STRATEGY = os.environ.get("UNSTRUCTURED_STRATEGY", "auto")
UNSTRUCTURED_CONCURRENCY_LEVEL = os.environ.get("UNSTRUCTURED_CONCURRENCY_LEVEL", 10)

ADOBE_CLIENT_ID = os.environ.get("ADOBE_CLIENT_ID", None)
ADOBE_CLIENT_SECRET = os.environ.get("ADOBE_CLIENT_SECRET", None)

_DEFAULT_EXTRACTOR_MAP = {
    "bmp": "image",
    "csv": "pandas",
    "docx": "python_docx",
    "excel": "openpyxl",
    "html": "markitdown",
    "jpeg": "image",
    "jpg": "image",
    "parquet": "pandas",
    "pdf": "pdfium",
    "png": "image",
    "pptx": "python_pptx",
    "text": "txt",
    "tiff": "image",
    "txt": "txt",
    "xml": "lxml",
    "mp3": "audio",
    "wav": "audio",
    "json": "txt",
    "md": "txt",
    "sh": "txt",
}

_Type_Extract_Method_PDF = Literal[
    "adobe",
    "nemoretriever_parse",
    "haystack",
    "llama_parse",
    "pdfium",
    "tika",
    "unstructured_io",
]

_Type_Extract_Images_Method = Literal["group", "yolox"]

_Type_Extract_Tables_Method_PDF = Literal["yolox", "paddle"]


[docs] class ExtractTask(Task): """ Object for document extraction task """ def __init__( self, document_type, extract_method: _Type_Extract_Method_PDF = None, extract_text: bool = False, extract_images: bool = False, extract_tables: bool = False, extract_charts: Optional[bool] = None, extract_audio_params: Optional[Dict[str, Any]] = None, extract_images_method: _Type_Extract_Images_Method = "group", extract_images_params: Optional[Dict[str, Any]] = None, extract_tables_method: _Type_Extract_Tables_Method_PDF = "yolox", extract_infographics: bool = False, extract_page_as_image: bool = False, text_depth: str = "document", paddle_output_format: str = "pseudo_markdown", table_output_format: str = "pseudo_markdown", ) -> None: """ Setup Extract Task Config """ super().__init__() # Set default extract_method if None if extract_method is None: # Handle both string and enum inputs if hasattr(document_type, "value"): document_type_str = document_type.value else: document_type_str = document_type document_type_lower = document_type_str.lower() if document_type_lower not in _DEFAULT_EXTRACTOR_MAP: raise ValueError( f"Unsupported document type: {document_type}." f" Supported types are: {list(_DEFAULT_EXTRACTOR_MAP.keys())}" ) extract_method = _DEFAULT_EXTRACTOR_MAP[document_type_lower] # Set default extract_charts if None if extract_charts is None: extract_charts = extract_tables # Build params dict for API schema validation extract_params = { "extract_text": extract_text, "extract_images": extract_images, "extract_images_method": extract_images_method, "extract_tables": extract_tables, "extract_tables_method": extract_tables_method, "extract_charts": extract_charts, "extract_infographics": extract_infographics, "extract_page_as_image": extract_page_as_image, "text_depth": text_depth, "table_output_format": table_output_format, } # Add optional parameters if provided if extract_images_params: extract_params["extract_images_params"] = extract_images_params if extract_audio_params: extract_params["extract_audio_params"] = extract_audio_params # Use the API schema for validation validated_data = IngestTaskExtractSchema( document_type=document_type, method=extract_method, params=extract_params, ) # Store validated data self._document_type = validated_data.document_type self._extract_method = validated_data.method self._extract_audio_params = extract_audio_params self._extract_images = extract_images self._extract_tables = extract_tables self._extract_images_method = extract_images_method self._extract_images_params = extract_images_params self._extract_tables_method = extract_tables_method self._extract_charts = extract_charts self._extract_infographics = extract_infographics self._extract_page_as_image = extract_page_as_image self._extract_text = extract_text self._text_depth = text_depth self._paddle_output_format = paddle_output_format self._table_output_format = table_output_format def __str__(self) -> str: """ Returns a string with the object's config and run time state """ info = "" info += "Extract Task:\n" info += f" document_type: {self._document_type.value}\n" info += f" extract_method: {self._extract_method}\n" info += f" extract_text: {self._extract_text}\n" info += f" extract_images: {self._extract_images}\n" info += f" extract_tables: {self._extract_tables}\n" info += f" extract_charts: {self._extract_charts}\n" info += f" extract_infographics: {self._extract_infographics}\n" info += f" extract_page_as_image: {self._extract_page_as_image}\n" info += f" text_depth: {self._text_depth}\n" info += f" table_output_format: {self._table_output_format}\n" return info
[docs] def to_dict(self) -> Dict: """ Convert to a dict for submission to redis """ extract_params = { "extract_text": self._extract_text, "extract_images": self._extract_images, "extract_images_method": self._extract_images_method, "extract_tables": self._extract_tables, "extract_tables_method": self._extract_tables_method, "extract_charts": self._extract_charts, "extract_infographics": self._extract_infographics, "extract_page_as_image": self._extract_page_as_image, "text_depth": self._text_depth, "table_output_format": self._table_output_format, } if self._extract_images_params: extract_params.update( { "extract_images_params": self._extract_images_params, } ) if self._extract_audio_params: extract_params.update( { "extract_audio_params": self._extract_audio_params, } ) task_properties = { "method": self._extract_method, "document_type": self._document_type.value, "params": extract_params, } # TODO(Devin): I like the idea of Derived classes augmenting the to_dict method, but its not logically # consistent with how we define tasks, we don't have multiple extract tasks, we have extraction paths based on # the method and the document type. if self._extract_method == "unstructured_local": unstructured_properties = { "api_key": "", # TODO(Devin): Should be an environment variable or configurable parameter "unstructured_url": "", # TODO(Devin): Should be an environment variable } task_properties["params"].update(unstructured_properties) elif self._extract_method == "unstructured_io": unstructured_properties = { "unstructured_api_key": os.environ.get("UNSTRUCTURED_API_KEY", UNSTRUCTURED_API_KEY), "unstructured_url": os.environ.get("UNSTRUCTURED_URL", UNSTRUCTURED_URL), "unstructured_strategy": os.environ.get("UNSTRUCTURED_STRATEGY", UNSTRUCTURED_STRATEGY), "unstructured_concurrency_level": os.environ.get( "UNSTRUCTURED_CONCURRENCY_LEVEL", UNSTRUCTURED_CONCURRENCY_LEVEL ), } task_properties["params"].update(unstructured_properties) elif self._extract_method == "adobe": adobe_properties = { "adobe_client_id": os.environ.get("ADOBE_CLIENT_ID", ADOBE_CLIENT_ID), "adobe_client_secrect": os.environ.get("ADOBE_CLIENT_SECRET", ADOBE_CLIENT_SECRET), } task_properties["params"].update(adobe_properties) return {"type": "extract", "task_properties": task_properties}
@property def document_type(self): return self._document_type.value