Coverage for cuda / pathfinder / _utils / spawned_process_runner.py: 58.46%
65 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 01:07 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 01:07 +0000
1# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2# SPDX-License-Identifier: Apache-2.0
4import contextlib
5import multiprocessing
6import queue # for Empty
7import sys
8import traceback
9from collections.abc import Callable, Sequence
10from dataclasses import dataclass
11from io import StringIO
12from typing import Any
14PROCESS_KILLED = -9
15PROCESS_NO_RESULT = -999
18# Similar to https://docs.python.org/3/library/subprocess.html#subprocess.CompletedProcess
19# (args, check_returncode() are intentionally not supported here.)
20@dataclass
21class CompletedProcess:
22 returncode: int
23 stdout: str
24 stderr: str
27class ChildProcessWrapper:
28 def __init__(
29 self,
30 result_queue: Any,
31 target: Callable[..., None],
32 args: Sequence[Any] | None,
33 kwargs: dict[str, Any] | None,
34 ) -> None:
35 self.target = target 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
36 self.args = () if args is None else args 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
37 self.kwargs = {} if kwargs is None else kwargs 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
38 self.result_queue = result_queue 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
40 def __call__(self) -> None:
41 # Capture stdout/stderr
42 old_stdout = sys.stdout
43 old_stderr = sys.stderr
44 sys.stdout = StringIO()
45 sys.stderr = StringIO()
47 try:
48 self.target(*self.args, **self.kwargs)
49 returncode = 0
50 except SystemExit as e: # Handle sys.exit()
51 returncode = e.code if isinstance(e.code, int) else 0
52 except BaseException:
53 traceback.print_exc()
54 returncode = 1
55 finally:
56 # Collect outputs and restore streams
57 stdout = sys.stdout.getvalue()
58 stderr = sys.stderr.getvalue()
59 sys.stdout = old_stdout
60 sys.stderr = old_stderr
61 with contextlib.suppress(Exception):
62 self.result_queue.put((returncode, stdout, stderr))
65def run_in_spawned_child_process(
66 target: Callable[..., None],
67 *,
68 args: Sequence[Any] | None = None,
69 kwargs: dict[str, Any] | None = None,
70 timeout: float | None = None,
71 rethrow: bool = False,
72) -> CompletedProcess:
73 """Run `target` in a spawned child process, capturing stdout/stderr.
75 The provided `target` must be defined at the top level of a module, and must
76 be importable in the spawned child process. Lambdas, closures, or interactively
77 defined functions (e.g., in Jupyter notebooks) will not work.
79 If `rethrow=True` and the child process exits with a nonzero code,
80 raises ChildProcessError with the captured stderr.
81 """
82 ctx = multiprocessing.get_context("spawn") 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
83 result_queue = ctx.Queue() 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
84 process = ctx.Process(target=ChildProcessWrapper(result_queue, target, args, kwargs)) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
85 process.start() 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
87 try: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
88 process.join(timeout) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
89 if process.is_alive(): 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
90 process.terminate()
91 process.join()
92 result = CompletedProcess(
93 returncode=PROCESS_KILLED,
94 stdout="",
95 stderr=f"Process timed out after {timeout} seconds and was terminated.",
96 )
97 else:
98 try: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
99 returncode, stdout, stderr = result_queue.get(timeout=1.0) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
100 except (queue.Empty, EOFError):
101 result = CompletedProcess(
102 returncode=PROCESS_NO_RESULT,
103 stdout="",
104 stderr="Process exited or crashed before returning results.",
105 )
106 else:
107 result = CompletedProcess( 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
108 returncode=returncode,
109 stdout=stdout,
110 stderr=stderr,
111 )
113 if rethrow and result.returncode != 0: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
114 raise ChildProcessError( 1T
115 f"Child process exited with code {result.returncode}.\n"
116 "--- stderr-from-child-process ---\n"
117 f"{result.stderr}"
118 "<end-of-stderr-from-child-process>\n"
119 )
121 return result 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRS
123 finally:
124 try: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
125 result_queue.close() 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
126 result_queue.join_thread() 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
127 except Exception: # noqa: S110
128 pass
129 if process.is_alive(): 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST
130 process.kill()
131 process.join()