Asynchronous FileSystemWriter Implementation

Storage writer for PyT Distributed format allowing asynchronous save.

class nvidia_resiliency_ext.checkpointing.async_ckpt.filesystem_async.FileSystemWriterAsync(path, *args, separation_hint=None, **kwargs)[source]

Bases: FileSystemWriter

Async-enabled implementation of FileSystemWriter using file I/O.

This class does not spawn the async process itself but relies on an external async mechanism.

Flow:

  1. Call write_data

  2. Externally start an async process with get_save_function_and_args and its arguments.

  3. The async function writer_proxy_func calls write_preloaded_data across multiple processes.

  4. Once saving is finalized on all ranks, call super().finish with the results stored in self.writer_result.

Note: Step (3) can also be executed synchronously.

Currently, it is assumed that a separate writer is created for each ckpt save (intermediate state is stored as writer attributes).

Initialize the writer pointing to path.

Parameters:
  • path (str | PathLike) – directory where the checkpoint will be written to.

  • single_file_per_rank – Produce one file per rank instead of one file per tensor/blob. Default to True.

  • sync_files – force files to be synced to permanent storage. Default to True.

  • thread_count – Number of IO threads to use to write. Default to 1.

  • per_thread_copy_ahead – How many bytes to copy from the GPU ahead of saving then. Default 10Mb.

  • cache_staged_state_dict – Whether to cache the staged state_dict. This option decreases staging latency at the cost of increases memory usage. Additionally, if this parameter is set to True, it’s the expectation that the stager is maintained and reused for multiple dcp.async_save calls. Default to False.

  • overwrite – Whether to allow overwriting existing checkpoints. Defaults to True.

  • _extensions – Extensions to apply to output streams (EXPERIMENTAL)

  • separation_hint (str | None)

    1. If sync_files is disabled, there’s no guarantee that the checkpoint will be consistent in the case of a failure.

property checkpoint_id: str | PathLike

return the checkpoint_id that will be used to save the checkpoint.

finish(metadata, results)[source]

Finish the checkpointing process.

Parameters:
  • metadata (Metadata) – metadata to save

  • results (List[List[WriteResult]]) – results to save

Return type:

None

get_save_function_and_args()[source]

Get function that saves the data to storage along with its arguments. Allows the external caller to apply the save function synchronously or asynchronously.

Returns: None (if there is nothing to write on this rank) or a tuple of:
  1. the function that saves the data.

  2. the function that stages the GPU tensors to a destination for async checkpointing. This function should be self-contained.

  3. arguments to that function in 1).

Return type:

Tuple[Callable | None, Callable | None, List]

static preload_tensors(write_buckets, non_blocking=True)[source]

Preloads tensors in state_dict to host memory via CPU memory.

Parameters:
  • write_buckets (List) – List of WriteBucket objects that define what to save in a checkpoint.

  • non_blocking (bool, optional) – knob to enable pinned D2H memcpy. Default is True.

Return type:

List[Tuple[Path, str, Tuple[list, list]]]

prepare_decentralized_global_plan(local_plan)[source]

Instead of assigning indices by plan order, uses PyT rank (same outcome).

Parameters:

local_plan (SavePlan) – local plan to turn to a global plan (without interactions with other ranks)

Returns:

SavePlan - locally transformed plan equivalent to the plan that would be

created by the coordinator

Return type:

SavePlan

prepare_local_plan(plan)[source]

Prepare the local plan for the checkpointing process.

Parameters:

plan (SavePlan)

Return type:

SavePlan

prepare_write_data(plan, planner)[source]

First stage of async saving. Copy data to CPU and plan the local saving.

Parameters:
  • plan (SavePlan) – save plan generated by the PyT Distributed compatible planner

  • planner (SavePlanner) – save planner used to resolve the bytes and tensor data

Return type:

None

Returns: None, but stores the save plan in self.write_buckets

retrieve_write_results()[source]
Turn the latest dict including write results from self.results_queue

into a single results lists. Includes error check.

Returns (List[WriteResult]): the list of write results

from all local processes performing the save.

Return type:

List[WriteResult]

classmethod validate_checkpoint_id(checkpoint_id)[source]

Check if the given checkpoint_id is supported by the storage. This allow us to enable automatic storage selection.

Parameters:

checkpoint_id (str | PathLike)

Return type:

bool

write_data(plan, planner)[source]

Write all items from plan.

Parameters:
Return type:

Future[List[WriteResult]]

static write_preloaded_data(transform_list, local_proc_idx, write_bucket, results_queue, count_queue, use_fsync, **kwargs)[source]

Performs actual data saving to storage.

Parameters:
  • local_proc_idx (int) – index of a local process that performs writing

  • write_bucket (WriteBucket) – data to write to storage

  • results_queue (mp.Queue) – queue to return the write results to the proxy checkpoint process.

  • count_queue (mp.JoinableQueue) – queue to marks worker task as completed

  • use_fsync (bool) – if True, calls os.fsync at the end of saving

  • transform_list (List[_StorageWriterTransforms])

Return type:

None

Returns: None, the write result are put into the queue

static write_preloaded_data_multiproc(transform_list, use_msc, rank, write_buckets, global_results_queue)[source]

Performs saving data to storage with multiple processes.

Starts predefined number of processes and uses 2 queues to make sure the results are complete: - local_results_queue - to send the actual results - count_queue - small queue to mark worker as completed

Using just one queue disallowed proper exception handling.

This method is meant to be run in a forked subprocess. Triggering GC during execution leads to CUDA errors (cleaning up tensors owned by the parent process). To prevent this, we disable the GC explicitly for this function with _disable_gc.

Parameters:
  • write_buckets (List[WriteBucket]) – write plan

  • global_results_queue (mp.Queue) – mp.Queue to collect Dict[List[WriteResults]] (or an Exception) from parallel write processes to the main training process

  • transform_list (List[_StorageWriterTransforms])

  • use_msc (bool)

  • rank (int)

Return type:

None

Returns: None