Skip to content

Subprocess utils

run_command_in_subprocess(command, path, timeout=3600)

Run a command in a subprocess and return the output.

Source code in bionemo/testing/subprocess_utils.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def run_command_in_subprocess(command: str, path: str, timeout: int = 3600) -> str:
    """Run a command in a subprocess and return the output."""
    open_port = find_free_network_port()
    # a local copy of the environment
    env = dict(**os.environ)
    env["MASTER_PORT"] = str(open_port)

    result = run_command_with_timeout(
        command=command,
        path=path,
        env=env,
        timeout=timeout,  # Set an appropriate timeout in seconds
    )

    # For debugging purposes, print the output if the test fails.
    if result.returncode != 0:
        sys.stderr.write("STDOUT:\n" + result.stdout + "\n")
        sys.stderr.write("STDERR:\n" + result.stderr + "\n")

    assert result.returncode == 0, f"Command failed: {command}"

    return result.stdout

run_command_with_timeout(command, path, env, timeout=3600)

Run command with timeout and incremental output processing to prevent hanging.

Source code in bionemo/testing/subprocess_utils.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def run_command_with_timeout(command, path, env, timeout=3600):
    """Run command with timeout and incremental output processing to prevent hanging."""
    # Start process without capturing output in the main process
    process = subprocess.Popen(
        command,
        shell=True,
        cwd=path,
        env=env,
        text=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        bufsize=1,  # Line buffered
    )

    stdout_data = []
    stderr_data = []
    start_time = time.time()

    try:
        # Use select to handle output in a non-blocking way
        import select

        # Get file descriptors for stdout and stderr
        stdout_fd = process.stdout.fileno()
        stderr_fd = process.stderr.fileno()

        # Set up select lists
        read_fds = [stdout_fd, stderr_fd]

        # Process output incrementally
        while read_fds and process.poll() is None:
            # Check for timeout
            if timeout and time.time() - start_time > timeout:
                process.terminate()
                time.sleep(0.5)
                if process.poll() is None:
                    process.kill()
                raise subprocess.TimeoutExpired(command, timeout)

            # Wait for output with a short timeout to allow checking process status
            ready_fds, _, _ = select.select(read_fds, [], [], 1.0)

            for fd in ready_fds:
                if fd == stdout_fd:
                    line = process.stdout.readline()
                    if not line:
                        read_fds.remove(stdout_fd)
                        continue
                    stdout_data.append(line)
                    # Optionally process/print output incrementally
                    # print(f"STDOUT: {line.strip()}")

                if fd == stderr_fd:
                    line = process.stderr.readline()
                    if not line:
                        read_fds.remove(stderr_fd)
                        continue
                    stderr_data.append(line)
                    # Optionally process/print error output incrementally
                    # print(f"STDERR: {line.strip()}")

        # Get any remaining output
        remaining_stdout, remaining_stderr = process.communicate()
        if remaining_stdout:
            stdout_data.append(remaining_stdout)
        if remaining_stderr:
            stderr_data.append(remaining_stderr)

        # Create result object similar to subprocess.run
        result = subprocess.CompletedProcess(
            args=command, returncode=process.returncode, stdout="".join(stdout_data), stderr="".join(stderr_data)
        )
        return result

    except Exception as e:
        # Make sure we don't leave zombie processes
        if process.poll() is None:
            process.terminate()
            time.sleep(0.5)
            if process.poll() is None:
                process.kill()
        raise e