task_pool
TaskPool
from dara.core.internal.pool.task_pool import TaskPool
class TaskPool()
Custom Pool implementation exposing asynchronous APIs for submitting jobs to worker processes
Attributes
- task_group: TaskGroup
- status: PoolStatus
- max_workers: int
- worker_timeout: float
- worker_parameters: WorkerParameters
- workers: Dict[int, WorkerProcess]
- tasks: Dict[str, TaskDefinition]
Methods
worker_timeout
Number of seconds worker is allowed to be idle before it is killed, if there are too many workers alive
running_tasks
@property
def running_tasks()
Get tasks which are currently running.
desired_workers
@property
def desired_workers()
Get the desired number of workers based on the current workload
start
async def start(timeout: float = 5)
Starts the pool and its workers
submit
def submit(task_uid: str,
function_name: str,
args: tuple = (),
kwargs: dict = {}) -> TaskDefinition
Submit a new task to the pool
Arguments:
task_uid
: unique identifier of the taskfunction_name
: name of the function within configured task module to runargs
: list of arguments to pass to the functionkwargs
: dict of kwargs to pass to the function
cancel
async def cancel(task_uid: str)
Cancel a task
Arguments:
task_uid
: uid of the task to cancel
on_progress
@contextmanager
def on_progress(task_uid: str, handler: Callable[[float, str], Coroutine])
Subscribe to progress updates for a given task
Arguments:
task_uid
: uid of the task to subscribe to updates forhandler
: handler to call whenever there is a progress update
close
def close()
Prevents any more tasks from being submitted to the pool.
Does not terminate the workers.
stop
async def stop()
Immediately stops the pool from handling the workers and terminates the workers.
Waits for the workers to finish.
join
async def join(timeout: Optional[float] = None)
Join the pool and wait for workers to complete
If pool is not closed, closes the pool first. If pool is not stopped, stops the pool first.
Arguments:
timeout
: optional time to wait for existing tasks to complete
__aenter__
async def __aenter__()
Enable TaskPool to be used with an async with
block
Implicitly starts the pool and waits for it to be running
__aexit__
async def __aexit__(exc_t, exc_v, exc_tb)
Enable TaskPool to be used with an async with
block
Implicitly stops the pool and waits for it to be completed
shutdown
def shutdown()
Shut down leftover child processes