Asynchronous FileSystemWriter Implementation

Storage writer for PyT Distributed format allowing asynchronous save.

class nvidia_resiliency_ext.checkpointing.async_ckpt.filesystem_async.FileSystemWriterAsync(*args, separation_hint=None, **kwargs)[source]

Bases: FileSystemWriter

Async-enabled implementation of FileSystemWriter using file I/O.

This class does not spawn the async process itself but relies on an external async mechanism.

Flow:

  1. Call write_data

  2. Externally start an async process with get_save_function_and_args and its arguments.

  3. The async function writer_proxy_func calls write_preloaded_data across multiple processes.

  4. Once saving is finalized on all ranks, call super().finish with the results stored in self.writer_result.

Note: Step (3) can also be executed synchronously.

Currently, it is assumed that a separate writer is created for each ckpt save (intermediate state is stored as writer attributes).

Initialize the writer pointing to path.

Parameters:
  • path – directory where the checkpoint will be written to.

  • single_file_per_rank – Produce one file per rank instead of one file per tensor/blob. Default to True.

  • sync_files – force files to be synced to permanent storage. Default to True.

  • thread_count – Number of IO threads to use to write. Default to 1.

  • per_thread_copy_ahead – How many bytes to copy from the GPU ahead of saving then. Default 10Mb.

  • cache_staged_state_dict – Whether to cache the staged state_dict. This option decreases staging latency at the cost of increases memory usage. Additionally, if this parameter is set to True, it’s the expectation that the stager is maintained and re-used for multiple dcp.async_save calls. Default to False.

  • overwrite – Whether to allow overwriting existing checkpoints. Defaults to True.

  • separation_hint (str | None)

    1. If sync_files is disabled, there’s no guarantee that the checkpoint will be consistent in the case of a failure.

get_save_function_and_args()[source]

Get function that saves the data to storage along with its arguments. Allows the external caller to apply the save function synchronously or asynchronously.

Returns: None (if there is nothing to write on this rank) or a tuple of:
  1. the function that saves the data.

  2. the function that stages the GPU tensors to a destination for async checkpointing. This function should be self-contained.

  3. arguments to that function in 1).

Return type:

Tuple[Callable | None, Callable | None, List]

static preload_tensors(write_buckets, non_blocking=True)[source]

Preloads tensors in state_dict to host memory via CPU memory.

Parameters:
  • write_buckets (List) – List of WriteBucket objects that define what to save in a checkpoint.

  • non_blocking (bool, optional) – knob to enable pinned D2H memcpy. Default is True.

Return type:

List[Tuple[Path, str, Tuple[list, list]]]

prepare_decentralized_global_plan(local_plan)[source]

Instead of assigning indices by plan order, uses PyT rank (same outcome).

Parameters:

local_plan (SavePlan) – local plan to turn to a global plan (without interactions with other ranks)

Returns:

SavePlan - locally transformed plan equivalent to the plan that would be

created by the coordinator

Return type:

SavePlan

prepare_write_data(plan, planner)[source]

First stage of async saving. Copy data to CPU and plan the local saving.

Parameters:
  • plan (SavePlan) – save plan generated by the PyT Distributed compatible planner

  • planner (SavePlanner) – save planner used to resolve the bytes and tensor data

Return type:

None

Returns: None, but stores the save plan in self.write_buckets

retrieve_write_results()[source]
Turn the latest dict including write results from self.results_queue

into a single results lists. Includes error check.

Returns (List[WriteResult]): the list of write results

from all local processes performing the save.

Return type:

List[WriteResult]

write_data(plan, planner)[source]

Write all items from plan.

Parameters:
Return type:

Future[List[WriteResult]]

static write_preloaded_data(transform_list, local_proc_idx, write_bucket, results_queue, count_queue, use_fsync)[source]

Performs actual data saving to storage.

Parameters:
  • local_proc_idx (int) – index of a local process that performs writing

  • write_bucket (WriteBucket) – data to write to storage

  • results_queue (mp.Queue) – queue to return the write results to the proxy checkpoint process.

  • count_queue (mp.JoinableQueue) – queue to marks worker task as completed

  • use_fsync (bool) – if True, calls os.fsync at the end of saving

  • transform_list (List[Any])

Return type:

None

Returns: None, the write result are put into the queue

static write_preloaded_data_multiproc(transform_list, rank, write_buckets, global_results_queue)[source]

Performs saving data to storage with multiple processes.

Starts predefined number of processes and uses 2 queues to make sure the results are complete: - local_results_queue - to send the actual results - count_queue - small queue to mark worker as completed

Using just one queue disallowed proper exception handling.

This method is meant to be run in a forked subprocess. Triggering GC during execution leads to CUDA errors (cleaning up tensors owned by the parent process). To prevent this, we disable the GC explicitly for this function with _disable_gc.

Parameters:
  • write_buckets (List[WriteBucket]) – write plan

  • global_results_queue (mp.Queue) – mp.Queue to collect Dict[List[WriteResults]] (or an Exception) from parallel write processes to the main training process

  • transform_list (List[Any])

  • rank (int)

Return type:

None

Returns: None