Coverage for cuda / pathfinder / _dynamic_libs / dynamic_lib_subprocess.py: 37.66%
77 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 01:07 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 01:07 +0000
1#!/usr/bin/env python
2# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3# SPDX-License-Identifier: Apache-2.0
5from __future__ import annotations
7import os
8import sys
9from collections.abc import Sequence
11from cuda.pathfinder._dynamic_libs.lib_descriptor import LIB_DESCRIPTORS
12from cuda.pathfinder._dynamic_libs.load_dl_common import DynamicLibNotFoundError, LoadedDL
13from cuda.pathfinder._dynamic_libs.platform_loader import LOADER
14from cuda.pathfinder._dynamic_libs.subprocess_protocol import (
15 MODE_CANARY,
16 MODE_LOAD,
17 STATUS_NOT_FOUND,
18 STATUS_OK,
19 VALID_MODES,
20 format_dynamic_lib_subprocess_payload,
21)
23# NOTE: The main entrypoint (below) serves both production (canary probe)
24# and tests (full loader). Keeping them together ensures a single subprocess
25# protocol and CLI surface, so the test subprocess stays aligned with the
26# production flow while avoiding a separate test-only module.
27# Any production-code impact is negligible since the extra logic only runs
28# in the subprocess entrypoint and only in test mode.
31def _probe_canary_abs_path(libname: str) -> str | None:
32 desc = LIB_DESCRIPTORS.get(libname)
33 if desc is None:
34 raise ValueError(f"Unsupported canary library name: {libname!r}")
35 try:
36 loaded: LoadedDL | None = LOADER.load_with_system_search(desc)
37 except DynamicLibNotFoundError:
38 return None
39 if loaded is None:
40 return None
41 abs_path: str | None = loaded.abs_path
42 return abs_path
45def _validate_abs_path(abs_path: str) -> None:
46 assert abs_path, f"empty path: {abs_path=!r}"
47 assert os.path.isabs(abs_path), f"not absolute: {abs_path=!r}"
48 assert os.path.isfile(abs_path), f"not a file: {abs_path=!r}"
51def _load_nvidia_dynamic_lib_for_test(libname: str) -> str:
52 """Test-only loader used by the subprocess entrypoint."""
53 # Keep imports inside the subprocess body so startup stays focused on the
54 # code under test rather than the parent test module.
55 from cuda.pathfinder import load_nvidia_dynamic_lib
56 from cuda.pathfinder._dynamic_libs.load_nvidia_dynamic_lib import _load_lib_no_cache
57 from cuda.pathfinder._dynamic_libs.supported_nvidia_libs import (
58 SUPPORTED_LINUX_SONAMES,
59 SUPPORTED_WINDOWS_DLLS,
60 )
61 from cuda.pathfinder._utils.platform_aware import IS_WINDOWS
63 loaded_dl_fresh = load_nvidia_dynamic_lib(libname)
64 if loaded_dl_fresh.was_already_loaded_from_elsewhere:
65 raise RuntimeError("loaded_dl_fresh.was_already_loaded_from_elsewhere")
67 abs_path = loaded_dl_fresh.abs_path
68 if not isinstance(abs_path, str):
69 raise RuntimeError(f"loaded_dl_fresh.abs_path is not a string: {abs_path!r}")
70 _validate_abs_path(abs_path)
71 assert loaded_dl_fresh.found_via is not None
73 loaded_dl_from_cache = load_nvidia_dynamic_lib(libname)
74 if loaded_dl_from_cache is not loaded_dl_fresh:
75 raise RuntimeError("loaded_dl_from_cache is not loaded_dl_fresh")
77 loaded_dl_no_cache = _load_lib_no_cache(libname)
78 supported_libs = SUPPORTED_WINDOWS_DLLS if IS_WINDOWS else SUPPORTED_LINUX_SONAMES
79 if not loaded_dl_no_cache.was_already_loaded_from_elsewhere and libname in supported_libs:
80 raise RuntimeError("not loaded_dl_no_cache.was_already_loaded_from_elsewhere")
81 abs_path_no_cache = loaded_dl_no_cache.abs_path
82 if not isinstance(abs_path_no_cache, str):
83 raise RuntimeError(f"loaded_dl_no_cache.abs_path is not a string: {abs_path_no_cache!r}")
84 if not os.path.samefile(abs_path_no_cache, abs_path):
85 raise RuntimeError(f"not os.path.samefile({abs_path_no_cache=!r}, {abs_path=!r})")
86 _validate_abs_path(abs_path_no_cache)
87 return abs_path
90def probe_dynamic_lib_and_print_json(libname: str, mode: str) -> None:
91 if mode == MODE_CANARY: 1fghebcd
92 abs_path = _probe_canary_abs_path(libname) 1fg
93 status = STATUS_OK if abs_path is not None else STATUS_NOT_FOUND 1fg
94 print(format_dynamic_lib_subprocess_payload(status, abs_path)) 1fg
95 return 1fg
97 if mode == MODE_LOAD: 1hebcd
98 # Test-only path: exercises full loader behavior in isolation.
99 try: 1ebcd
100 abs_path = _load_nvidia_dynamic_lib_for_test(libname) 1ebcd
101 except DynamicLibNotFoundError as exc: 1bcd
102 error = { 1bcd
103 "type": exc.__class__.__name__,
104 "message": str(exc),
105 }
106 print(format_dynamic_lib_subprocess_payload(STATUS_NOT_FOUND, None, error=error)) 1bcd
107 return 1bcd
108 print(format_dynamic_lib_subprocess_payload(STATUS_OK, abs_path)) 1e
109 return 1e
111 raise ValueError(f"Unsupported subprocess probe mode: {mode!r}") 1h
114def main(argv: Sequence[str] | None = None) -> int:
115 args = list(sys.argv[1:] if argv is None else argv)
116 if len(args) != 2 or args[0] not in VALID_MODES:
117 modes = ", ".join(VALID_MODES)
118 raise SystemExit(
119 f"Usage: python -m cuda.pathfinder._dynamic_libs.dynamic_lib_subprocess <mode> <libname>\nModes: {modes}"
120 )
121 mode, libname = args
122 probe_dynamic_lib_and_print_json(libname, mode)
123 return 0
126if __name__ == "__main__":
127 raise SystemExit(main())