channel
_PoolAPI
from dara.core.internal.pool.channel import _PoolAPI
class _PoolAPI()
API for the TaskPool to communicate with the workers
Methods
dispatch
def dispatch(task: TaskDefinition)
Dispatch a task description message for any worker to pick up
Arguments:
task
: task definition to dispatch
get_worker_message
def get_worker_message() -> Optional[WorkerMessage]
Retrieve a worker message if there is one available
Does not block, returns None if no message available
_WorkerAPI
from dara.core.internal.pool.channel import _WorkerAPI
class _WorkerAPI()
API for workers to communicate with the TaskPool
Methods
initialize_worker
def initialize_worker()
Confirm a worker has been initialized
acknowledge
def acknowledge(task_uid: str)
Acknowledge a task has been received and accepted
Arguments:
task_uid
: uid of the task to acknowledge
send_result
def send_result(task_uid: str, result: SharedMemoryPointer)
Send a result of a given task
Arguments:
task_uid
: uid of the task to send result forresult
: pointer to shared memory storing the result
send_error
def send_error(task_uid: Optional[str], error: BaseException)
Send an error back to the pool
Wraps the error in a SubprocessException to serialize the traceback
Arguments:
task_uid
: optional uid of the task to send error for. If not specified, the error is a generic one not related to a specific taskerror
: exception to raise
log
def log(task_uid: str, log: str)
Pass a log message to the pool to put into the logger in the main process
Arguments:
task_uid
: uid of the task to send logs forlog
: message to log
send_progress
def send_progress(task_uid: str, progress: float, message: str)
Send a progress notification
Arguments:
task_uid
: uid of the task to send progress update forprogress
: progress from 0-100 to sendmessage
: progress messsage to send
get_task
def get_task() -> Optional[WorkerTask]
Retrieve a task definition from the worker queue if there is one available
Does not block, returns None if no message available
Channel
from dara.core.internal.pool.channel import Channel
class Channel()
A communication channel allowing bidirectional communication between TaskPool and worker processes via two queues