Wrapper
- class nvidia_resiliency_ext.inprocess.CallWrapper(wrapper)[source]
The
CallWrapper
encapsulates the state and execution flow of the restart capabilities of theWrapper
for a single invocation of the wrapped function. This design ensures that each call operates independently, with the restart state tied to the specific invocation rather than the function’s definition.The
CallWrapper
is automatically created by theWrapper
with every invocation of the wrapped function. The activeCallWrapper
instance is then passed as the value for any function argument annotated withCallWrapper
ortyping.Optional[CallWrapper]
. This allows the wrapped function to access and interact with the state of theWrapper
during its execution.- Parameters:
wrapper (Wrapper)
- atomic()[source]
A context manager to wrap a section of the workload that must not be executed while the termination procedure is in progress.
atomic()
is implemented with a reentrant lock, shared between the termination procedure and atomic section in the wrapped function. The termination procedure won’t be launched if the main thread is executinginprocess.CallWrapper.atomic()
code block, and the main thread won’t enter intoinprocess.CallWrapper.atomic()
code block if termination procedure is already in progress.
- ping()[source]
Sends a heartbeat to indicate that the workload is making meaningful forward progress.
The optional manual progress timeout is initiated with the first call to
CallWrapper.ping()
on each rank in a restart iteration. Once the timeout is activated, every distributed rank must periodically invokeCallWrapper.ping()
to confirm ongoing progress. If any rank fails to report progress within the specifiedsoft_timeout
orhard_timeout
intervals for theWrapper
, the rank will be considered unresponsive, and a restart of the wrapped function will be attempted.- Return type:
None
- class nvidia_resiliency_ext.inprocess.Wrapper(*, store_factory=<class 'nvidia_resiliency_ext.inprocess.store.TCPStore'>, store_kwargs=None, initialize=None, abort=<nvidia_resiliency_ext.inprocess.abort.AbortTorchDistributed object>, finalize=None, health_check=None, rank_assignment=<nvidia_resiliency_ext.inprocess.rank_assignment.ShiftRanks object>, rank_filter=None, monitor_thread_interval=datetime.timedelta(seconds=1), monitor_process_interval=datetime.timedelta(seconds=1), progress_watchdog_interval=datetime.timedelta(seconds=1), soft_timeout=datetime.timedelta(seconds=60), hard_timeout=datetime.timedelta(seconds=90), heartbeat_timeout=datetime.timedelta(seconds=30), barrier_timeout=datetime.timedelta(seconds=120), completion_timeout=datetime.timedelta(seconds=120), last_call_wait=datetime.timedelta(seconds=1), termination_grace_time=datetime.timedelta(seconds=5), monitor_process_logfile=None, enabled=True)[source]
Python function wrapper that adds restart capabilities to an existing Python function implementing distributed PyTorch workload.
Upon a fault, the wrapped function is restarted across all distributed ranks, within the same operating system process. Wrapped function restart invocation excludes distributed ranks that are terminated, missing, or deemed unhealthy. When a failure occurs on any worker, the wrapper ensures the function restarts simultaneously on all healthy ranks. This process continues until all ranks complete execution successfully or a termination condition is met.
See the Usage Guide for detailed documentation.
- Parameters:
store_factory (type[StoreMixin]) – Factory to construct the internal distributed store for communication between ranks.
store_kwargs (dict[str, Any] | None) – Dictionary of keyword arguments to construct the internal store with
store_factory(**store_kwargs)
.initialize (Initialize | None) – Rank-local initialize.
abort (Abort | None) – Asynchronously aborts execution.
finalize (Finalize | None) – Rank-local finalize.
health_check (HealthCheck | None) – Rank-local health check.
rank_assignment (RankAssignment) – Reassigns ranks and computes the new world size for the next restart iteration.
rank_filter (RankFilter | None) – Specifies ranks actively calling the wrapped function.
monitor_thread_interval (timedelta) – Monitoring interval for the monitor thread.
monitor_process_interval (timedelta) – Monitoring interval for the monitor process.
progress_watchdog_interval (timedelta) – Interval for automatic progress watchdog timestamp updates.
soft_timeout (timedelta) – Soft progress timeout. Timed-out rank executes asynchronous abort, and participates in the restart if healthy.
hard_timeout (timedelta) – Hard progress timeout. Timed-out rank is terminated.
heartbeat_timeout (timedelta) – Timeout for a missing rank detection heartbeat.
barrier_timeout (timedelta) – Barrier timeout.
completion_timeout (timedelta) – Completion barrier timeout.
last_call_wait (timedelta) – Time interval for other ranks to report concurrent terminal failures.
termination_grace_time (timedelta) – Interval between
SIGTERM
andSIGKILL
signals issued by the hard timeout mechanism.monitor_process_logfile (str | None) – Absolute filepath for the monitor process logfile. It may contain “{rank}” placeholder, to be filled with initial integer rank id.
enabled (bool) – Enables the wrapper.
- Returns:
Returns the value of the wrapped function if all ranks successfully completed execution. Inactive ranks return
None
.