Skip to content

NeMo Retriever API Reference

PDF pre-splitting for parallel ingest

Server-side PDF splitting supports configurable page chunking. Use .pdf_split_config(pages_per_chunk=...) in the Python client, or use the equivalent PDF split page count option in the CLI. See this API guide and the CLI reference for parameter tables and examples.

System-facing ingestion interface.

This module defines the public API surface for building and executing ingestion. Concrete implementations are provided by runmodes:

  • inprocess: local Python process, no framework assumptions
  • batch: large-scale batch execution
  • fused: low-latency single-actor GPU model fusion

Ingestor = ingestor module-attribute

ingestor

Interface base class. All methods intentionally raise NotImplementedError.

Each runmode should subclass this and eventually provide working behavior.

Source code in nemo_retriever/ingestor.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
class ingestor:
    """
    Interface base class. All methods intentionally raise NotImplementedError.

    Each runmode should subclass this and eventually provide working behavior.
    """

    RUN_MODE: str = "interface"

    def __init__(self, documents: Optional[List[str]] = None) -> None:
        self._documents: List[str] = list(documents or [])
        self._buffers: List[Tuple[str, BytesIO]] = []

    def _not_implemented(self, method_name: str) -> "None":
        raise NotImplementedError(
            f"{self.__class__.__name__}.{method_name}() is not implemented yet " f"(run_mode={self.RUN_MODE})."
        )

    def files(self, documents: Union[str, List[str]]) -> "ingestor":
        """Add document paths/URIs for processing."""
        self._not_implemented("files")

    def buffers(self, buffers: Union[Tuple[str, BytesIO], List[Tuple[str, BytesIO]]]) -> "ingestor":
        """Add in-memory buffers for processing."""
        self._not_implemented("buffers")

    def load(self) -> "ingestor":
        """
        Placeholder for remote fetch/localization.

        The client-side Ingestor supports downloading remote URIs locally.
        In this system, each runmode may handle remote inputs differently.
        """
        self._not_implemented("load")

    def ingest(
        self,
        params: IngestExecuteParams | None = None,
        **kwargs: Any,
    ) -> Union[List[Any], Tuple[Any, ...]]:
        """
        Execute the configured ingestion pipeline (placeholder).
        """
        _ = _merge_params(params, kwargs)
        self._not_implemented("ingest")

    def ingest_async(self, *, return_failures: bool = False, return_traces: bool = False) -> Any:
        """Asynchronously execute ingestion (placeholder)."""
        self._not_implemented("ingest_async")

    def all_tasks(self) -> "ingestor":
        """Record the default task chain (placeholder)."""
        self._not_implemented("all_tasks")

    def dedup(self, params: DedupParams | None = None, **kwargs: Any) -> "ingestor":
        """Record a dedup task configuration."""
        _ = _merge_params(params, kwargs)
        self._not_implemented("dedup")

    def embed(self, params: EmbedParams | None = None, **kwargs: Any) -> "ingestor":
        """Record an embedding task configuration."""
        _ = _merge_params(params, kwargs)
        self._not_implemented("embed")

    def extract(self, params: ExtractParams | None = None, **kwargs: Any) -> "ingestor":
        """Record an extract task configuration."""
        _ = _merge_params(params, kwargs)
        self._not_implemented("extract")

    def extract_image_files(self, params: ExtractParams | None = None, **kwargs: Any) -> "ingestor":
        """Record an extract-image-files task configuration."""
        _ = _merge_params(params, kwargs)
        self._not_implemented("extract_image_files")

    def filter(self) -> "ingestor":
        """Record a filter task configuration."""
        self._not_implemented("filter")

    def store(self, params: StoreParams | None = None, **kwargs: Any) -> "ingestor":
        """Record a store task configuration for extracted image assets."""
        _ = _merge_params(params, kwargs)
        self._not_implemented("store")

    def store_embed(self) -> "ingestor":
        """Record a store-embed task configuration."""
        self._not_implemented("store_embed")

    def udf(
        self,
        udf_function: str,
        udf_function_name: Optional[str] = None,
        phase: Optional[Union[int, str]] = None,
        target_stage: Optional[str] = None,
        run_before: bool = False,
        run_after: bool = False,
    ) -> "ingestor":
        """Record a UDF task configuration."""
        self._not_implemented("udf")

    def vdb_upload(self, params: VdbUploadParams | None = None, **kwargs: Any) -> "ingestor":
        """Record a vector DB upload configuration (execution TBD)."""
        _ = _merge_params(params, kwargs)
        self._not_implemented("vdb_upload")

    def save_intermediate_results(self, output_dir: str) -> "ingestor":
        """Record intermediate results persistence configuration."""
        self._not_implemented("save_intermediate_results")

    def save_to_disk(
        self,
        output_directory: Optional[str] = None,
        cleanup: bool = True,
        compression: Optional[str] = "gzip",
    ) -> "ingestor":
        """Record result persistence configuration (execution TBD)."""
        self._not_implemented("save_to_disk")

    def caption(self, params: "CaptionParams | None" = None, **kwargs: Any) -> "ingestor":
        """Record a caption task configuration."""
        _ = _merge_params(params, kwargs)
        self._not_implemented("caption")

    def webhook(self, params: "WebhookParams | None" = None, **kwargs: Any) -> "ingestor":
        """Record a webhook notification configuration."""
        _ = _merge_params(params, kwargs)
        self._not_implemented("webhook")

    def pdf_split_config(self, pages_per_chunk: int = 32) -> "ingestor":
        """Record PDF split configuration (execution TBD)."""
        self._not_implemented("pdf_split_config")

    def completed_jobs(self) -> int:
        """Return completed job count (placeholder until backend populates job state)."""
        self._not_implemented("completed_jobs")

    def failed_jobs(self) -> int:
        """Return failed job count (placeholder until backend populates job state)."""
        self._not_implemented("failed_jobs")

    def cancelled_jobs(self) -> int:
        """Return cancelled job count (placeholder until backend populates job state)."""
        self._not_implemented("cancelled_jobs")

    def remaining_jobs(self) -> int:
        """Return remaining job count (placeholder until backend populates job state)."""
        self._not_implemented("remaining_jobs")

    def get_status(self) -> Dict[str, str]:
        """
        Return per-document status mapping (placeholder).

        Once Ray execution is wired, this should reflect actual job/task state.
        """
        self._not_implemented("get_status")

RUN_MODE = 'interface' class-attribute instance-attribute

__init__(documents=None)

Source code in nemo_retriever/ingestor.py
95
96
97
def __init__(self, documents: Optional[List[str]] = None) -> None:
    self._documents: List[str] = list(documents or [])
    self._buffers: List[Tuple[str, BytesIO]] = []

all_tasks()

Record the default task chain (placeholder).

Source code in nemo_retriever/ingestor.py
136
137
138
def all_tasks(self) -> "ingestor":
    """Record the default task chain (placeholder)."""
    self._not_implemented("all_tasks")

buffers(buffers)

Add in-memory buffers for processing.

Source code in nemo_retriever/ingestor.py
108
109
110
def buffers(self, buffers: Union[Tuple[str, BytesIO], List[Tuple[str, BytesIO]]]) -> "ingestor":
    """Add in-memory buffers for processing."""
    self._not_implemented("buffers")

cancelled_jobs()

Return cancelled job count (placeholder until backend populates job state).

Source code in nemo_retriever/ingestor.py
225
226
227
def cancelled_jobs(self) -> int:
    """Return cancelled job count (placeholder until backend populates job state)."""
    self._not_implemented("cancelled_jobs")

caption(params=None, **kwargs)

Record a caption task configuration.

Source code in nemo_retriever/ingestor.py
203
204
205
206
def caption(self, params: "CaptionParams | None" = None, **kwargs: Any) -> "ingestor":
    """Record a caption task configuration."""
    _ = _merge_params(params, kwargs)
    self._not_implemented("caption")

completed_jobs()

Return completed job count (placeholder until backend populates job state).

Source code in nemo_retriever/ingestor.py
217
218
219
def completed_jobs(self) -> int:
    """Return completed job count (placeholder until backend populates job state)."""
    self._not_implemented("completed_jobs")

dedup(params=None, **kwargs)

Record a dedup task configuration.

Source code in nemo_retriever/ingestor.py
140
141
142
143
def dedup(self, params: DedupParams | None = None, **kwargs: Any) -> "ingestor":
    """Record a dedup task configuration."""
    _ = _merge_params(params, kwargs)
    self._not_implemented("dedup")

embed(params=None, **kwargs)

Record an embedding task configuration.

Source code in nemo_retriever/ingestor.py
145
146
147
148
def embed(self, params: EmbedParams | None = None, **kwargs: Any) -> "ingestor":
    """Record an embedding task configuration."""
    _ = _merge_params(params, kwargs)
    self._not_implemented("embed")

extract(params=None, **kwargs)

Record an extract task configuration.

Source code in nemo_retriever/ingestor.py
150
151
152
153
def extract(self, params: ExtractParams | None = None, **kwargs: Any) -> "ingestor":
    """Record an extract task configuration."""
    _ = _merge_params(params, kwargs)
    self._not_implemented("extract")

extract_image_files(params=None, **kwargs)

Record an extract-image-files task configuration.

Source code in nemo_retriever/ingestor.py
155
156
157
158
def extract_image_files(self, params: ExtractParams | None = None, **kwargs: Any) -> "ingestor":
    """Record an extract-image-files task configuration."""
    _ = _merge_params(params, kwargs)
    self._not_implemented("extract_image_files")

failed_jobs()

Return failed job count (placeholder until backend populates job state).

Source code in nemo_retriever/ingestor.py
221
222
223
def failed_jobs(self) -> int:
    """Return failed job count (placeholder until backend populates job state)."""
    self._not_implemented("failed_jobs")

files(documents)

Add document paths/URIs for processing.

Source code in nemo_retriever/ingestor.py
104
105
106
def files(self, documents: Union[str, List[str]]) -> "ingestor":
    """Add document paths/URIs for processing."""
    self._not_implemented("files")

filter()

Record a filter task configuration.

Source code in nemo_retriever/ingestor.py
160
161
162
def filter(self) -> "ingestor":
    """Record a filter task configuration."""
    self._not_implemented("filter")

get_status()

Return per-document status mapping (placeholder).

Once Ray execution is wired, this should reflect actual job/task state.

Source code in nemo_retriever/ingestor.py
233
234
235
236
237
238
239
def get_status(self) -> Dict[str, str]:
    """
    Return per-document status mapping (placeholder).

    Once Ray execution is wired, this should reflect actual job/task state.
    """
    self._not_implemented("get_status")

ingest(params=None, **kwargs)

Execute the configured ingestion pipeline (placeholder).

Source code in nemo_retriever/ingestor.py
121
122
123
124
125
126
127
128
129
130
def ingest(
    self,
    params: IngestExecuteParams | None = None,
    **kwargs: Any,
) -> Union[List[Any], Tuple[Any, ...]]:
    """
    Execute the configured ingestion pipeline (placeholder).
    """
    _ = _merge_params(params, kwargs)
    self._not_implemented("ingest")

ingest_async(*, return_failures=False, return_traces=False)

Asynchronously execute ingestion (placeholder).

Source code in nemo_retriever/ingestor.py
132
133
134
def ingest_async(self, *, return_failures: bool = False, return_traces: bool = False) -> Any:
    """Asynchronously execute ingestion (placeholder)."""
    self._not_implemented("ingest_async")

load()

Placeholder for remote fetch/localization.

The client-side Ingestor supports downloading remote URIs locally. In this system, each runmode may handle remote inputs differently.

Source code in nemo_retriever/ingestor.py
112
113
114
115
116
117
118
119
def load(self) -> "ingestor":
    """
    Placeholder for remote fetch/localization.

    The client-side Ingestor supports downloading remote URIs locally.
    In this system, each runmode may handle remote inputs differently.
    """
    self._not_implemented("load")

pdf_split_config(pages_per_chunk=32)

Record PDF split configuration (execution TBD).

Source code in nemo_retriever/ingestor.py
213
214
215
def pdf_split_config(self, pages_per_chunk: int = 32) -> "ingestor":
    """Record PDF split configuration (execution TBD)."""
    self._not_implemented("pdf_split_config")

remaining_jobs()

Return remaining job count (placeholder until backend populates job state).

Source code in nemo_retriever/ingestor.py
229
230
231
def remaining_jobs(self) -> int:
    """Return remaining job count (placeholder until backend populates job state)."""
    self._not_implemented("remaining_jobs")

save_intermediate_results(output_dir)

Record intermediate results persistence configuration.

Source code in nemo_retriever/ingestor.py
190
191
192
def save_intermediate_results(self, output_dir: str) -> "ingestor":
    """Record intermediate results persistence configuration."""
    self._not_implemented("save_intermediate_results")

save_to_disk(output_directory=None, cleanup=True, compression='gzip')

Record result persistence configuration (execution TBD).

Source code in nemo_retriever/ingestor.py
194
195
196
197
198
199
200
201
def save_to_disk(
    self,
    output_directory: Optional[str] = None,
    cleanup: bool = True,
    compression: Optional[str] = "gzip",
) -> "ingestor":
    """Record result persistence configuration (execution TBD)."""
    self._not_implemented("save_to_disk")

store(params=None, **kwargs)

Record a store task configuration for extracted image assets.

Source code in nemo_retriever/ingestor.py
164
165
166
167
def store(self, params: StoreParams | None = None, **kwargs: Any) -> "ingestor":
    """Record a store task configuration for extracted image assets."""
    _ = _merge_params(params, kwargs)
    self._not_implemented("store")

store_embed()

Record a store-embed task configuration.

Source code in nemo_retriever/ingestor.py
169
170
171
def store_embed(self) -> "ingestor":
    """Record a store-embed task configuration."""
    self._not_implemented("store_embed")

udf(udf_function, udf_function_name=None, phase=None, target_stage=None, run_before=False, run_after=False)

Record a UDF task configuration.

Source code in nemo_retriever/ingestor.py
173
174
175
176
177
178
179
180
181
182
183
def udf(
    self,
    udf_function: str,
    udf_function_name: Optional[str] = None,
    phase: Optional[Union[int, str]] = None,
    target_stage: Optional[str] = None,
    run_before: bool = False,
    run_after: bool = False,
) -> "ingestor":
    """Record a UDF task configuration."""
    self._not_implemented("udf")

vdb_upload(params=None, **kwargs)

Record a vector DB upload configuration (execution TBD).

Source code in nemo_retriever/ingestor.py
185
186
187
188
def vdb_upload(self, params: VdbUploadParams | None = None, **kwargs: Any) -> "ingestor":
    """Record a vector DB upload configuration (execution TBD)."""
    _ = _merge_params(params, kwargs)
    self._not_implemented("vdb_upload")

webhook(params=None, **kwargs)

Record a webhook notification configuration.

Source code in nemo_retriever/ingestor.py
208
209
210
211
def webhook(self, params: "WebhookParams | None" = None, **kwargs: Any) -> "ingestor":
    """Record a webhook notification configuration."""
    _ = _merge_params(params, kwargs)
    self._not_implemented("webhook")

create_ingestor(*, run_mode='inprocess', params=None, **kwargs)

Graph-only ingestion factory.

Source code in nemo_retriever/ingestor.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def create_ingestor(
    *,
    run_mode: RunMode = "inprocess",
    params: IngestorCreateParams | None = None,
    **kwargs: Any,
) -> "Ingestor":
    """
    Graph-only ingestion factory.
    """
    merged = _merge_params(params, kwargs)
    if isinstance(merged, IngestorCreateParams):
        parsed = merged
    else:
        parsed = IngestorCreateParams(**merged)

    if run_mode == "service":
        from nemo_retriever.service_ingestor import ServiceIngestor

        service_kwargs: dict[str, Any] = {
            "base_url": parsed.base_url,
            "documents": parsed.documents,
            "api_token": parsed.api_key,
        }
        if parsed.max_concurrency is not None:
            service_kwargs["max_concurrency"] = parsed.max_concurrency
        return ServiceIngestor(**service_kwargs)

    if run_mode not in {"batch", "inprocess"}:
        raise ValueError(f"create_ingestor supports run modes 'inprocess', 'batch', and 'service'; got {run_mode!r}.")

    from nemo_retriever.graph_ingestor import GraphIngestor

    return GraphIngestor(
        run_mode=run_mode,
        documents=parsed.documents,
        ray_address=parsed.ray_address,
        ray_log_to_driver=parsed.ray_log_to_driver,
        debug=parsed.debug,
        allow_no_gpu=parsed.allow_no_gpu,
        error_policy=parsed.error_policy,
    )

logger = logging.getLogger(__name__) module-attribute

retriever = Retriever module-attribute

Retriever dataclass

Graph-based query helper: batch embed → VDB retrieve [→ Nemotron rerank].

Configuration is passed through embed_kwargs (:class:~nemo_retriever.params.EmbedParams), vdb_kwargs (constructor kwargs for :class:~nemo_retriever.vdb.operators.RetrieveVdbOperator), and optional rerank_kwargs for :class:~nemo_retriever.rerank.rerank.NemotronRerankActor.

See retriever.md for examples.

Source code in nemo_retriever/retriever.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
@dataclass
class Retriever:
    """Graph-based query helper: batch embed → VDB retrieve [→ Nemotron rerank].

    Configuration is passed through ``embed_kwargs`` (:class:`~nemo_retriever.params.EmbedParams`),
    ``vdb_kwargs`` (constructor kwargs for :class:`~nemo_retriever.vdb.operators.RetrieveVdbOperator`),
    and optional ``rerank_kwargs`` for :class:`~nemo_retriever.rerank.rerank.NemotronRerankActor`.

    See ``retriever.md`` for examples.
    """

    run_mode: Literal["local", "service"] = "local"
    """``local`` uses archetype batch embed resolution; ``service`` forces CPU HTTP embed."""

    top_k: int = 10
    rerank: bool = False
    """When ``True``, append :class:`~nemo_retriever.rerank.rerank.NemotronRerankActor` after retrieval."""

    graph: Any = None
    """Custom :class:`~nemo_retriever.graph.pipeline_graph.Graph`. When set, ``embed_kwargs`` /
    ``vdb_kwargs`` default-graph fields are ignored for construction (you still pass execute kwargs)."""

    embed_kwargs: dict[str, Any] = field(default_factory=dict)
    vdb_kwargs: dict[str, Any] = field(default_factory=dict)
    rerank_kwargs: dict[str, Any] = field(default_factory=dict)

    _cached_graph: Any = field(default=None, init=False, repr=False, compare=False)
    _cache_key: Any = field(default=None, init=False, repr=False, compare=False)

    def __post_init__(self) -> None:
        if self.run_mode not in ("local", "service"):
            raise ValueError("run_mode must be 'local' or 'service'")

    def _merge_embed_params(self, extra: Optional[dict[str, Any]] = None) -> Any:
        from nemo_retriever.model import _LOCAL_INGEST_EMBED_BACKENDS, normalize_backend
        from nemo_retriever.params import EmbedParams

        base: dict[str, Any] = {
            "model_name": VL_EMBED_MODEL,
            "embed_model_name": VL_EMBED_MODEL,
            "input_type": "query",
            "text_column": "text",
            "inference_batch_size": 32,
            "embed_inference_batch_size": 32,
        }
        merged = {**base, **dict(self.embed_kwargs or {}), **dict(extra or {})}
        if "local_ingest_embed_backend" in merged and merged["local_ingest_embed_backend"] is not None:
            merged["local_ingest_embed_backend"] = normalize_backend(
                str(merged["local_ingest_embed_backend"]),
                _LOCAL_INGEST_EMBED_BACKENDS,
                field_name="local_ingest_embed_backend",
                default="vllm",
            )
        params = EmbedParams.model_validate(merged)
        if self.run_mode == "service":
            url = (params.embedding_endpoint or params.embed_invoke_url or "").strip()
            if not url:
                raise ValueError(
                    "run_mode='service' requires a non-empty HTTP embedding URL. "
                    "Set ``embedding_endpoint`` or ``embed_invoke_url`` inside ``embed_kwargs``."
                )
        return params

    def _merge_rerank_actor_kwargs(self) -> dict[str, Any]:
        return {**_default_rerank_actor_kwargs(), **dict(self.rerank_kwargs or {})}

    def _refine_factor(self) -> int:
        if not self.rerank:
            return 1
        return int(self._merge_rerank_actor_kwargs().get("refine_factor", 4))

    def _build_default_graph(self, *, embed_extra: Optional[dict[str, Any]] = None) -> Any:
        from nemo_retriever.rerank.rerank import NemotronRerankActor
        from nemo_retriever.text_embed.cpu_operator import _BatchEmbedCPUActor
        from nemo_retriever.text_embed.operators import _BatchEmbedActor

        embed_params = self._merge_embed_params(embed_extra)
        if self.run_mode == "service":
            embed_op = _BatchEmbedCPUActor(params=embed_params)
        else:
            embed_op = _BatchEmbedActor(params=embed_params)

        vdb_init = _coerce_vdb_init(self.vdb_kwargs)
        retrieve = RetrieveVdbOperator(
            explode_for_rerank=self.rerank,
            **vdb_init,
        )

        chain = embed_op >> retrieve
        if self.rerank:
            rk = self._merge_rerank_actor_kwargs()
            rk.pop("refine_factor", None)
            chain = chain >> NemotronRerankActor(**rk)

        return chain

    def _get_graph(self, *, embed_extra: Optional[dict[str, Any]] = None) -> Any:
        if self.graph is not None:
            return self.graph

        key = (
            self.run_mode,
            self.rerank,
            json.dumps(self.vdb_kwargs, sort_keys=True, default=str),
            json.dumps(self.embed_kwargs, sort_keys=True, default=str),
            json.dumps(self.rerank_kwargs, sort_keys=True, default=str),
            json.dumps(embed_extra or {}, sort_keys=True, default=str),
        )
        if self._cached_graph is not None and self._cache_key == key:
            return self._cached_graph
        g = self._build_default_graph(embed_extra=embed_extra)
        self._cached_graph = g
        self._cache_key = key
        return g

    def _execute_queries_graph(
        self,
        query_texts: list[str],
        *,
        effective_top_k: int,
        retrieval_top_k: int,
        vdb_call_kwargs: Optional[dict[str, Any]],
        embed_extra: Optional[dict[str, Any]],
    ) -> list[list[dict[str, Any]]]:
        embed_params = self._merge_embed_params(embed_extra)
        text_col = str(embed_params.text_column)
        df = pd.DataFrame({text_col: query_texts})

        graph = self._get_graph(embed_extra=embed_extra)
        if not callable(getattr(graph, "resolve_for_local_execution", None)):
            raise TypeError("graph must provide resolve_for_local_execution() (e.g. pipeline_graph.Graph)")

        exec_kwargs: dict[str, Any] = {
            **filter_retrieval_kwargs(dict(vdb_call_kwargs or {})),
            "top_k": int(retrieval_top_k),
            "query_texts": query_texts,
        }
        resolved = graph.resolve_for_local_execution()
        leaves = resolved.execute(df, **exec_kwargs)
        if len(leaves) != 1:
            raise RuntimeError(
                f"Retriever query graph must yield exactly one leaf output; got {len(leaves)}. "
                "Use a linear graph or adjust your custom ``graph``."
            )
        out = leaves[0]

        if isinstance(out, pd.DataFrame):
            if not self.rerank:
                raise TypeError(
                    "Graph returned a DataFrame but ``rerank`` is False; expected list[list[dict]] from retrieval."
                )
            rk = self._merge_rerank_actor_kwargs()
            score_col = str(rk.get("score_column", "rerank_score"))
            return rerank_long_dataframe_to_hits(
                out, query_texts=query_texts, top_k=int(effective_top_k), score_column=score_col
            )
        if not isinstance(out, list):
            raise TypeError(f"Unexpected query graph output type: {type(out).__name__}")
        return out

    def query(
        self,
        query: str,
        *,
        top_k: Optional[int] = None,
        vdb_kwargs: Optional[dict[str, Any]] = None,
        embed_kwargs: Optional[dict[str, Any]] = None,
    ) -> list[RetrievalHit]:
        return self.queries([query], top_k=top_k, vdb_kwargs=vdb_kwargs, embed_kwargs=embed_kwargs)[0]

    def queries(
        self,
        queries: Sequence[str],
        *,
        top_k: Optional[int] = None,
        vdb_kwargs: Optional[dict[str, Any]] = None,
        embed_kwargs: Optional[dict[str, Any]] = None,
    ) -> list[list[RetrievalHit]]:
        query_texts = [str(q) for q in queries]
        if not query_texts:
            return []

        effective_top_k = int(top_k) if top_k is not None else int(self.top_k)
        refine = self._refine_factor()
        retrieval_top_k = effective_top_k * refine if self.rerank else effective_top_k

        return self._execute_queries_graph(
            query_texts,
            effective_top_k=effective_top_k,
            retrieval_top_k=retrieval_top_k,
            vdb_call_kwargs=vdb_kwargs,
            embed_extra=embed_kwargs,
        )

    def retrieve(
        self,
        query: str,
        top_k: Optional[int] = None,
        *,
        vdb_kwargs: Optional[dict[str, Any]] = None,
        embed_kwargs: Optional[dict[str, Any]] = None,
    ) -> "RetrievalResult":
        from nemo_retriever.llm.types import RetrievalResult

        hits = self.query(query, top_k=top_k, vdb_kwargs=vdb_kwargs, embed_kwargs=embed_kwargs)

        chunks: list[str] = []
        metadata: list[dict[str, Any]] = []
        for hit in hits:
            chunks.append(str(hit.get("text", "")))
            metadata.append({k: v for k, v in hit.items() if k != "text"})
        return RetrievalResult(chunks=chunks, metadata=metadata)

    def retrieve_batch(
        self,
        queries: Sequence[str],
        *,
        top_k: Optional[int] = None,
        vdb_kwargs: Optional[dict[str, Any]] = None,
        embed_kwargs: Optional[dict[str, Any]] = None,
    ) -> list["RetrievalResult"]:
        from nemo_retriever.llm.types import RetrievalResult

        query_texts = [str(q) for q in queries]
        if not query_texts:
            return []

        hits_per_query = self.queries(query_texts, top_k=top_k, vdb_kwargs=vdb_kwargs, embed_kwargs=embed_kwargs)

        results: list[RetrievalResult] = []
        for hits in hits_per_query:
            chunks = [str(hit.get("text", "")) for hit in hits]
            metadata = [{k: v for k, v in hit.items() if k != "text"} for hit in hits]
            results.append(RetrievalResult(chunks=chunks, metadata=metadata))
        return results

    def answer(
        self,
        query: str,
        *,
        llm: "LLMClient",
        judge: Optional["AnswerJudge"] = None,
        reference: Optional[str] = None,
        top_k: Optional[int] = None,
        vdb_kwargs: Optional[dict[str, Any]] = None,
        embed_kwargs: Optional[dict[str, Any]] = None,
    ) -> "AnswerResult":
        from nemo_retriever.llm.types import AnswerResult

        if judge is not None and reference is None:
            raise ValueError("judge requires reference")

        retrieved = self.retrieve(query, top_k=top_k, vdb_kwargs=vdb_kwargs, embed_kwargs=embed_kwargs)

        gen = llm.generate(query, retrieved.chunks)

        result = AnswerResult(
            query=query,
            answer=gen.answer,
            chunks=retrieved.chunks,
            metadata=retrieved.metadata,
            model=gen.model,
            latency_s=gen.latency_s,
            error=gen.error,
        )

        if gen.error is not None:
            return result

        if reference is None and judge is None:
            return result

        self._populate_scores(
            result,
            query=query,
            reference=reference,
            judge=judge,
            gen_error=gen.error,
        )
        return result

    def _populate_scores(
        self,
        result: "AnswerResult",
        *,
        query: str,
        reference: Optional[str],
        judge: Optional["AnswerJudge"],
        gen_error: Optional[str],
    ) -> None:
        from concurrent.futures import ThreadPoolExecutor

        from nemo_retriever.evaluation.scoring import (
            answer_in_context,
            classify_failure,
            token_f1,
        )

        def _scoring() -> tuple[Optional[bool], Optional[float], Optional[bool]]:
            if reference is None:
                return None, None, None
            aic = answer_in_context(reference, result.chunks)
            f1 = token_f1(reference, result.answer)
            return aic, float(f1.get("f1", 0.0)), bool(f1.get("exact_match", False))

        def _judging() -> tuple[Optional[int], Optional[str], Optional[str]]:
            if judge is None or reference is None:
                return None, None, None
            jr = judge.judge(query, reference, result.answer)
            return jr.score, jr.reasoning, jr.error

        with ThreadPoolExecutor(max_workers=2) as pool:
            scoring_future = pool.submit(_scoring)
            judge_future = pool.submit(_judging)
            aic, f1, em = scoring_future.result()
            judge_score, judge_reasoning, judge_error = judge_future.result()

        result.answer_in_context = aic
        result.token_f1 = f1
        result.exact_match = em
        result.judge_score = judge_score
        result.judge_reasoning = judge_reasoning
        result.judge_error = judge_error

        if reference is not None and aic is not None:
            result.failure_mode = classify_failure(
                ref_in_chunks=aic,
                judge_score=judge_score,
                gen_error=gen_error,
                candidate=result.answer,
            )

    def pipeline(self, *, top_k: Optional[int] = None) -> "RetrieverPipelineBuilder":
        effective_top_k = int(top_k) if top_k is not None else int(self.top_k)
        return RetrieverPipelineBuilder(self, top_k=effective_top_k)

    def generate_sql(self, query: str) -> str:
        from nemo_retriever.tabular_data.retrieval import generate_sql

        return generate_sql(query)

embed_kwargs = field(default_factory=dict) class-attribute instance-attribute

graph = None class-attribute instance-attribute

Custom :class:~nemo_retriever.graph.pipeline_graph.Graph. When set, embed_kwargs / vdb_kwargs default-graph fields are ignored for construction (you still pass execute kwargs).

rerank = False class-attribute instance-attribute

When True, append :class:~nemo_retriever.rerank.rerank.NemotronRerankActor after retrieval.

rerank_kwargs = field(default_factory=dict) class-attribute instance-attribute

run_mode = 'local' class-attribute instance-attribute

local uses archetype batch embed resolution; service forces CPU HTTP embed.

top_k = 10 class-attribute instance-attribute

vdb_kwargs = field(default_factory=dict) class-attribute instance-attribute

__init__(run_mode='local', top_k=10, rerank=False, graph=None, embed_kwargs=dict(), vdb_kwargs=dict(), rerank_kwargs=dict())

__post_init__()

Source code in nemo_retriever/retriever.py
83
84
85
def __post_init__(self) -> None:
    if self.run_mode not in ("local", "service"):
        raise ValueError("run_mode must be 'local' or 'service'")

answer(query, *, llm, judge=None, reference=None, top_k=None, vdb_kwargs=None, embed_kwargs=None)

Source code in nemo_retriever/retriever.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def answer(
    self,
    query: str,
    *,
    llm: "LLMClient",
    judge: Optional["AnswerJudge"] = None,
    reference: Optional[str] = None,
    top_k: Optional[int] = None,
    vdb_kwargs: Optional[dict[str, Any]] = None,
    embed_kwargs: Optional[dict[str, Any]] = None,
) -> "AnswerResult":
    from nemo_retriever.llm.types import AnswerResult

    if judge is not None and reference is None:
        raise ValueError("judge requires reference")

    retrieved = self.retrieve(query, top_k=top_k, vdb_kwargs=vdb_kwargs, embed_kwargs=embed_kwargs)

    gen = llm.generate(query, retrieved.chunks)

    result = AnswerResult(
        query=query,
        answer=gen.answer,
        chunks=retrieved.chunks,
        metadata=retrieved.metadata,
        model=gen.model,
        latency_s=gen.latency_s,
        error=gen.error,
    )

    if gen.error is not None:
        return result

    if reference is None and judge is None:
        return result

    self._populate_scores(
        result,
        query=query,
        reference=reference,
        judge=judge,
        gen_error=gen.error,
    )
    return result

generate_sql(query)

Source code in nemo_retriever/retriever.py
390
391
392
393
def generate_sql(self, query: str) -> str:
    from nemo_retriever.tabular_data.retrieval import generate_sql

    return generate_sql(query)

pipeline(*, top_k=None)

Source code in nemo_retriever/retriever.py
386
387
388
def pipeline(self, *, top_k: Optional[int] = None) -> "RetrieverPipelineBuilder":
    effective_top_k = int(top_k) if top_k is not None else int(self.top_k)
    return RetrieverPipelineBuilder(self, top_k=effective_top_k)

queries(queries, *, top_k=None, vdb_kwargs=None, embed_kwargs=None)

Source code in nemo_retriever/retriever.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def queries(
    self,
    queries: Sequence[str],
    *,
    top_k: Optional[int] = None,
    vdb_kwargs: Optional[dict[str, Any]] = None,
    embed_kwargs: Optional[dict[str, Any]] = None,
) -> list[list[RetrievalHit]]:
    query_texts = [str(q) for q in queries]
    if not query_texts:
        return []

    effective_top_k = int(top_k) if top_k is not None else int(self.top_k)
    refine = self._refine_factor()
    retrieval_top_k = effective_top_k * refine if self.rerank else effective_top_k

    return self._execute_queries_graph(
        query_texts,
        effective_top_k=effective_top_k,
        retrieval_top_k=retrieval_top_k,
        vdb_call_kwargs=vdb_kwargs,
        embed_extra=embed_kwargs,
    )

query(query, *, top_k=None, vdb_kwargs=None, embed_kwargs=None)

Source code in nemo_retriever/retriever.py
214
215
216
217
218
219
220
221
222
def query(
    self,
    query: str,
    *,
    top_k: Optional[int] = None,
    vdb_kwargs: Optional[dict[str, Any]] = None,
    embed_kwargs: Optional[dict[str, Any]] = None,
) -> list[RetrievalHit]:
    return self.queries([query], top_k=top_k, vdb_kwargs=vdb_kwargs, embed_kwargs=embed_kwargs)[0]

retrieve(query, top_k=None, *, vdb_kwargs=None, embed_kwargs=None)

Source code in nemo_retriever/retriever.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
def retrieve(
    self,
    query: str,
    top_k: Optional[int] = None,
    *,
    vdb_kwargs: Optional[dict[str, Any]] = None,
    embed_kwargs: Optional[dict[str, Any]] = None,
) -> "RetrievalResult":
    from nemo_retriever.llm.types import RetrievalResult

    hits = self.query(query, top_k=top_k, vdb_kwargs=vdb_kwargs, embed_kwargs=embed_kwargs)

    chunks: list[str] = []
    metadata: list[dict[str, Any]] = []
    for hit in hits:
        chunks.append(str(hit.get("text", "")))
        metadata.append({k: v for k, v in hit.items() if k != "text"})
    return RetrievalResult(chunks=chunks, metadata=metadata)

retrieve_batch(queries, *, top_k=None, vdb_kwargs=None, embed_kwargs=None)

Source code in nemo_retriever/retriever.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def retrieve_batch(
    self,
    queries: Sequence[str],
    *,
    top_k: Optional[int] = None,
    vdb_kwargs: Optional[dict[str, Any]] = None,
    embed_kwargs: Optional[dict[str, Any]] = None,
) -> list["RetrievalResult"]:
    from nemo_retriever.llm.types import RetrievalResult

    query_texts = [str(q) for q in queries]
    if not query_texts:
        return []

    hits_per_query = self.queries(query_texts, top_k=top_k, vdb_kwargs=vdb_kwargs, embed_kwargs=embed_kwargs)

    results: list[RetrievalResult] = []
    for hits in hits_per_query:
        chunks = [str(hit.get("text", "")) for hit in hits]
        metadata = [{k: v for k, v in hit.items() if k != "text"} for hit in hits]
        results.append(RetrievalResult(chunks=chunks, metadata=metadata))
    return results

RetrieverPipelineBuilder

Fluent builder for live-RAG batch operator graphs.

Returned from :meth:Retriever.pipeline. Each builder method appends an :class:~nemo_retriever.evaluation.eval_operator.EvalOperator to an internal list; :meth:run composes them into a graph via the existing >> chaining and executes it on a DataFrame built from the provided queries.

Example

builder = retriever.pipeline() # doctest: +SKIP df = builder.generate(llm).score().judge(judge).run( # doctest: +SKIP ... queries=["q1", "q2"], ... reference=["r1", "r2"], ... )

Source code in nemo_retriever/retriever.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
class RetrieverPipelineBuilder:
    """Fluent builder for live-RAG batch operator graphs.

    Returned from :meth:`Retriever.pipeline`.  Each builder method appends
    an :class:`~nemo_retriever.evaluation.eval_operator.EvalOperator` to an
    internal list; :meth:`run` composes them into a graph via the existing
    ``>>`` chaining and executes it on a DataFrame built from the provided
    queries.

    Example:
        >>> builder = retriever.pipeline()  # doctest: +SKIP
        >>> df = builder.generate(llm).score().judge(judge).run(  # doctest: +SKIP
        ...     queries=["q1", "q2"],
        ...     reference=["r1", "r2"],
        ... )
    """

    def __init__(self, retriever: "Retriever", *, top_k: int = 5) -> None:
        self._retriever = retriever
        self._top_k = int(top_k)
        self._steps: list[Any] = []

    def with_retrieval(self, *, top_k: int) -> "RetrieverPipelineBuilder":
        """Override the ``top_k`` used for the live retrieval source."""
        self._top_k = int(top_k)
        return self

    def generate(
        self,
        llm: Optional[Any] = None,
        /,
        *,
        model: Optional[str] = None,
        **kwargs: Any,
    ) -> "RetrieverPipelineBuilder":
        """Append a :class:`QAGenerationOperator` step.

        Accepts either a pre-built
        :class:`~nemo_retriever.llm.clients.LiteLLMClient` (whose transport
        and sampling params are unpacked onto the operator) or the flat
        ``model=..., api_base=..., ...`` kwargs forwarded to the operator
        constructor directly.

        Raises:
            ValueError: If neither ``llm`` nor ``model`` is provided.
        """
        from nemo_retriever.evaluation.generation import QAGenerationOperator

        if llm is None and model is None:
            raise ValueError("generate() requires either llm= or model=")

        if llm is not None:
            transport = llm.transport
            sampling = llm.sampling
            operator = QAGenerationOperator(
                model=transport.model,
                api_base=transport.api_base,
                api_key=transport.api_key,
                temperature=sampling.temperature,
                top_p=sampling.top_p,
                max_tokens=sampling.max_tokens,
                extra_params=dict(transport.extra_params) if transport.extra_params else None,
                num_retries=transport.num_retries,
                timeout=transport.timeout,
            )
        else:
            operator = QAGenerationOperator(model=model, **kwargs)

        self._steps.append(operator)
        return self

    def score(self) -> "RetrieverPipelineBuilder":
        """Append a :class:`ScoringOperator` step (Tier 1 + Tier 2)."""
        from nemo_retriever.evaluation.scoring_operator import ScoringOperator

        self._steps.append(ScoringOperator())
        return self

    def judge(
        self,
        judge: Optional[Any] = None,
        /,
        *,
        model: Optional[str] = None,
        **kwargs: Any,
    ) -> "RetrieverPipelineBuilder":
        """Append a :class:`JudgingOperator` step (Tier 3).

        Accepts either a pre-built
        :class:`~nemo_retriever.llm.clients.judge.LLMJudge` (whose transport params
        are unpacked onto the operator) or the flat ``model=...`` kwargs
        forwarded to the operator constructor.

        Raises:
            ValueError: If neither ``judge`` nor ``model`` is provided.
        """
        from nemo_retriever.evaluation.judging import JudgingOperator

        if judge is None and model is None:
            raise ValueError("judge() requires either judge= or model=")

        if judge is not None:
            transport = judge._client.transport
            operator = JudgingOperator(
                model=transport.model,
                api_base=transport.api_base,
                api_key=transport.api_key,
                extra_params=dict(transport.extra_params) if transport.extra_params else None,
                num_retries=transport.num_retries,
                timeout=transport.timeout,
            )
        else:
            operator = JudgingOperator(model=model, **kwargs)

        self._steps.append(operator)
        return self

    def run(
        self,
        queries: Any,
        *,
        reference: Any = None,
    ) -> "pd.DataFrame":
        """Execute the composed graph on ``queries``.

        Args:
            queries: A single query string, a list of query strings, or a
                pre-built ``pandas.DataFrame`` (which must contain a
                ``query`` column and, when judging/scoring, a
                ``reference_answer`` column).
            reference: Optional ground-truth answer(s).  Accepts a single
                string (applied to all queries), a list aligned with
                ``queries``, or ``None``.  Ignored when ``queries`` is
                already a DataFrame.

        Returns:
            A ``pandas.DataFrame`` with the columns contributed by each
            appended step (always ``query``, ``context``, and
            ``context_metadata``; plus ``answer``/``latency_s``/... when
            ``.generate()`` ran, and so on).

        Raises:
            ValueError: If ``reference`` is a list whose length does not
                match ``queries``.
        """
        import pandas as pd

        from nemo_retriever.evaluation.live_retrieval import LiveRetrievalOperator

        if isinstance(queries, str):
            query_list = [queries]
            df = pd.DataFrame({"query": query_list})
            if reference is not None:
                refs = reference if isinstance(reference, list) else [reference]
                if len(refs) != len(query_list):
                    raise ValueError("reference length must match queries length")
                df["reference_answer"] = refs
        elif isinstance(queries, list):
            df = pd.DataFrame({"query": list(queries)})
            if reference is not None:
                refs = reference if isinstance(reference, list) else [reference] * len(queries)
                if len(refs) != len(queries):
                    raise ValueError("reference length must match queries length")
                df["reference_answer"] = refs
        elif isinstance(queries, pd.DataFrame):
            df = queries.copy()
        else:
            raise TypeError("queries must be a str, list[str], or pandas.DataFrame; " f"got {type(queries).__name__}")

        retrieval_op = LiveRetrievalOperator(self._retriever, top_k=self._top_k)
        if not self._steps:
            out = retrieval_op.run(df)
        else:
            graph = retrieval_op
            for step in self._steps:
                graph = graph >> step
            # Linear live-RAG pipelines have exactly one leaf.
            leaves = graph.execute(df)
            if len(leaves) != 1:
                raise RuntimeError(f"Unexpected pipeline fan-out: got {len(leaves)} leaf outputs")
            out = leaves[0]

        # Expose the generation failure rate on ``df.attrs`` for downstream aggregators.
        if "gen_error" in out.columns and len(out) > 0:
            out.attrs["generation_failure_rate"] = float(out["gen_error"].notna().mean())

        return out

__init__(retriever, *, top_k=5)

Source code in nemo_retriever/retriever.py
413
414
415
416
def __init__(self, retriever: "Retriever", *, top_k: int = 5) -> None:
    self._retriever = retriever
    self._top_k = int(top_k)
    self._steps: list[Any] = []

generate(llm=None, /, *, model=None, **kwargs)

Append a :class:QAGenerationOperator step.

Accepts either a pre-built :class:~nemo_retriever.llm.clients.LiteLLMClient (whose transport and sampling params are unpacked onto the operator) or the flat model=..., api_base=..., ... kwargs forwarded to the operator constructor directly.

Raises:

Type Description
ValueError

If neither llm nor model is provided.

Source code in nemo_retriever/retriever.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
def generate(
    self,
    llm: Optional[Any] = None,
    /,
    *,
    model: Optional[str] = None,
    **kwargs: Any,
) -> "RetrieverPipelineBuilder":
    """Append a :class:`QAGenerationOperator` step.

    Accepts either a pre-built
    :class:`~nemo_retriever.llm.clients.LiteLLMClient` (whose transport
    and sampling params are unpacked onto the operator) or the flat
    ``model=..., api_base=..., ...`` kwargs forwarded to the operator
    constructor directly.

    Raises:
        ValueError: If neither ``llm`` nor ``model`` is provided.
    """
    from nemo_retriever.evaluation.generation import QAGenerationOperator

    if llm is None and model is None:
        raise ValueError("generate() requires either llm= or model=")

    if llm is not None:
        transport = llm.transport
        sampling = llm.sampling
        operator = QAGenerationOperator(
            model=transport.model,
            api_base=transport.api_base,
            api_key=transport.api_key,
            temperature=sampling.temperature,
            top_p=sampling.top_p,
            max_tokens=sampling.max_tokens,
            extra_params=dict(transport.extra_params) if transport.extra_params else None,
            num_retries=transport.num_retries,
            timeout=transport.timeout,
        )
    else:
        operator = QAGenerationOperator(model=model, **kwargs)

    self._steps.append(operator)
    return self

judge(judge=None, /, *, model=None, **kwargs)

Append a :class:JudgingOperator step (Tier 3).

Accepts either a pre-built :class:~nemo_retriever.llm.clients.judge.LLMJudge (whose transport params are unpacked onto the operator) or the flat model=... kwargs forwarded to the operator constructor.

Raises:

Type Description
ValueError

If neither judge nor model is provided.

Source code in nemo_retriever/retriever.py
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
def judge(
    self,
    judge: Optional[Any] = None,
    /,
    *,
    model: Optional[str] = None,
    **kwargs: Any,
) -> "RetrieverPipelineBuilder":
    """Append a :class:`JudgingOperator` step (Tier 3).

    Accepts either a pre-built
    :class:`~nemo_retriever.llm.clients.judge.LLMJudge` (whose transport params
    are unpacked onto the operator) or the flat ``model=...`` kwargs
    forwarded to the operator constructor.

    Raises:
        ValueError: If neither ``judge`` nor ``model`` is provided.
    """
    from nemo_retriever.evaluation.judging import JudgingOperator

    if judge is None and model is None:
        raise ValueError("judge() requires either judge= or model=")

    if judge is not None:
        transport = judge._client.transport
        operator = JudgingOperator(
            model=transport.model,
            api_base=transport.api_base,
            api_key=transport.api_key,
            extra_params=dict(transport.extra_params) if transport.extra_params else None,
            num_retries=transport.num_retries,
            timeout=transport.timeout,
        )
    else:
        operator = JudgingOperator(model=model, **kwargs)

    self._steps.append(operator)
    return self

run(queries, *, reference=None)

Execute the composed graph on queries.

Parameters:

Name Type Description Default
queries Any

A single query string, a list of query strings, or a pre-built pandas.DataFrame (which must contain a query column and, when judging/scoring, a reference_answer column).

required
reference Any

Optional ground-truth answer(s). Accepts a single string (applied to all queries), a list aligned with queries, or None. Ignored when queries is already a DataFrame.

None

Returns:

Type Description
'pd.DataFrame'

A pandas.DataFrame with the columns contributed by each

'pd.DataFrame'

appended step (always query, context, and

'pd.DataFrame'

context_metadata; plus answer/latency_s/... when

'pd.DataFrame'

.generate() ran, and so on).

Raises:

Type Description
ValueError

If reference is a list whose length does not match queries.

Source code in nemo_retriever/retriever.py
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
def run(
    self,
    queries: Any,
    *,
    reference: Any = None,
) -> "pd.DataFrame":
    """Execute the composed graph on ``queries``.

    Args:
        queries: A single query string, a list of query strings, or a
            pre-built ``pandas.DataFrame`` (which must contain a
            ``query`` column and, when judging/scoring, a
            ``reference_answer`` column).
        reference: Optional ground-truth answer(s).  Accepts a single
            string (applied to all queries), a list aligned with
            ``queries``, or ``None``.  Ignored when ``queries`` is
            already a DataFrame.

    Returns:
        A ``pandas.DataFrame`` with the columns contributed by each
        appended step (always ``query``, ``context``, and
        ``context_metadata``; plus ``answer``/``latency_s``/... when
        ``.generate()`` ran, and so on).

    Raises:
        ValueError: If ``reference`` is a list whose length does not
            match ``queries``.
    """
    import pandas as pd

    from nemo_retriever.evaluation.live_retrieval import LiveRetrievalOperator

    if isinstance(queries, str):
        query_list = [queries]
        df = pd.DataFrame({"query": query_list})
        if reference is not None:
            refs = reference if isinstance(reference, list) else [reference]
            if len(refs) != len(query_list):
                raise ValueError("reference length must match queries length")
            df["reference_answer"] = refs
    elif isinstance(queries, list):
        df = pd.DataFrame({"query": list(queries)})
        if reference is not None:
            refs = reference if isinstance(reference, list) else [reference] * len(queries)
            if len(refs) != len(queries):
                raise ValueError("reference length must match queries length")
            df["reference_answer"] = refs
    elif isinstance(queries, pd.DataFrame):
        df = queries.copy()
    else:
        raise TypeError("queries must be a str, list[str], or pandas.DataFrame; " f"got {type(queries).__name__}")

    retrieval_op = LiveRetrievalOperator(self._retriever, top_k=self._top_k)
    if not self._steps:
        out = retrieval_op.run(df)
    else:
        graph = retrieval_op
        for step in self._steps:
            graph = graph >> step
        # Linear live-RAG pipelines have exactly one leaf.
        leaves = graph.execute(df)
        if len(leaves) != 1:
            raise RuntimeError(f"Unexpected pipeline fan-out: got {len(leaves)} leaf outputs")
        out = leaves[0]

    # Expose the generation failure rate on ``df.attrs`` for downstream aggregators.
    if "gen_error" in out.columns and len(out) > 0:
        out.attrs["generation_failure_rate"] = float(out["gen_error"].notna().mean())

    return out

score()

Append a :class:ScoringOperator step (Tier 1 + Tier 2).

Source code in nemo_retriever/retriever.py
467
468
469
470
471
472
def score(self) -> "RetrieverPipelineBuilder":
    """Append a :class:`ScoringOperator` step (Tier 1 + Tier 2)."""
    from nemo_retriever.evaluation.scoring_operator import ScoringOperator

    self._steps.append(ScoringOperator())
    return self

with_retrieval(*, top_k)

Override the top_k used for the live retrieval source.

Source code in nemo_retriever/retriever.py
418
419
420
421
def with_retrieval(self, *, top_k: int) -> "RetrieverPipelineBuilder":
    """Override the ``top_k`` used for the live retrieval source."""
    self._top_k = int(top_k)
    return self

MetaJoinKey = Literal['auto', 'source_id', 'source_name'] module-attribute

RunMode = Literal['inprocess', 'batch', 'fused', 'service'] module-attribute

SPLIT_CONFIG_VALID_KEYS = frozenset({'text', 'html', 'pdf', 'audio', 'image', 'video'}) module-attribute

__all__ = ['ASRParams', 'AudioChunkParams', 'AudioVisualFuseParams', 'BatchTuningParams', 'CaptionParams', 'ChartParams', 'DedupParams', 'EmbedParams', 'ExtractParams', 'FusedTuningParams', 'GpuAllocationParams', 'HtmlChunkParams', 'InfographicParams', 'IngestExecuteParams', 'IngestorCreateParams', 'LanceDbParams', 'LLMInferenceParams', 'LLMRemoteClientParams', 'ModelRuntimeParams', 'OcrParams', 'PageElementsParams', 'PdfSplitParams', 'RemoteInvokeParams', 'RemoteRetryParams', 'RunMode', 'SPLIT_CONFIG_VALID_KEYS', 'StoreParams', 'TabularExtractParams', 'TableParams', 'TextChunkParams', 'MetaJoinKey', 'VdbUploadParams', 'VideoFrameParams', 'VideoFrameTextDedupParams', 'WebhookParams', 'resolve_split_params'] module-attribute

ASRParams

Bases: _ParamsModel

Params for ASR (Parakeet/Riva gRPC or local transformers backend).

Source code in nemo_retriever/params/models.py
162
163
164
165
166
167
168
169
class ASRParams(_ParamsModel):
    """Params for ASR (Parakeet/Riva gRPC or local transformers backend)."""

    audio_endpoints: Tuple[Optional[str], Optional[str]] = (None, None)
    audio_infer_protocol: str = "grpc"
    function_id: Optional[str] = None
    auth_token: Optional[str] = None
    segment_audio: bool = False

audio_endpoints = (None, None) class-attribute instance-attribute

audio_infer_protocol = 'grpc' class-attribute instance-attribute

auth_token = None class-attribute instance-attribute

function_id = None class-attribute instance-attribute

segment_audio = False class-attribute instance-attribute

AudioChunkParams

Bases: _ParamsModel

Params for media chunking (audio/video split). Aligned with nemo_retriever.api dataloader.

Set enabled=False (when wired through VideoSplitActor) to skip audio chunking and ASR on a video pipeline — useful for visual-only recall benchmarks. MediaChunkActor ignores this flag for the audio-only pipeline since chunking is the whole point there.

Source code in nemo_retriever/params/models.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class AudioChunkParams(_ParamsModel):
    """Params for media chunking (audio/video split). Aligned with `nemo_retriever.api` dataloader.

    Set ``enabled=False`` (when wired through ``VideoSplitActor``) to skip
    audio chunking and ASR on a video pipeline — useful for visual-only
    recall benchmarks. ``MediaChunkActor`` ignores this flag for the
    audio-only pipeline since chunking is the whole point there.
    """

    enabled: bool = True
    split_type: Literal["size", "time", "frame"] = "size"
    split_interval: int = 450
    audio_only: bool = False
    video_audio_separate: bool = False

audio_only = False class-attribute instance-attribute

enabled = True class-attribute instance-attribute

split_interval = 450 class-attribute instance-attribute

split_type = 'size' class-attribute instance-attribute

video_audio_separate = False class-attribute instance-attribute

AudioVisualFuseParams

Bases: _ParamsModel

Toggle for :class:~nemo_retriever.video.AudioVisualFuser.

Source code in nemo_retriever/params/models.py
218
219
220
221
class AudioVisualFuseParams(_ParamsModel):
    """Toggle for :class:`~nemo_retriever.video.AudioVisualFuser`."""

    enabled: bool = True

enabled = True class-attribute instance-attribute

BatchTuningParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class BatchTuningParams(_ParamsModel):
    debug_run_id: str = "unknown"
    pdf_split_batch_size: int = 1
    pdf_extract_batch_size: int = 4
    pdf_extract_num_cpus: float = 2
    pdf_extract_workers: Optional[int] = None
    page_elements_batch_size: int = 24
    detect_batch_size: int = 24
    ocr_inference_batch_size: Optional[int] = None
    page_elements_workers: Optional[int] = None
    ocr_workers: Optional[int] = None
    detect_workers: Optional[int] = None
    page_elements_cpus_per_actor: float = 1
    ocr_cpus_per_actor: float = 1
    embed_workers: Optional[int] = None
    embed_batch_size: int = 32
    embed_cpus_per_actor: float = 1
    gpu_page_elements: Optional[float] = None
    gpu_ocr: Optional[float] = None
    gpu_embed: Optional[float] = None
    nemotron_parse_workers: Optional[int] = None
    gpu_nemotron_parse: Optional[float] = None
    nemotron_parse_batch_size: Optional[int] = None
    store_workers: Optional[int] = None
    inference_batch_size: int = 8

debug_run_id = 'unknown' class-attribute instance-attribute

detect_batch_size = 24 class-attribute instance-attribute

detect_workers = None class-attribute instance-attribute

embed_batch_size = 32 class-attribute instance-attribute

embed_cpus_per_actor = 1 class-attribute instance-attribute

embed_workers = None class-attribute instance-attribute

gpu_embed = None class-attribute instance-attribute

gpu_nemotron_parse = None class-attribute instance-attribute

gpu_ocr = None class-attribute instance-attribute

gpu_page_elements = None class-attribute instance-attribute

inference_batch_size = 8 class-attribute instance-attribute

nemotron_parse_batch_size = None class-attribute instance-attribute

nemotron_parse_workers = None class-attribute instance-attribute

ocr_cpus_per_actor = 1 class-attribute instance-attribute

ocr_inference_batch_size = None class-attribute instance-attribute

ocr_workers = None class-attribute instance-attribute

page_elements_batch_size = 24 class-attribute instance-attribute

page_elements_cpus_per_actor = 1 class-attribute instance-attribute

page_elements_workers = None class-attribute instance-attribute

pdf_extract_batch_size = 4 class-attribute instance-attribute

pdf_extract_num_cpus = 2 class-attribute instance-attribute

pdf_extract_workers = None class-attribute instance-attribute

pdf_split_batch_size = 1 class-attribute instance-attribute

store_workers = None class-attribute instance-attribute

CaptionParams

Bases: LLMInferenceParams

Source code in nemo_retriever/params/models.py
601
602
603
604
605
606
607
608
609
610
611
612
613
614
class CaptionParams(LLMInferenceParams):
    endpoint_url: Optional[str] = None
    model_name: str = "nvidia/NVIDIA-Nemotron-Nano-12B-v2-VL-BF16"
    api_key: Optional[str] = None
    prompt: str = "Caption the content of this image:"
    system_prompt: Optional[str] = "/no_think"
    batch_size: int = 8
    device: Optional[str] = None
    hf_cache_dir: Optional[str] = None
    context_text_max_chars: int = 0
    tensor_parallel_size: int = 1
    gpu_memory_utilization: float = 0.5
    caption_infographics: bool = False
    extra_body: dict[str, Any] = Field(default_factory=dict)

api_key = None class-attribute instance-attribute

batch_size = 8 class-attribute instance-attribute

caption_infographics = False class-attribute instance-attribute

context_text_max_chars = 0 class-attribute instance-attribute

device = None class-attribute instance-attribute

endpoint_url = None class-attribute instance-attribute

extra_body = Field(default_factory=dict) class-attribute instance-attribute

gpu_memory_utilization = 0.5 class-attribute instance-attribute

hf_cache_dir = None class-attribute instance-attribute

model_name = 'nvidia/NVIDIA-Nemotron-Nano-12B-v2-VL-BF16' class-attribute instance-attribute

prompt = 'Caption the content of this image:' class-attribute instance-attribute

system_prompt = '/no_think' class-attribute instance-attribute

tensor_parallel_size = 1 class-attribute instance-attribute

ChartParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
520
521
522
523
class ChartParams(_ParamsModel):
    remote: RemoteInvokeParams = Field(default_factory=RemoteInvokeParams)
    remote_retry: RemoteRetryParams = Field(default_factory=RemoteRetryParams)
    inference_batch_size: int = 8

inference_batch_size = 8 class-attribute instance-attribute

remote = Field(default_factory=RemoteInvokeParams) class-attribute instance-attribute

remote_retry = Field(default_factory=RemoteRetryParams) class-attribute instance-attribute

DedupParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
632
633
634
635
class DedupParams(_ParamsModel):
    content_hash: bool = True
    bbox_iou: bool = True
    iou_threshold: float = Field(default=0.45, ge=0.0, le=1.0)

bbox_iou = True class-attribute instance-attribute

content_hash = True class-attribute instance-attribute

iou_threshold = Field(default=0.45, ge=0.0, le=1.0) class-attribute instance-attribute

EmbedParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
class EmbedParams(_ParamsModel):
    model_name: Optional[str] = None
    embedding_endpoint: Optional[str] = None
    embed_invoke_url: Optional[str] = None
    embed_model_name: Optional[str] = None
    api_key: Optional[str] = None
    input_type: str = "passage"
    embed_modality: str = "text"  # "text", "image", or "text_image" — default for all element types
    embed_granularity: Literal["element", "page"] = "element"  # "element" = per-element rows, "page" = one row per page
    text_elements_modality: Optional[str] = None  # per-type override for page-text rows
    structured_elements_modality: Optional[str] = None  # per-type override for table/chart/infographic rows
    text_column: str = "text"
    inference_batch_size: int = 32
    output_column: str = "text_embeddings_1b_v2"
    embedding_dim_column: str = "text_embeddings_1b_v2_dim"
    has_embedding_column: str = "text_embeddings_1b_v2_has_embedding"
    embed_output_column: str = "text_embeddings_1b_v2"
    embed_inference_batch_size: int = 16

    local_ingest_embed_backend: str = (
        "vllm"  # "vllm" or "hf" — selects ingest-time embedder backend for both text and VL models
    )
    query_max_length: int = 128
    dimensions: Optional[int] = None

    # Concurrent HTTP embedding requests per Ray batch (OpenAI-compatible NIM).
    nim_http_max_concurrent: int = 32
    request_timeout_s: float = 600.0

    runtime: ModelRuntimeParams = Field(default_factory=ModelRuntimeParams)
    batch_tuning: BatchTuningParams = Field(default_factory=BatchTuningParams)
    fused_tuning: FusedTuningParams = Field(default_factory=FusedTuningParams)

    @field_validator("local_ingest_embed_backend", mode="before")
    @classmethod
    def _validate_local_ingest_embed_backend(cls, v: str) -> str:
        from nemo_retriever.model import _LOCAL_INGEST_EMBED_BACKENDS, normalize_backend

        return normalize_backend(
            str(v) if v is not None else None,
            _LOCAL_INGEST_EMBED_BACKENDS,
            field_name="local_ingest_embed_backend",
            default="vllm",
        )

    @field_validator("embed_modality", "text_elements_modality", "structured_elements_modality", mode="before")
    @classmethod
    def _validate_modality(cls, v: str | None) -> str | None:
        if v is None:
            return None
        modality = str(v).strip()
        if modality == "image_text":
            raise ValueError("Use 'text_image' instead of 'image_text'.")
        if modality not in VALID_EMBED_MODALITIES:
            raise ValueError(f"Modality must be one of {sorted(VALID_EMBED_MODALITIES)}")
        return modality

    @model_validator(mode="after")
    def _warn_page_granularity_overrides(self) -> "EmbedParams":
        if self.embed_granularity == "page" and (
            self.text_elements_modality is not None or self.structured_elements_modality is not None
        ):
            warnings.warn(
                "text_elements_modality and structured_elements_modality are ignored when "
                "embed_granularity='page' (only embed_modality is used).",
                UserWarning,
                stacklevel=2,
            )
        return self

api_key = None class-attribute instance-attribute

batch_tuning = Field(default_factory=BatchTuningParams) class-attribute instance-attribute

dimensions = None class-attribute instance-attribute

embed_granularity = 'element' class-attribute instance-attribute

embed_inference_batch_size = 16 class-attribute instance-attribute

embed_invoke_url = None class-attribute instance-attribute

embed_modality = 'text' class-attribute instance-attribute

embed_model_name = None class-attribute instance-attribute

embed_output_column = 'text_embeddings_1b_v2' class-attribute instance-attribute

embedding_dim_column = 'text_embeddings_1b_v2_dim' class-attribute instance-attribute

embedding_endpoint = None class-attribute instance-attribute

fused_tuning = Field(default_factory=FusedTuningParams) class-attribute instance-attribute

has_embedding_column = 'text_embeddings_1b_v2_has_embedding' class-attribute instance-attribute

inference_batch_size = 32 class-attribute instance-attribute

input_type = 'passage' class-attribute instance-attribute

local_ingest_embed_backend = 'vllm' class-attribute instance-attribute

model_name = None class-attribute instance-attribute

nim_http_max_concurrent = 32 class-attribute instance-attribute

output_column = 'text_embeddings_1b_v2' class-attribute instance-attribute

query_max_length = 128 class-attribute instance-attribute

request_timeout_s = 600.0 class-attribute instance-attribute

runtime = Field(default_factory=ModelRuntimeParams) class-attribute instance-attribute

structured_elements_modality = None class-attribute instance-attribute

text_column = 'text' class-attribute instance-attribute

text_elements_modality = None class-attribute instance-attribute

ExtractParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
class ExtractParams(_ParamsModel):
    # Extraction flags
    extract_text: bool = True
    extract_images: bool = True
    extract_tables: bool = True
    extract_charts: bool = True
    extract_infographics: bool = True
    extract_page_as_image: Optional[bool] = None

    # Extraction options
    method: str = "pdfium"
    # Run PageElementDetection (layout/yolox). Required by TableStructure,
    # GraphicElements, and OCR. Safe to disable for text-only ingests.
    use_page_elements: bool = True
    use_table_structure: bool = False
    table_output_format: Optional[Literal["pseudo_markdown", "markdown"]] = None
    use_graphic_elements: bool = False
    dpi: int = 200
    image_format: str = "jpeg"
    jpeg_quality: int = 100
    render_mode: Literal["full_dpi", "fit_to_model"] = "fit_to_model"
    inference_batch_size: int = 8
    ocr_model_dir: Optional[str] = None
    ocr_version: Literal["v1", "v2"] = "v2"
    ocr_lang: Optional[Literal["multi", "english"]] = None

    # Service endpoints
    invoke_url: Optional[str] = None
    api_key: Optional[str] = None
    request_timeout_s: float = 60.0
    page_elements_invoke_url: Optional[str] = None
    page_elements_api_key: Optional[str] = None
    page_elements_request_timeout_s: Optional[float] = None
    ocr_invoke_url: Optional[str] = None
    ocr_api_key: Optional[str] = None
    ocr_request_timeout_s: Optional[float] = None
    graphic_elements_invoke_url: Optional[str] = None
    table_structure_invoke_url: Optional[str] = None
    nemotron_parse_invoke_url: Optional[str] = None
    nemotron_parse_model: Optional[str] = None

    # Output columns
    output_column: str = "page_elements_v3"
    num_detections_column: str = "page_elements_v3_num_detections"
    counts_by_label_column: str = "page_elements_v3_counts_by_label"

    remote_retry: RemoteRetryParams = Field(default_factory=RemoteRetryParams)
    batch_tuning: BatchTuningParams = Field(default_factory=BatchTuningParams)

    @model_validator(mode="after")
    def _auto_enable_features(self) -> "ExtractParams":
        """Auto-configure feature flags from remote endpoints.

        * Enable ``use_graphic_elements`` when ``graphic_elements_invoke_url``
          is provided.
        * Enable ``use_table_structure`` when ``table_structure_invoke_url``
          is provided.
        * Default ``table_output_format`` to ``"markdown"`` when the stage is
          enabled and the caller did not explicitly choose a format.
        """
        if self.graphic_elements_invoke_url and not self.use_graphic_elements:
            self.use_graphic_elements = True
        if self.table_structure_invoke_url and not self.use_table_structure:
            self.use_table_structure = True
        if self.table_output_format is None:
            self.table_output_format = "markdown" if self.use_table_structure else "pseudo_markdown"
        if self.ocr_version == "v1" and self.ocr_lang is not None:
            raise ValueError("ocr_lang is only supported when ocr_version='v2'.")
        if not self.use_page_elements:
            consumers = [
                ("use_table_structure", self.use_table_structure and self.extract_tables),
                ("use_graphic_elements", self.use_graphic_elements and self.extract_charts),
            ]
            enabled = [name for name, on in consumers if on]
            if enabled:
                raise ValueError(f"use_page_elements=False is incompatible with: {', '.join(enabled)}")
        return self

api_key = None class-attribute instance-attribute

batch_tuning = Field(default_factory=BatchTuningParams) class-attribute instance-attribute

counts_by_label_column = 'page_elements_v3_counts_by_label' class-attribute instance-attribute

dpi = 200 class-attribute instance-attribute

extract_charts = True class-attribute instance-attribute

extract_images = True class-attribute instance-attribute

extract_infographics = True class-attribute instance-attribute

extract_page_as_image = None class-attribute instance-attribute

extract_tables = True class-attribute instance-attribute

extract_text = True class-attribute instance-attribute

graphic_elements_invoke_url = None class-attribute instance-attribute

image_format = 'jpeg' class-attribute instance-attribute

inference_batch_size = 8 class-attribute instance-attribute

invoke_url = None class-attribute instance-attribute

jpeg_quality = 100 class-attribute instance-attribute

method = 'pdfium' class-attribute instance-attribute

nemotron_parse_invoke_url = None class-attribute instance-attribute

nemotron_parse_model = None class-attribute instance-attribute

num_detections_column = 'page_elements_v3_num_detections' class-attribute instance-attribute

ocr_api_key = None class-attribute instance-attribute

ocr_invoke_url = None class-attribute instance-attribute

ocr_lang = None class-attribute instance-attribute

ocr_model_dir = None class-attribute instance-attribute

ocr_request_timeout_s = None class-attribute instance-attribute

ocr_version = 'v2' class-attribute instance-attribute

output_column = 'page_elements_v3' class-attribute instance-attribute

page_elements_api_key = None class-attribute instance-attribute

page_elements_invoke_url = None class-attribute instance-attribute

page_elements_request_timeout_s = None class-attribute instance-attribute

remote_retry = Field(default_factory=RemoteRetryParams) class-attribute instance-attribute

render_mode = 'fit_to_model' class-attribute instance-attribute

request_timeout_s = 60.0 class-attribute instance-attribute

table_output_format = None class-attribute instance-attribute

table_structure_invoke_url = None class-attribute instance-attribute

use_graphic_elements = False class-attribute instance-attribute

use_page_elements = True class-attribute instance-attribute

use_table_structure = False class-attribute instance-attribute

FusedTuningParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
268
269
270
271
272
class FusedTuningParams(_ParamsModel):
    fused_workers: int = 1
    fused_batch_size: int = 64
    fused_cpus_per_actor: float = 1
    fused_gpus_per_actor: float = 1.0

fused_batch_size = 64 class-attribute instance-attribute

fused_cpus_per_actor = 1 class-attribute instance-attribute

fused_gpus_per_actor = 1.0 class-attribute instance-attribute

fused_workers = 1 class-attribute instance-attribute

GpuAllocationParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
275
276
277
class GpuAllocationParams(_ParamsModel):
    gpu_devices: list[str] = Field(default_factory=list)
    startup_timeout: float = 600.0

gpu_devices = Field(default_factory=list) class-attribute instance-attribute

startup_timeout = 600.0 class-attribute instance-attribute

HtmlChunkParams

Bases: TextChunkParams

Source code in nemo_retriever/params/models.py
142
143
class HtmlChunkParams(TextChunkParams):
    pass

InfographicParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
638
639
640
641
642
643
644
645
class InfographicParams(_ParamsModel):
    remote: RemoteInvokeParams = Field(default_factory=RemoteInvokeParams)
    remote_retry: RemoteRetryParams = Field(default_factory=RemoteRetryParams)
    inference_batch_size: int = 8
    allowed_page_element_labels: Sequence[str] = ("infographic", "title")
    output_column: str = "infographic_elements_v1"
    num_detections_column: str = "infographic_elements_v1_num_detections"
    counts_by_label_column: str = "infographic_elements_v1_counts_by_label"

allowed_page_element_labels = ('infographic', 'title') class-attribute instance-attribute

counts_by_label_column = 'infographic_elements_v1_counts_by_label' class-attribute instance-attribute

inference_batch_size = 8 class-attribute instance-attribute

num_detections_column = 'infographic_elements_v1_num_detections' class-attribute instance-attribute

output_column = 'infographic_elements_v1' class-attribute instance-attribute

remote = Field(default_factory=RemoteInvokeParams) class-attribute instance-attribute

remote_retry = Field(default_factory=RemoteRetryParams) class-attribute instance-attribute

IngestExecuteParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
116
117
118
119
120
121
122
123
124
125
126
class IngestExecuteParams(_ParamsModel):
    show_progress: bool = False
    return_failures: bool = False
    save_to_disk: bool = False
    return_traces: bool = False
    parallel: bool = False
    max_workers: Optional[int] = None
    gpu_devices: list[str] = Field(default_factory=list)
    page_chunk_size: int = 32
    runtime_metrics_dir: Optional[str] = None
    runtime_metrics_prefix: Optional[str] = None

gpu_devices = Field(default_factory=list) class-attribute instance-attribute

max_workers = None class-attribute instance-attribute

page_chunk_size = 32 class-attribute instance-attribute

parallel = False class-attribute instance-attribute

return_failures = False class-attribute instance-attribute

return_traces = False class-attribute instance-attribute

runtime_metrics_dir = None class-attribute instance-attribute

runtime_metrics_prefix = None class-attribute instance-attribute

save_to_disk = False class-attribute instance-attribute

show_progress = False class-attribute instance-attribute

IngestorCreateParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
101
102
103
104
105
106
107
108
109
110
111
112
113
class IngestorCreateParams(_ParamsModel):
    documents: list[str] = Field(default_factory=list)
    ray_address: Optional[str] = None
    ray_log_to_driver: bool = True
    debug: bool = False
    base_url: str = "http://localhost:7670"
    allow_no_gpu: bool = False
    api_key: Optional[str] = None
    error_policy: Literal["raise", "collect"] = "raise"
    # service run mode: maximum number of concurrent page uploads.  Lower
    # values (e.g. 2-4) reduce burst pressure on Kubernetes NodePort /
    # kube-proxy paths that otherwise reset connections under heavy load.
    max_concurrency: Optional[int] = None

allow_no_gpu = False class-attribute instance-attribute

api_key = None class-attribute instance-attribute

base_url = 'http://localhost:7670' class-attribute instance-attribute

debug = False class-attribute instance-attribute

documents = Field(default_factory=list) class-attribute instance-attribute

error_policy = 'raise' class-attribute instance-attribute

max_concurrency = None class-attribute instance-attribute

ray_address = None class-attribute instance-attribute

ray_log_to_driver = True class-attribute instance-attribute

LLMInferenceParams

Bases: _ParamsModel

Reusable LLM sampling / generation parameters.

Inherit from this model to add temperature, top_p, and max_tokens to any task that invokes an LLM (captioning, summarization, etc.).

Source code in nemo_retriever/params/models.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
class LLMInferenceParams(_ParamsModel):
    """Reusable LLM sampling / generation parameters.

    Inherit from this model to add temperature, top_p, and max_tokens
    to any task that invokes an LLM (captioning, summarization, etc.).
    """

    temperature: float = 1.0
    top_p: Optional[float] = None
    max_tokens: int = 1024

    @field_validator("temperature")
    @classmethod
    def _check_temperature(cls, v: float) -> float:
        if not (0.0 <= v <= 2.0):
            raise ValueError("temperature must be between 0.0 and 2.0")
        return v

    @field_validator("top_p")
    @classmethod
    def _check_top_p(cls, v: Optional[float]) -> Optional[float]:
        if v is not None and not (0.0 <= v <= 1.0):
            raise ValueError("top_p must be between 0.0 and 1.0")
        return v

    @field_validator("max_tokens")
    @classmethod
    def _check_max_tokens(cls, v: int) -> int:
        if v <= 0:
            raise ValueError("max_tokens must be > 0")
        return v

    def to_sampling_kwargs(self) -> dict[str, Any]:
        """Build a dict of sampling parameters suitable for LLM inference calls.

        ``top_p`` is only included when explicitly set (not ``None``), because
        many backends (vLLM, OpenAI, NIM) change behaviour when the key is
        present vs. absent.
        """
        kw: dict[str, Any] = {"temperature": self.temperature, "max_tokens": self.max_tokens}
        if self.top_p is not None:
            kw["top_p"] = self.top_p
        return kw

max_tokens = 1024 class-attribute instance-attribute

temperature = 1.0 class-attribute instance-attribute

top_p = None class-attribute instance-attribute

to_sampling_kwargs()

Build a dict of sampling parameters suitable for LLM inference calls.

top_p is only included when explicitly set (not None), because many backends (vLLM, OpenAI, NIM) change behaviour when the key is present vs. absent.

Source code in nemo_retriever/params/models.py
558
559
560
561
562
563
564
565
566
567
568
def to_sampling_kwargs(self) -> dict[str, Any]:
    """Build a dict of sampling parameters suitable for LLM inference calls.

    ``top_p`` is only included when explicitly set (not ``None``), because
    many backends (vLLM, OpenAI, NIM) change behaviour when the key is
    present vs. absent.
    """
    kw: dict[str, Any] = {"temperature": self.temperature, "max_tokens": self.max_tokens}
    if self.top_p is not None:
        kw["top_p"] = self.top_p
    return kw

LLMRemoteClientParams

Bases: _ParamsModel

Transport / connection parameters for any remote LLM client.

Pairs with :class:LLMInferenceParams (sampling) to fully specify a call. api_key is auto-resolved from the environment by :class:_ParamsModel when left as None.

Source code in nemo_retriever/params/models.py
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
class LLMRemoteClientParams(_ParamsModel):
    """Transport / connection parameters for any remote LLM client.

    Pairs with :class:`LLMInferenceParams` (sampling) to fully specify a
    call.  ``api_key`` is auto-resolved from the environment by
    :class:`_ParamsModel` when left as ``None``.
    """

    model: str
    api_base: Optional[str] = None
    api_key: Optional[str] = None
    num_retries: int = 3
    timeout: float = 120.0
    extra_params: dict[str, Any] = Field(default_factory=dict)

    @field_validator("num_retries")
    @classmethod
    def _check_retries(cls, v: int) -> int:
        if v < 0:
            raise ValueError("num_retries must be >= 0")
        return v

    @field_validator("timeout")
    @classmethod
    def _check_timeout(cls, v: float) -> float:
        if v <= 0:
            raise ValueError("timeout must be > 0")
        return v

api_base = None class-attribute instance-attribute

api_key = None class-attribute instance-attribute

extra_params = Field(default_factory=dict) class-attribute instance-attribute

model instance-attribute

num_retries = 3 class-attribute instance-attribute

timeout = 120.0 class-attribute instance-attribute

LanceDbParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
class LanceDbParams(_ParamsModel):
    lancedb_uri: str = "lancedb"
    table_name: str = "nv-ingest"
    overwrite: bool = True
    create_index: bool = True
    index_type: str = "IVF_HNSW_SQ"
    metric: str = "l2"
    num_partitions: int = 16
    num_sub_vectors: int = 256
    embedding_column: str = "text_embeddings_1b_v2"
    embedding_key: str = "embedding"
    include_text: bool = True
    text_column: str = "text"
    hybrid: bool = False
    fts_language: str = "English"

create_index = True class-attribute instance-attribute

embedding_column = 'text_embeddings_1b_v2' class-attribute instance-attribute

embedding_key = 'embedding' class-attribute instance-attribute

fts_language = 'English' class-attribute instance-attribute

hybrid = False class-attribute instance-attribute

include_text = True class-attribute instance-attribute

index_type = 'IVF_HNSW_SQ' class-attribute instance-attribute

lancedb_uri = 'lancedb' class-attribute instance-attribute

metric = 'l2' class-attribute instance-attribute

num_partitions = 16 class-attribute instance-attribute

num_sub_vectors = 256 class-attribute instance-attribute

overwrite = True class-attribute instance-attribute

table_name = 'nv-ingest' class-attribute instance-attribute

text_column = 'text' class-attribute instance-attribute

ModelRuntimeParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
91
92
93
94
95
96
97
98
class ModelRuntimeParams(_ParamsModel):
    device: Optional[str] = None
    hf_cache_dir: Optional[str] = None
    normalize: bool = True
    max_length: int = 8192
    model_name: Optional[str] = None
    gpu_memory_utilization: float = 0.45
    enforce_eager: bool = False

device = None class-attribute instance-attribute

enforce_eager = False class-attribute instance-attribute

gpu_memory_utilization = 0.45 class-attribute instance-attribute

hf_cache_dir = None class-attribute instance-attribute

max_length = 8192 class-attribute instance-attribute

model_name = None class-attribute instance-attribute

normalize = True class-attribute instance-attribute

OcrParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
502
503
504
505
506
507
508
class OcrParams(_ParamsModel):
    remote: RemoteInvokeParams = Field(default_factory=RemoteInvokeParams)
    remote_retry: RemoteRetryParams = Field(default_factory=RemoteRetryParams)
    inference_batch_size: int = 8
    extract_tables: bool = False
    extract_charts: bool = False
    extract_infographics: bool = False

extract_charts = False class-attribute instance-attribute

extract_infographics = False class-attribute instance-attribute

extract_tables = False class-attribute instance-attribute

inference_batch_size = 8 class-attribute instance-attribute

remote = Field(default_factory=RemoteInvokeParams) class-attribute instance-attribute

remote_retry = Field(default_factory=RemoteRetryParams) class-attribute instance-attribute

PageElementsParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
493
494
495
496
497
498
499
class PageElementsParams(_ParamsModel):
    remote: RemoteInvokeParams = Field(default_factory=RemoteInvokeParams)
    remote_retry: RemoteRetryParams = Field(default_factory=RemoteRetryParams)
    inference_batch_size: int = 8
    output_column: str = "page_elements_v3"
    num_detections_column: str = "page_elements_v3_num_detections"
    counts_by_label_column: str = "page_elements_v3_counts_by_label"

counts_by_label_column = 'page_elements_v3_counts_by_label' class-attribute instance-attribute

inference_batch_size = 8 class-attribute instance-attribute

num_detections_column = 'page_elements_v3_num_detections' class-attribute instance-attribute

output_column = 'page_elements_v3' class-attribute instance-attribute

remote = Field(default_factory=RemoteInvokeParams) class-attribute instance-attribute

remote_retry = Field(default_factory=RemoteRetryParams) class-attribute instance-attribute

PdfSplitParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
129
130
131
class PdfSplitParams(_ParamsModel):
    start_page: Optional[int] = None
    end_page: Optional[int] = None

end_page = None class-attribute instance-attribute

start_page = None class-attribute instance-attribute

RemoteInvokeParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
85
86
87
88
class RemoteInvokeParams(_ParamsModel):
    invoke_url: Optional[str] = None
    api_key: Optional[str] = None
    request_timeout_s: float = 60.0

api_key = None class-attribute instance-attribute

invoke_url = None class-attribute instance-attribute

request_timeout_s = 60.0 class-attribute instance-attribute

RemoteRetryParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
79
80
81
82
class RemoteRetryParams(_ParamsModel):
    remote_max_pool_workers: int = 32
    remote_max_retries: int = 5
    remote_max_429_retries: int = 3

remote_max_429_retries = 3 class-attribute instance-attribute

remote_max_pool_workers = 32 class-attribute instance-attribute

remote_max_retries = 5 class-attribute instance-attribute

StoreParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
478
479
480
481
482
483
484
485
486
487
488
489
490
class StoreParams(_ParamsModel):
    storage_uri: str = "stored_images"
    storage_options: dict[str, Any] = Field(default_factory=dict)
    image_format: str = "png"
    strip_base64: bool = True
    batch_tuning: BatchTuningParams = Field(default_factory=BatchTuningParams)

    @model_validator(mode="after")
    def _resolve_local_storage_uri(self) -> "StoreParams":
        """Resolve relative local paths to absolute so they survive Ray serialization."""
        if not urlparse(self.storage_uri).scheme:
            self.storage_uri = str(UPath(self.storage_uri).resolve())
        return self

batch_tuning = Field(default_factory=BatchTuningParams) class-attribute instance-attribute

image_format = 'png' class-attribute instance-attribute

storage_options = Field(default_factory=dict) class-attribute instance-attribute

storage_uri = 'stored_images' class-attribute instance-attribute

strip_base64 = True class-attribute instance-attribute

TableParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
511
512
513
514
515
516
517
class TableParams(_ParamsModel):
    remote: RemoteInvokeParams = Field(default_factory=RemoteInvokeParams)
    remote_retry: RemoteRetryParams = Field(default_factory=RemoteRetryParams)
    inference_batch_size: int = 8
    output_column: str = "table_structure_v1"
    num_detections_column: str = "table_structure_v1_num_detections"
    counts_by_label_column: str = "table_structure_v1_counts_by_label"

counts_by_label_column = 'table_structure_v1_counts_by_label' class-attribute instance-attribute

inference_batch_size = 8 class-attribute instance-attribute

num_detections_column = 'table_structure_v1_num_detections' class-attribute instance-attribute

output_column = 'table_structure_v1' class-attribute instance-attribute

remote = Field(default_factory=RemoteInvokeParams) class-attribute instance-attribute

remote_retry = Field(default_factory=RemoteRetryParams) class-attribute instance-attribute

TabularExtractParams

Bases: _ParamsModel

Params for step 1: extract schema metadata and write to Neo4j.

Covers SQLAlchemy reflection of a live database and/or parsing of pre-existing SQL DDL/query files. Produces Database, Schema, Table, Column, View and Query nodes together with their relationships. The Neo4j connection is provided by get_neo4j_conn() (see tabular_data.neo4j) and is not configured here.

Source code in nemo_retriever/params/models.py
653
654
655
656
657
658
659
660
661
662
663
664
665
class TabularExtractParams(_ParamsModel):
    """Params for step 1: extract schema metadata and write to Neo4j.

    Covers SQLAlchemy reflection of a live database and/or parsing of
    pre-existing SQL DDL/query files.  Produces Database, Schema, Table,
    Column, View and Query nodes together with their relationships.
    The Neo4j connection is provided by get_neo4j_conn() (see
    tabular_data.neo4j) and is not configured here.
    """

    model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)

    connector: Optional[SQLDatabase] = None

connector = None class-attribute instance-attribute

model_config = ConfigDict(extra='forbid', arbitrary_types_allowed=True) class-attribute instance-attribute

TextChunkParams

Bases: _ParamsModel

Source code in nemo_retriever/params/models.py
134
135
136
137
138
139
class TextChunkParams(_ParamsModel):
    max_tokens: int = 1024
    overlap_tokens: int = 0
    tokenizer_model_id: Optional[str] = None
    encoding: str = "utf-8"
    tokenizer_cache_dir: Optional[str] = None

encoding = 'utf-8' class-attribute instance-attribute

max_tokens = 1024 class-attribute instance-attribute

overlap_tokens = 0 class-attribute instance-attribute

tokenizer_cache_dir = None class-attribute instance-attribute

tokenizer_model_id = None class-attribute instance-attribute

VdbUploadParams

Bases: _ParamsModel

Post-graph vector DB upload configuration.

Sidecar metadata (meta_*) matches nv_ingest_client / metadata_and_filtered_search.ipynb: all three fields must be set together to merge columns into each chunk's content_metadata.

Source code in nemo_retriever/params/models.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
class VdbUploadParams(_ParamsModel):
    """Post-graph vector DB upload configuration.

    Sidecar metadata (``meta_*``) matches ``nv_ingest_client`` / ``metadata_and_filtered_search.ipynb``:
    all three fields must be set together to merge columns into each chunk's ``content_metadata``.
    """

    vdb_op: str = "lancedb"
    vdb_kwargs: dict[str, Any] = Field(default_factory=dict)
    meta_dataframe: Optional[Any] = None
    """Path to csv/json/parquet or an in-memory :class:`pandas.DataFrame`."""
    meta_source_field: Optional[str] = None
    meta_fields: Optional[list[str]] = None
    meta_join_key: MetaJoinKey = "auto"
    """How to match rows to documents: ``source_id`` (full path), ``source_name`` (basename), or ``auto`` (try both)."""

    @model_validator(mode="after")
    def _validate_sidecar_triplet(self) -> "VdbUploadParams":
        trio = (self.meta_dataframe, self.meta_source_field, self.meta_fields)
        if all(x is None for x in trio):
            return self
        if any(x is None for x in trio):
            raise ValueError(
                "meta_dataframe, meta_source_field, and meta_fields must all be set together "
                "when attaching sidecar metadata."
            )
        if not self.meta_fields:
            raise ValueError("meta_fields must be a non-empty list when sidecar metadata is enabled.")
        return self

    def to_ingest_operator_kwargs(self) -> dict[str, Any]:
        """Flatten into kwargs for :class:`~nemo_retriever.vdb.IngestVdbOperator`."""
        out = dict(self.vdb_kwargs or {})
        if self.meta_dataframe is not None:
            out["meta_dataframe"] = self.meta_dataframe
            out["meta_source_field"] = self.meta_source_field
            out["meta_fields"] = list(self.meta_fields or [])
            out["meta_join_key"] = self.meta_join_key
        return out

meta_dataframe = None class-attribute instance-attribute

Path to csv/json/parquet or an in-memory :class:pandas.DataFrame.

meta_fields = None class-attribute instance-attribute

meta_join_key = 'auto' class-attribute instance-attribute

How to match rows to documents: source_id (full path), source_name (basename), or auto (try both).

meta_source_field = None class-attribute instance-attribute

vdb_kwargs = Field(default_factory=dict) class-attribute instance-attribute

vdb_op = 'lancedb' class-attribute instance-attribute

to_ingest_operator_kwargs()

Flatten into kwargs for :class:~nemo_retriever.vdb.IngestVdbOperator.

Source code in nemo_retriever/params/models.py
467
468
469
470
471
472
473
474
475
def to_ingest_operator_kwargs(self) -> dict[str, Any]:
    """Flatten into kwargs for :class:`~nemo_retriever.vdb.IngestVdbOperator`."""
    out = dict(self.vdb_kwargs or {})
    if self.meta_dataframe is not None:
        out["meta_dataframe"] = self.meta_dataframe
        out["meta_source_field"] = self.meta_source_field
        out["meta_fields"] = list(self.meta_fields or [])
        out["meta_join_key"] = self.meta_join_key
    return out

VideoFrameParams

Bases: _ParamsModel

Params for video frame extraction (ffmpeg fps + perceptual-hash dedup).

Set enabled=False to skip frame extraction entirely; the video pipeline then produces only audio (ASR) rows — no frame OCR, no audio+visual fusion. Useful for ablating the visual modality or for audio-only recall benchmarks against video corpora.

dedup activates perceptual-hash (dhash) dedup before OCR. dhash catches visually-identical adjacent frames that byte-level hashing misses (encoder noise, brightness drift, etc.). On a 60s slide-heavy sample we measured ~91% duplicates collapsed at distance 5 vs ~11% for MD5 — a near-10x cut in OCR cost on slide content. Tune dedup_max_hamming_distance upward for more aggressive merging or down to 0 to require exact perceptual-hash matches.

Source code in nemo_retriever/params/models.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class VideoFrameParams(_ParamsModel):
    """Params for video frame extraction (ffmpeg fps + perceptual-hash dedup).

    Set ``enabled=False`` to skip frame extraction entirely; the video
    pipeline then produces only audio (ASR) rows — no frame OCR, no
    audio+visual fusion. Useful for ablating the visual modality or for
    audio-only recall benchmarks against video corpora.

    ``dedup`` activates perceptual-hash (dhash) dedup before OCR. dhash
    catches visually-identical adjacent frames that byte-level hashing
    misses (encoder noise, brightness drift, etc.). On a 60s slide-heavy
    sample we measured ~91% duplicates collapsed at distance 5 vs ~11%
    for MD5 — a near-10x cut in OCR cost on slide content. Tune
    ``dedup_max_hamming_distance`` upward for more aggressive merging or
    down to 0 to require exact perceptual-hash matches.
    """

    enabled: bool = True
    fps: float = Field(default=1.0, gt=0.0)
    max_frames: Optional[int] = None
    dedup: bool = True
    dedup_max_hamming_distance: int = 5
    dedup_max_dropped_frames: int = 2

dedup = True class-attribute instance-attribute

dedup_max_dropped_frames = 2 class-attribute instance-attribute

dedup_max_hamming_distance = 5 class-attribute instance-attribute

enabled = True class-attribute instance-attribute

fps = Field(default=1.0, gt=0.0) class-attribute instance-attribute

max_frames = None class-attribute instance-attribute

VideoFrameTextDedupParams

Bases: _ParamsModel

Params for merging consecutive video_frame rows with identical OCR text.

After full-frame OCR, slides that are visible for many seconds produce a flood of frames with the same text (image-hash dedup misses them when encoder noise differs frame-to-frame). This stage groups by (source_path, text) and merges adjacent runs into a single row whose segment_start_seconds / segment_end_seconds cover the union of the run.

Tolerance is expressed in dropped frames, not seconds, so it scales with video_frame_fps: at runtime the dedup reads each group's metadata.fps and converts to max_gap_seconds = max_dropped_frames / fps. Default 2 means we bridge gaps of up to 2 missing frames in a run — a typical safety margin for image-hash dedup leaving small holes.

Source code in nemo_retriever/params/models.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class VideoFrameTextDedupParams(_ParamsModel):
    """Params for merging consecutive video_frame rows with identical OCR text.

    After full-frame OCR, slides that are visible for many seconds produce a
    flood of frames with the same text (image-hash dedup misses them when
    encoder noise differs frame-to-frame). This stage groups by
    ``(source_path, text)`` and merges adjacent runs into a single row whose
    ``segment_start_seconds`` / ``segment_end_seconds`` cover the union of
    the run.

    Tolerance is expressed in **dropped frames**, not seconds, so it scales
    with ``video_frame_fps``: at runtime the dedup reads each group's
    ``metadata.fps`` and converts to ``max_gap_seconds = max_dropped_frames / fps``.
    Default 2 means we bridge gaps of up to 2 missing frames in a run —
    a typical safety margin for image-hash dedup leaving small holes.
    """

    enabled: bool = True
    max_dropped_frames: int = 2

enabled = True class-attribute instance-attribute

max_dropped_frames = 2 class-attribute instance-attribute

WebhookParams

Bases: _ParamsModel

Configuration for the webhook notification stage.

When endpoint_url is set, selected columns from the processed batch are serialised to JSON and HTTP-POSTed to that URL. If endpoint_url is None the stage is a no-op.

Source code in nemo_retriever/params/models.py
617
618
619
620
621
622
623
624
625
626
627
628
629
class WebhookParams(_ParamsModel):
    """Configuration for the webhook notification stage.

    When ``endpoint_url`` is set, selected columns from the processed batch
    are serialised to JSON and HTTP-POSTed to that URL.  If ``endpoint_url``
    is ``None`` the stage is a no-op.
    """

    endpoint_url: Optional[str] = None
    columns: list[str] = Field(default_factory=list)
    headers: dict[str, str] = Field(default_factory=dict)
    timeout_s: float = 30.0
    max_retries: int = 3

columns = Field(default_factory=list) class-attribute instance-attribute

endpoint_url = None class-attribute instance-attribute

headers = Field(default_factory=dict) class-attribute instance-attribute

max_retries = 3 class-attribute instance-attribute

timeout_s = 30.0 class-attribute instance-attribute

resolve_split_params(split_config)

Resolve a user-supplied split_config dict into per-key effective params.

Returns a dict keyed by every entry in SPLIT_CONFIG_VALID_KEYS. Each value is one of: a TextChunkParams / HtmlChunkParams instance (chunking enabled for that key), None (key absent — chunking off via the default), or False (explicit opt-out sentinel).

Per-key values supplied by the caller may be a plain dict of chunk-params fields, a pre-built TextChunkParams / HtmlChunkParams instance (passed through verbatim), None, or False.

Source code in nemo_retriever/params/utils.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def resolve_split_params(
    split_config: dict[str, Any] | None,
) -> dict[str, Any]:
    """Resolve a user-supplied split_config dict into per-key effective params.

    Returns a dict keyed by every entry in ``SPLIT_CONFIG_VALID_KEYS``. Each
    value is one of: a ``TextChunkParams`` / ``HtmlChunkParams`` instance
    (chunking enabled for that key), ``None`` (key absent — chunking off
    via the default), or ``False`` (explicit opt-out sentinel).

    Per-key values supplied by the caller may be a plain dict of
    chunk-params fields, a pre-built ``TextChunkParams`` /
    ``HtmlChunkParams`` instance (passed through verbatim), ``None``, or
    ``False``.
    """
    from nemo_retriever.params.models import HtmlChunkParams, TextChunkParams

    cfg = split_config or {}
    unknown = set(cfg) - SPLIT_CONFIG_VALID_KEYS
    if unknown:
        raise ValueError(
            f"Unknown split_config key(s): {sorted(unknown)}; " f"expected one of {sorted(SPLIT_CONFIG_VALID_KEYS)}"
        )

    out: dict[str, Any] = {}
    for key in SPLIT_CONFIG_VALID_KEYS:
        v = cfg.get(key)
        if v is None:
            out[key] = None
            continue
        if v is False:
            out[key] = False  # explicit opt-out (distinct from None / absent)
            continue
        if isinstance(v, TextChunkParams):  # HtmlChunkParams is a TextChunkParams subclass
            out[key] = v
            continue
        if isinstance(v, dict):
            cls = HtmlChunkParams if key == "html" else TextChunkParams
            out[key] = cls(**v)
            continue
        raise TypeError(
            f"split_config['{key}'] must be a TextChunkParams, dict, None, or False; got {type(v).__name__}"
        )
    return out