Asynchronous FileSystemWriter Implementation
Storage writer for PyT Distributed format allowing asynchronous save.
- class nvidia_resiliency_ext.checkpointing.async_ckpt.filesystem_async.ConsistentDataIdentifier(key)[source]
Bases:
objectIdentifier for consistent data structure stored in worker cache.
This allows passing a lightweight identifier instead of pickling the entire data structure (which includes IPC handles) across process boundaries.
- Parameters:
key (str)
- class nvidia_resiliency_ext.checkpointing.async_ckpt.filesystem_async.FileSystemWriterAsync(path, *args, separation_hint=None, use_msc=False, is_multiproc_io=False, use_cached_data_structure=False, **kwargs)[source]
Bases:
FileSystemWriterAsync-enabled implementation of FileSystemWriter using file I/O.
This class does not spawn the async process itself but relies on an external async mechanism.
Flow:
Call write_data
Externally start an async process with get_save_function_and_args and its arguments.
The async function calls write_preloaded_data_multithread (threads) or write_preloaded_data_multiproc (processes) across multiple workers.
Once saving is finalized on all ranks, call super().finish with the results stored in self.writer_result.
Note: Step (3) can also be executed synchronously.
Currently, it is assumed that a separate writer is created for each ckpt save (intermediate state is stored as writer attributes).
Initialize the writer pointing to path.
- Parameters:
path (str | PathLike) – directory where the checkpoint will be written to.
single_file_per_rank – Produce one file per rank instead of one file per tensor/blob. Default to True.
sync_files – force files to be synced to permanent storage. Default to True.
thread_count – Number of IO threads to use to write. Default to 1.
per_thread_copy_ahead – How many bytes to copy from the GPU ahead of saving then. Default 10Mb.
cache_staged_state_dict – Whether to cache the staged state_dict. This option decreases staging latency at the cost of increases memory usage. Additionally, if this parameter is set to True, it’s the expectation that the stager is maintained and reused for multiple dcp.async_save calls. Default to False.
overwrite – Whether to allow overwriting existing checkpoints. Defaults to True.
_extensions – Extensions to apply to output streams (EXPERIMENTAL)
separation_hint (str | None)
use_msc (bool)
is_multiproc_io (bool)
use_cached_data_structure (bool)
If sync_files is disabled, there’s no guarantee that the checkpoint will be consistent in the case of a failure.
- property checkpoint_id: str | PathLike
return the checkpoint_id that will be used to save the checkpoint.
- finish(metadata, results)[source]
Finish the checkpointing process.
- Parameters:
metadata (Metadata) – metadata to save
results (List[List[WriteResult]]) – results to save
- Return type:
None
- get_save_function_and_args()[source]
Get function that saves the data to storage along with its arguments. Allows the external caller to apply the save function synchronously or asynchronously.
- Returns: None (if there is nothing to write on this rank) or a tuple of:
the function that saves the data.
the function that stages the GPU tensors to a destination for async checkpointing. This function should be self-contained.
arguments to that function in 1).
- static preload_tensors(resolved_plan_data, non_blocking=True)[source]
Creates write_buckets and preloads tensors to host memory.
This runs in the persistent worker process. Bucket creation is done here (not in prepare_write_data) so that cached GPU tensor data stored in the worker process can be retrieved and reused without re-pickling.
- Parameters:
resolved_plan_data (Tuple) –
Tuple containing (checkpoint_dir, (identifier, data_structure)) where: - identifier: ConsistentDataIdentifier (caching) or None - data_structure: (separation_hint, cached_tensor_data,
uncached_tensor_data, byte_io_data, thread_count, storage_plan)
non_blocking (bool, optional) – Enable pinned D2H memcpy. Default is True.
- Returns:
List of write buckets with tensors moved to CPU
- Return type:
List[WriteBucket]
- prepare_decentralized_global_plan(local_plan)[source]
Instead of assigning indices by plan order, uses PyT rank (same outcome).
- Parameters:
local_plan (SavePlan) – local plan to turn to a global plan (without interactions with other ranks)
- Returns:
- SavePlan - locally transformed plan equivalent to the plan that would be
created by the coordinator
- Return type:
- prepare_write_data(plan, planner)[source]
First stage of async saving. Resolve data and store in compact format.
Separates data into GPU tensors (potentially cacheable), CPU tensors (always fresh), and ByteIO (always fresh). Bucket creation is deferred to preload_tensors so that it can run in the persistent worker process and take advantage of the data cache.
- Parameters:
plan (SavePlan) – save plan generated by the PyT Distributed compatible planner
planner (SavePlanner) – save planner used to resolve the bytes and tensor data
- Return type:
None
Returns: None, but stores the resolved plan data in instance attributes
- retrieve_write_results()[source]
- Turn the latest dict including write results from self.results_queue
into a single results lists. Includes error check.
- Returns (Union(List[WriteResult], WRAPPED_EXCEPTION): the list of write results
from all local workers performing the save, or a WRAPPED_EXCEPTION if an exception was raised during the writing process.
- Return type:
List[WriteResult] | tuple[BaseException, StackSummary]
- classmethod validate_checkpoint_id(checkpoint_id)[source]
Validate the checkpoint_id that will be used to save the checkpoint.
This method is available in PyTorch 2.3 and above.
- write_data(plan, planner)[source]
Write all items from
plan.- Parameters:
plan (SavePlan)
planner (SavePlanner)
- Return type:
- static write_preloaded_data(transform_list, open_file, local_thread_idx, write_bucket, results_queue, count_queue, use_fsync, **kwargs)[source]
Performs actual data saving to storage (used by worker threads in multithread mode).
- Parameters:
transform_list (List[_StorageWriterTransforms]) – streaming transforms list
open_file (Callable) – file open callable
local_thread_idx (int) – index of the worker thread that performs writing
write_bucket (WriteBucket) – data to write to storage
results_queue (queue.Queue) – queue to return the write results. If None (main-thread worker), result is returned directly.
count_queue (queue.Queue) – queue to signal worker task completion. If None (main-thread worker), skipped.
use_fsync (bool) – if True, calls os.fsync at the end of saving
- Return type:
- Returns: None when running in a thread (results put in queue);
result tuple when running as main-thread worker (results_queue is None)
- static write_preloaded_data_multiproc(transform_list, use_msc, open_file, rank, write_buckets, global_results_queue)[source]
Performs saving data to storage with multiple processes.
Starts predefined number of processes and uses 2 queues to make sure the results are complete: - local_results_queue - to send the actual results - count_queue - small queue to mark worker as completed
Using just one queue disallowed proper exception handling.
This method is meant to be run in a forked subprocess. Triggering GC during execution leads to CUDA errors (cleaning up tensors owned by the parent process). To prevent this, we disable the GC explicitly for this function with _disable_gc.
Note: requires is_daemon=False on the PersistentAsyncCaller, because daemon processes cannot spawn child processes.
- Parameters:
transform_list (List[_StorageWriterTransforms]) – streaming transforms list
use_msc (bool) – flag to indicate use of multi storage client
open_file (Callable) – file open callable
rank (int) – training rank
write_buckets (List[WriteBucket]) – write plan
global_results_queue (mp.Queue) – mp.Queue to collect Dict[List[WriteResults]] (or an Exception) from parallel write processes to the main training process
- Return type:
None
Returns: None
- static write_preloaded_data_multithread(transform_list, use_msc, open_file, rank, write_buckets, global_results_queue)[source]
Performs saving data to storage with multiple threads.
Uses threads (not processes) so that this can run safely inside a daemon process without spawning child processes. The last bucket runs on the calling thread to avoid thread creation overhead. Uses two queues for worker coordination: - local_results_queue - to collect write results from worker threads - count_queue - to signal worker completion (get + task_done / join).
Triggering GC during execution can lead to CUDA errors when tensors are shared. To prevent this, we disable the GC explicitly for this function with _disable_gc.
- Parameters:
transform_list (List[_StorageWriterTransforms]) – streaming transforms list
use_msc (bool) – flag to indicate use of multi storage client for storage access
open_file (Callable) – file open callable
rank (int) – training rank
write_buckets (List[WriteBucket]) – write plan
global_results_queue (mp.Queue) – queue to send Dict[List[WriteResults]] (or an Exception) back to the main training process
- Return type:
None
Returns: None
- static write_preloaded_data_proc(transform_list, open_file, local_proc_idx, write_bucket, results_queue, count_queue, use_fsync, **kwargs)[source]
Performs actual data saving to storage (used by worker processes in multiproc mode).
- Parameters:
local_proc_idx (int) – index of a local process that performs writing
write_bucket (WriteBucket) – data to write to storage
results_queue (mp.Queue) – queue to return the write results to the proxy checkpoint process.
count_queue (mp.JoinableQueue) – queue to marks worker task as completed
use_fsync (bool) – if True, calls os.fsync at the end of saving
transform_list (List[_StorageWriterTransforms])
open_file (Callable)
- Return type:
None
Returns: None, the write results are put into the results_queue
- nvidia_resiliency_ext.checkpointing.async_ckpt.filesystem_async.get_write_results_queue(mp_mode='spawn')[source]
Get or create a multiprocessing queue for write results.
- Parameters:
mp_mode (str) – Multiprocessing context mode. Defaults to ‘spawn’.
- Returns:
Queue for collecting write results.
- Return type:
mp.Queue