Source code for nvtripy.backend.api.executable

# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import inspect
from typing import Dict, Sequence, Tuple, Union

import mlir_tensorrt.runtime.api as runtime
from nvtripy import config, export
from nvtripy.backend.api.stream import default_stream
from nvtripy.backend.api.input_info import InputInfo
from nvtripy.backend.mlir.utils import MLIRRuntimeClient
from nvtripy.common.exception import raise_error
from nvtripy.frontend import Tensor
from nvtripy.trace.ops.constant import Constant
from nvtripy.utils import json as json_utils
from nvtripy.utils.types import str_from_type_annotation


# Executable.__call__ is in the hot path for benchmarks, so we would not want additional overhead
[docs] @export.public_api(document_under="compiling_code", bypass_dispatch=["__call__"], document_init_sig=False) class Executable: """ Represents a compiled executable generated by the compiler. .. seealso:: :func:`compile` """ # The constructor is intentionally undocumented because it is not meant to be called by users. # `return_single_tensor_as_sequence` indicates whether the return type should be a sequence even if # there is only one output. def __init__( self, executable, arg_names, return_single_tensor_as_sequence: bool, input_infos: Dict[str, InputInfo] ): self._executable = executable self._runtime_client = MLIRRuntimeClient() # TODO (#577): Support multiple devices: self._session = runtime.RuntimeSession(runtime.RuntimeSessionOptions(num_devices=1, device_id=0), executable) self.stream = default_stream() self._arg_names = arg_names self._num_expected_args = len(arg_names) self._executable_signature = self._executable.get_signature("main") self._return_single_tensor_as_sequence = return_single_tensor_as_sequence # Build a signature so the executable works with `inspect.signature` params = [] for name in self._arg_names: params.append(inspect.Parameter(name, inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=Tensor)) num_outputs = self._executable_signature.get_num_results() return_annotation = ( Tuple[(Tensor,) * num_outputs] if num_outputs > 1 or self._return_single_tensor_as_sequence else Tensor ) self.__signature__ = inspect.Signature(params, return_annotation=return_annotation) self.input_infos: Dict[str, InputInfo] = input_infos """ Stores metadata, like shapes and data types, for each input to the executable. """ def __str__(self) -> str: params = [ f"{name}: {str_from_type_annotation(param.annotation)}" for name, param in self.__signature__.parameters.items() ] return f"Executable({', '.join(params)}) -> {str_from_type_annotation(self.__signature__.return_annotation)}"
[docs] @staticmethod def load(path: str) -> "nvtripy.Executable": """ Loads a executable from the provided path. Args: path: The path from which to load the exectuable. Returns: The executable object loaded from the file. .. code-block:: python :linenos: :caption: Save and load executable import os import tempfile # doc: omit def add(a, b): return a + b # doc: no-print-locals compiled_add executable_file compiled_add = tp.compile( add, args=[ tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32), tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32), ], ) out_dir = tempfile.TemporaryDirectory().name # doc: omit # Assuming `out_dir` is the directory containing the executable: executable_file = os.path.join(out_dir, "executable.json") compiled_add.save(executable_file) # doc: omit assert os.path.exists(executable_file) loaded_executable = tp.Executable.load(executable_file) """ return json_utils.load(path)
[docs] def __call__(self, *args: Tensor, **kwargs: Tensor) -> Union[Tensor, Sequence[Tensor]]: """ Invokes the executable with the specified tensor arguments. .. note:: Inputs must be evaluated tensors in GPU memory. You can use :func:`nvtripy.copy` or :func:`nvtripy.Tensor.eval` to ensure this. Args: *args: Positional arguments. Must be of type :class:`Tensor` . **kwargs: Keyword arguments. Must be of type :class:`Tensor` . Returns: The output :class:`Tensor` s of the compiled function. .. code-block:: python :linenos: def add(a, b): return a + b # doc: no-print-locals compiled_add compiled_add = tp.compile( add, args=[ tp.InputInfo((1,), dtype=tp.float32), tp.InputInfo((1,), dtype=tp.float32), ], ) a = tp.ones((1,), dtype=tp.float32).eval() b = tp.ones((1,), dtype=tp.float32).eval() out = compiled_add(a, b) """ num_positional = len(args) NUM_ARGS = num_positional + len(kwargs) input_tensors = list(args) # Need to get arguments in the order of self._arg_names, which may be different from kwargs ordering. expected_kwargs = self._arg_names[num_positional:] for name in expected_kwargs: if name not in kwargs: raise_error(f"Missing argument: {name}", [f"Expected the following arguments: {self._arg_names}"]) input_tensors.append(kwargs[name]) del kwargs[name] if kwargs: raise_error( f"Extra keyword arguments: {list(kwargs.keys())}", [ f"Expected the following arguments: {self._arg_names}.\n" f"Note: The following arguments were already provided as positional arguments: {self._arg_names[:num_positional]}" ], ) # We do this after kwarg checks since those will be more informative (we can explain which arguments are missing/extra). if NUM_ARGS != self._num_expected_args: raise_error( "Incorrect number of arguments.", [ f"Expected {self._num_expected_args} arguments but got {NUM_ARGS}.\n" f"Note: Expected arguments were: {self._arg_names}", ], ) for tensor in input_tensors: producer = tensor.trace_tensor.producer if not isinstance(producer, Constant) or tensor.device.kind != "gpu": raise_error( "Inputs to compiled executables must be evaluated tensors on the GPU.", [ "Got input" + (f" on device '{tensor.device}':" if tensor.device.kind != "gpu" else ":"), tensor, "Hint: Try calling `.eval()` on the tensor to ensure it is a GPU constant.", ], ) input_memrefs = [inp.trace_tensor.producer.data for inp in input_tensors] try: output_memrefs = self._session.execute_function( "main", in_args=input_memrefs, stream=self.stream._active_cuda_stream, client=self._runtime_client ) except runtime.MTRTException as err: # TODO: Evaluate whether this should be moved into the executor if "function expects a memref type with element type" in str(err): # If the problem is a mismatched data type, we can provide a better error message than the executor can. expected_input_dtypes = [info.dtype for info in self.input_infos.values()] for tensor, dtype, arg_name in zip(input_tensors, expected_input_dtypes, self._arg_names): if tensor.dtype != dtype: raise_error( f"Unexpected tensor data type.", ( [ f"For parameter {arg_name}, expected data type: {dtype} but got: {tensor.dtype}. ", ] + (["Note: Argument was: ", tensor] if "all" in config.extra_error_information else []) ), ) elif "InternalError: failed to set input shape" in str(err) or "Runtime shape mismatch" in str(err): expected_input_shapes = [info.shape_bounds for info in self.input_infos.values()] for tensor, expected_bounds, arg_name in zip(input_tensors, expected_input_shapes, self._arg_names): shape = tensor.shape if len(shape) != len(expected_bounds.min): raise_error( f"Unexpected tensor rank.", [ f"For tensor: `{arg_name}`, expected a rank of: {len(expected_bounds.min)} but got: {len(shape)}.\n" f"Note: The provided argument was: ", tensor, ], ) for i in range(len(shape)): if shape[i] < expected_bounds.min[i] or shape[i] > expected_bounds.max[i]: raise_error( f"Unexpected tensor shape.", [ f"For tensor: `{arg_name}`, expected a shape within the bounds: min={expected_bounds.min}, max={expected_bounds.max}, but got: {shape}.\n" f"Dimension {i} has a shape of {shape[i]}, which is not within the expected bounds of [{expected_bounds.min[i]}, {expected_bounds.max[i]}].\n" f"Note: The provided argument was: ", tensor, ], ) raise_error(str(err)) output_tensors = tuple(Tensor.fast_init(output_memref) for output_memref in output_memrefs) if self.__signature__.return_annotation == Tensor: output_tensors = output_tensors[0] return output_tensors
[docs] def save(self, path: str) -> None: """ Saves this executable to the provided path. Args: path: The path at which to save the executable. .. code-block:: python :linenos: :caption: Save executable import os import tempfile # doc: omit def add(a, b): return a + b # doc: no-print-locals compiled_add executable_file compiled_add = tp.compile( add, args=[ tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32), tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32), ], ) out_dir = tempfile.TemporaryDirectory().name # doc: omit # Assuming `out_dir` is the desired output directory: executable_file = os.path.join(out_dir, "executable.json") compiled_add.save(executable_file) assert os.path.exists(executable_file) """ json_utils.save(self, path)
@property def serialized_tensorrt_engine(self) -> bytes: """ The serialized TensorRT engine, as ``bytes``, from the executable. .. seealso:: Refer to the `TensorRT developer guide <https://docs.nvidia.com/deeplearning/tensorrt/latest/inference-library/python-api-docs.html#deserializing-a-plan>`_ for details on how to work with serialized TensorRT engines. .. code-block:: python :linenos: :caption: TensorRT engine def add(a, b): return a + b # doc: no-print-locals compiled_add trt_engine compiled_add = tp.compile( add, args=[ tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32), tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32), ], ) trt_engine = compiled_add.serialized_tensorrt_engine assert isinstance(trt_engine, bytes) """ data_segments = self._executable.get_data_segments() if len(data_segments) != 1: raise_error( "Cannot get tensorrt engine from multiple clusters.", [f"Found {len(data_segments)} clusters in the executable."], ) trt_cluster = data_segments[0] # tuple of (name, data) return trt_cluster[1]
@json_utils.Encoder.register(Executable) def encode_executable(executable): return { "arg_names": executable._arg_names, "executable": base64.b64encode(executable._executable.serialize()).decode(), "_return_single_tensor_as_sequence": executable._return_single_tensor_as_sequence, "input_infos": executable.input_infos, } @json_utils.Decoder.register(Executable) def decode_executable(executable_dict): executable_bytes = base64.b64decode(executable_dict["executable"]) return Executable( runtime.Executable(executable_bytes), executable_dict["arg_names"], return_single_tensor_as_sequence=executable_dict["_return_single_tensor_as_sequence"], input_infos=executable_dict["input_infos"], )