Skip to main content

Scheduled Automation & Triggers

Automating task execution is a core feature of the Flyte SDK, allowing you to run logic at specific intervals or clock times. In this tutorial, you will build a scheduled data pipeline that uses various trigger types to automate task execution.

Prerequisites

To follow this tutorial, you need the flyte package installed and a basic understanding of defining tasks using TaskEnvironment.

import flyte
from datetime import datetime

# Initialize your environment
env = flyte.TaskEnvironment(name="automation-tutorial")

Step 1: Create a Basic Hourly Trigger

The simplest way to automate a task is using predefined convenience methods on the Trigger class. You will create a task that runs every hour and receives the scheduled execution time as an input.

@env.task(triggers=flyte.Trigger.hourly(trigger_time_input_key="execution_time"))
def hourly_maintenance(execution_time: datetime):
print(f"Running maintenance for window starting at {execution_time}")

By passing "execution_time" to Trigger.hourly(), you tell Flyte to bind the scheduled time of the run to that specific function argument. The Trigger class also provides daily(), weekly(), monthly(), and minutely() constructors.

Step 2: Define a Custom Cron Schedule

When predefined intervals aren't enough, you can use standard Cron expressions. This is useful for specific timing, such as running a task at a specific time in a specific timezone.

# Define a trigger that runs every weekday at 9:00 AM New York time
morning_report_trigger = flyte.Trigger(
name="morning-report",
automation=flyte.Cron(
expression="0 9 * * 1-5",
timezone="America/New_York"
),
inputs={"report_date": flyte.TriggerTime}
)

@env.task(triggers=morning_report_trigger)
def generate_report(report_date: datetime):
print(f"Generating report for {report_date.strftime('%Y-%m-%d')}")

The Cron class takes a five-field expression (minute hour day-of-month month day-of-week). Using flyte.TriggerTime in the inputs dictionary allows you to map the schedule's activation time to any task parameter.

Step 3: Use Fixed-Rate Intervals

Unlike Cron, which targets specific clock times, FixedRate ensures a consistent interval between runs regardless of the clock. This is ideal for high-frequency tasks like polling.

# Run every 15 minutes
polling_trigger = flyte.Trigger(
name="fast-poll",
automation=flyte.FixedRate(interval_minutes=15),
description="Polls the external API every 15 minutes"
)

@env.task(triggers=polling_trigger)
def poll_external_service():
# Logic to check for new data
pass

FixedRate is defined in minutes. If you need a task to start at a specific moment and then follow an interval, you can provide an optional start_time to the FixedRate constructor.

Step 4: Configure Notifications and Overrides

Triggers can also manage execution settings and alert you when things go wrong. You can override environment variables or set up Slack/Email notifications specifically for the triggered runs.

from flyte.models import ActionPhase
import flyte.notify

alerting_trigger = flyte.Trigger(
name="critical-sync",
automation=flyte.Cron("0 * * * *"),
env_vars={"SYNC_MODE": "STRICT"},
notifications=flyte.notify.Slack(
on_phase=ActionPhase.FAILED,
webhook_url="https://hooks.slack.com/services/...",
message="Scheduled sync failed for {task.name}!"
)
)

@env.task(triggers=alerting_trigger)
def sync_data():
# This run will have SYNC_MODE=STRICT in its environment
pass

The notifications parameter accepts a single notification or a list of them. The on_phase argument determines which task status (e.g., FAILED, SUCCEEDED) triggers the notification.

Step 5: Combine Multiple Triggers

A single task can be associated with multiple schedules. You can pass a tuple of triggers to the @env.task decorator.

# A task that runs both on a custom cron and every minute
@env.task(triggers=(morning_report_trigger, flyte.Trigger.minutely("report_date")))
def multi_schedule_task(report_date: datetime):
print(f"Triggered at {report_date}")

When you deploy this environment using flyte.deploy(env), all defined triggers are registered. By default, they are set to auto_activate=True, meaning they will begin executing on their schedules as soon as the deployment is successful.

Complete Result

Your automated pipeline now looks like this:

import flyte
from datetime import datetime
from flyte.models import ActionPhase
import flyte.notify

env = flyte.TaskEnvironment(name="production-pipeline")

# 1. Custom Cron with Timezone and Notifications
daily_trigger = flyte.Trigger(
name="daily-cleanup",
automation=flyte.Cron("0 0 * * *", timezone="UTC"),
notifications=flyte.notify.Slack(
on_phase=ActionPhase.FAILED,
webhook_url="https://hooks.slack.com/...",
message="Daily cleanup failed"
)
)

# 2. Fixed Rate for high frequency
fast_trigger = flyte.Trigger(
name="heartbeat",
automation=flyte.FixedRate(5)
)

@env.task(triggers=(daily_trigger, fast_trigger))
def maintenance_task():
print("Performing system maintenance...")

if __name__ == "__main__":
flyte.init_from_config()
flyte.deploy(env)

This setup ensures your maintenance_task runs both at midnight UTC every day and every 5 minutes, with automated alerts if the daily run fails.