Replication

class nvidia_resiliency_ext.checkpointing.local.replication.strategies.CliqueReplicationStrategy(local_group, target_device='cpu')[source]

Bases: ReplicationStrategy

Implements a replication strategy where all participants are in a single group.

This strategy replicates local checkpoints among all ranks in the local process group, enabling efficient retrieval and communication of tensor data.

Parameters:

local_group (GroupWrapper | ProcessGroup)

classmethod from_replication_params(replication_jump=0, replication_factor=2)[source]

Instantiates process groups necessary for checkpoint replication.

Training ranks are divided into W // F distinct groups of size F, where W is the world size and F is the replication_factor. Each group consists of ranks:

n, n + J, n + 2J, …, n + (F - 1)J,

where J is the replication_jump and n = aJF + b, with:
  • a = 0, 1, …, (W / (JF)) - 1

  • b = 0, 1, …, J - 1.

Checkpoint shards are exchanged and fully replicated within each group.

Important: The world size (W) must be divisible by J * F.

This grouping enables replication across different failure domains by specifying J equal to the failure blast radius.

Example: For a world size of 32, replication_jump = 8, and replication_factor = 2, the replication groups (cliques) are:

0-8, 1-9, 2-10, 3-11, 4-12, 5-13, 6-14, 7-15, 16-24, 17-25, 18-26, 19-27, 20-28, 21-29, 22-30, 23-31

Parameters:
  • replication_jump (int, optional) – J in the formula above. Represents the gap between successive ranks storing replicas of a given rank’s data.

  • replication_factor (int, optional) – F in the formula above. Denotes the number of ranks storing replicas of a given rank’s data.

Return type:

CliqueReplicationStrategy

replicate(local_ckpt, id_)[source]

Replicates the local checkpoint and returns the replicated checkpoints with IDs.

This method splits the local checkpoint into a hollow state dictionary and its tensor data, gathers replicated copies from other ranks, and reconstructs the state dictionaries.

Parameters:
Returns:

  • List[TensorAwareStateDict]: A list of replicated checkpoints.

  • List[str]: A list of identifiers for the replicated checkpoints.

Return type:

Tuple[List[TensorAwareStateDict], List[str]]

retrieve_execute(*args, **kwargs)[source]

Executes the retrieval plan using the local group.

Returns:

The result of executing the retrieval plan.

retrieve_plan(globally_available_ids, wanted)[source]

Creates a plan for retrieving the specified IDs from globally available replicas.

Parameters:
  • globally_available_ids (Mapping[int, List[str]]) – Mapping of ranks to available IDs.

  • wanted (Sequence[str]) – List of IDs to retrieve.

Returns:

A plan detailing how to retrieve the requested IDs.

Return type:

ExchangePlan

Raises:

NoReplicasAvailableError – If no replicas are found for a requested ID.

class nvidia_resiliency_ext.checkpointing.local.replication.strategies.LazyCliqueReplicationStrategy(replication_jump=0, replication_factor=2)[source]

Bases: LazyReplicationStrategyBuilder[CliqueReplicationStrategy]

Lazy version of CliqueReplicationStrategy allowing to delay process group formation.

Training ranks are divided into W // F distinct groups of size F, where W is the world size and F is the replication_factor. Each group consists of ranks:

n, n + J, n + 2J, …, n + (F - 1)J,

where J is the replication_jump and n = aJF + b, with:
  • a = 0, 1, …, (W / (JF)) - 1

  • b = 0, 1, …, J - 1.

Checkpoint shards are exchanged and fully replicated within each group.

Important: The world size (W) must be divisible by J * F.

This grouping enables replication across different failure domains by specifying J equal to the failure blast radius.

Example: For a world size of 32, replication_jump = 8, and replication_factor = 2, the replication groups (cliques) are:

0-8, 1-9, 2-10, 3-11, 4-12, 5-13, 6-14, 7-15, 16-24, 17-25, 18-26, 19-27, 20-28, 21-29, 22-30, 23-31

Parameters:
  • replication_jump (int, optional) – J in the formula above. Represents the gap between successive ranks storing replicas of a given rank’s data.

  • replication_factor (int, optional) – F in the formula above. Denotes the number of ranks storing replicas of a given rank’s data.

class nvidia_resiliency_ext.checkpointing.local.replication.strategies.LazyReplicationStrategyBuilder[source]

Bases: ReplicationStrategy, ABC, Generic[EagerT]

Represents an uninitialized replication strategy.

Replication strategy needs process groups which can be impossible to initialize and the time of instantiation of the ReplicationStrategy class.

This class allows for a lazy initialization of an instance of EagerT type: >>> lazy_repl_strategy = LazyReplicationStrategyBuilder() >>> … >>> lazy_repl_strategy.replicate(…) # performs lazy init transparently >>> lazy_repl_strategy.retrieve_execute(…) # reuses previously initialized instance transparently

replicate(local_ckpt, id_)[source]

Delegate to the underlying replication strategy.

Parameters:
Return type:

Tuple[List[TensorAwareStateDict], List[str]]

property replication_strategy: EagerT

Lazy build on demand.

retrieve_execute(*args, **kwargs)[source]

Delegate to the underlying replication strategy.

retrieve_plan(globally_available_ids, wanted)[source]

Delegate to the underlying replication strategy.

Parameters:
Return type:

ExchangePlan

exception nvidia_resiliency_ext.checkpointing.local.replication.strategies.NoReplicasAvailableError[source]

Bases: Exception

Exception raised when no replicas are available for a requested ID.

class nvidia_resiliency_ext.checkpointing.local.replication.strategies.ReplicationStrategy[source]

Bases: ABC

Abstract base class defining the interface for replication strategies.

abstract replicate(local_ckpt, id_)[source]

Replicates the local checkpoint.

Parameters:
  • local_ckpt (TensorAwareStateDict) – The local checkpoint to be replicated.

  • id (str) – Identifier for the checkpoint.

  • id_ (str)

Returns:

A list of replicated checkpoints together with correspinding IDs

Return type:

Tuple[List[TensorAwareStateDict], List[str]]

abstract retrieve_execute(*args, **kwargs)[source]

Executes the retrieval plan.

abstract retrieve_plan(globally_available_ids, wanted)[source]

Generates a retrieval plan based on globally available IDs.

Parameters:
  • globally_available_ids (Mapping[int, List[str]]) – Mapping of ranks to available IDs.

  • wanted (Sequence[str]) – List of IDs to retrieve.

Returns:

A plan detailing how to retrieve the requested IDs.

Return type:

ExchangePlan