nv_dfm_core.targets.local.JobExecution#

class nv_dfm_core.targets.local.JobExecution(id, federation, sites, inter_job_queue, logger)[source]#

A JobExecution object has a JobRunner process for every site to execute a pipeline. We reuse executions in a pool. We do this because we want to a) start all the background processes in a pool and b) we cannot send (normal) queues to processes after they have been created. Therefore, we pre-create JobExecution objects which are all essentially full federations with a process for every existing site. For those sites we can create all the comm queues at startup.

Parameters:
  • id (int)

  • federation (FederationRunner)

  • sites (list[str])

  • inter_job_queue (Queue)

  • logger (Logger)

get_dead_sites()[source]#

Returns a list of sites whose worker processes have died.

Return type:

list[str]

handle()[source]#

Returns the job handle if this execution is assigned. Otherwise, returns None.

handle_was_assigned(handle)[source]#

Assign a job handle to this execution.

Parameters:

handle (object)

handle_was_released()[source]#

Called when the job execution completes.

is_alive()[source]#

Returns True if the execution is alive.

Return type:

bool

job_id()[source]#

Returns the job ID if this execution is assigned to a job. Otherwise, returns None.

Return type:

str | None

receive_inter_job_token(token)[source]#

Called by the FederationRunner background thread that polls the inter job queue. Receives an inter-job token and forwards it to the correct queue.

Parameters:

token (TokenPackage)

shutdown()[source]#

Shuts down the execution. Note that workers are generally waiting for new data on the command queue and only have a timeout of 0.5 seconds on the command queue. So we give them 5 seconds to finish.

submit(site, js)[source]#

Called by the job object when it is started.

Parameters:
  • site (str)

  • js (object)