Source code for nv_ingest_api.internal.extract.pdf.engines.nemoretriever

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import io
import logging
import math
import uuid
import concurrent.futures
from typing import Any
from typing import Dict
from typing import Tuple
from typing import Optional
from typing import List

import numpy as np
import pypdfium2 as pdfium

from nv_ingest_api.internal.extract.pdf.engines.pdfium import _extract_page_elements
from nv_ingest_api.internal.primitives.nim.model_interface import nemoretriever_parse as nemoretriever_parse_utils
from nv_ingest_api.internal.enums.common import AccessLevelEnum
from nv_ingest_api.internal.enums.common import ContentTypeEnum
from nv_ingest_api.internal.enums.common import ContentDescriptionEnum
from nv_ingest_api.internal.enums.common import TableFormatEnum
from nv_ingest_api.internal.enums.common import TextTypeEnum
from nv_ingest_api.internal.schemas.meta.metadata_schema import validate_metadata
from nv_ingest_api.internal.primitives.nim.model_interface.yolox import (
    YOLOX_PAGE_IMAGE_PREPROC_WIDTH,
    YOLOX_PAGE_IMAGE_PREPROC_HEIGHT,
    YOLOX_PAGE_IMAGE_FORMAT,
)
from nv_ingest_api.internal.schemas.extract.extract_pdf_schema import NemoRetrieverParseConfigSchema
from nv_ingest_api.util.metadata.aggregators import (
    extract_pdf_metadata,
    LatexTable,
    Base64Image,
    construct_image_metadata_from_pdf_image,
    construct_text_metadata,
)
from nv_ingest_api.util.pdf.pdfium import pdfium_pages_to_numpy
from nv_ingest_api.internal.primitives.nim.default_values import YOLOX_MAX_BATCH_SIZE
from nv_ingest_api.util.exception_handlers.pdf import pdfium_exception_handler
from nv_ingest_api.util.image_processing.transforms import numpy_to_base64, crop_image
from nv_ingest_api.util.nim import create_inference_client


logger = logging.getLogger(__name__)

NEMORETRIEVER_PARSE_RENDER_DPI = 300
NEMORETRIEVER_PARSE_MAX_WIDTH = 1024
NEMORETRIEVER_PARSE_MAX_HEIGHT = 1280
NEMORETRIEVER_PARSE_MAX_BATCH_SIZE = 8


# Define a helper function to use nemoretriever_parse to extract text from a base64 encoded bytestram PDF
[docs] def nemoretriever_parse_extractor( pdf_stream: io.BytesIO, extract_text: bool, extract_images: bool, extract_infographics: bool, extract_tables: bool, extract_charts: bool, extractor_config: dict, execution_trace_log: Optional[List[Any]] = None, ) -> str: """ Helper function to use nemoretriever_parse to extract text from a bytestream PDF. Parameters ---------- pdf_stream : io.BytesIO A bytestream PDF. extract_text : bool Specifies whether to extract text. extract_images : bool Specifies whether to extract images. extract_tables : bool Specifies whether to extract tables. extract_infographics : bool Specifies whether to extract infographics. extract_charts : bool Specifies whether to extract charts. execution_trace_log : Optional[List], optional Trace information for debugging purposes (default is None). extractor_config : dict A dictionary containing additional extraction parameters. Expected keys include: - row_data : dict - text_depth : str, optional (default is "page") - extract_tables_method : str, optional (default is "yolox") - identify_nearby_objects : bool, optional (default is True) - table_output_format : str, optional (default is "pseudo_markdown") - pdfium_config : dict, optional (configuration for PDFium) - nemoretriever_parse_config : dict, optional (configuration for NemoRetrieverParse) - metadata_column : str, optional (default is "metadata") Returns ------- str A string of extracted text. Raises ------ ValueError If required keys are missing in extractor_config or invalid values are provided. KeyError If required keys are missing in row_data. """ logger = logging.getLogger(__name__) logger.debug("Extracting PDF with nemoretriever_parse backend.") # Retrieve row_data from extractor_config. row_data = extractor_config.get("row_data") if row_data is None: raise ValueError("Missing 'row_data' in extractor_config.") # Get source_id from row_data. try: source_id = row_data["source_id"] except KeyError: raise KeyError("row_data must contain 'source_id'.") # Get and validate text_depth. text_depth_str = extractor_config.get("text_depth", "page") try: text_depth = TextTypeEnum[text_depth_str.upper()] except KeyError: valid_options = [e.name.lower() for e in TextTypeEnum] raise ValueError(f"Invalid text_depth value: {text_depth_str}. Expected one of: {valid_options}") # Get extraction method for tables. extract_tables_method = extractor_config.get("extract_tables_method", "yolox") # Flag for identifying nearby objects. identify_nearby_objects = extractor_config.get("identify_nearby_objects", True) # Get and validate table_output_format. table_output_format_str = extractor_config.get("table_output_format", "pseudo_markdown") try: table_output_format = TableFormatEnum[table_output_format_str.upper()] except KeyError: valid_options = [e.name.lower() for e in TableFormatEnum] raise ValueError( f"Invalid table_output_format value: {table_output_format_str}. Expected one of: {valid_options}" ) # Process nemoretriever_parse configuration. nemoretriever_parse_config_raw = extractor_config.get("nemoretriever_parse_config", {}) if isinstance(nemoretriever_parse_config_raw, dict): nemoretriever_parse_config = NemoRetrieverParseConfigSchema(**nemoretriever_parse_config_raw) elif isinstance(nemoretriever_parse_config_raw, NemoRetrieverParseConfigSchema): nemoretriever_parse_config = nemoretriever_parse_config_raw else: raise ValueError( "`nemoretriever_parse_config` must be a dictionary or a NemoRetrieverParseConfigSchema instance." ) # Get base metadata. metadata_col = extractor_config.get("metadata_column", "metadata") if hasattr(row_data, "index") and metadata_col in row_data.index: base_unified_metadata = row_data[metadata_col] else: base_unified_metadata = row_data.get(metadata_col, {}) # get base source_metadata base_source_metadata = base_unified_metadata.get("source_metadata", {}) # get source_location source_location = base_source_metadata.get("source_location", "") # get collection_id (assuming coming in from source_metadata...) collection_id = base_source_metadata.get("collection_id", "") # get partition_id (assuming coming in from source_metadata...) partition_id = base_source_metadata.get("partition_id", -1) # get access_level (assuming coming in from source_metadata...) access_level = base_source_metadata.get("access_level", AccessLevelEnum.UNKNOWN) extracted_data = [] doc = pdfium.PdfDocument(pdf_stream) pdf_metadata = extract_pdf_metadata(doc, source_id) page_count = pdf_metadata.page_count source_metadata = { "source_name": pdf_metadata.filename, "source_id": source_id, "source_location": source_location, "source_type": pdf_metadata.source_type, "collection_id": collection_id, "date_created": pdf_metadata.date_created, "last_modified": pdf_metadata.last_modified, "summary": "", "partition_id": partition_id, "access_level": access_level, } accumulated_text = [] accumulated_tables = [] accumulated_images = [] pages_for_ocr = [] # We'll accumulate (page_idx, np_image) here pages_for_tables = [] # We'll accumulate (page_idx, np_image) here futures = [] # We'll keep track of all the Future objects for table/charts nemoretriever_parse_client = None if extract_text: nemoretriever_parse_client = _create_clients(nemoretriever_parse_config) max_workers = nemoretriever_parse_config.workers_per_progress_engine with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: for page_idx in range(page_count): page = doc.get_page(page_idx) page_image, padding_offset = _convert_pdfium_page_to_numpy_for_parser(page) pages_for_ocr.append((page_idx, page_image)) page_image_for_tables, padding_offset_for_tables = _convert_pdfium_page_to_numpy_for_yolox(page) pages_for_tables.append((page_idx, page_image_for_tables, padding_offset_for_tables)) page.close() # Whenever pages_as_images hits NEMORETRIEVER_PARSE_MAX_BATCH_SIZE, submit a job if (extract_text) and (len(pages_for_ocr) >= NEMORETRIEVER_PARSE_MAX_BATCH_SIZE): future_parser = executor.submit( lambda *args, **kwargs: ("parser", _extract_text_and_bounding_boxes(*args, **kwargs)), pages_for_ocr[:], # pass a copy nemoretriever_parse_client, execution_trace_log=execution_trace_log, ) futures.append(future_parser) pages_for_ocr.clear() # Whenever pages_as_images hits YOLOX_MAX_BATCH_SIZE, submit a job if ( (extract_tables_method == "yolox") and (extract_tables or extract_charts or extract_infographics) and (len(pages_for_tables) >= YOLOX_MAX_BATCH_SIZE) ): future_yolox = executor.submit( lambda *args, **kwargs: ("yolox", _extract_page_elements(*args, **kwargs)), pages_for_tables[:], # pass a copy page_count, source_metadata, base_unified_metadata, extract_tables, extract_charts, extract_infographics, table_output_format, nemoretriever_parse_config.yolox_endpoints, nemoretriever_parse_config.yolox_infer_protocol, nemoretriever_parse_config.auth_token, execution_trace_log=execution_trace_log, ) futures.append(future_yolox) pages_for_tables.clear() # After page loop, if we still have leftover pages_as_images, submit one last job if extract_text and pages_for_ocr: future_parser = executor.submit( lambda *args, **kwargs: ("parser", _extract_text_and_bounding_boxes(*args, **kwargs)), pages_for_ocr[:], # pass a copy nemoretriever_parse_client, execution_trace_log=execution_trace_log, ) futures.append(future_parser) pages_for_ocr.clear() if ( (extract_tables_method == "yolox") and (extract_tables or extract_charts or extract_infographics) and pages_for_tables ): future_yolox = executor.submit( lambda *args, **kwargs: ("yolox", _extract_page_elements(*args, **kwargs)), pages_for_tables[:], page_count, source_metadata, base_unified_metadata, extract_tables, extract_charts, extract_infographics, table_output_format, nemoretriever_parse_config.yolox_endpoints, nemoretriever_parse_config.yolox_infer_protocol, nemoretriever_parse_config.auth_token, execution_trace_log=execution_trace_log, ) futures.append(future_yolox) pages_for_tables.clear() parser_results = [] # Now wait for all futures to complete for fut in concurrent.futures.as_completed(futures): model_name, extracted_items = fut.result() # blocks until finished if (model_name == "yolox") and (extract_tables or extract_charts or extract_infographics): extracted_data.extend(extracted_items) elif model_name == "parser": parser_results.extend(extracted_items) for page_idx, parser_output in parser_results: page = None page_image = None page_text = [] page_nearby_blocks = { "text": {"content": [], "bbox": [], "type": []}, "images": {"content": [], "bbox": [], "type": []}, "structured": {"content": [], "bbox": [], "type": []}, } for bbox_dict in parser_output: cls = bbox_dict["type"] bbox = bbox_dict["bbox"] txt = bbox_dict["text"] transformed_bbox = [ math.floor(bbox["xmin"] * NEMORETRIEVER_PARSE_MAX_WIDTH), math.floor(bbox["ymin"] * NEMORETRIEVER_PARSE_MAX_HEIGHT), math.ceil(bbox["xmax"] * NEMORETRIEVER_PARSE_MAX_WIDTH), math.ceil(bbox["ymax"] * NEMORETRIEVER_PARSE_MAX_HEIGHT), ] if cls not in nemoretriever_parse_utils.ACCEPTED_CLASSES: continue if identify_nearby_objects: _insert_page_nearby_blocks(page_nearby_blocks, cls, txt, transformed_bbox) if extract_text: page_text.append(txt) if (extract_tables_method == "nemoretriever_parse") and (extract_tables) and (cls == "Table"): table = LatexTable( latex=txt, bbox=transformed_bbox, max_width=NEMORETRIEVER_PARSE_MAX_WIDTH, max_height=NEMORETRIEVER_PARSE_MAX_HEIGHT, ) accumulated_tables.append(table) if extract_images and (cls == "Picture"): if page is None: page = doc.get_page(page_idx) if page_image is None: page_image, _ = _convert_pdfium_page_to_numpy_for_parser(page) img_numpy = crop_image(page_image, transformed_bbox) if img_numpy is not None: base64_img = numpy_to_base64(img_numpy, format=YOLOX_PAGE_IMAGE_FORMAT) image = Base64Image( image=base64_img, bbox=transformed_bbox, width=img_numpy.shape[1], height=img_numpy.shape[0], max_width=NEMORETRIEVER_PARSE_MAX_WIDTH, max_height=NEMORETRIEVER_PARSE_MAX_HEIGHT, ) accumulated_images.append(image) # If NemoRetrieverParse fails to extract anything, fall back to using pdfium. if not "".join(page_text).strip(): if page is None: page = doc.get_page(page_idx) page_text = [page.get_textpage().get_text_bounded()] accumulated_text.extend(page_text) # Construct tables if extract_tables: for table in accumulated_tables: extracted_data.append( _construct_table_metadata( table, page_idx, page_count, source_metadata, base_unified_metadata, ) ) accumulated_tables = [] # Construct images if extract_images: for image in accumulated_images: extracted_data.append( construct_image_metadata_from_pdf_image( image, page_idx, page_count, source_metadata, base_unified_metadata, ) ) accumulated_images = [] # Construct text - page if (extract_text) and (text_depth == TextTypeEnum.PAGE): extracted_data.append( construct_text_metadata( accumulated_text, pdf_metadata.keywords, page_idx, -1, -1, -1, page_count, text_depth, source_metadata, base_unified_metadata, delimiter="\n\n", bbox_max_dimensions=(NEMORETRIEVER_PARSE_MAX_WIDTH, NEMORETRIEVER_PARSE_MAX_HEIGHT), nearby_objects=page_nearby_blocks, ) ) accumulated_text = [] # Construct text - document if (extract_text) and (text_depth == TextTypeEnum.DOCUMENT): text_extraction = construct_text_metadata( accumulated_text, pdf_metadata.keywords, -1, -1, -1, -1, page_count, text_depth, source_metadata, base_unified_metadata, delimiter="\n\n", ) if len(text_extraction) > 0: extracted_data.append(text_extraction) if nemoretriever_parse_client: nemoretriever_parse_client.close() doc.close() return extracted_data
def _extract_text_and_bounding_boxes( pages: list, nemoretriever_parse_client, execution_trace_log=None, ) -> list: # Collect all page indices and images in order. image_page_indices = [page[0] for page in pages] original_images = [page[1] for page in pages] # Prepare the data payload with all images. data = {"images": original_images} # Perform inference using the NimClient. inference_results = nemoretriever_parse_client.infer( data=data, model_name="nemoretriever_parse", stage_name="pdf_extraction", max_batch_size=NEMORETRIEVER_PARSE_MAX_BATCH_SIZE, execution_trace_log=execution_trace_log, ) return list(zip(image_page_indices, inference_results)) def _create_clients(nemoretriever_parse_config): model_interface = nemoretriever_parse_utils.NemoRetrieverParseModelInterface( model_name=nemoretriever_parse_config.nemoretriever_parse_model_name, ) nemoretriever_parse_client = create_inference_client( nemoretriever_parse_config.nemoretriever_parse_endpoints, model_interface, nemoretriever_parse_config.auth_token, nemoretriever_parse_config.nemoretriever_parse_infer_protocol, nemoretriever_parse_config.timeout, ) return nemoretriever_parse_client def _send_inference_request( nemoretriever_parse_client, image_array: np.ndarray, ) -> Dict[str, Any]: try: # NIM only supports processing one page at a time (batch size = 1). data = {"image": image_array} response = nemoretriever_parse_client.infer( data=data, model_name="nemoretriever_parse", ) except Exception as e: logger.exception(f"Unhandled error during NemoRetrieverParse inference: {e}") raise e return response def _convert_pdfium_page_to_numpy_for_parser( page: pdfium.PdfPage, render_dpi: int = NEMORETRIEVER_PARSE_RENDER_DPI, scale_tuple: Tuple[int, int] = (NEMORETRIEVER_PARSE_MAX_WIDTH, NEMORETRIEVER_PARSE_MAX_HEIGHT), padding_tuple: Tuple[int, int] = (NEMORETRIEVER_PARSE_MAX_WIDTH, NEMORETRIEVER_PARSE_MAX_HEIGHT), ) -> np.ndarray: page_images, padding_offsets = pdfium_pages_to_numpy( [page], render_dpi=render_dpi, scale_tuple=scale_tuple, padding_tuple=padding_tuple ) return page_images[0], padding_offsets[0] def _convert_pdfium_page_to_numpy_for_yolox( page: pdfium.PdfPage, scale_tuple: Tuple[int, int] = (YOLOX_PAGE_IMAGE_PREPROC_WIDTH, YOLOX_PAGE_IMAGE_PREPROC_HEIGHT), padding_tuple: Tuple[int, int] = (YOLOX_PAGE_IMAGE_PREPROC_WIDTH, YOLOX_PAGE_IMAGE_PREPROC_HEIGHT), ) -> np.ndarray: page_images, padding_offsets = pdfium_pages_to_numpy([page], scale_tuple=scale_tuple, padding_tuple=padding_tuple) return page_images[0], padding_offsets[0] def _insert_page_nearby_blocks( page_nearby_blocks: Dict[str, Any], cls: str, txt: str, bbox: str, ): if cls in nemoretriever_parse_utils.ACCEPTED_TEXT_CLASSES: nearby_blocks_key = "text" elif cls in nemoretriever_parse_utils.ACCEPTED_TABLE_CLASSES: nearby_blocks_key = "structured" elif cls in nemoretriever_parse_utils.ACCEPTED_IMAGE_CLASSES: nearby_blocks_key = "images" page_nearby_blocks[nearby_blocks_key]["content"].append(txt) page_nearby_blocks[nearby_blocks_key]["bbox"].append(bbox) page_nearby_blocks[nearby_blocks_key]["type"].append(cls) @pdfium_exception_handler(descriptor="nemoretriever_parse") def _construct_table_metadata( table: LatexTable, page_idx: int, page_count: int, source_metadata: Dict, base_unified_metadata: Dict, ): content = table.latex table_format = TableFormatEnum.LATEX subtype = ContentTypeEnum.TABLE description = ContentDescriptionEnum.PDF_TABLE content_metadata = { "type": ContentTypeEnum.STRUCTURED, "description": description, "page_number": page_idx, "hierarchy": { "page_count": page_count, "page": page_idx, "line": -1, "span": -1, }, "subtype": subtype, } table_metadata = { "caption": "", "table_content": content, "table_format": table_format, "table_location": table.bbox, "table_location_max_dimensions": (table.max_width, table.max_height), } ext_unified_metadata = base_unified_metadata.copy() ext_unified_metadata.update( { "content": "", "source_metadata": source_metadata, "content_metadata": content_metadata, "table_metadata": table_metadata, } ) validated_unified_metadata = validate_metadata(ext_unified_metadata) return [ContentTypeEnum.STRUCTURED, validated_unified_metadata.model_dump(), str(uuid.uuid4())]