tasks
Task
from dara.core.internal.tasks import Task
class Task(BaseTask)
The task class represents a task to be executed in a subprocess, and provides an API for executing the task in an async or synchronous fashion.
Methods for reading from the subprocess and writing to it are also provided.
Methods
__init__
def __init__(func: Callable,
args: Union[List[Any], None] = None,
kwargs: Union[Dict[str, Any], None] = None,
reg_entry: Optional[CachedRegistryEntry] = None,
notify_channels: Optional[List[str]] = None,
cache_key: Optional[str] = None,
task_id: Optional[str] = None,
on_progress: Optional[Callable[[TaskProgressUpdate],
Union[None,
Awaitable[None]]]] = None)
Arguments:
func
: The function to execute within the processreg_entry
: The associated registry entry for this taskargs
: The arguments to pass to that functionkwargs
: The keyword arguments to pass to that functionnotify_channels
: If this task is run in a TaskManager instance these channels will also be notified on completioncache_key
: Optional cache key if there is a PendingTask in the store associated with this tasktask_id
: Optional task_id to set for the task - otherwise the task generates its id automatically
run
async def run(
send_stream: Optional[MemoryObjectSendStream[TaskMessage]] = None
) -> Any
Run the task asynchronously, and await its' end.
Arguments:
send_stream
: The stream to send messages to the task manager on
cancel
async def cancel()
Cancel the task.
MetaTask
from dara.core.internal.tasks import MetaTask
class MetaTask(BaseTask)
A MetaTask represents a task that is dependant on the results of a number of other tasks. It exposes an async wrapper around the other tasks, waits for them to complete and then applies the process_result function to the results.
Methods
__init__
def __init__(process_result: Callable[..., Any],
args: Optional[List[Any]] = None,
kwargs: Optional[Dict[str, Any]] = None,
reg_entry: Optional[CachedRegistryEntry] = None,
notify_channels: Optional[List[str]] = None,
process_as_task: bool = False,
cache_key: Optional[str] = None,
task_id: Optional[str] = None)
Arguments:
result
(process
): A function to process the result of the other tasksreg_entry
: The associated registry entry for this taskargs
: The arguments to pass to that functionkwargs
: The keyword arguments to pass to that functionnotify_channels
: If this task is run in a TaskManager instance these channels will also be notified on completionprocess_as_task
: Whether to run the process_result function as a task or not, defaults to Falsecache_key
: Optional cache key if there is a PendingTask in the store associated with this tasktask_id
: Optional task_id to set for the task - otherwise the task generates its id automatically
run
async def run(
send_stream: Optional[MemoryObjectSendStream[TaskMessage]] = None)
Run any tasks found in the arguments to completion, collect the results and then call the process result
function as a further task with a resultant arguments
Arguments:
send_stream
: The stream to send messages to the task manager on
cancel
async def cancel()
Cancel the tasks underneath
TaskManager
from dara.core.internal.tasks import TaskManager
class TaskManager()
TaskManager is responsible for running tasks and managing their pending state. It is also responsible for communicating the state of tasks to the client via the WebsocketManager.
When a task is run, a PendingTask it is stored in the tasks dict. It is also stored in the store with the key of the task's cache_key. This allows the task to be retrieved by the cache_key from the store.
When a task is completed, it is removed from the tasks dict and the store entry is updated with the result.
When a task is cancelled, it is removed from the tasks dict and the store entry is updated with None.
Methods
run_task
async def run_task(task: BaseTask, ws_channel: Optional[str] = None)
Run a task and store it in the tasks dict
Arguments:
task
: Task to runws_channel
: Websocket channel to send task updates to
cancel_task
async def cancel_task(task_id: str, notify: bool = True)
Cancel a running task by its id
Arguments:
task_id
: the id of the tasknotify
: whether to notify, true by default
cancel_all_tasks
async def cancel_all_tasks()
Cancel all the currently running tasks, useful for cleaning up on app shutdown
get_result
async def get_result(task_id: str)
Fetch the result of a task by its id
Arguments:
task_id
: the id of the task to fetch
set_result
async def set_result(task_id: str, value: Any)
Set the result of a task by its id
TaskResultEntry
Global registry entry for task results. This is global because task ids are unique and accessed one time only so it's effectively a one-time use random key.