Coverage for cuda / pathfinder / _utils / spawned_process_runner.py: 58.46%

65 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-08 01:07 +0000

1# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. 

2# SPDX-License-Identifier: Apache-2.0 

3 

4import contextlib 

5import multiprocessing 

6import queue # for Empty 

7import sys 

8import traceback 

9from collections.abc import Callable, Sequence 

10from dataclasses import dataclass 

11from io import StringIO 

12from typing import Any 

13 

14PROCESS_KILLED = -9 

15PROCESS_NO_RESULT = -999 

16 

17 

18# Similar to https://docs.python.org/3/library/subprocess.html#subprocess.CompletedProcess 

19# (args, check_returncode() are intentionally not supported here.) 

20@dataclass 

21class CompletedProcess: 

22 returncode: int 

23 stdout: str 

24 stderr: str 

25 

26 

27class ChildProcessWrapper: 

28 def __init__( 

29 self, 

30 result_queue: Any, 

31 target: Callable[..., None], 

32 args: Sequence[Any] | None, 

33 kwargs: dict[str, Any] | None, 

34 ) -> None: 

35 self.target = target 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

36 self.args = () if args is None else args 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

37 self.kwargs = {} if kwargs is None else kwargs 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

38 self.result_queue = result_queue 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

39 

40 def __call__(self) -> None: 

41 # Capture stdout/stderr 

42 old_stdout = sys.stdout 

43 old_stderr = sys.stderr 

44 sys.stdout = StringIO() 

45 sys.stderr = StringIO() 

46 

47 try: 

48 self.target(*self.args, **self.kwargs) 

49 returncode = 0 

50 except SystemExit as e: # Handle sys.exit() 

51 returncode = e.code if isinstance(e.code, int) else 0 

52 except BaseException: 

53 traceback.print_exc() 

54 returncode = 1 

55 finally: 

56 # Collect outputs and restore streams 

57 stdout = sys.stdout.getvalue() 

58 stderr = sys.stderr.getvalue() 

59 sys.stdout = old_stdout 

60 sys.stderr = old_stderr 

61 with contextlib.suppress(Exception): 

62 self.result_queue.put((returncode, stdout, stderr)) 

63 

64 

65def run_in_spawned_child_process( 

66 target: Callable[..., None], 

67 *, 

68 args: Sequence[Any] | None = None, 

69 kwargs: dict[str, Any] | None = None, 

70 timeout: float | None = None, 

71 rethrow: bool = False, 

72) -> CompletedProcess: 

73 """Run `target` in a spawned child process, capturing stdout/stderr. 

74 

75 The provided `target` must be defined at the top level of a module, and must 

76 be importable in the spawned child process. Lambdas, closures, or interactively 

77 defined functions (e.g., in Jupyter notebooks) will not work. 

78 

79 If `rethrow=True` and the child process exits with a nonzero code, 

80 raises ChildProcessError with the captured stderr. 

81 """ 

82 ctx = multiprocessing.get_context("spawn") 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

83 result_queue = ctx.Queue() 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

84 process = ctx.Process(target=ChildProcessWrapper(result_queue, target, args, kwargs)) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

85 process.start() 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

86 

87 try: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

88 process.join(timeout) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

89 if process.is_alive(): 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

90 process.terminate() 

91 process.join() 

92 result = CompletedProcess( 

93 returncode=PROCESS_KILLED, 

94 stdout="", 

95 stderr=f"Process timed out after {timeout} seconds and was terminated.", 

96 ) 

97 else: 

98 try: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

99 returncode, stdout, stderr = result_queue.get(timeout=1.0) 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

100 except (queue.Empty, EOFError): 

101 result = CompletedProcess( 

102 returncode=PROCESS_NO_RESULT, 

103 stdout="", 

104 stderr="Process exited or crashed before returning results.", 

105 ) 

106 else: 

107 result = CompletedProcess( 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

108 returncode=returncode, 

109 stdout=stdout, 

110 stderr=stderr, 

111 ) 

112 

113 if rethrow and result.returncode != 0: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

114 raise ChildProcessError( 1T

115 f"Child process exited with code {result.returncode}.\n" 

116 "--- stderr-from-child-process ---\n" 

117 f"{result.stderr}" 

118 "<end-of-stderr-from-child-process>\n" 

119 ) 

120 

121 return result 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRS

122 

123 finally: 

124 try: 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

125 result_queue.close() 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

126 result_queue.join_thread() 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

127 except Exception: # noqa: S110 

128 pass 

129 if process.is_alive(): 1abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRST

130 process.kill() 

131 process.join()