# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import inspect
from typing import Dict, Sequence, Tuple, Union
import mlir_tensorrt.runtime.api as runtime
from nvtripy import config, export
from nvtripy.backend.api.stream import default_stream
from nvtripy.backend.api.input_info import InputInfo
from nvtripy.backend.mlir.utils import MLIRRuntimeClient
from nvtripy.common.exception import raise_error
from nvtripy.frontend import Tensor
from nvtripy.trace.ops.constant import Constant
from nvtripy.utils import json as json_utils
from nvtripy.utils.types import str_from_type_annotation
# Executable.__call__ is in the hot path for benchmarks, so we would not want additional overhead
[docs]
@export.public_api(document_under="compiling_code", bypass_dispatch=["__call__"], document_init_sig=False)
class Executable:
"""
Represents a compiled executable generated by the compiler.
.. seealso:: :func:`compile`
"""
# The constructor is intentionally undocumented because it is not meant to be called by users.
# `return_single_tensor_as_sequence` indicates whether the return type should be a sequence even if
# there is only one output.
def __init__(
self, executable, arg_names, return_single_tensor_as_sequence: bool, input_infos: Dict[str, InputInfo]
):
self._executable = executable
self._runtime_client = MLIRRuntimeClient()
# TODO (#577): Support multiple devices:
self._session = runtime.RuntimeSession(runtime.RuntimeSessionOptions(num_devices=1, device_id=0), executable)
self.stream = default_stream()
self._arg_names = arg_names
self._num_expected_args = len(arg_names)
self._executable_signature = self._executable.get_signature("main")
self._return_single_tensor_as_sequence = return_single_tensor_as_sequence
# Build a signature so the executable works with `inspect.signature`
params = []
for name in self._arg_names:
params.append(inspect.Parameter(name, inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=Tensor))
num_outputs = self._executable_signature.get_num_results()
return_annotation = (
Tuple[(Tensor,) * num_outputs] if num_outputs > 1 or self._return_single_tensor_as_sequence else Tensor
)
self.__signature__ = inspect.Signature(params, return_annotation=return_annotation)
self.input_infos: Dict[str, InputInfo] = input_infos
"""
Stores metadata, like shapes and data types, for each input to the executable.
"""
def __str__(self) -> str:
params = [
f"{name}: {str_from_type_annotation(param.annotation)}"
for name, param in self.__signature__.parameters.items()
]
return f"Executable({', '.join(params)}) -> {str_from_type_annotation(self.__signature__.return_annotation)}"
[docs]
@staticmethod
def load(path: str) -> "nvtripy.Executable":
"""
Loads a executable from the provided path.
Args:
path: The path from which to load the exectuable.
Returns:
The executable object loaded from the file.
.. code-block:: python
:linenos:
:caption: Save and load executable
import os
import tempfile # doc: omit
def add(a, b):
return a + b
# doc: no-print-locals compiled_add executable_file
compiled_add = tp.compile(
add,
args=[
tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32),
tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32),
],
)
out_dir = tempfile.TemporaryDirectory().name # doc: omit
# Assuming `out_dir` is the directory containing the executable:
executable_file = os.path.join(out_dir, "executable.json")
compiled_add.save(executable_file) # doc: omit
assert os.path.exists(executable_file)
loaded_executable = tp.Executable.load(executable_file)
"""
return json_utils.load(path)
[docs]
def __call__(self, *args: Tensor, **kwargs: Tensor) -> Union[Tensor, Sequence[Tensor]]:
"""
Invokes the executable with the specified tensor arguments.
.. note:: Inputs must be evaluated tensors in GPU memory.
You can use :func:`nvtripy.copy` or :func:`nvtripy.Tensor.eval` to ensure this.
Args:
*args: Positional arguments. Must be of type :class:`Tensor` .
**kwargs: Keyword arguments. Must be of type :class:`Tensor` .
Returns:
The output :class:`Tensor` s of the compiled function.
.. code-block:: python
:linenos:
def add(a, b):
return a + b
# doc: no-print-locals compiled_add
compiled_add = tp.compile(
add,
args=[
tp.InputInfo((1,), dtype=tp.float32),
tp.InputInfo((1,), dtype=tp.float32),
],
)
a = tp.ones((1,), dtype=tp.float32).eval()
b = tp.ones((1,), dtype=tp.float32).eval()
out = compiled_add(a, b)
"""
num_positional = len(args)
NUM_ARGS = num_positional + len(kwargs)
input_tensors = list(args)
# Need to get arguments in the order of self._arg_names, which may be different from kwargs ordering.
expected_kwargs = self._arg_names[num_positional:]
for name in expected_kwargs:
if name not in kwargs:
raise_error(f"Missing argument: {name}", [f"Expected the following arguments: {self._arg_names}"])
input_tensors.append(kwargs[name])
del kwargs[name]
if kwargs:
raise_error(
f"Extra keyword arguments: {list(kwargs.keys())}",
[
f"Expected the following arguments: {self._arg_names}.\n"
f"Note: The following arguments were already provided as positional arguments: {self._arg_names[:num_positional]}"
],
)
# We do this after kwarg checks since those will be more informative (we can explain which arguments are missing/extra).
if NUM_ARGS != self._num_expected_args:
raise_error(
"Incorrect number of arguments.",
[
f"Expected {self._num_expected_args} arguments but got {NUM_ARGS}.\n"
f"Note: Expected arguments were: {self._arg_names}",
],
)
for tensor in input_tensors:
producer = tensor.trace_tensor.producer
if not isinstance(producer, Constant) or tensor.device.kind != "gpu":
raise_error(
"Inputs to compiled executables must be evaluated tensors on the GPU.",
[
"Got input" + (f" on device '{tensor.device}':" if tensor.device.kind != "gpu" else ":"),
tensor,
"Hint: Try calling `.eval()` on the tensor to ensure it is a GPU constant.",
],
)
input_memrefs = [inp.trace_tensor.producer.data for inp in input_tensors]
try:
output_memrefs = self._session.execute_function(
"main", in_args=input_memrefs, stream=self.stream._active_cuda_stream, client=self._runtime_client
)
except runtime.MTRTException as err:
# TODO: Evaluate whether this should be moved into the executor
if "function expects a memref type with element type" in str(err):
# If the problem is a mismatched data type, we can provide a better error message than the executor can.
expected_input_dtypes = [info.dtype for info in self.input_infos.values()]
for tensor, dtype, arg_name in zip(input_tensors, expected_input_dtypes, self._arg_names):
if tensor.dtype != dtype:
raise_error(
f"Unexpected tensor data type.",
(
[
f"For parameter {arg_name}, expected data type: {dtype} but got: {tensor.dtype}. ",
]
+ (["Note: Argument was: ", tensor] if "all" in config.extra_error_information else [])
),
)
elif "InternalError: failed to set input shape" in str(err) or "Runtime shape mismatch" in str(err):
expected_input_shapes = [info.shape_bounds for info in self.input_infos.values()]
for tensor, expected_bounds, arg_name in zip(input_tensors, expected_input_shapes, self._arg_names):
shape = tensor.shape
if len(shape) != len(expected_bounds.min):
raise_error(
f"Unexpected tensor rank.",
[
f"For tensor: `{arg_name}`, expected a rank of: {len(expected_bounds.min)} but got: {len(shape)}.\n"
f"Note: The provided argument was: ",
tensor,
],
)
for i in range(len(shape)):
if shape[i] < expected_bounds.min[i] or shape[i] > expected_bounds.max[i]:
raise_error(
f"Unexpected tensor shape.",
[
f"For tensor: `{arg_name}`, expected a shape within the bounds: min={expected_bounds.min}, max={expected_bounds.max}, but got: {shape}.\n"
f"Dimension {i} has a shape of {shape[i]}, which is not within the expected bounds of [{expected_bounds.min[i]}, {expected_bounds.max[i]}].\n"
f"Note: The provided argument was: ",
tensor,
],
)
raise_error(str(err))
output_tensors = tuple(Tensor.fast_init(output_memref) for output_memref in output_memrefs)
if self.__signature__.return_annotation == Tensor:
output_tensors = output_tensors[0]
return output_tensors
[docs]
def save(self, path: str) -> None:
"""
Saves this executable to the provided path.
Args:
path: The path at which to save the executable.
.. code-block:: python
:linenos:
:caption: Save executable
import os
import tempfile # doc: omit
def add(a, b):
return a + b
# doc: no-print-locals compiled_add executable_file
compiled_add = tp.compile(
add,
args=[
tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32),
tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32),
],
)
out_dir = tempfile.TemporaryDirectory().name # doc: omit
# Assuming `out_dir` is the desired output directory:
executable_file = os.path.join(out_dir, "executable.json")
compiled_add.save(executable_file)
assert os.path.exists(executable_file)
"""
json_utils.save(self, path)
@property
def serialized_tensorrt_engine(self) -> bytes:
"""
The serialized TensorRT engine, as ``bytes``, from the executable.
.. seealso:: Refer to the `TensorRT developer guide <https://docs.nvidia.com/deeplearning/tensorrt/latest/inference-library/python-api-docs.html#deserializing-a-plan>`_
for details on how to work with serialized TensorRT engines.
.. code-block:: python
:linenos:
:caption: TensorRT engine
def add(a, b):
return a + b
# doc: no-print-locals compiled_add trt_engine
compiled_add = tp.compile(
add,
args=[
tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32),
tp.InputInfo(shape=((1, 2, 3),), dtype=tp.float32),
],
)
trt_engine = compiled_add.serialized_tensorrt_engine
assert isinstance(trt_engine, bytes)
"""
data_segments = self._executable.get_data_segments()
if len(data_segments) != 1:
raise_error(
"Cannot get tensorrt engine from multiple clusters.",
[f"Found {len(data_segments)} clusters in the executable."],
)
trt_cluster = data_segments[0] # tuple of (name, data)
return trt_cluster[1]
@json_utils.Encoder.register(Executable)
def encode_executable(executable):
return {
"arg_names": executable._arg_names,
"executable": base64.b64encode(executable._executable.serialize()).decode(),
"_return_single_tensor_as_sequence": executable._return_single_tensor_as_sequence,
"input_infos": executable.input_infos,
}
@json_utils.Decoder.register(Executable)
def decode_executable(executable_dict):
executable_bytes = base64.b64decode(executable_dict["executable"])
return Executable(
runtime.Executable(executable_bytes),
executable_dict["arg_names"],
return_single_tensor_as_sequence=executable_dict["_return_single_tensor_as_sequence"],
input_infos=executable_dict["input_infos"],
)