Source code for nv_dfm_core.session._session_delegate

# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from abc import ABC, abstractmethod
from typing import Any

from nv_dfm_core.api import PreparedPipeline
from nv_dfm_core.exec import Frame
from nv_dfm_core.exec._helpers import load_fed_runtime_module
from nv_dfm_core.gen.irgen._fed_info import FedInfo
from nv_dfm_core.gen.irgen._helpers import load_fed_info_json

from ._callback_dispatcher import CallbackRunner
from ._job import Job


[docs] class SessionDelegate(ABC): """Abstract base class for session delegates that handle different execution targets. A delegate implements the specific logic for executing pipelines on different backends (e.g., FLARE, local execution). """ def __init__( self, session: Any, federation_name: str, homesite: str, logger: logging.Logger, ): self._session: Any = session self._federation_name: str = federation_name self._homesite: str = homesite self._logger: logging.Logger = logger @property def session(self) -> Any: return self._session @property def federation_name(self) -> str: return self._federation_name @property def homesite(self) -> str: return self._homesite @property def logger(self) -> logging.Logger: return self._logger
[docs] def load_fed_info(self) -> FedInfo: """Load federation information from the federation runtime module.""" # by default we load the fed_info.json from the fed runtime module fed_runtime_module = load_fed_runtime_module(self._federation_name) self._logger.info( f"Loading fed_info.json as resource from fed runtime module: {fed_runtime_module.__name__}" ) fed_info = load_fed_info_json(fed_runtime_module) return fed_info
@abstractmethod def connect(self, debug: bool) -> None: ... @abstractmethod def close(self, debug: bool) -> None: ...
[docs] @abstractmethod def reattach( self, job_id: str, callback_runner: CallbackRunner | None, ) -> Job: """Reattach to an existing job. Args: job_id: The ID of the job to reattach to. callback_runner: The callback runner for receiving results, or None for informational-only attachment (status/abort only). Returns: Job object connected to the existing job. """ ...
[docs] @abstractmethod def execute( self, pipeline: PreparedPipeline, next_frame: Frame, input_params: list[tuple[Frame, dict[str, Any]]], callback_runner: CallbackRunner, debug: bool, options: Any = None, force_modgen: bool = False, ) -> Job: """Execute a prepared pipeline. Args: pipeline: The prepared pipeline to execute. next_frame: The next frame for the pipeline. input_params: List of (frame, params) tuples for input. callback_runner: The callback runner for receiving results. debug: Whether to run in debug mode. options: Target-specific options. force_modgen: If True, force code regeneration. Returns: Job object for the execution. """ ...