Rank Assignment

Rank Assignment

Base class

Tree

class nvidia_resiliency_ext.inprocess.rank_assignment.Layer(min_ranks=1, max_ranks=None, key_or_fn='', flag=None)[source]

Represents a configuration for a layer of branches at a certain depth in a topology tree constructed by Tree.

Parameters:
  • min_ranks (int) – the minimum number of healthy ranks in a subtree

  • max_ranks (int | None) – the maximum number of ranks to activate in a subtree, no limit if None

  • key_or_fn (str | Callable[[State], str]) – a string key, or a Callable evaluated with inprocess.State as input to produce a grouping string key

  • flag (LayerFlag | None) – an optional flag that modifies rank assignment policy in a given branch

class nvidia_resiliency_ext.inprocess.rank_assignment.LayerFlag(value)[source]

A flag to modify rank assignment or rank filtering policy in a given Layer of a Tree rank assignment.

RESERVE

indicates that branches at this layer of the topology tree may be traversed while searching for a replacement inactive rank

BACKFILL

indicates that branches at this layer of the topology tree may be traversed while searching for a replacement active rank

class nvidia_resiliency_ext.inprocess.rank_assignment.Tree(layers, world_size_filter=None)[source]

Implements an integrated rank assignment and activation algorithm that builds a multi-layer topology tree for distributed ranks. Each layer in this tree specifies constraints and policies for assigning and activating ranks. Grouping keys in each layer can align with hardware properties (e.g., to confine ranks within a compute node) or application-driven requirements (e.g., ensuring a particular divisibility).

Tree constructs a rooted topology tree whose depth equals the number of layers. Each layer corresponds to a Layer, determining the rank assignment policy within its subtree. The distributed ranks are represented as leaves.

Algorithm

Initialization

The algorithm traverses all ranks in depth-first order. For each visited rank, if all ancestor layers permit more active ranks (i.e., if the already-active ranks do not exceed any ancestor layer’s Layer.max_ranks), that rank is activated.

Rank reassignment

When some ranks terminate or become unhealthy, the algorithm proceeds in several steps:

  1. Propagate termination

Using a reverse depth-first search (children before parents), if the number of healthy ranks in a branch falls below Layer.min_ranks, that entire branch (and its subtree) is terminated.

  1. Replace ranks from a reserve domain

The algorithm attempts to replace terminated or unhealthy active ranks with inactive ranks from the nearest ancestor subtree that has the LayerFlag.RESERVE flag. This search for an inactive rank continues recursively upward until a branch without the LayerFlag.RESERVE flag is reached.

  1. Backfill ranks

Within any ancestor subtree flagged as LayerFlag.BACKFILL, an active rank with the largest rank index swaps places with a terminated rank, effectively filling local gaps (similar to FillGaps).

  1. Shift ranks

After local backfills, remaining gaps from unhandled terminations are closed by shifting healthy ranks left to fill any vacated indices. This global step reassigns all rank indices past the first unhealthy rank (similar to ShiftRanks).

  1. Optional filter

If a world_size_filter callable is provided, it can reduce the active ranks to a smaller world_size necessary for the workload. world_size_filter is invoked with the current number of active ranks as an argument, and returns the adjusted number of requested active ranks, no greater than the input. Healthy ranks with indices greater than the value returned value are deactivated and become part of the reserve pool.

Note

Tree cannot be composed with any other instance of RankAssignment or RankFilter.

Example

inprocess.rank_assignment.Tree(
    [
        inprocess.rank_assignment.Layer(
            min_ranks=128,
            max_ranks=256,
            key_or_fn='root',
            flag=inprocess.rank_assignment.LayerFlag.RESERVE,
        ),
        inprocess.rank_assignment.Layer(
            min_ranks=8,
            max_ranks=8,
            key_or_fn=lambda _: socket.gethostname(),
            flag=inprocess.rank_assignment.LayerFlag.RESERVE,
        ),
    ]
)

In this two-level topology tree:

  • The first layer (hostname-based) allows up to 8 active ranks per host (Layer.max_ranks=8). If the number of healthy ranks in any host drops below 8 (Layer.min_ranks=8), that entire host’s subtree is terminated. The algorithm can look for inactive reserve ranks within the same hostname because of the LayerFlag.RESERVE flag.

  • All hosts are grouped under the 'root' layer, which permits up to 256 active ranks (Layer.max_ranks=256). If the global healthy rank count drops below 128 (Layer.min_ranks=128), all ranks are terminated. The LayerFlag.RESERVE flag at the 'root' level lets the algorithm traverse upward from one host to another host through the 'root' to search for reserve ranks.

Parameters:
  • layers (list[Layer]) – a list of Layer instances, each layer specifies properties corresponding to one grouping level in a topology tree

  • world_size_filter (Callable[[int], int] | None) – an optional Callable that takes the final application-visible world size, and returns the new world size, no greater than the input

Composable Rank Assignments

class nvidia_resiliency_ext.inprocess.rank_assignment.FillGaps[source]

A class for reassigning distributed ranks, filling in gaps caused by terminated or unhealthy ranks.

The FillGaps class is a specialized rank assignment strategy that reorders ranks to fill gaps created by terminated or unhealthy ranks. It preserves the previous rank assignment for the first world_size - len(terminated_ranks) healthy ranks; the remaining healthy ranks are reassigned to fill in gaps left by unhealthy ranks.

Example:

|<--- preserved --->|<- moved ->|     |<--new world size->|

+---+---+---+---+---+---+---+---+     +---+---+---+---+---+
| 0 | X | 2 | 3 | X | X | 6 | 7 | --> | 0 | 6 | 2 | 3 | 7 |
+---+---+---+---+---+---+---+---+     +---+---+---+---+---+
      ^           ^        |  |
      |           |        |  |
      ---------------------   |
                  |           |
                  -------------
class nvidia_resiliency_ext.inprocess.rank_assignment.FilterCountGroupedByKey(key_or_fn, condition, timeout=datetime.timedelta(seconds=60))[source]

A class for filtering distributed ranks by grouping by a key.

FilterCountGroupedByKey organizes ranks into groups based on a specified string key. For each group, it increments a group counter by 1 for every healthy rank. A given boolean condition is then evaluated for each rank, with the corresponding group counter passed as input.

  • If condition(group_counter) evaluates to True, the rank is preserved.

  • If it evaluates to False, the rank is considered unhealthy and marked for termination.

FilterCountGroupedByKey needs to be followed by another RankAssignment that performs the actual rank termination by raising RankDiscarded exception.

condition = lambda count: count == 2

+---+---+---+---+---+---+---+---+     +---+---+---+---+---+---+---+---+
| 0 | X | 2 | 3 | X | X | 6 | 7 | --> | X | X | 2 | 3 | X | X | 6 | 7 |
+---+---+---+---+---+---+---+---+     +---+---+---+---+---+---+---+---+
| key=0 | key=1 | key=2 | key=3 |     | key=0 | key=1 | key=2 | key=3 |
|       |       |       |       |     |       |       |       |       |
|count=1|count=2|count=0|count=2|     | False | True  | False | True  |

Example:

# hostname is the group key, and condition checks if exactly 8 ranks
# corresponding to a given hostname are in a healthy state, if the
# count is different than 8, all ranks from corresponding hostname are
# considered unhealthy, and terminated; remaining healthy ranks are
# shifted to the left to fill all gaps created by unhealthy ranks.

rank_assignment = (
    inprocess.Compose(
        inprocess.rank_assignment.ShiftRanks(),
        inprocess.rank_assignment.FilterCountGroupedByKey(
            key_or_fn=lambda _: socket.gethostname(),
            condition=lambda count: count == 8,
        ),
    ),
),
Parameters:
  • key_or_fn (str | Callable[[State], str]) – a string key, or a Callable evaluated with inprocess.state.State as the input to produce a string key

  • condition (Callable[[int], bool]) – condition to be evaluated with group counter as the input, if False the rank is terminated

  • timeout (timedelta) – timeout for distributed barrier

class nvidia_resiliency_ext.inprocess.rank_assignment.ShiftRanks[source]

A class for reassigning distributed ranks, filling in gaps caused by terminated or unhealthy ranks.

The ShiftRanks class is a specialized rank assignment strategy that shifts all healthy ranks to the left to fill gaps created by terminated or unhealthy ranks. ShiftRanks preserves the relative order of all healthy ranks, but all ranks past the first unhealthy rank are reassigned (shifted).

Example:

 <-   ->|<------- moved ------->|     |<--new world size->|

          ----
          v   |
+---+---+---+---+---+---+---+---+     +---+---+---+---+---+
| 0 | X | 2 | 3 | X | X | 6 | 7 | --> | 0 | 2 | 3 | 6 | 7 |
+---+---+---+---+---+---+---+---+     +---+---+---+---+---+
      ^   |   ^   ^       |   |
      |   |   |   |       |   |
      ----     ------------   |
                  |           |
                  ------------

Rank Filtering

Base class

Rank Filters

class nvidia_resiliency_ext.inprocess.rank_assignment.ActivateAllRanks[source]

Activates all distributed ranks.

All healthy distributed ranks will call the provided wrapped function in the next iteration of inprocess.Wrapper.

ActivateAllRanks unconditionally activates all ranks, and cannot be combined with any other RankAssignment performing rank activation.

class nvidia_resiliency_ext.inprocess.rank_assignment.ActiveWorldSizeDivisibleBy(divisor=1)[source]

ActiveWorldSizeDivisibleBy ensures that the active world size is divisible by a given number. Ranks within the adjusted world size are marked as active and are calling the wrapped function, while ranks outside this range are marked as inactive.

Parameters:

divisor (int) – the divisor to adjust the active world size by

class nvidia_resiliency_ext.inprocess.rank_assignment.MaxActiveWorldSize(max_active_world_size=None)[source]

MaxActiveWorldSize ensures that the active world size is no greater than the specified max_active_world_size. Ranks with indices less than the active world size are active and calling the wrapped function, while ranks outside this range are inactive.

Parameters:

max_active_world_size (int | None) – maximum active world size, no limit if None