Skip to content

Loss

BERTMLMLossWithReduction

Bases: _Nemo2CompatibleLossReduceMixin, MegatronLossReduction

Source code in bionemo/llm/model/loss.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class BERTMLMLossWithReduction(_Nemo2CompatibleLossReduceMixin, MegatronLossReduction):  # noqa: D101
    def __init__(
        self,
        validation_step: bool = False,
        val_drop_last: bool = True,
    ) -> None:
        """Initializes the Model class.

        Args:
            validation_step (bool, optional): Whether this object is being applied to the validation step. Defaults to False.
            val_drop_last (bool, optional): Whether the last batch is configured to be dropped during validation. Defaults to True.
        """
        # TODO(@jomitchell): Track down how we handle test. This is a common pattern in NeMo2, but these parameters seem likely
        #  to change in the future.
        super().__init__()
        self.validation_step = validation_step
        self.val_drop_last = val_drop_last

    def forward(
        self, batch: Dict[str, Tensor], forward_out: Dict[str, Tensor]
    ) -> Tuple[Tensor, PerTokenLossDict | SameSizeLossDict | DataParallelGroupLossAndIO]:
        """Computes loss of `labels` in the batch vs `token_logits` in the forward output currently. In the future this will be extended
            to handle other loss types like sequence loss if it is present in the forward_out and batch.

        Args:
            batch (Dict[str, Tensor]): The batch of data. Each tensor should be of shape [batch_size, *, *],
                and match the corresponding dimension for that particular key in the batch output.
                For example, the "labels" and "token_logits" key should have a tensor of shape [batch_size, sequence_length].
            forward_out (Dict[str, Tensor]): The forward output from the model. Each tensor should be of shape [batch_size, *, *]

        Taken from:
        https://github.com/NVIDIA/NeMo/blob/main/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py#L951-L976 .
        """  # noqa: D205
        if "labels" not in batch:
            raise ValueError("Labels not provided in the batch. These are required for this loss computation.")

        # NOTE: token_logits is [sequence, batch] but labels and other fiels, including the loss are [batch, sequence]
        unreduced_token_loss = unreduced_token_loss_fn(forward_out["token_logits"], batch["labels"])  # [b s]

        # TODO(@jstjohn) also handle different output keys, like the sequence loss.

        # Compute loss over "valid" tokens in the microbatch, i.e. the non-masked tokens.
        # The loss is not normalized, so you need to divide by the number of non-masked
        # tokens (loss_mask.sum()) to compute the mean loss per token.
        loss_for_microbatch, num_valid_tokens_in_microbatch = masked_token_loss(
            unreduced_token_loss, batch["loss_mask"]
        )

        # Get the context parallel size for some normalizations and reductions.
        cp_size = parallel_state.get_context_parallel_world_size()

        # If we do not drop the last partial batch of validation, we need to do fancy reduction handling to support
        #  reducing the loss across the data parallel group.
        if self.validation_step and not self.val_drop_last:
            if loss_for_microbatch.isnan():
                # TODO(@jomitchell): Add a unit test for this. This is the case where there are no valid tokens in the microbatch for the loss
                #  to be computed over, so we expect a NaN loss (divide by zero for a mean) but we make this an expected and non-breaking case,
                #  re-defining it as a 0 loss. This is standard in NeMo/NeMo2.
                if batch["loss_mask"].count_nonzero() != 0:
                    raise ValueError("Got NaN loss with non-empty input")
                loss_sum_for_microbatch = torch.zeros_like(num_valid_tokens_in_microbatch)
            else:
                # The loss is already the sum of all losses from masked_token_loss().
                loss_sum_for_microbatch = loss_for_microbatch

            # In this case we need to store the loss sum as well as the number of valid tokens in the microbatch.
            loss_sum_and_microbatch_size_all_gpu = torch.cat(
                [
                    loss_sum_for_microbatch.clone().detach().view(1),
                    Tensor([num_valid_tokens_in_microbatch]).cuda().clone().detach(),
                ]
            )

            # Reduce the loss sum across the data parallel group to get the total loss
            # for all data parallel / distributed microbatches.
            torch.distributed.all_reduce(
                loss_sum_and_microbatch_size_all_gpu,
                group=parallel_state.get_data_parallel_group(with_context_parallel=True),
                op=torch.distributed.ReduceOp.SUM,
            )

            # Return the loss tensor multiplied by the context parallel size,
            # and the data & context parallel reduced loss sum.
            return loss_for_microbatch * cp_size, {
                "loss_sum_and_microbatch_size": loss_sum_and_microbatch_size_all_gpu
            }

        # Return the loss tensor multiplied by the context parallel size, as well as
        # the data-parallel averaged loss, i.e. the loss divided by the DP size.
        # Normalize the loss by the number of "valid" tokens, because masked_token_loss
        # no longer does this normalization, and BioNeMo losses expect this normalization.
        reduced_loss = (
            average_losses_across_data_parallel_group([loss_for_microbatch], with_context_parallel=True)
            / num_valid_tokens_in_microbatch
        )
        return loss_for_microbatch * cp_size, {"avg": reduced_loss}

__init__(validation_step=False, val_drop_last=True)

Initializes the Model class.

Parameters:

Name Type Description Default
validation_step bool

Whether this object is being applied to the validation step. Defaults to False.

False
val_drop_last bool

Whether the last batch is configured to be dropped during validation. Defaults to True.

True
Source code in bionemo/llm/model/loss.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def __init__(
    self,
    validation_step: bool = False,
    val_drop_last: bool = True,
) -> None:
    """Initializes the Model class.

    Args:
        validation_step (bool, optional): Whether this object is being applied to the validation step. Defaults to False.
        val_drop_last (bool, optional): Whether the last batch is configured to be dropped during validation. Defaults to True.
    """
    # TODO(@jomitchell): Track down how we handle test. This is a common pattern in NeMo2, but these parameters seem likely
    #  to change in the future.
    super().__init__()
    self.validation_step = validation_step
    self.val_drop_last = val_drop_last

forward(batch, forward_out)

Computes loss of labels in the batch vs token_logits in the forward output currently. In the future this will be extended to handle other loss types like sequence loss if it is present in the forward_out and batch.

Parameters:

Name Type Description Default
batch Dict[str, Tensor]

The batch of data. Each tensor should be of shape [batch_size, , ], and match the corresponding dimension for that particular key in the batch output. For example, the "labels" and "token_logits" key should have a tensor of shape [batch_size, sequence_length].

required
forward_out Dict[str, Tensor]

The forward output from the model. Each tensor should be of shape [batch_size, , ]

required

Taken from: https://github.com/NVIDIA/NeMo/blob/main/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py#L951-L976 .

Source code in bionemo/llm/model/loss.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def forward(
    self, batch: Dict[str, Tensor], forward_out: Dict[str, Tensor]
) -> Tuple[Tensor, PerTokenLossDict | SameSizeLossDict | DataParallelGroupLossAndIO]:
    """Computes loss of `labels` in the batch vs `token_logits` in the forward output currently. In the future this will be extended
        to handle other loss types like sequence loss if it is present in the forward_out and batch.

    Args:
        batch (Dict[str, Tensor]): The batch of data. Each tensor should be of shape [batch_size, *, *],
            and match the corresponding dimension for that particular key in the batch output.
            For example, the "labels" and "token_logits" key should have a tensor of shape [batch_size, sequence_length].
        forward_out (Dict[str, Tensor]): The forward output from the model. Each tensor should be of shape [batch_size, *, *]

    Taken from:
    https://github.com/NVIDIA/NeMo/blob/main/nemo/collections/nlp/models/language_modeling/megatron_gpt_model.py#L951-L976 .
    """  # noqa: D205
    if "labels" not in batch:
        raise ValueError("Labels not provided in the batch. These are required for this loss computation.")

    # NOTE: token_logits is [sequence, batch] but labels and other fiels, including the loss are [batch, sequence]
    unreduced_token_loss = unreduced_token_loss_fn(forward_out["token_logits"], batch["labels"])  # [b s]

    # TODO(@jstjohn) also handle different output keys, like the sequence loss.

    # Compute loss over "valid" tokens in the microbatch, i.e. the non-masked tokens.
    # The loss is not normalized, so you need to divide by the number of non-masked
    # tokens (loss_mask.sum()) to compute the mean loss per token.
    loss_for_microbatch, num_valid_tokens_in_microbatch = masked_token_loss(
        unreduced_token_loss, batch["loss_mask"]
    )

    # Get the context parallel size for some normalizations and reductions.
    cp_size = parallel_state.get_context_parallel_world_size()

    # If we do not drop the last partial batch of validation, we need to do fancy reduction handling to support
    #  reducing the loss across the data parallel group.
    if self.validation_step and not self.val_drop_last:
        if loss_for_microbatch.isnan():
            # TODO(@jomitchell): Add a unit test for this. This is the case where there are no valid tokens in the microbatch for the loss
            #  to be computed over, so we expect a NaN loss (divide by zero for a mean) but we make this an expected and non-breaking case,
            #  re-defining it as a 0 loss. This is standard in NeMo/NeMo2.
            if batch["loss_mask"].count_nonzero() != 0:
                raise ValueError("Got NaN loss with non-empty input")
            loss_sum_for_microbatch = torch.zeros_like(num_valid_tokens_in_microbatch)
        else:
            # The loss is already the sum of all losses from masked_token_loss().
            loss_sum_for_microbatch = loss_for_microbatch

        # In this case we need to store the loss sum as well as the number of valid tokens in the microbatch.
        loss_sum_and_microbatch_size_all_gpu = torch.cat(
            [
                loss_sum_for_microbatch.clone().detach().view(1),
                Tensor([num_valid_tokens_in_microbatch]).cuda().clone().detach(),
            ]
        )

        # Reduce the loss sum across the data parallel group to get the total loss
        # for all data parallel / distributed microbatches.
        torch.distributed.all_reduce(
            loss_sum_and_microbatch_size_all_gpu,
            group=parallel_state.get_data_parallel_group(with_context_parallel=True),
            op=torch.distributed.ReduceOp.SUM,
        )

        # Return the loss tensor multiplied by the context parallel size,
        # and the data & context parallel reduced loss sum.
        return loss_for_microbatch * cp_size, {
            "loss_sum_and_microbatch_size": loss_sum_and_microbatch_size_all_gpu
        }

    # Return the loss tensor multiplied by the context parallel size, as well as
    # the data-parallel averaged loss, i.e. the loss divided by the DP size.
    # Normalize the loss by the number of "valid" tokens, because masked_token_loss
    # no longer does this normalization, and BioNeMo losses expect this normalization.
    reduced_loss = (
        average_losses_across_data_parallel_group([loss_for_microbatch], with_context_parallel=True)
        / num_valid_tokens_in_microbatch
    )
    return loss_for_microbatch * cp_size, {"avg": reduced_loss}

DataParallelGroupLossAndIO

Bases: TypedDict

Average losses across the data parallel group + the original batch and inference output.

Source code in bionemo/llm/model/loss.py
57
58
59
60
61
62
class DataParallelGroupLossAndIO(TypedDict):
    """Average losses across the data parallel group + the original batch and inference output."""

    avg: Tensor
    batch: dict[str, Tensor]
    forward_out: dict[str, Tensor]

PerTokenLossDict

Bases: TypedDict

Tensor dictionary for loss.

This is the return type for a loss that is computed per token in the batch, supporting microbatches of varying sizes.

Source code in bionemo/llm/model/loss.py
39
40
41
42
43
44
45
class PerTokenLossDict(TypedDict):
    """Tensor dictionary for loss.

    This is the return type for a loss that is computed per token in the batch, supporting microbatches of varying sizes.
    """

    loss_sum_and_microbatch_size: Tensor

SameSizeLossDict

Bases: TypedDict

Tensor dictionary for loss.

This is the return type for a loss that is computed for the entire batch, where all microbatches are the same size.

Source code in bionemo/llm/model/loss.py
48
49
50
51
52
53
54
class SameSizeLossDict(TypedDict):
    """Tensor dictionary for loss.

    This is the return type for a loss that is computed for the entire batch, where all microbatches are the same size.
    """

    avg: Tensor

unreduced_token_loss_fn(logits, labels, cross_entropy_loss_fusion=False)

Computes the unreduced token loss given the logits and labels without regard to the loss mask.

WARNING: This function does not apply a loss mask. Also, it does inplace operation on the inputs.

Parameters:

Name Type Description Default
logits Tensor

The predicted logits of shape [sequence_length, batch_size, num_classes].

required
labels Tensor

The true labels of shape [batch_size, sequence_length].

required
cross_entropy_loss_fusion bool

If True, use the fused kernel version of vocab parallel cross entropy. This should generally be preferred for speed as it packs more operations into a single kernel on the GPU. However some users have observed reduced training stability when using this method.

False

Returns:

Name Type Description
Tensor Tensor

The unreduced token loss of shape [batch_size, sequence_length].

Source code in bionemo/llm/model/loss.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def unreduced_token_loss_fn(logits: Tensor, labels: Tensor, cross_entropy_loss_fusion: bool = False) -> Tensor:
    """Computes the unreduced token loss given the logits and labels without regard to the loss mask.

    WARNING: This function does not apply a loss mask. Also, it does inplace operation on the inputs.

    Args:
        logits (Tensor): The predicted logits of shape [sequence_length, batch_size, num_classes].
        labels (Tensor): The true labels of shape [batch_size, sequence_length].
        cross_entropy_loss_fusion (bool): If True, use the fused kernel version of vocab parallel cross entropy. This
            should generally be preferred for speed as it packs more operations into a single kernel on the GPU. However
            some users have observed reduced training stability when using this method.

    Returns:
        Tensor: The unreduced token loss of shape [batch_size, sequence_length].
    """
    labels = labels.transpose(0, 1).contiguous()  # [b, s] -> [s, b]
    if cross_entropy_loss_fusion:
        loss = fused_vocab_parallel_cross_entropy(logits, labels)
    else:
        loss = tensor_parallel.vocab_parallel_cross_entropy(logits, labels)
    # [s b] => [b, s]
    loss = loss.transpose(0, 1).contiguous()
    return loss