Source code for nv_dfm_core.api._pipeline

# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal

from pydantic import ConfigDict
from typing_extensions import override

from ._block import Block
from ._pipeline_build_helper import PipelineBuildHelper

if TYPE_CHECKING:
    from ._api_visitor import ApiVisitor
else:
    ApiVisitor = Any


[docs] class Pipeline(Block): """Represents a DFM pipeline containing a sequence of operations and control flow. A Pipeline is the top-level construct that defines the workflow to be executed across a federation. It contains statements, operations, and control structures. """ model_config: ConfigDict = ConfigDict(extra="forbid", frozen=True) api_version: str = "" # filled by Session mode: Literal["execute", "discovery"] = "execute" name: str | None = None # can be used to name the pipeline @override def __enter__(self): # pyright: ignore[reportMissingSuperCall] """Overrides Block/s enter/exit methods""" PipelineBuildHelper.enter_pipeline(self) return self @override def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: # pyright: ignore[reportMissingSuperCall] PipelineBuildHelper.exit_pipeline(self) if exc_type is not None: return False return True @override def accept(self, visitor: ApiVisitor) -> None: visitor.visit_pipeline(self)
[docs] @classmethod def load_from_file(cls, file: str | Path, **fsspec_kwargs: Any) -> "Pipeline": """ Load a stored pipeline from a .json file. The file argument can be: - a local filename (str or Path) - a remote URI (e.g., s3://bucket/file.json, gs://bucket/file.json, etc.) - any fsspec-compatible URL If fsspec is installed, it will be used for all paths (including local). If fsspec is not installed, only local files are supported. Additional keyword arguments are passed to fsspec.filesystem(). """ try: import fsspec # pyright: ignore[reportMissingTypeStubs] except ImportError: fsspec = None file_url = str(file) if fsspec is not None: fs, path_in_fs = fsspec.core.url_to_fs(file_url, **fsspec_kwargs) # pyright: ignore[reportUnknownMemberType] if not fs.exists(path_in_fs): raise FileNotFoundError(f"Pipeline file not found: {file}") with fs.open(path_in_fs, "r") as f: pipeline = Pipeline.model_validate_json(f.read()) else: file = Path(file) if not file.exists(): raise FileNotFoundError(f"Pipeline file not found: {file}") with open(file, "r") as f: pipeline = Pipeline.model_validate_json(f.read()) return pipeline
[docs] @classmethod def save_to_file( cls, pipeline: "Pipeline", file: str | Path, **fsspec_kwargs: Any ) -> None: """ Store a pipeline to a .json file. The file argument can be: - a local filename (str or Path) - a remote URI (e.g., s3://bucket/file.json, gs://bucket/file.json, etc.) - any fsspec-compatible URL If fsspec is installed, it will be used for all paths (including local). If fsspec is not installed, only local files are supported. Additional keyword arguments are passed to fsspec.filesystem(). """ try: import fsspec # pyright: ignore[reportMissingTypeStubs] except ImportError: fsspec = None file_url = str(file) if fsspec is not None: fs, path_in_fs = fsspec.core.url_to_fs(file_url, **fsspec_kwargs) # pyright: ignore[reportUnknownMemberType,reportUnknownVariableType] # Try to ensure parent directory exists (if supported) try: parent = fs._parent(path_in_fs) if parent and not fs.exists(parent): fs.mkdirs(parent, exist_ok=True) except Exception: pass # Some filesystems may not support mkdirs or _parent with fs.open(path_in_fs, "w") as f: _ = f.write(pipeline.model_dump_json(indent=4)) else: # Fallback: only support local files file = Path(file) file.parent.mkdir(parents=True, exist_ok=True) with open(file, "w") as f: _ = f.write(pipeline.model_dump_json(indent=4))