Asynchronous FileSystemWriter Implementation
Storage writer for PyT Distributed format allowing asynchronous save.
- class nvidia_resiliency_ext.checkpointing.async_ckpt.filesystem_async.FileSystemWriterAsync(*args, separation_hint=None, **kwargs)[source]
Bases:
FileSystemWriter
Async-enabled implementation of FileSystemWriter using file I/O.
This class does not spawn the async process itself but relies on an external async mechanism.
Flow:
Call write_data
Externally start an async process with get_save_function_and_args and its arguments.
The async function writer_proxy_func calls write_preloaded_data across multiple processes.
Once saving is finalized on all ranks, call super().finish with the results stored in self.writer_result.
Note: Step (3) can also be executed synchronously.
Currently, it is assumed that a separate writer is created for each ckpt save (intermediate state is stored as writer attributes).
Initialize the writer pointing to path.
- Parameters:
path – directory where the checkpoint will be written to.
single_file_per_rank – Produce one file per rank instead of one file per tensor/blob. Default to True.
sync_files – force files to be synced to permanent storage. Default to True.
thread_count – Number of IO threads to use to write. Default to 1.
per_thread_copy_ahead – How many bytes to copy from the GPU ahead of saving then. Default 10Mb.
cache_staged_state_dict – Whether to cache the staged state_dict. This option decreases staging latency at the cost of increases memory usage. Additionally, if this parameter is set to True, it’s the expectation that the stager is maintained and re-used for multiple dcp.async_save calls. Default to False.
overwrite – Whether to allow overwriting existing checkpoints. Defaults to True.
separation_hint (str | None)
If sync_files is disabled, there’s no guarantee that the checkpoint will be consistent in the case of a failure.
- get_save_function_and_args()[source]
Get function that saves the data to storage along with its arguments. Allows the external caller to apply the save function synchronously or asynchronously.
- Returns: None (if there is nothing to write on this rank) or a tuple of:
the function that saves the data.
the function that stages the GPU tensors to a destination for async checkpointing. This function should be self-contained.
arguments to that function in 1).
- static preload_tensors(write_buckets, non_blocking=True)[source]
Preloads tensors in state_dict to host memory via CPU memory.
- prepare_decentralized_global_plan(local_plan)[source]
Instead of assigning indices by plan order, uses PyT rank (same outcome).
- Parameters:
local_plan (SavePlan) – local plan to turn to a global plan (without interactions with other ranks)
- Returns:
- SavePlan - locally transformed plan equivalent to the plan that would be
created by the coordinator
- Return type:
- prepare_write_data(plan, planner)[source]
First stage of async saving. Copy data to CPU and plan the local saving.
- Parameters:
plan (SavePlan) – save plan generated by the PyT Distributed compatible planner
planner (SavePlanner) – save planner used to resolve the bytes and tensor data
- Return type:
None
Returns: None, but stores the save plan in self.write_buckets
- retrieve_write_results()[source]
- Turn the latest dict including write results from self.results_queue
into a single results lists. Includes error check.
- Returns (List[WriteResult]): the list of write results
from all local processes performing the save.
- Return type:
List[WriteResult]
- write_data(plan, planner)[source]
Write all items from
plan
.- Parameters:
plan (SavePlan)
planner (SavePlanner)
- Return type:
- static write_preloaded_data(transform_list, local_proc_idx, write_bucket, results_queue, count_queue, use_fsync)[source]
Performs actual data saving to storage.
- Parameters:
local_proc_idx (int) – index of a local process that performs writing
write_bucket (WriteBucket) – data to write to storage
results_queue (mp.Queue) – queue to return the write results to the proxy checkpoint process.
count_queue (mp.JoinableQueue) – queue to marks worker task as completed
use_fsync (bool) – if True, calls os.fsync at the end of saving
- Return type:
None
Returns: None, the write result are put into the queue
- static write_preloaded_data_multiproc(transform_list, rank, write_buckets, global_results_queue)[source]
Performs saving data to storage with multiple processes.
Starts predefined number of processes and uses 2 queues to make sure the results are complete: - local_results_queue - to send the actual results - count_queue - small queue to mark worker as completed
Using just one queue disallowed proper exception handling.
This method is meant to be run in a forked subprocess. Triggering GC during execution leads to CUDA errors (cleaning up tensors owned by the parent process). To prevent this, we disable the GC explicitly for this function with _disable_gc.
- Parameters:
- Return type:
None
Returns: None