Skip to main content

channel

_PoolAPI

from dara.core.internal.pool.channel import _PoolAPI

class _PoolAPI()

API for the TaskPool to communicate with the workers

Methods

dispatch

def dispatch(task: TaskDefinition)

Dispatch a task description message for any worker to pick up

Arguments:

  • task: task definition to dispatch

get_worker_message

def get_worker_message() -> Optional[WorkerMessage]

Retrieve a worker message if there is one available

Does not block, returns None if no message available

_WorkerAPI

from dara.core.internal.pool.channel import _WorkerAPI

class _WorkerAPI()

API for workers to communicate with the TaskPool

Methods

initialize_worker

def initialize_worker()

Confirm a worker has been initialized

acknowledge

def acknowledge(task_uid: str)

Acknowledge a task has been received and accepted

Arguments:

  • task_uid: uid of the task to acknowledge

send_result

def send_result(task_uid: str, result: SharedMemoryPointer)

Send a result of a given task

Arguments:

  • task_uid: uid of the task to send result for
  • result: pointer to shared memory storing the result

send_error

def send_error(task_uid: Optional[str], error: BaseException)

Send an error back to the pool

Wraps the error in a SubprocessException to serialize the traceback

Arguments:

  • task_uid: optional uid of the task to send error for. If not specified, the error is a generic one not related to a specific task
  • error: exception to raise

log

def log(task_uid: str, log: str)

Pass a log message to the pool to put into the logger in the main process

Arguments:

  • task_uid: uid of the task to send logs for
  • log: message to log

send_progress

def send_progress(task_uid: str, progress: float, message: str)

Send a progress notification

Arguments:

  • task_uid: uid of the task to send progress update for
  • progress: progress from 0-100 to send
  • message: progress messsage to send

get_task

def get_task() -> Optional[WorkerTask]

Retrieve a task definition from the worker queue if there is one available

Does not block, returns None if no message available

Channel

from dara.core.internal.pool.channel import Channel

class Channel()

A communication channel allowing bidirectional communication between TaskPool and worker processes via two queues