Asynchronous FileSystemWriter Implementation

Storage writer for PyT Distributed format allowing asynchronous save.

class nvidia_resiliency_ext.checkpointing.async_ckpt.filesystem_async.ConsistentDataIdentifier(key)[source]

Bases: object

Identifier for consistent data structure stored in worker cache.

This allows passing a lightweight identifier instead of pickling the entire data structure (which includes IPC handles) across process boundaries.

Parameters:

key (str)

class nvidia_resiliency_ext.checkpointing.async_ckpt.filesystem_async.FileSystemWriterAsync(path, *args, separation_hint=None, use_msc=False, is_multiproc_io=False, use_cached_data_structure=False, **kwargs)[source]

Bases: FileSystemWriter

Async-enabled implementation of FileSystemWriter using file I/O.

This class does not spawn the async process itself but relies on an external async mechanism.

Flow:

  1. Call write_data

  2. Externally start an async process with get_save_function_and_args and its arguments.

  3. The async function calls write_preloaded_data_multithread (threads) or write_preloaded_data_multiproc (processes) across multiple workers.

  4. Once saving is finalized on all ranks, call super().finish with the results stored in self.writer_result.

Note: Step (3) can also be executed synchronously.

Currently, it is assumed that a separate writer is created for each ckpt save (intermediate state is stored as writer attributes).

Initialize the writer pointing to path.

Parameters:
  • path (str | PathLike) – directory where the checkpoint will be written to.

  • single_file_per_rank – Produce one file per rank instead of one file per tensor/blob. Default to True.

  • sync_files – force files to be synced to permanent storage. Default to True.

  • thread_count – Number of IO threads to use to write. Default to 1.

  • per_thread_copy_ahead – How many bytes to copy from the GPU ahead of saving then. Default 10Mb.

  • cache_staged_state_dict – Whether to cache the staged state_dict. This option decreases staging latency at the cost of increases memory usage. Additionally, if this parameter is set to True, it’s the expectation that the stager is maintained and reused for multiple dcp.async_save calls. Default to False.

  • overwrite – Whether to allow overwriting existing checkpoints. Defaults to True.

  • _extensions – Extensions to apply to output streams (EXPERIMENTAL)

  • separation_hint (str | None)

  • use_msc (bool)

  • is_multiproc_io (bool)

  • use_cached_data_structure (bool)

    1. If sync_files is disabled, there’s no guarantee that the checkpoint will be consistent in the case of a failure.

property checkpoint_id: str | PathLike

return the checkpoint_id that will be used to save the checkpoint.

finish(metadata, results)[source]

Finish the checkpointing process.

Parameters:
  • metadata (Metadata) – metadata to save

  • results (List[List[WriteResult]]) – results to save

Return type:

None

get_save_function_and_args()[source]

Get function that saves the data to storage along with its arguments. Allows the external caller to apply the save function synchronously or asynchronously.

Returns: None (if there is nothing to write on this rank) or a tuple of:
  1. the function that saves the data.

  2. the function that stages the GPU tensors to a destination for async checkpointing. This function should be self-contained.

  3. arguments to that function in 1).

Return type:

Tuple[Callable | None, Callable | None, List]

static preload_tensors(resolved_plan_data, non_blocking=True)[source]

Creates write_buckets and preloads tensors to host memory.

This runs in the persistent worker process. Bucket creation is done here (not in prepare_write_data) so that cached GPU tensor data stored in the worker process can be retrieved and reused without re-pickling.

Parameters:
  • resolved_plan_data (Tuple) –

    Tuple containing (checkpoint_dir, (identifier, data_structure)) where: - identifier: ConsistentDataIdentifier (caching) or None - data_structure: (separation_hint, cached_tensor_data,

    uncached_tensor_data, byte_io_data, thread_count, storage_plan)

  • non_blocking (bool, optional) – Enable pinned D2H memcpy. Default is True.

Returns:

List of write buckets with tensors moved to CPU

Return type:

List[WriteBucket]

prepare_decentralized_global_plan(local_plan)[source]

Instead of assigning indices by plan order, uses PyT rank (same outcome).

Parameters:

local_plan (SavePlan) – local plan to turn to a global plan (without interactions with other ranks)

Returns:

SavePlan - locally transformed plan equivalent to the plan that would be

created by the coordinator

Return type:

SavePlan

prepare_local_plan(plan)[source]

Prepare the local plan for the checkpointing process.

Parameters:

plan (SavePlan)

Return type:

SavePlan

prepare_write_data(plan, planner)[source]

First stage of async saving. Resolve data and store in compact format.

Separates data into GPU tensors (potentially cacheable), CPU tensors (always fresh), and ByteIO (always fresh). Bucket creation is deferred to preload_tensors so that it can run in the persistent worker process and take advantage of the data cache.

Parameters:
  • plan (SavePlan) – save plan generated by the PyT Distributed compatible planner

  • planner (SavePlanner) – save planner used to resolve the bytes and tensor data

Return type:

None

Returns: None, but stores the resolved plan data in instance attributes

retrieve_write_results()[source]
Turn the latest dict including write results from self.results_queue

into a single results lists. Includes error check.

Returns (Union(List[WriteResult], WRAPPED_EXCEPTION): the list of write results

from all local workers performing the save, or a WRAPPED_EXCEPTION if an exception was raised during the writing process.

Return type:

List[WriteResult] | tuple[BaseException, StackSummary]

classmethod validate_checkpoint_id(checkpoint_id)[source]

Validate the checkpoint_id that will be used to save the checkpoint.

This method is available in PyTorch 2.3 and above.

Parameters:

checkpoint_id (str | PathLike)

Return type:

bool

write_data(plan, planner)[source]

Write all items from plan.

Parameters:
Return type:

Future[List[WriteResult]]

static write_preloaded_data(transform_list, open_file, local_thread_idx, write_bucket, results_queue, count_queue, use_fsync, **kwargs)[source]

Performs actual data saving to storage (used by worker threads in multithread mode).

Parameters:
  • transform_list (List[_StorageWriterTransforms]) – streaming transforms list

  • open_file (Callable) – file open callable

  • local_thread_idx (int) – index of the worker thread that performs writing

  • write_bucket (WriteBucket) – data to write to storage

  • results_queue (queue.Queue) – queue to return the write results. If None (main-thread worker), result is returned directly.

  • count_queue (queue.Queue) – queue to signal worker task completion. If None (main-thread worker), skipped.

  • use_fsync (bool) – if True, calls os.fsync at the end of saving

Return type:

Tuple[int, List[WriteResult] | Exception] | None

Returns: None when running in a thread (results put in queue);

result tuple when running as main-thread worker (results_queue is None)

static write_preloaded_data_multiproc(transform_list, use_msc, open_file, rank, write_buckets, global_results_queue)[source]

Performs saving data to storage with multiple processes.

Starts predefined number of processes and uses 2 queues to make sure the results are complete: - local_results_queue - to send the actual results - count_queue - small queue to mark worker as completed

Using just one queue disallowed proper exception handling.

This method is meant to be run in a forked subprocess. Triggering GC during execution leads to CUDA errors (cleaning up tensors owned by the parent process). To prevent this, we disable the GC explicitly for this function with _disable_gc.

Note: requires is_daemon=False on the PersistentAsyncCaller, because daemon processes cannot spawn child processes.

Parameters:
  • transform_list (List[_StorageWriterTransforms]) – streaming transforms list

  • use_msc (bool) – flag to indicate use of multi storage client

  • open_file (Callable) – file open callable

  • rank (int) – training rank

  • write_buckets (List[WriteBucket]) – write plan

  • global_results_queue (mp.Queue) – mp.Queue to collect Dict[List[WriteResults]] (or an Exception) from parallel write processes to the main training process

Return type:

None

Returns: None

static write_preloaded_data_multithread(transform_list, use_msc, open_file, rank, write_buckets, global_results_queue)[source]

Performs saving data to storage with multiple threads.

Uses threads (not processes) so that this can run safely inside a daemon process without spawning child processes. The last bucket runs on the calling thread to avoid thread creation overhead. Uses two queues for worker coordination: - local_results_queue - to collect write results from worker threads - count_queue - to signal worker completion (get + task_done / join).

Triggering GC during execution can lead to CUDA errors when tensors are shared. To prevent this, we disable the GC explicitly for this function with _disable_gc.

Parameters:
  • transform_list (List[_StorageWriterTransforms]) – streaming transforms list

  • use_msc (bool) – flag to indicate use of multi storage client for storage access

  • open_file (Callable) – file open callable

  • rank (int) – training rank

  • write_buckets (List[WriteBucket]) – write plan

  • global_results_queue (mp.Queue) – queue to send Dict[List[WriteResults]] (or an Exception) back to the main training process

Return type:

None

Returns: None

static write_preloaded_data_proc(transform_list, open_file, local_proc_idx, write_bucket, results_queue, count_queue, use_fsync, **kwargs)[source]

Performs actual data saving to storage (used by worker processes in multiproc mode).

Parameters:
  • local_proc_idx (int) – index of a local process that performs writing

  • write_bucket (WriteBucket) – data to write to storage

  • results_queue (mp.Queue) – queue to return the write results to the proxy checkpoint process.

  • count_queue (mp.JoinableQueue) – queue to marks worker task as completed

  • use_fsync (bool) – if True, calls os.fsync at the end of saving

  • transform_list (List[_StorageWriterTransforms])

  • open_file (Callable)

Return type:

None

Returns: None, the write results are put into the results_queue

nvidia_resiliency_ext.checkpointing.async_ckpt.filesystem_async.get_write_results_queue(mp_mode='spawn')[source]

Get or create a multiprocessing queue for write results.

Parameters:

mp_mode (str) – Multiprocessing context mode. Defaults to ‘spawn’.

Returns:

Queue for collecting write results.

Return type:

mp.Queue