Skip to main content

tasks

Task

from dara.core.internal.tasks import Task

class Task(BaseTask)

The task class represents a task to be executed in a subprocess, and provides an API for executing the task in an async or synchronous fashion.

Methods for reading from the subprocess and writing to it are also provided.

Methods

__init__

def __init__(func: Callable,
args: Union[List[Any], None] = None,
kwargs: Union[Dict[str, Any], None] = None,
reg_entry: Optional[CachedRegistryEntry] = None,
notify_channels: Optional[List[str]] = None,
cache_key: Optional[str] = None,
task_id: Optional[str] = None,
on_progress: Optional[Callable[[TaskProgressUpdate],
Union[None,
Awaitable[None]]]] = None)

Arguments:

  • func: The function to execute within the process
  • reg_entry: The associated registry entry for this task
  • args: The arguments to pass to that function
  • kwargs: The keyword arguments to pass to that function
  • notify_channels: If this task is run in a TaskManager instance these channels will also be notified on completion
  • cache_key: Optional cache key if there is a PendingTask in the store associated with this task
  • task_id: Optional task_id to set for the task - otherwise the task generates its id automatically

run

async def run(
send_stream: Optional[MemoryObjectSendStream[TaskMessage]] = None
) -> Any

Run the task asynchronously, and await its' end.

Arguments:

  • send_stream: The stream to send messages to the task manager on

cancel

async def cancel()

Cancel the task.

MetaTask

from dara.core.internal.tasks import MetaTask

class MetaTask(BaseTask)

A MetaTask represents a task that is dependant on the results of a number of other tasks. It exposes an async wrapper around the other tasks, waits for them to complete and then applies the process_result function to the results.

Methods

__init__

def __init__(process_result: Callable[..., Any],
args: Optional[List[Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
reg_entry: Optional[CachedRegistryEntry] = None,
notify_channels: Optional[List[str]] = None,
process_as_task: bool = False,
cache_key: Optional[str] = None,
task_id: Optional[str] = None)

Arguments:

  • result (process): A function to process the result of the other tasks
  • reg_entry: The associated registry entry for this task
  • args: The arguments to pass to that function
  • kwargs: The keyword arguments to pass to that function
  • notify_channels: If this task is run in a TaskManager instance these channels will also be notified on completion
  • process_as_task: Whether to run the process_result function as a task or not, defaults to False
  • cache_key: Optional cache key if there is a PendingTask in the store associated with this task
  • task_id: Optional task_id to set for the task - otherwise the task generates its id automatically

run

async def run(
send_stream: Optional[MemoryObjectSendStream[TaskMessage]] = None)

Run any tasks found in the arguments to completion, collect the results and then call the process result

function as a further task with a resultant arguments

Arguments:

  • send_stream: The stream to send messages to the task manager on

cancel

async def cancel()

Cancel the tasks underneath

TaskManager

from dara.core.internal.tasks import TaskManager

class TaskManager()

TaskManager is responsible for running tasks and managing their pending state. It is also responsible for communicating the state of tasks to the client via the WebsocketManager.

When a task is run, a PendingTask it is stored in the tasks dict. It is also stored in the store with the key of the task's cache_key. This allows the task to be retrieved by the cache_key from the store.

When a task is completed, it is removed from the tasks dict and the store entry is updated with the result.

When a task is cancelled, it is removed from the tasks dict and the store entry is updated with None.

Methods

run_task

async def run_task(task: BaseTask, ws_channel: Optional[str] = None)

Run a task and store it in the tasks dict

Arguments:

  • task: Task to run
  • ws_channel: Websocket channel to send task updates to

cancel_task

async def cancel_task(task_id: str, notify: bool = True)

Cancel a running task by its id

Arguments:

  • task_id: the id of the task
  • notify: whether to notify, true by default

cancel_all_tasks

async def cancel_all_tasks()

Cancel all the currently running tasks, useful for cleaning up on app shutdown

get_result

async def get_result(task_id: str)

Fetch the result of a task by its id

Arguments:

  • task_id: the id of the task to fetch

set_result

async def set_result(task_id: str, value: Any)

Set the result of a task by its id

TaskResultEntry

Global registry entry for task results. This is global because task ids are unique and accessed one time only so it's effectively a one-time use random key.