Skip to main content

scheduler

ScheduledJob

from dara.core.internal.scheduler import ScheduledJob

class ScheduledJob(BaseModel)

Attributes

  • interval: Union[int, List[int]]
  • continue_running: bool
  • first_execution: bool
  • run_once: bool

Methods

__init__

def __init__(interval: Union[int, list], run_once=False, **kwargs)

Creates a ScheduledJob object

Arguments:

  • interval: The interval between job executions, in seconds
  • run_once: Whether to run the job only once

do

def do(func, args=None) -> BaseProcess

Starts the scheduled job process and returns it to the caller. Process is daemonized,

this ensures that it will be terminated by the parent process on parent exit.

Arguments:

  • func: The function to be called within the process, must be possible to pickle
  • args: List of arguments to pass to the function

CronScheduledJob

from dara.core.internal.scheduler import CronScheduledJob

class CronScheduledJob(ScheduledJob)

Attributes

  • crondef: str

Methods

__init__

def __init__(crondef: str)

Creates a CronScheduledJob object

Arguments:

  • crondef: The cron schedule expression to run the job according to.

do

def do(func, args=None) -> BaseProcess

Starts the scheduled job process and returns it to the caller. Process is daemonized,

this ensures that it will be terminated by the parent process on parent exit.

Arguments:

  • func: The function to be called within the process, must be possible to pickle
  • args: List of arguments to pass to the function

TimeScheduledJob

from dara.core.internal.scheduler import TimeScheduledJob

class TimeScheduledJob(ScheduledJob)

Attributes

  • job_time: datetime

Methods

__init__

def __init__(interval: Union[int, list], job_time: str, run_once=False)

Creates a TimeScheduledJob object

Arguments:

  • interval: The interval between job executions, in seconds
  • job_time: A string containing the time at which to execute this job. Must be formatted %H:%M

do

def do(func, args=None) -> BaseProcess

Starts the scheduled job process and returns it to the caller. Process is daemonized,

this ensures that it will be terminated by the parent process on parent exit.

Arguments:

  • func: The function to be called within the process, must be possible to pickle
  • args: List of arguments to pass to the function

ScheduledJobFactory

from dara.core.internal.scheduler import ScheduledJobFactory

class ScheduledJobFactory(BaseModel)

Factory class for creating different ScheduledJob objects depending on the methods called.

Arguments:

  • interval: The interval of time to wait between job executions
  • run_once: Whether the job should be run only once

Attributes

  • interval: Union[int, list]
  • continue_running: bool
  • weekday: Optional[datetime]
  • run_once: bool

Methods

at

def at(job_time: str) -> TimeScheduledJob

If the job must execute at a specific time of day, this function returns a

TimeScheduledJob process which will ensure that the job is executed at the given time.

Arguments:

  • job_time: A string containing the time at which to execute this job. Must be formatted %H:%M. Note that this is UTC time.

do

def do(func, args=None) -> BaseProcess

If the job is scheduled to execute after a specific interval, this function returns a ScheduledJob process,

which will ensure that takes place.

Arguments:

  • func: The function to be pickled and passed to the subprocess to execute
  • args: Any arguments to be passed to the picked function

Scheduler

from dara.core.internal.scheduler import Scheduler

class Scheduler()

Scheduler class, provides a set of methods for scheduling when a job should occur. For example:

Scheduler(interval=1, run_once=True).minute().do(job)

Arguments:

  • interval: The interval of time to wait between job executions
  • run_once: Whether the job should only be run once

Methods

second

def second() -> ScheduledJob

Schedule a job to execute every second

seconds

def seconds() -> ScheduledJob

Schedule a job to execute every x seconds

minute

def minute() -> ScheduledJob

Schedule a job to execute every minute

minutes

def minutes() -> ScheduledJob

Schedule a job to execute every x minutes

hour

def hour() -> ScheduledJob

Schedule a job to execute every hour

hours

def hours() -> ScheduledJob

Schedule a job to execute every x hours

day

def day() -> ScheduledJobFactory

Schedule a job to execute every day

days

def days() -> ScheduledJobFactory

Schedule a job to execute every x days

monday

def monday() -> ScheduledJobFactory

Schedule a job to execute every Monday

tuesday

def tuesday() -> ScheduledJobFactory

Schedule a job to execute every Tuesday

wednesday

def wednesday() -> ScheduledJobFactory

Schedule a job to execute every Wednesday

thursday

def thursday() -> ScheduledJobFactory

Schedule a job to execute every Thursday

friday

def friday() -> ScheduledJobFactory

Schedule a job to execute every Friday

saturday

def saturday() -> ScheduledJobFactory

Schedule a job to execute every Saturday

sunday

def sunday() -> ScheduledJobFactory

Schedule a job to execute every Sunday

every

def every(interval: int = 1) -> Scheduler

Schedule a job to occur every x units of time. For example, scheduler.every().day().at(10:10).do(job).

Arguments:

  • interval: An integer denoting the amount of time to wait between repeat executions of the job.

on

def on(interval: int = 1) -> Scheduler

Schedule a job to occur once only after x units of time. For example, scheduler.on(1).day().at(10:10).do(job).

Arguments:

  • interval: An integer denoting the amount of time to wait between repeat executions of the job.

cron

def cron(crondef: str) -> CronScheduledJob

Schedule a job to occur based on a cron schedule expression. For example scheduler.cron("0 0 * * *").do(job).

Arguments:

  • crondef: The cron schedule expression.