Source code for nvtripy.frontend.ops.split

#
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Sequence, Tuple, Union

from nvtripy import export
from nvtripy.common.exception import raise_error
from nvtripy.frontend.ops import utils as op_utils
from nvtripy.utils import wrappers


[docs] @export.public_api(document_under="operations/functions") @wrappers.interface( dtype_constraints={"input": "T1", wrappers.RETURN_VALUE: "T1"}, dtype_variables={ "T1": ["float32", "float16", "bfloat16", "int4", "int8", "int32", "int64", "bool"], }, ) def split( input: "nvtripy.Tensor", num_split_or_sizes: Union[int, Sequence[int]], dim: int = 0 ) -> Tuple["nvtripy.Tensor"]: r""" Splits a tensor along the specified dimension. Args: input: The input tensor. num_split_or_sizes: If this is an ``int``, the input is split into this many equal sized chunks. If the dimension cannot be divided evenly, the last chunk will be smaller. If this is a ``Sequence[int]``, the input will be split into ``len(num_split_or_sizes)`` chunks where the :math:`i^{th}` chunk has a size of ``num_split_or_sizes[i]``. The size of the chunk will be clamped if the input is too small. dim: The dimension along which the slices are done. All other dimensions are included in full. Returns: A tuple of slices of the input tensor. .. code-block:: python :linenos: :caption: Splitting Into 2 Chunks # doc: print-locals input outputs input = tp.reshape(tp.arange(16, dtype=tp.float32), (4, 4)) outputs = tp.split(input, 2) assert np.array_equal(cp.from_dlpack(outputs[0]).get(), cp.from_dlpack(input[:2, :]).get()) assert np.array_equal(cp.from_dlpack(outputs[1]).get(), cp.from_dlpack(input[2:, :]).get()) .. code-block:: python :linenos: :caption: Splitting Along A Different Dimension # doc: print-locals input outputs input = tp.reshape(tp.arange(16, dtype=tp.float32), (4, 4)) outputs = tp.split(input, 2, dim=1) assert np.array_equal(cp.from_dlpack(outputs[0]).get(), cp.from_dlpack(input[:, :2]).get()) assert np.array_equal(cp.from_dlpack(outputs[1]).get(), cp.from_dlpack(input[:, 2:]).get()) .. code-block:: python :linenos: :caption: Splitting With Custom Chunk Sizes # doc: print-locals input outputs input = tp.reshape(tp.arange(16, dtype=tp.float32), (4, 4)) outputs = tp.split(input, [1, 1, 2]) assert np.array_equal(cp.from_dlpack(outputs[0]).get(), cp.from_dlpack(input[:1, :]).get()) assert np.array_equal(cp.from_dlpack(outputs[1]).get(), cp.from_dlpack(input[1:2, :]).get()) assert np.array_equal(cp.from_dlpack(outputs[2]).get(), cp.from_dlpack(input[2:, :]).get()) """ dim = op_utils.process_dim(dim, input.rank) if isinstance(num_split_or_sizes, int): if num_split_or_sizes <= 0: raise_error(f"`num_split_or_sizes` must be positive, but got: {num_split_or_sizes}") chunk_sizes = [op_utils.int_ceil_div(input.shape[dim], num_split_or_sizes)] * num_split_or_sizes else: if not num_split_or_sizes: raise_error("Split indices must not be empty") chunk_sizes = num_split_or_sizes def slice_on_dim(start, stop): slice_params = [] for index in range(input.rank): if index == dim: slice_params.append(slice(start, stop)) else: slice_params.append(slice(None)) return input.__getitem__(slice_params) splits = [] start = 0 for size in chunk_sizes: splits.append(slice_on_dim(start, start + size)) start += size return tuple(splits)