Wrapper

class nvidia_resiliency_ext.inprocess.CallWrapper(wrapper)[source]

The CallWrapper encapsulates the state and execution flow of the restart capabilities of the Wrapper for a single invocation of the wrapped function. This design ensures that each call operates independently, with the restart state tied to the specific invocation rather than the function’s definition.

The CallWrapper is automatically created by the Wrapper with every invocation of the wrapped function. The active CallWrapper instance is then passed as the value for any function argument annotated with CallWrapper or typing.Optional[CallWrapper]. This allows the wrapped function to access and interact with the state of the Wrapper during its execution.

Parameters:

wrapper (Wrapper)

atomic()[source]

A context manager to wrap a section of the workload that must not be executed while the termination procedure is in progress.

atomic() is implemented with a reentrant lock, shared between the termination procedure and atomic section in the wrapped function. The termination procedure won’t be launched if the main thread is executing inprocess.CallWrapper.atomic() code block, and the main thread won’t enter into inprocess.CallWrapper.atomic() code block if termination procedure is already in progress.

property iteration: int

Returns integer index of the current restart iteration.

ping()[source]

Sends a heartbeat to indicate that the workload is making meaningful forward progress.

The optional manual progress timeout is initiated with the first call to CallWrapper.ping() on each rank in a restart iteration. Once the timeout is activated, every distributed rank must periodically invoke CallWrapper.ping() to confirm ongoing progress. If any rank fails to report progress within the specified soft_timeout or hard_timeout intervals for the Wrapper, the rank will be considered unresponsive, and a restart of the wrapped function will be attempted.

Return type:

None

class nvidia_resiliency_ext.inprocess.Wrapper(*, store_factory=<class 'nvidia_resiliency_ext.inprocess.store.TCPStore'>, store_kwargs=None, initialize=None, abort=<nvidia_resiliency_ext.inprocess.abort.AbortTorchDistributed object>, finalize=None, health_check=None, rank_assignment=<nvidia_resiliency_ext.inprocess.rank_assignment.ShiftRanks object>, rank_filter=None, monitor_thread_interval=datetime.timedelta(seconds=1), monitor_process_interval=datetime.timedelta(seconds=1), progress_watchdog_interval=datetime.timedelta(seconds=1), soft_timeout=datetime.timedelta(seconds=60), hard_timeout=datetime.timedelta(seconds=90), heartbeat_timeout=datetime.timedelta(seconds=30), barrier_timeout=datetime.timedelta(seconds=120), completion_timeout=datetime.timedelta(seconds=120), last_call_wait=datetime.timedelta(seconds=1), termination_grace_time=datetime.timedelta(seconds=5), monitor_process_logfile=None, enabled=True)[source]

Python function wrapper that adds restart capabilities to an existing Python function implementing distributed PyTorch workload.

Upon a fault, the wrapped function is restarted across all distributed ranks, within the same operating system process. Wrapped function restart invocation excludes distributed ranks that are terminated, missing, or deemed unhealthy. When a failure occurs on any worker, the wrapper ensures the function restarts simultaneously on all healthy ranks. This process continues until all ranks complete execution successfully or a termination condition is met.

See the Usage Guide for detailed documentation.

Parameters:
  • store_factory (type[StoreMixin]) – Factory to construct the internal distributed store for communication between ranks.

  • store_kwargs (dict[str, Any] | None) – Dictionary of keyword arguments to construct the internal store with store_factory(**store_kwargs).

  • initialize (Initialize | None) – Rank-local initialize.

  • abort (Abort | None) – Asynchronously aborts execution.

  • finalize (Finalize | None) – Rank-local finalize.

  • health_check (HealthCheck | None) – Rank-local health check.

  • rank_assignment (RankAssignment) – Reassigns ranks and computes the new world size for the next restart iteration.

  • rank_filter (RankFilter | None) – Specifies ranks actively calling the wrapped function.

  • monitor_thread_interval (timedelta) – Monitoring interval for the monitor thread.

  • monitor_process_interval (timedelta) – Monitoring interval for the monitor process.

  • progress_watchdog_interval (timedelta) – Interval for automatic progress watchdog timestamp updates.

  • soft_timeout (timedelta) – Soft progress timeout. Timed-out rank executes asynchronous abort, and participates in the restart if healthy.

  • hard_timeout (timedelta) – Hard progress timeout. Timed-out rank is terminated.

  • heartbeat_timeout (timedelta) – Timeout for a missing rank detection heartbeat.

  • barrier_timeout (timedelta) – Barrier timeout.

  • completion_timeout (timedelta) – Completion barrier timeout.

  • last_call_wait (timedelta) – Time interval for other ranks to report concurrent terminal failures.

  • termination_grace_time (timedelta) – Interval between SIGTERM and SIGKILL signals issued by the hard timeout mechanism.

  • monitor_process_logfile (str | None) – Absolute filepath for the monitor process logfile. It may contain “{rank}” placeholder, to be filled with initial integer rank id.

  • enabled (bool) – Enables the wrapper.

Returns:

Returns the value of the wrapped function if all ranks successfully completed execution. Inactive ranks return None.