Skip to main content

Scheduling and Triggers

To automate task execution on a schedule, define a Trigger and associate it with a task using the @env.task decorator's triggers parameter.

Basic Scheduling with Convenience Methods

The simplest way to schedule a task is using the built-in convenience constructors in the Trigger class, such as hourly(), daily(), or minutely().

import flyte
from datetime import datetime

env = flyte.TaskEnvironment(name="scheduling-demo")

# Runs every hour on the hour
@env.task(triggers=flyte.Trigger.hourly())
def hourly_cleanup(trigger_time: datetime):
print(f"Cleaning up at {trigger_time}")

The convenience methods automatically map the scheduled execution time to a task input if you provide a key:

# Runs every minute and passes the schedule time to 'execution_ts'
@env.task(triggers=flyte.Trigger.minutely(trigger_time_input_key="execution_ts"))
def monitor_task(execution_ts: datetime):
...

Custom Cron Schedules

For complex clock-based schedules, use the Cron class. This supports standard five-field cron expressions and timezone-aware scheduling.

# Define a trigger for 12:01 PM ET every day
nyc_trigger = flyte.Trigger(
name="nyc_noon_sync",
automation=flyte.Cron("1 12 * * *", timezone="America/New_York"),
inputs={"start_time": flyte.TriggerTime, "batch_size": 100},
)

@env.task(triggers=nyc_trigger)
def sync_data(start_time: datetime, batch_size: int):
...

Common Cron patterns supported:

  • "0 * * * *": Every hour (at minute 0)
  • "0 0 * * *": Daily at midnight
  • "*/5 * * * *": Every 5 minutes

Interval-Based Scheduling

Use FixedRate when you need a task to run at a consistent interval (e.g., every 90 minutes) regardless of the clock time.

# Runs every 90 minutes
interval_trigger = flyte.Trigger(
name="interval_check",
automation=flyte.FixedRate(interval_minutes=90),
description="Check system health every 90 minutes"
)

@env.task(triggers=interval_trigger)
def health_check():
...

Passing Schedule Metadata to Tasks

To access the scheduled execution time inside your task, use the flyte.TriggerTime sentinel in the inputs dictionary of your Trigger. This maps the trigger's timestamp to a specific task parameter.

custom_trigger = flyte.Trigger(
name="metadata_trigger",
automation=flyte.Cron("0 0 * * *"),
inputs={
"scheduled_at": flyte.TriggerTime, # Maps trigger time to 'scheduled_at'
"region": "us-east-1" # Static input for triggered runs
}
)

@env.task(triggers=custom_trigger)
def process_region(scheduled_at: datetime, region: str):
...

Advanced Configuration

You can override task-level settings specifically for executions initiated by a trigger. This is useful for running scheduled tasks on specific queues or with different environment variables.

Overriding Execution Parameters

heavy_trigger = flyte.Trigger(
name="heavy_scheduled_run",
automation=flyte.Cron("0 2 * * *"),
queue="high-memory-queue",
interruptible=False, # Ensure scheduled runs aren't preempted
env_vars={"LOG_LEVEL": "DEBUG"},
overwrite_cache=True
)

Multiple Triggers

A single task can be associated with multiple triggers by passing a list or tuple to the decorator.

@env.task(triggers=[
flyte.Trigger.daily(name="daily_sync"),
flyte.Trigger.hourly(name="hourly_patch")
])
def multi_scheduled_task():
...

Trigger Notifications

Configure notifications (Slack, Email, or Webhook) that only fire when the task is executed via a specific trigger.

from flyte.models import ActionPhase

alert_trigger = flyte.Trigger(
name="monitored_trigger",
automation=flyte.Cron("0 * * * *"),
notifications=flyte.notify.Slack(
on_phase=ActionPhase.FAILED,
webhook_url="https://hooks.slack.com/services/...",
message="Scheduled run of {task.name} failed!"
)
)

Troubleshooting

  • Unique Names: Every Trigger must have a unique name. If you reuse a trigger name across different tasks, deployment may fail or lead to unexpected behavior.
  • Description Length: The description field is limited to 255 characters. Longer strings will be automatically truncated.
  • DST Transitions: When using Cron with a specific timezone, be aware that Daylight Savings Time transitions can cause runs to be skipped or duplicated depending on the local clock shift.
  • Activation: By default, triggers are set to auto_activate=True. If you set this to False, the trigger will be created but will not start executing until manually activated via the Flyte platform.