nv_dfm_core.session.Session#

class nv_dfm_core.session.Session(federation_name, homesite, target='flare', logger=None, callback_dispatcher=None, **kwargs)[source]#

Main entry point for executing DFM pipelines in a federation.

A Session manages the lifecycle of pipeline execution, including preparation, execution, and communication with the federation sites.

Parameters:
  • federation_name (str)

  • homesite (str)

  • target (Literal['flare', 'local'])

  • logger (Logger | None)

  • callback_dispatcher (CallbackDispatcher | None)

  • kwargs (Any)

property callback_dispatcher: CallbackDispatcher#

The callback dispatcher used for all jobs created by this session.

debug_show_code(pipeline)[source]#

Generate and return the Python code for each site in the prepared pipeline for debugging purposes.

Parameters:

pipeline (PreparedPipeline)

Return type:

dict[str, str]

execute(pipeline, input_params={}, autostop=True, default_callback=None, place_callbacks=None, debug=False, options=None, force_modgen=False)[source]#

Execute a pipeline.

Parameters:
  • pipeline (PreparedPipeline) – The pipeline to execute.

  • input_params (dict[str, Any] | list[dict[str, Any]] | None) – Default: empty dict. The input parameters for the pipeline. Can be None, a single dict, or a list of dicts. If input_params is None, the pipeline is started but no start signal is sent, the pipeline is waiting for a call to job.send_input_params(). In that case, autostop should probably be False because otherwise the pipeline will be stopped immediately. If input_params is a single dict, it’s treated as a list of one dict. The pipeline gets started once for each parameter dict in the input_params list. Each paramset must contain values for all param places in the pipeline. The dict or dicts can be empty, if the pipeline has no PlaceParams.

  • autostop (bool) – Default True. Whether to automatically send the stop frame to the pipeline after the given input parameters. If False, the pipeline is kept running and additional params can be sent through the Job object. In this case, the stop frame must be sent manually, also through the Job object.

  • default_callback (Callable[[str, int | str | None, Frame, str, Any], None] | Callable[[str, int | str | None, Frame, str, Any], Coroutine[Any, Any, None]] | None) – The default callback for the pipeline.

  • place_callbacks (dict[str, Callable[[str, int | str | None, Frame, str, Any], None] | Callable[[str, int | str | None, Frame, str, Any], Coroutine[Any, Any, None]]] | None) – The place callbacks for the pipeline.

  • force_modgen (bool) – If True, modgen will ignore the cache and generate the code again.

  • debug (bool)

  • options (Any)

Returns:

The job object.

Return type:

Job

prepare(pipeline, restrict_to_sites=None, debug=False)[source]#

Prepare a pipeline for execution. This method may optimize the pipeline for execution in the whole federation, which can take a bit. Prepared pipelines can be reused for multiple executions.

Parameters:
  • pipeline (Pipeline)

  • restrict_to_sites (Literal['homesite'] | list[str] | None)

  • debug (bool)

Return type:

PreparedPipeline

reattach(job_id, default_callback=None, place_callbacks=None)[source]#

Reattach to an existing job using its job_id.

This allows reconnecting to long-running jobs, retrieving status, and optionally receiving results that haven’t been consumed yet.

Parameters:
  • job_id (str) – The ID of the job to reattach to.

  • default_callback (Callable[[str, int | str | None, Frame, str, Any], None] | Callable[[str, int | str | None, Frame, str, Any], Coroutine[Any, Any, None]] | None) – Optional callback for receiving results from any place. Required if place_callbacks is provided.

  • place_callbacks (dict[str, Callable[[str, int | str | None, Frame, str, Any], None] | Callable[[str, int | str | None, Frame, str, Any], Coroutine[Any, Any, None]]] | None) – Optional dict mapping place names to specific callbacks. Can only be provided if default_callback is also provided.

Returns:

Job object connected to the existing job. If the job_id is not found, returns a job with UNKNOWN status.

Raises:

ValueError – If place_callbacks is provided without default_callback.

Return type:

Job

Note

  • If both callbacks are None, returns “informational” job (status/abort only).

  • Multiple clients can reattach to same job (undefined which receives results).

  • For finished jobs, may retrieve buffered results not yet consumed.

  • FLARE: Can reattach to any job (even old/finished ones).

  • Local: Can only reattach to jobs still running in same process.

property runtime_module: ModuleType#

Loading the runtime module can fail, so we do it lazily when we need it instead of raising an error in the constructor.