Asynchronous Checkpoint Core Utilities
This module provides an async utilities which allow to start a checkpoint save process in the background.
- class nvidia_resiliency_ext.checkpointing.async_ckpt.core.AsyncCaller[source]
Bases:
ABC
Wrapper around mp.Process that ensures correct semantic of distributed finalization.
Starts process asynchronously and allows checking if all processes on all ranks are done.
- abstract close()[source]
Terminate the async caller at exit of an application or some termination conditions
- abstract is_current_async_call_done(blocking, no_dist)[source]
Check if async save is finished on all ranks.
For semantic correctness, requires rank synchronization in each check. This method must be called on all ranks.
- Parameters:
- Returns:
- True if all ranks are done (immediately of after active wait
if blocking is True), False if at least one rank is still active.
- Return type:
- abstract schedule_async_call(async_req)[source]
- Schedule async_req with some process forking or reusing
persistent worker
This method must be called on all ranks.
- Parameters:
async_req (AsyncRequest) – AsyncRequest object containing to start async process
- Return type:
None
- class nvidia_resiliency_ext.checkpointing.async_ckpt.core.AsyncCallsQueue(persistent=False)[source]
Bases:
object
Manages a queue of async calls.
Allows adding a new async call with schedule_async_request and finalizing active calls with maybe_finalize_async_calls.
- Parameters:
persistent (bool)
- maybe_finalize_async_calls(blocking=False, no_dist=False)[source]
Finalizes all available calls.
This method must be called on all ranks.
- Parameters:
blocking (bool, optional) – if True, will wait until all active requests are done. Otherwise, finalizes only the async request that already finished. Defaults to False.
- Returns:
- list of indices (as returned by schedule_async_request)
of async calls that have been successfully finalized.
- Return type:
List[int]
- schedule_async_request(async_request)[source]
Start a new async call and add it to a queue of active async calls.
This method must be called on all ranks.
- Parameters:
async_request (AsyncRequest) – async request to start.
- Returns:
- index of the async call that was started.
This can help the user keep track of the async calls.
- Return type:
- class nvidia_resiliency_ext.checkpointing.async_ckpt.core.AsyncRequest(async_fn, async_fn_args, finalize_fns, async_fn_kwargs={}, preload_fn=None, is_frozen=False, call_idx=0)[source]
Bases:
NamedTuple
Represents an async request that needs to be scheduled for execution.
- Parameters:
async_fn (Callable, optional) – async function to call. None represents noop.
async_fn_args (Tuple) – args to pass to async_fn.
finalize_fns (List[Callable]) – list of functions to call to finalize the request. These functions will be called synchronously after async_fn is done on all ranks.
async_fn_kwargs (Tuple) – kwargs to pass to async_fn.
preload_fn (Callable) – preload function to stage tensors from GPU to Host. This should be self-contained with a proper list of arguments with partial.
is_frozen (Bool) – a flag to indicate this async request can be modified or not.
call_idx (int) – index variable used to order async requests for synchronization in preloading and writing tensors on the async caller
Create new instance of AsyncRequest(async_fn, async_fn_args, finalize_fns, async_fn_kwargs, preload_fn, is_frozen, call_idx)
- add_finalize_fn(fn)[source]
Adds a new finalize function to the request.
- Parameters:
fn (Callable) – function to add to the async request. This function will be called after existing finalization functions.
- Returns:
None
- Return type:
None
- execute_sync()[source]
Helper to synchronously execute the request.
This logic is equivalent to what should happen in case of the async call.
- Return type:
None
- class nvidia_resiliency_ext.checkpointing.async_ckpt.core.PersistentAsyncCaller[source]
Bases:
AsyncCaller
Wrapper around mp.Process that ensures correct semantic of distributed finalization.
Starts process asynchronously and allows checking if all processes on all ranks are done.
- static async_loop(rank, queue, preload_q, comp_q, log_level=20)[source]
Main function for the persistent checkpoint worker
The persisent worker is created once and terminated at exit or when application calls close() explictily
This routine receives AsyncRequest and does preload_fn first and put the integer value in preload_q to inform the trainer to proceed. When the async_fn from the request` is completed (background saving is done), it puts a integer value to comp_q to notify the trainer the completion.
- Parameters:
rank (int) – the rank of the trainer where the persistent worker is created.
queue (mp.JoinableQueue) – the main queue used to receive AsyncRequest from the training rank
preload_q (mp.JoinableQueue) – a queue to inform trainer that preloading of tensors from GPU to Host or dedicated location is completed
comp_q (mp.Queue) – a queue to inform the training rank the completion of scheduled async checkpoint request
log_level (int, Optional) – an integer to set log-level in this spawned process to get aligned with the training rank’s logging level
- close()[source]
Terminate the async caller at exit of an application or some termination conditions
- is_current_async_call_done(blocking=False, no_dist=False)[source]
Check if async save is finished on all ranks.
For semantic correctness, requires rank synchronization in each check. This method must be called on all ranks.
- Parameters:
- Returns:
- True if all ranks are done (immediately of after active wait
if blocking is True), False if at least one rank is still active.
- Return type:
- schedule_async_call(async_req)[source]
Put AsyncRequest to the Persistent Async Caller
This method must be called on all ranks. The async_req object is pickled and sent to the persistent async worker via a JoinableQueue. Therefore, all arguments within async_req must be picklable.
- Parameters:
async_fn (Callable, optional) – async function to call. If None, no process will be started.
async_req (AsyncRequest) – AsyncRequest object containing to schedule a checkpointing request
- Return type:
None
- class nvidia_resiliency_ext.checkpointing.async_ckpt.core.TemporalAsyncCaller[source]
Bases:
AsyncCaller
Wrapper around mp.Process that ensures correct semantic of distributed finalization.
Starts process asynchronously and allows checking if all processes on all ranks are done.
- close()[source]
Terminate the async caller at exit of an application or some termination conditions
- is_current_async_call_done(blocking=False, no_dist=False)[source]
Check if async save is finished on all ranks.
For semantic correctness, requires rank synchronization in each check. This method must be called on all ranks.
- Parameters:
- Returns:
- True if all ranks are done (immediately of after active wait
if blocking is True), False if at least one rank is still active.
- Return type:
- schedule_async_call(async_req)[source]
Spawn a process with async_fn as the target.
This method must be called on all ranks.
- Parameters:
async_fn (Callable, optional) – async function to call. If None, no process will be started.
async_req (AsyncRequest) – AsyncRequest object containing to start async process
- Return type:
None