Skip to content

Async worker queue

AsyncWorkQueue

Implements an asynchronous queue.

Source code in bionemo/scdl/util/async_worker_queue.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class AsyncWorkQueue:
    """Implements an asynchronous queue."""

    def __init__(self, max_workers: int = 5, use_processes: bool = False) -> None:
        """Initialize the AsyncWorkQueue.

        Args:
            max_workers: The maximum number of worker threads or processes.
            use_processes: If True, use ProcessPoolExecutor; otherwise, use ThreadPoolExecutor.
        """
        self.use_processes = use_processes
        if use_processes:
            self.executor: Union[concurrent.futures.ThreadPoolExecutor, concurrent.futures.ProcessPoolExecutor] = (
                concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)
            )
        else:
            self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        self.lock = threading.Lock()
        self.tasks: List[concurrent.futures.Future] = []

    def submit_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> concurrent.futures.Future:
        """Submit a task to the work queue.

        Args:
            func: The function to be executed asynchronously.
            args: Positional arguments to pass to the function.
            kwargs: Keyword arguments to pass to the function.
            A Future object representing the execution of the function.

        Returns:
            Future: placeholder for the asynchronous operation.
        """
        with self.lock:
            future = self.executor.submit(func, *args, **kwargs)
            self.tasks.append(future)
            return future

    def shutdown(self, wait: bool = True) -> None:
        """Shutdown the executor and wait for the tasks to complete.

        Args:
            wait: If True, wait for all tasks to complete before shutting down.
        """
        self.executor.shutdown(wait=wait)

    def get_completed_tasks(self) -> List[concurrent.futures.Future]:
        """Get the list of completed tasks.

        Returns:
            A list of Future objects that are completed.
        """
        with self.lock:
            completed_tasks = [task for task in self.tasks if task.done()]
            return completed_tasks

    def get_pending_tasks(self) -> List[concurrent.futures.Future]:
        """Get the list of pending tasks.

        Returns:
            A list of Future objects that are not yet completed.
        """
        with self.lock:
            pending_tasks = [task for task in self.tasks if not task.done()]
            return pending_tasks

    def get_task_results(self) -> List[Any]:
        """Get the results of all completed tasks.

        Returns:
            A list of results from the completed tasks.

        Raises:
            Exception: This would be expected if the task fails to complete or
            if is cancelled.
        """
        completed_tasks = self.get_completed_tasks()
        results = []
        for task in completed_tasks:
            try:
                results.append(task.result())
            except Exception as e:
                results.append(e)
        return results

    def wait(self) -> List[Any]:
        """Wait for all submitted tasks to complete and return their results.

        Returns:
            A list of results from all completed tasks.
        """
        # Wait for all tasks to complete
        concurrent.futures.wait(self.tasks)

        # Collect results from all tasks
        results = []
        for task in self.tasks:
            try:
                results.append(task.result())
            except Exception as e:
                results.append(e)

        return results

__init__(max_workers=5, use_processes=False)

Initialize the AsyncWorkQueue.

Parameters:

Name Type Description Default
max_workers int

The maximum number of worker threads or processes.

5
use_processes bool

If True, use ProcessPoolExecutor; otherwise, use ThreadPoolExecutor.

False
Source code in bionemo/scdl/util/async_worker_queue.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, max_workers: int = 5, use_processes: bool = False) -> None:
    """Initialize the AsyncWorkQueue.

    Args:
        max_workers: The maximum number of worker threads or processes.
        use_processes: If True, use ProcessPoolExecutor; otherwise, use ThreadPoolExecutor.
    """
    self.use_processes = use_processes
    if use_processes:
        self.executor: Union[concurrent.futures.ThreadPoolExecutor, concurrent.futures.ProcessPoolExecutor] = (
            concurrent.futures.ProcessPoolExecutor(max_workers=max_workers)
        )
    else:
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
    self.lock = threading.Lock()
    self.tasks: List[concurrent.futures.Future] = []

get_completed_tasks()

Get the list of completed tasks.

Returns:

Type Description
List[Future]

A list of Future objects that are completed.

Source code in bionemo/scdl/util/async_worker_queue.py
69
70
71
72
73
74
75
76
77
def get_completed_tasks(self) -> List[concurrent.futures.Future]:
    """Get the list of completed tasks.

    Returns:
        A list of Future objects that are completed.
    """
    with self.lock:
        completed_tasks = [task for task in self.tasks if task.done()]
        return completed_tasks

get_pending_tasks()

Get the list of pending tasks.

Returns:

Type Description
List[Future]

A list of Future objects that are not yet completed.

Source code in bionemo/scdl/util/async_worker_queue.py
79
80
81
82
83
84
85
86
87
def get_pending_tasks(self) -> List[concurrent.futures.Future]:
    """Get the list of pending tasks.

    Returns:
        A list of Future objects that are not yet completed.
    """
    with self.lock:
        pending_tasks = [task for task in self.tasks if not task.done()]
        return pending_tasks

get_task_results()

Get the results of all completed tasks.

Returns:

Type Description
List[Any]

A list of results from the completed tasks.

Raises:

Type Description
Exception

This would be expected if the task fails to complete or

Source code in bionemo/scdl/util/async_worker_queue.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def get_task_results(self) -> List[Any]:
    """Get the results of all completed tasks.

    Returns:
        A list of results from the completed tasks.

    Raises:
        Exception: This would be expected if the task fails to complete or
        if is cancelled.
    """
    completed_tasks = self.get_completed_tasks()
    results = []
    for task in completed_tasks:
        try:
            results.append(task.result())
        except Exception as e:
            results.append(e)
    return results

shutdown(wait=True)

Shutdown the executor and wait for the tasks to complete.

Parameters:

Name Type Description Default
wait bool

If True, wait for all tasks to complete before shutting down.

True
Source code in bionemo/scdl/util/async_worker_queue.py
61
62
63
64
65
66
67
def shutdown(self, wait: bool = True) -> None:
    """Shutdown the executor and wait for the tasks to complete.

    Args:
        wait: If True, wait for all tasks to complete before shutting down.
    """
    self.executor.shutdown(wait=wait)

submit_task(func, *args, **kwargs)

Submit a task to the work queue.

Parameters:

Name Type Description Default
func Callable[..., Any]

The function to be executed asynchronously.

required
args Any

Positional arguments to pass to the function.

()
kwargs Any

Keyword arguments to pass to the function.

{}

Returns:

Name Type Description
Future Future

placeholder for the asynchronous operation.

Source code in bionemo/scdl/util/async_worker_queue.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def submit_task(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> concurrent.futures.Future:
    """Submit a task to the work queue.

    Args:
        func: The function to be executed asynchronously.
        args: Positional arguments to pass to the function.
        kwargs: Keyword arguments to pass to the function.
        A Future object representing the execution of the function.

    Returns:
        Future: placeholder for the asynchronous operation.
    """
    with self.lock:
        future = self.executor.submit(func, *args, **kwargs)
        self.tasks.append(future)
        return future

wait()

Wait for all submitted tasks to complete and return their results.

Returns:

Type Description
List[Any]

A list of results from all completed tasks.

Source code in bionemo/scdl/util/async_worker_queue.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def wait(self) -> List[Any]:
    """Wait for all submitted tasks to complete and return their results.

    Returns:
        A list of results from all completed tasks.
    """
    # Wait for all tasks to complete
    concurrent.futures.wait(self.tasks)

    # Collect results from all tasks
    results = []
    for task in self.tasks:
        try:
            results.append(task.result())
        except Exception as e:
            results.append(e)

    return results