Source code for nv_ingest_api.internal.extract.pdf.pdf_extractor

# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# Copyright (c) 2024, NVIDIA CORPORATION.

import pandas as pd
from typing import Any, Dict, List, Optional, Tuple
import logging

from nv_ingest_api.internal.extract.pdf.engines.pdf_helpers import _orchestrate_row_extraction

logger = logging.getLogger(__name__)


[docs] def extract_primitives_from_pdf_internal( df_extraction_ledger: pd.DataFrame, task_config: Dict[str, Any], extractor_config: Any, execution_trace_log: Optional[List[Any]] = None, ) -> Tuple[pd.DataFrame, Dict]: """ Process a DataFrame of PDF documents by orchestrating extraction for each row. This function applies the row-level orchestration function to every row in the DataFrame, aggregates the results, and returns a new DataFrame with the extracted data along with any trace information. Parameters ---------- df_extraction_ledger : pd.DataFrame A pandas DataFrame containing PDF documents. Must include a 'content' column with base64-encoded PDF data. task_config: dict A dictionary of configuration parameters. Expected to include 'task_properties' and 'validated_config' keys. extractor_config: Any A dictionary of configuration parameters for the extraction process. execution_trace_log : list, optional A list for accumulating trace information during extraction. Defaults to None. Returns ------- tuple of (pd.DataFrame, dict) A tuple where the first element is a DataFrame with the extracted data (with columns: document_type, metadata, uuid) and the second element is a dictionary containing trace information. Raises ------ Exception If an error occurs during the extraction process on any row. """ try: task_config = task_config extractor_config = extractor_config # Apply the orchestration function to each row. extraction_series = df_extraction_ledger.apply( lambda row: _orchestrate_row_extraction(row, task_config, extractor_config, execution_trace_log), axis=1 ) # Explode the results if the extraction returns lists. extraction_series = extraction_series.explode().dropna() # Convert the extracted results into a DataFrame. if not extraction_series.empty: extracted_df = pd.DataFrame(extraction_series.to_list(), columns=["document_type", "metadata", "uuid"]) else: extracted_df = pd.DataFrame({"document_type": [], "metadata": [], "uuid": []}) return extracted_df, {"execution_trace_log": execution_trace_log} except Exception as e: err_msg = f"extract_primitives_from_pdf: Error processing PDF bytes: {e}" logger.error(err_msg, exc_info=True) raise type(e)(err_msg) from e