Source code for tensorrt_llm.layers.normalization

# SPDX-FileCopyrightText: Copyright (c) 2022-2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional

from ..functional import (ACT2FN, Tensor, chunk, group_norm, layer_norm,
                          rms_norm, unsqueeze)
from ..mapping import Mapping
from ..module import Module
from ..parameter import Parameter
from .embedding import CombinedTimestepLabelEmbeddings, Embedding
from .linear import Linear


[docs] class LayerNorm(Module): def __init__(self, normalized_shape, eps=1e-05, elementwise_affine=True, bias=True, dtype=None, tp_size=1, tp_dim=-1): super().__init__() if isinstance(normalized_shape, int): normalized_shape = (normalized_shape, ) self.normalized_shape = tuple(normalized_shape) self.elementwise_affine = elementwise_affine if self.elementwise_affine: self.weight = Parameter(shape=self.normalized_shape, dtype=dtype) if bias: self.bias = Parameter(shape=self.normalized_shape, dtype=dtype) else: self.register_parameter('bias', None) else: self.register_parameter('weight', None) self.register_parameter('bias', None) self.eps = eps self.dtype = dtype self.tp_size = tp_size self.tp_dim = tp_dim
[docs] def forward(self, x, normalized_shape=None): weight = 1. if self.weight is None else self.weight.value bias = 0. if self.bias is None else self.bias.value if normalized_shape is None: normalized_shape = self.normalized_shape return layer_norm(x, normalized_shape, weight, bias, self.eps)
[docs] class RmsNorm(Module): def __init__(self, normalized_shape, num_groups=1, eps=1e-06, elementwise_affine=True, dtype=None): super().__init__() if isinstance(normalized_shape, int): normalized_shape = (normalized_shape, ) self.normalized_shape = tuple(normalized_shape) self.elementwise_affine = elementwise_affine self.num_groups = num_groups num_channels = normalized_shape[-1] if num_channels % num_groups != 0: raise ValueError('num_channels must be divisible by num_groups') if self.elementwise_affine: self.weight = Parameter(shape=self.normalized_shape, dtype=dtype) else: self.register_parameter('weight', None) self.eps = eps self.dtype = dtype
[docs] def forward(self, x, normalized_shape=None): weight = None if self.weight is None else self.weight.value if normalized_shape is None: normalized_shape = self.normalized_shape return rms_norm(x, normalized_shape, self.num_groups, weight, self.eps)
[docs] class GroupNorm(Module): def __init__(self, num_groups, num_channels, eps=1e-05, affine=True, dtype=None): super().__init__() if num_channels % num_groups != 0: raise ValueError('num_channels must be divisible by num_groups') self.num_groups = num_groups self.num_channels = num_channels self.affine = affine if self.affine: self.weight = Parameter(shape=(self.num_channels, ), dtype=dtype) self.bias = Parameter(shape=(self.num_channels, ), dtype=dtype) else: self.register_parameter('weight', None) self.register_parameter('bias', None) self.eps = eps
[docs] def forward(self, x): weight = None if self.weight is None else self.weight.value bias = None if self.bias is None else self.bias.value return group_norm(x, self.num_groups, weight, bias, self.eps)
[docs] class AdaLayerNorm(Module): def __init__(self, embedding_dim: int, num_embeddings: Optional[int] = None, output_dim: Optional[int] = None, norm_elementwise_affine: bool = False, norm_eps: float = 1e-5, chunk_dim: int = 0, mapping=Mapping(), dtype=None): super().__init__() self.chunk_dim = chunk_dim output_dim = output_dim or embedding_dim * 2 if num_embeddings is not None: self.emb = Embedding(num_embeddings, embedding_dim, dtype=dtype) else: self.emb = None self.silu = ACT2FN['silu'] self.linear = Linear(embedding_dim, output_dim, tp_group=mapping.tp_group, tp_size=mapping.tp_size, dtype=dtype) self.norm = LayerNorm(output_dim // 2, eps=norm_eps, elementwise_affine=norm_elementwise_affine, dtype=dtype)
[docs] def forward(self, x: Tensor, timestep: Optional[Tensor] = None, temb: Optional[Tensor] = None): assert timestep is not None or temb is not None if self.emb is not None and timestep is not None: temb = self.emb(timestep) temb = self.linear(self.silu(temb)) if self.chunk_dim == 1: shift, scale = chunk(temb, 2, dim=1) shift = unsqueeze(shift, 1) scale = unsqueeze(scale, 1) else: scale, shift = chunk(temb, 2, dim=0) x = self.norm(x) * (1 + scale) + shift return x
[docs] class AdaLayerNormZero(Module): def __init__(self, embedding_dim: int, num_embeddings: Optional[int] = None, norm_type: str = "layer_norm", bias: bool = True, mapping=Mapping(), dtype=None): super().__init__() if num_embeddings is not None: self.emb = CombinedTimestepLabelEmbeddings(num_embeddings, embedding_dim, dtype=dtype) else: self.emb = None self.silu = ACT2FN['silu'] self.linear = Linear(embedding_dim, 6 * embedding_dim, bias=bias, tp_group=mapping.tp_group, tp_size=mapping.tp_size, dtype=dtype) if norm_type == "layer_norm": self.norm = LayerNorm(embedding_dim, elementwise_affine=False, eps=1e-6, dtype=dtype) elif norm_type == "fp32_layer_norm": self.norm = LayerNorm(embedding_dim, elementwise_affine=False, bias=False, dtype=dtype) else: raise ValueError( f"Unsupported `norm_type` ({norm_type}) provided. Supported ones are: 'layer_norm', 'fp32_layer_norm'." )
[docs] def forward(self, x: Tensor, timestep: Optional[Tensor] = None, class_labels: Optional[Tensor] = None, hidden_dtype: str = None, emb: Optional[Tensor] = None): assert emb is not None or self.emb is not None if self.emb is not None: emb = self.emb(timestep, class_labels, hidden_dtype=hidden_dtype) emb = self.linear(self.silu(emb)) shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = chunk( emb, 6, dim=1) x = self.norm(x) * (1 + unsqueeze(scale_msa, 1)) + unsqueeze( shift_msa, 1) return x, gate_msa, shift_mlp, scale_mlp, gate_mlp
[docs] class AdaLayerNormZeroSingle(Module): def __init__(self, embedding_dim: int, norm_type: str = "layer_norm", bias: bool = True, mapping=Mapping(), dtype=None): super().__init__() self.silu = ACT2FN['silu'] self.linear = Linear(embedding_dim, 3 * embedding_dim, bias=bias, tp_group=mapping.tp_group, tp_size=mapping.tp_size, dtype=dtype) if norm_type == "layer_norm": self.norm = LayerNorm(embedding_dim, elementwise_affine=False, eps=1e-6) else: raise ValueError( f"Unsupported `norm_type` ({norm_type}) provided. Supported ones are: 'layer_norm', 'fp32_layer_norm'." )
[docs] def forward(self, x: Tensor, emb: Optional[Tensor] = None): emb = self.linear(self.silu(emb)) shift_msa, scale_msa, gate_msa = chunk(emb, 3, dim=1) x = self.norm(x) * (1 + unsqueeze(scale_msa, 1)) + unsqueeze( shift_msa, 1) return x, gate_msa
[docs] class AdaLayerNormContinuous(Module): def __init__(self, embedding_dim: int, conditioning_embedding_dim: int, elementwise_affine: bool = True, eps: float = 1e-5, bias: bool = True, norm_type: str = "layer_norm", mapping=Mapping(), dtype=None): super().__init__() self.silu = ACT2FN['silu'] self.linear = Linear(conditioning_embedding_dim, embedding_dim * 2, bias=bias, tp_group=mapping.tp_group, tp_size=mapping.tp_size, dtype=dtype) if norm_type == "layer_norm": self.norm = LayerNorm(embedding_dim, eps=eps, elementwise_affine=elementwise_affine, bias=bias, dtype=dtype) elif norm_type == "rms_norm": self.norm = RmsNorm(embedding_dim, eps=eps, elementwise_affine=elementwise_affine, dtype=dtype) else: raise ValueError(f"unknown norm_type {norm_type}")
[docs] def forward(self, x: Tensor, conditioning_embedding: Tensor): # convert back to the original dtype in case `conditioning_embedding`` is upcasted to float32 (needed for hunyuanDiT) emb = self.linear(self.silu(conditioning_embedding).cast(x.dtype)) scale, shift = chunk(emb, 2, dim=1) x = self.norm(x) * unsqueeze((1 + scale), 1) + unsqueeze(shift, 1) return x
[docs] class SD35AdaLayerNormZeroX(Module): def __init__(self, embedding_dim: int, norm_type: str = "layer_norm", bias: bool = True, mapping=Mapping(), dtype=None): super().__init__() self.silu = ACT2FN['silu'] self.linear = Linear(embedding_dim, 9 * embedding_dim, bias=bias, tp_group=mapping.tp_group, tp_size=mapping.tp_size, dtype=dtype) if norm_type == "layer_norm": self.norm = LayerNorm(embedding_dim, elementwise_affine=False, eps=1e-6, dtype=dtype) else: raise ValueError( f"Unsupported `norm_type` ({norm_type}) provided. Supported ones are: 'layer_norm'." )
[docs] def forward(self, hidden_states: Tensor, emb: Tensor): emb = self.linear(self.silu(emb).cast(hidden_states.dtype)) shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp, shift_msa2, scale_msa2, gate_msa2 = chunk( emb, 9, dim=1) norm_hidden_states = self.norm(hidden_states) hidden_states = norm_hidden_states * ( 1 + unsqueeze(scale_msa, 1)) + unsqueeze(shift_msa, 1) norm_hidden_states2 = norm_hidden_states * ( 1 + unsqueeze(scale_msa2, 1)) + unsqueeze(shift_msa2, 1) return hidden_states, gate_msa, shift_mlp, scale_mlp, gate_mlp, norm_hidden_states2, gate_msa2