Ask AI

You are viewing an unreleased or outdated version of the documentation

Automating ops using schedules and jobs#

In this guide, we'll walk you through running ops on a schedule. To do this for asset definitions, refer to the Automating assets using schedules guide.

By the end of this guide, you'll be able to:

  • Create a job that executes ops
  • Create a schedule
  • Add the new job and schedule to your project's Definitions object
  • Turn the schedule on

Prerequisites#

To follow this guide, you'll need:


Step 1: Create a job#

The first step in creating a schedule is to build a job that executes some ops.

Let's assume we already have a few ops in our project. To create a job that executes the ops, we'll use the @job decorator to define the job:

@op
def count_orders():
    return 5


@op
def count_users(arg):
    return arg + 1


@job
def ecommerce_job():
    count_users(count_orders())

To create the job, we:

  1. Imported job
  2. Constructed the job using the @job decorator and name it ecommerce_job
  3. Within the @job function's body, we used function calls to define dependencies between the count_orders and count_users ops

Refer to the Op jobs documentation for more info and examples.


Step 2: Define the schedule#

Next, we'll construct the schedule using ScheduleDefinition and attach it to the job we created in Step 1.

ecommerce_schedule = ScheduleDefinition(
    job=ecommerce_job,
    cron_schedule="15 5 * * 1-5",
    default_status=DefaultScheduleStatus.RUNNING,
)

To build the schedule, we:

  1. Imported DefaultScheduleStatus and ScheduleDefinition from dagster

  2. Created a schedule using ScheduleDefinition that:

    • Is attached to the ecommerce_job job
    • Has a cron expression of 15 5 * * 1-5, which translates to Every Monday through Friday of every month at 5:15AM
    • Is turned on by default (default_status). We'll discuss this more in Step 4.

Step 3: Update the Definitions object#

Next, we'll update our project's Definitions object to include the new job and schedule. This ensures the job and schedule are available to Dagster processes, such as the Dagster UI.

defs = Definitions(
    jobs=[ecommerce_job],
    schedules=[ecommerce_schedule],
)

At this point, your code should look like the following:

from dagster import job, op, DefaultScheduleStatus, Definitions, ScheduleDefinition


@op
def count_orders():
    return 5


@op
def count_users(arg):
    return arg + 1


@job
def ecommerce_job():
    count_users(count_orders())


ecommerce_schedule = ScheduleDefinition(
    job=ecommerce_job,
    cron_schedule="15 5 * * 1-5",
    default_status=DefaultScheduleStatus.RUNNING,
)


defs = Definitions(
    jobs=[ecommerce_job],
    schedules=[ecommerce_schedule],
)

Step 4: Turn the schedule on#

turned the schedule on by using the default_status parameter in its ScheduleDefinition, but there are a few other ways to do this:

Heads up! Starting or stopping a schedule in the UI will override any default status set in code.

To turn on a schedule in the Dagster UI, navigate to Overview > Schedules:

Schedules tab in the Dagster UI

After the schedule is started, it will begin executing immediately if the dagster-daemon process is running. This process starts automatically when dagster dev is run.


APIs in this guide#

NameDescription
@opA decorator used to define ops. Returns an OpDefinition. The decorated function is called the "compute function".
@jobThe decorator used to define a job.
ScheduleDefinitionA class that defines a schedule and attaches it to a job.
DefinitionsThe object that contains all the definitions defined within a code location. Definitions include assets, jobs, resources, schedules, and sensors.