Rank Assignment
Rank Assignment
Base class
Tree
- class nvidia_resiliency_ext.inprocess.rank_assignment.Layer(min_ranks=1, max_ranks=None, key_or_fn='', flag=None)[source]
Represents a configuration for a layer of branches at a certain depth in a topology tree constructed by
Tree
.- Parameters:
min_ranks (int) – the minimum number of healthy ranks in a subtree
max_ranks (int | None) – the maximum number of ranks to activate in a subtree, no limit if
None
key_or_fn (str | Callable[[State], str]) – a string key, or a
Callable
evaluated withinprocess.State
as input to produce a grouping string keyflag (LayerFlag | None) – an optional flag that modifies rank assignment policy in a given branch
- class nvidia_resiliency_ext.inprocess.rank_assignment.LayerFlag(value)[source]
A flag to modify rank assignment or rank filtering policy in a given
Layer
of aTree
rank assignment.- RESERVE
indicates that branches at this layer of the topology tree may be traversed while searching for a replacement inactive rank
- BACKFILL
indicates that branches at this layer of the topology tree may be traversed while searching for a replacement active rank
- class nvidia_resiliency_ext.inprocess.rank_assignment.Tree(layers, world_size_filter=None)[source]
Implements an integrated rank assignment and activation algorithm that builds a multi-layer topology tree for distributed ranks. Each layer in this tree specifies constraints and policies for assigning and activating ranks. Grouping keys in each layer can align with hardware properties (e.g., to confine ranks within a compute node) or application-driven requirements (e.g., ensuring a particular divisibility).
Tree
constructs a rooted topology tree whose depth equals the number of layers. Each layer corresponds to aLayer
, determining the rank assignment policy within its subtree. The distributed ranks are represented as leaves.Algorithm
Initialization
The algorithm traverses all ranks in depth-first order. For each visited rank, if all ancestor layers permit more active ranks (i.e., if the already-active ranks do not exceed any ancestor layer’s
Layer.max_ranks
), that rank is activated.Rank reassignment
When some ranks terminate or become unhealthy, the algorithm proceeds in several steps:
Propagate termination
Using a reverse depth-first search (children before parents), if the number of healthy ranks in a branch falls below
Layer.min_ranks
, that entire branch (and its subtree) is terminated.Replace ranks from a reserve domain
The algorithm attempts to replace terminated or unhealthy active ranks with inactive ranks from the nearest ancestor subtree that has the
LayerFlag.RESERVE
flag. This search for an inactive rank continues recursively upward until a branch without theLayerFlag.RESERVE
flag is reached.Backfill ranks
Within any ancestor subtree flagged as
LayerFlag.BACKFILL
, an active rank with the largest rank index swaps places with a terminated rank, effectively filling local gaps (similar toFillGaps
).Shift ranks
After local backfills, remaining gaps from unhandled terminations are closed by shifting healthy ranks left to fill any vacated indices. This global step reassigns all rank indices past the first unhealthy rank (similar to
ShiftRanks
).Optional filter
If a
world_size_filter
callable is provided, it can reduce the active ranks to a smallerworld_size
necessary for the workload.world_size_filter
is invoked with the current number of active ranks as an argument, and returns the adjusted number of requested active ranks, no greater than the input. Healthy ranks with indices greater than the value returned value are deactivated and become part of the reserve pool.Note
Tree
cannot be composed with any other instance ofRankAssignment
orRankFilter
.Example
inprocess.rank_assignment.Tree( [ inprocess.rank_assignment.Layer( min_ranks=128, max_ranks=256, key_or_fn='root', flag=inprocess.rank_assignment.LayerFlag.RESERVE, ), inprocess.rank_assignment.Layer( min_ranks=8, max_ranks=8, key_or_fn=lambda _: socket.gethostname(), flag=inprocess.rank_assignment.LayerFlag.RESERVE, ), ] )
In this two-level topology tree:
The first layer (hostname-based) allows up to 8 active ranks per host (
Layer.max_ranks=8
). If the number of healthy ranks in any host drops below 8 (Layer.min_ranks=8
), that entire host’s subtree is terminated. The algorithm can look for inactive reserve ranks within the same hostname because of theLayerFlag.RESERVE
flag.All hosts are grouped under the
'root'
layer, which permits up to 256 active ranks (Layer.max_ranks=256
). If the global healthy rank count drops below 128 (Layer.min_ranks=128
), all ranks are terminated. TheLayerFlag.RESERVE
flag at the'root'
level lets the algorithm traverse upward from one host to another host through the'root'
to search for reserve ranks.
- Parameters:
layers (list[Layer]) – a list of
Layer
instances, each layer specifies properties corresponding to one grouping level in a topology treeworld_size_filter (Callable[[int], int] | None) – an optional
Callable
that takes the final application-visible world size, and returns the new world size, no greater than the input
Composable Rank Assignments
- class nvidia_resiliency_ext.inprocess.rank_assignment.FillGaps[source]
A class for reassigning distributed ranks, filling in gaps caused by terminated or unhealthy ranks.
The
FillGaps
class is a specialized rank assignment strategy that reorders ranks to fill gaps created by terminated or unhealthy ranks. It preserves the previous rank assignment for the firstworld_size - len(terminated_ranks)
healthy ranks; the remaining healthy ranks are reassigned to fill in gaps left by unhealthy ranks.Example:
|<--- preserved --->|<- moved ->| |<--new world size->| +---+---+---+---+---+---+---+---+ +---+---+---+---+---+ | 0 | X | 2 | 3 | X | X | 6 | 7 | --> | 0 | 6 | 2 | 3 | 7 | +---+---+---+---+---+---+---+---+ +---+---+---+---+---+ ^ ^ | | | | | | --------------------- | | | -------------
- class nvidia_resiliency_ext.inprocess.rank_assignment.FilterCountGroupedByKey(key_or_fn, condition, timeout=datetime.timedelta(seconds=60))[source]
A class for filtering distributed ranks by grouping by a key.
FilterCountGroupedByKey
organizes ranks into groups based on a specified string key. For each group, it increments a group counter by 1 for every healthy rank. A given booleancondition
is then evaluated for each rank, with the corresponding group counter passed as input.If
condition(group_counter)
evaluates toTrue
, the rank is preserved.If it evaluates to
False
, the rank is considered unhealthy and marked for termination.
FilterCountGroupedByKey
needs to be followed by anotherRankAssignment
that performs the actual rank termination by raisingRankDiscarded
exception.condition = lambda count: count == 2 +---+---+---+---+---+---+---+---+ +---+---+---+---+---+---+---+---+ | 0 | X | 2 | 3 | X | X | 6 | 7 | --> | X | X | 2 | 3 | X | X | 6 | 7 | +---+---+---+---+---+---+---+---+ +---+---+---+---+---+---+---+---+ | key=0 | key=1 | key=2 | key=3 | | key=0 | key=1 | key=2 | key=3 | | | | | | | | | | | |count=1|count=2|count=0|count=2| | False | True | False | True |
Example:
# hostname is the group key, and condition checks if exactly 8 ranks # corresponding to a given hostname are in a healthy state, if the # count is different than 8, all ranks from corresponding hostname are # considered unhealthy, and terminated; remaining healthy ranks are # shifted to the left to fill all gaps created by unhealthy ranks. rank_assignment = ( inprocess.Compose( inprocess.rank_assignment.ShiftRanks(), inprocess.rank_assignment.FilterCountGroupedByKey( key_or_fn=lambda _: socket.gethostname(), condition=lambda count: count == 8, ), ), ),
- Parameters:
key_or_fn (str | Callable[[State], str]) – a string key, or a
Callable
evaluated withinprocess.state.State
as the input to produce a string keycondition (Callable[[int], bool]) – condition to be evaluated with group counter as the input, if
False
the rank is terminatedtimeout (timedelta) – timeout for distributed barrier
- class nvidia_resiliency_ext.inprocess.rank_assignment.ShiftRanks[source]
A class for reassigning distributed ranks, filling in gaps caused by terminated or unhealthy ranks.
The
ShiftRanks
class is a specialized rank assignment strategy that shifts all healthy ranks to the left to fill gaps created by terminated or unhealthy ranks.ShiftRanks
preserves the relative order of all healthy ranks, but all ranks past the first unhealthy rank are reassigned (shifted).Example:
<- ->|<------- moved ------->| |<--new world size->| ---- v | +---+---+---+---+---+---+---+---+ +---+---+---+---+---+ | 0 | X | 2 | 3 | X | X | 6 | 7 | --> | 0 | 2 | 3 | 6 | 7 | +---+---+---+---+---+---+---+---+ +---+---+---+---+---+ ^ | ^ ^ | | | | | | | | ---- ------------ | | | ------------
Rank Filtering
Base class
Rank Filters
- class nvidia_resiliency_ext.inprocess.rank_assignment.ActivateAllRanks[source]
Activates all distributed ranks.
All healthy distributed ranks will call the provided wrapped function in the next iteration of
inprocess.Wrapper
.ActivateAllRanks
unconditionally activates all ranks, and cannot be combined with any otherRankAssignment
performing rank activation.
- class nvidia_resiliency_ext.inprocess.rank_assignment.ActiveWorldSizeDivisibleBy(divisor=1)[source]
ActiveWorldSizeDivisibleBy
ensures that the active world size is divisible by a given number. Ranks within the adjusted world size are marked as active and are calling the wrapped function, while ranks outside this range are marked as inactive.- Parameters:
divisor (int) – the divisor to adjust the active world size by
- class nvidia_resiliency_ext.inprocess.rank_assignment.MaxActiveWorldSize(max_active_world_size=None)[source]
MaxActiveWorldSize
ensures that the active world size is no greater than the specifiedmax_active_world_size
. Ranks with indices less than the active world size are active and calling the wrapped function, while ranks outside this range are inactive.