Source code for nvtripy.frontend.trace.ops.where

#
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import numbers
from dataclasses import dataclass

import nvtripy.frontend.trace.ops.utils as op_utils
from nvtripy import export
from nvtripy.frontend.trace.ops.base import BaseTraceOp
from nvtripy.utils import wrappers


@dataclass(repr=False)
class Where(BaseTraceOp):
    infer_rank = op_utils.InferRankPolicies.max_of_inputs()

    def infer_dtypes(self):
        assert len(self.inputs) == 3, "Select operation should have exactly 3 inputs!"
        self.outputs[0].dtype = self.inputs[1].dtype

    def to_flat_ir(self, inputs, outputs):
        from nvtripy.flat_ir.ops import SelectOp
        from nvtripy.flat_ir.tensor import FlatIRTensor

        # Unconditionally insert broadcast for all operands
        assert len(inputs) == 3, f"Where op expects 3 inputs but got {len(inputs)}."
        cond_rank, a_rank, b_rank = (input.rank for input in inputs)

        output_rank = max(a_rank, b_rank, cond_rank)
        with FlatIRTensor.context(["make rank of cond, a and b the same."]):
            broadcasted_input_0 = op_utils.expand_rank_of_tensor(inputs[0], output_rank - cond_rank)
            broadcasted_input_1 = op_utils.expand_rank_of_tensor(inputs[1], output_rank - a_rank)
            broadcasted_input_2 = op_utils.expand_rank_of_tensor(inputs[2], output_rank - b_rank)

        with FlatIRTensor.context(["compute element-wise max of input shapes to get the desired output shape."]):
            bcast_cond_and_input = op_utils.compute_shape_of_broadcast(
                op_utils.get_shape_of_tensor(broadcasted_input_0),
                op_utils.get_shape_of_tensor(broadcasted_input_1),
                output_rank,
                shape1_name="the 'condition' tensor",
                shape2_name="the 'input' tensor",
            )
            bcast_input_and_other = op_utils.compute_shape_of_broadcast(
                op_utils.get_shape_of_tensor(broadcasted_input_1),
                op_utils.get_shape_of_tensor(broadcasted_input_2),
                output_rank,
                shape1_name="the 'input' tensor",
                shape2_name="the 'other' tensor",
            )
            computed_output_shape = op_utils.compute_shape_of_broadcast(
                bcast_cond_and_input,
                bcast_input_and_other,
                output_rank,
                shape1_name="the previously computed broadcast of the 'condition' and 'input' tensor",
                shape2_name="the previously computed broadcast of the 'input' and 'other' tensors",
            )

            broadcasted_input_0 = op_utils.insert_broadcast(
                broadcasted_input_0,
                outputs[0].rank,
                shape_of_target_tensor=computed_output_shape,
                tensor_details=f"first input of 'where' ('condition')",
            )
            broadcasted_input_1 = op_utils.insert_broadcast(
                broadcasted_input_1,
                outputs[0].rank,
                shape_of_target_tensor=computed_output_shape,
                tensor_details="second input of 'where' ('input')",
            )
            broadcasted_input_2 = op_utils.insert_broadcast(
                broadcasted_input_2,
                outputs[0].rank,
                shape_of_target_tensor=computed_output_shape,
                tensor_details="third input of 'where' ('other')",
            )

        SelectOp.build([broadcasted_input_0, broadcasted_input_1, broadcasted_input_2], outputs)


[docs] @export.public_api(document_under="operations/functions") @wrappers.interface( dtype_constraints={"condition": "T2", "input": "T1", "other": "T1", wrappers.RETURN_VALUE: "T1"}, dtype_variables={ "T1": ["float32", "float16", "bfloat16", "float8", "int4", "int8", "int32", "int64", "bool"], "T2": ["bool"], }, ) def where(condition: "nvtripy.Tensor", input: "nvtripy.Tensor", other: "nvtripy.Tensor") -> "nvtripy.Tensor": r""" Returns a new tensor of elements selected from either ``input`` or ``other``, depending on ``condition``. Args: condition: The condition tensor. Where this is ``True``, elements are selected from ``input``. Otherwise, elements are selected from ``other``. input: Tensor of values selected at indices where condition is ``True``. other: Tensor values selected at indices where condition is ``False``. Returns: A new tensor with the broadcasted shape. Constraints: All three parameters must be broadcast-compatible with each other. .. code-block:: python :linenos: condition = tp.Tensor([[True, False], [True, True]]) input = tp.ones([2, 2], dtype=tp.float32) other = tp.zeros([2, 2], dtype=tp.float32) output = tp.where(condition, input, other) assert np.array_equal(cp.from_dlpack(output).get(), np.array([[1, 0], [1, 1]], dtype=np.float32)) """ return Where.build([condition, input, other])
[docs] @export.public_api(document_under="operations/functions") @wrappers.interface( dtype_constraints={"input": "T1", "mask": "T2", wrappers.RETURN_VALUE: "T1"}, dtype_variables={ "T1": ["float32", "float16", "bfloat16", "float8", "int4", "int8", "int32", "int64", "bool"], "T2": ["bool"], }, ) def masked_fill(input: "nvtripy.Tensor", mask: "nvtripy.Tensor", value: numbers.Number) -> "nvtripy.Tensor": r""" Returns a new tensor filled with ``value`` where ``mask`` is ``True`` and elements from the input tensor otherwise. Args: input: The input tensor. mask: The mask tensor. value: the value to fill with. This will be casted to match the data type of the input tensor. Returns: A new tensor of the same shape as the input tensor. .. code-block:: python :linenos: mask = tp.Tensor([[True, False], [True, True]]) input = tp.zeros([2, 2]) output = tp.masked_fill(input, mask, -1.0) assert np.array_equal(cp.from_dlpack(output).get(), np.array([[-1, 0], [-1, -1]], dtype=np.float32)) """ from nvtripy.frontend.trace.ops.fill import full_like fill_tensor = full_like(input, value) return where(mask, fill_tensor, input)