Run command with timeout and incremental output processing to prevent hanging.
Source code in bionemo/testing/subprocess_utils.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105 | def run_command_with_timeout(command, path, env, timeout=3600):
"""Run command with timeout and incremental output processing to prevent hanging."""
# Start process without capturing output in the main process
process = subprocess.Popen(
command,
shell=True,
cwd=path,
env=env,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=1, # Line buffered
)
stdout_data = []
stderr_data = []
start_time = time.time()
try:
# Use select to handle output in a non-blocking way
import select
# Get file descriptors for stdout and stderr
stdout_fd = process.stdout.fileno()
stderr_fd = process.stderr.fileno()
# Set up select lists
read_fds = [stdout_fd, stderr_fd]
# Process output incrementally
while read_fds and process.poll() is None:
# Check for timeout
if timeout and time.time() - start_time > timeout:
process.terminate()
time.sleep(0.5)
if process.poll() is None:
process.kill()
raise subprocess.TimeoutExpired(command, timeout)
# Wait for output with a short timeout to allow checking process status
ready_fds, _, _ = select.select(read_fds, [], [], 1.0)
for fd in ready_fds:
if fd == stdout_fd:
line = process.stdout.readline()
if not line:
read_fds.remove(stdout_fd)
continue
stdout_data.append(line)
# Optionally process/print output incrementally
# print(f"STDOUT: {line.strip()}")
if fd == stderr_fd:
line = process.stderr.readline()
if not line:
read_fds.remove(stderr_fd)
continue
stderr_data.append(line)
# Optionally process/print error output incrementally
# print(f"STDERR: {line.strip()}")
# Get any remaining output
remaining_stdout, remaining_stderr = process.communicate()
if remaining_stdout:
stdout_data.append(remaining_stdout)
if remaining_stderr:
stderr_data.append(remaining_stderr)
# Create result object similar to subprocess.run
result = subprocess.CompletedProcess(
args=command, returncode=process.returncode, stdout="".join(stdout_data), stderr="".join(stderr_data)
)
return result
except Exception as e:
# Make sure we don't leave zombie processes
if process.poll() is None:
process.terminate()
time.sleep(0.5)
if process.poll() is None:
process.kill()
raise e
|