Skip to main content

task_pool

TaskPool

from dara.core.internal.pool.task_pool import TaskPool

class TaskPool()

Custom Pool implementation exposing asynchronous APIs for submitting jobs to worker processes

Attributes

  • task_group: TaskGroup
  • status: PoolStatus
  • max_workers: int
  • worker_timeout: float
  • worker_parameters: WorkerParameters
  • workers: Dict[int, WorkerProcess]
  • tasks: Dict[str, TaskDefinition]

Methods

worker_timeout

Number of seconds worker is allowed to be idle before it is killed, if there are too many workers alive

running_tasks

@property
def running_tasks()

Get tasks which are currently running.

desired_workers

@property
def desired_workers()

Get the desired number of workers based on the current workload

start

async def start(timeout: float = 5)

Starts the pool and its workers

submit

def submit(task_uid: str,
function_name: str,
args: tuple = (),
kwargs: dict = {}) -> TaskDefinition

Submit a new task to the pool

Arguments:

  • task_uid: unique identifier of the task
  • function_name: name of the function within configured task module to run
  • args: list of arguments to pass to the function
  • kwargs: dict of kwargs to pass to the function

cancel

async def cancel(task_uid: str)

Cancel a task

Arguments:

  • task_uid: uid of the task to cancel

on_progress

@contextmanager
def on_progress(task_uid: str, handler: Callable[[float, str], Coroutine])

Subscribe to progress updates for a given task

Arguments:

  • task_uid: uid of the task to subscribe to updates for
  • handler: handler to call whenever there is a progress update

close

def close()

Prevents any more tasks from being submitted to the pool.

Does not terminate the workers.

stop

async def stop()

Immediately stops the pool from handling the workers and terminates the workers.

Waits for the workers to finish.

join

async def join(timeout: Optional[float] = None)

Join the pool and wait for workers to complete

If pool is not closed, closes the pool first. If pool is not stopped, stops the pool first.

Arguments:

  • timeout: optional time to wait for existing tasks to complete

__aenter__

async def __aenter__()

Enable TaskPool to be used with an async with block

Implicitly starts the pool and waits for it to be running

__aexit__

async def __aexit__(exc_t, exc_v, exc_tb)

Enable TaskPool to be used with an async with block

Implicitly stops the pool and waits for it to be completed

shutdown

def shutdown()

Shut down leftover child processes