Asynchronous FileSystemWriter Implementation
Storage writer for PyT Distributed format allowing asynchronous save.
- class nvidia_resiliency_ext.checkpointing.async_ckpt.filesystem_async.FileSystemWriterAsync(path, *args, separation_hint=None, **kwargs)[source]
Bases:
FileSystemWriter
Async-enabled implementation of FileSystemWriter using file I/O.
This class does not spawn the async process itself but relies on an external async mechanism.
Flow:
Call write_data
Externally start an async process with get_save_function_and_args and its arguments.
The async function writer_proxy_func calls write_preloaded_data across multiple processes.
Once saving is finalized on all ranks, call super().finish with the results stored in self.writer_result.
Note: Step (3) can also be executed synchronously.
Currently, it is assumed that a separate writer is created for each ckpt save (intermediate state is stored as writer attributes).
Initialize the writer pointing to path.
- Parameters:
path (str | PathLike) – directory where the checkpoint will be written to.
single_file_per_rank – Produce one file per rank instead of one file per tensor/blob. Default to True.
sync_files – force files to be synced to permanent storage. Default to True.
thread_count – Number of IO threads to use to write. Default to 1.
per_thread_copy_ahead – How many bytes to copy from the GPU ahead of saving then. Default 10Mb.
cache_staged_state_dict – Whether to cache the staged state_dict. This option decreases staging latency at the cost of increases memory usage. Additionally, if this parameter is set to True, it’s the expectation that the stager is maintained and reused for multiple dcp.async_save calls. Default to False.
overwrite – Whether to allow overwriting existing checkpoints. Defaults to True.
_extensions – Extensions to apply to output streams (EXPERIMENTAL)
separation_hint (str | None)
If sync_files is disabled, there’s no guarantee that the checkpoint will be consistent in the case of a failure.
- property checkpoint_id: str | PathLike
return the checkpoint_id that will be used to save the checkpoint.
- finish(metadata, results)[source]
Finish the checkpointing process.
- Parameters:
metadata (Metadata) – metadata to save
results (List[List[WriteResult]]) – results to save
- Return type:
None
- get_save_function_and_args()[source]
Get function that saves the data to storage along with its arguments. Allows the external caller to apply the save function synchronously or asynchronously.
- Returns: None (if there is nothing to write on this rank) or a tuple of:
the function that saves the data.
the function that stages the GPU tensors to a destination for async checkpointing. This function should be self-contained.
arguments to that function in 1).
- static preload_tensors(write_buckets, non_blocking=True)[source]
Preloads tensors in state_dict to host memory via CPU memory.
- prepare_decentralized_global_plan(local_plan)[source]
Instead of assigning indices by plan order, uses PyT rank (same outcome).
- Parameters:
local_plan (SavePlan) – local plan to turn to a global plan (without interactions with other ranks)
- Returns:
- SavePlan - locally transformed plan equivalent to the plan that would be
created by the coordinator
- Return type:
- prepare_write_data(plan, planner)[source]
First stage of async saving. Copy data to CPU and plan the local saving.
- Parameters:
plan (SavePlan) – save plan generated by the PyT Distributed compatible planner
planner (SavePlanner) – save planner used to resolve the bytes and tensor data
- Return type:
None
Returns: None, but stores the save plan in self.write_buckets
- retrieve_write_results()[source]
- Turn the latest dict including write results from self.results_queue
into a single results lists. Includes error check.
- Returns (List[WriteResult]): the list of write results
from all local processes performing the save.
- Return type:
List[WriteResult]
- classmethod validate_checkpoint_id(checkpoint_id)[source]
Check if the given checkpoint_id is supported by the storage. This allow us to enable automatic storage selection.
- write_data(plan, planner)[source]
Write all items from
plan
.- Parameters:
plan (SavePlan)
planner (SavePlanner)
- Return type:
- static write_preloaded_data(transform_list, local_proc_idx, write_bucket, results_queue, count_queue, use_fsync, **kwargs)[source]
Performs actual data saving to storage.
- Parameters:
local_proc_idx (int) – index of a local process that performs writing
write_bucket (WriteBucket) – data to write to storage
results_queue (mp.Queue) – queue to return the write results to the proxy checkpoint process.
count_queue (mp.JoinableQueue) – queue to marks worker task as completed
use_fsync (bool) – if True, calls os.fsync at the end of saving
transform_list (List[_StorageWriterTransforms])
- Return type:
None
Returns: None, the write result are put into the queue
- static write_preloaded_data_multiproc(transform_list, use_msc, rank, write_buckets, global_results_queue)[source]
Performs saving data to storage with multiple processes.
Starts predefined number of processes and uses 2 queues to make sure the results are complete: - local_results_queue - to send the actual results - count_queue - small queue to mark worker as completed
Using just one queue disallowed proper exception handling.
This method is meant to be run in a forked subprocess. Triggering GC during execution leads to CUDA errors (cleaning up tensors owned by the parent process). To prevent this, we disable the GC explicitly for this function with _disable_gc.
- Parameters:
- Return type:
None
Returns: None