Automated Triggers and Schedules
Automated triggers allow you to execute Flyte tasks on a recurring schedule or in response to external events. In this SDK, triggers are defined as part of the task environment and can be managed programmatically through the remote interface.
Defining Triggers
Triggers are defined using the flyte.Trigger class (found in src/flyte/_trigger.py). A trigger requires a name and an automation schedule, which can be either clock-based (Cron) or interval-based (FixedRate).
Cron Schedules
The Cron class supports standard five-field cron expressions and allows specifying a timezone.
import flyte
# Run every day at midnight London time
daily_trigger = flyte.Trigger(
name="daily_cleanup",
automation=flyte.Cron("0 0 * * *", timezone="Europe/London"),
description="Runs daily cleanup at midnight"
)
Fixed Rate Schedules
The FixedRate class defines an interval in minutes. Unlike Cron, it does not depend on specific clock times.
# Run every 45 minutes
interval_trigger = flyte.Trigger(
name="frequent_check",
automation=flyte.FixedRate(interval_minutes=45)
)
Convenience Methods
The Trigger class provides several static methods for common schedules, which automatically handle the underlying Cron or FixedRate configuration:
Trigger.minutely()Trigger.hourly()Trigger.daily()Trigger.weekly()Trigger.monthly()
Binding Triggers to Tasks
Triggers are associated with tasks via the triggers parameter in the @env.task decorator. You can provide a single trigger or a tuple of triggers.
from datetime import datetime
import flyte
env = flyte.TaskEnvironment(name="production")
@env.task(triggers=flyte.Trigger.hourly())
def my_scheduled_task(trigger_time: datetime):
print(f"Executing scheduled run for {trigger_time}")
Passing the Execution Time
To access the scheduled execution time within your task, use the flyte.TriggerTime sentinel in the trigger's inputs dictionary. This binds the trigger's timestamp to a specific task input parameter.
custom_trigger = flyte.Trigger(
name="parameterized_trigger",
automation=flyte.Cron("0 12 * * *"),
inputs={
"scheduled_at": flyte.TriggerTime,
"mode": "production"
}
)
@env.task(triggers=custom_trigger)
def process_data(scheduled_at: datetime, mode: str):
# scheduled_at will contain the time the trigger was supposed to fire
...
Advanced Configuration
Custom Context and Metadata
Triggers can propagate metadata to the task execution using custom_context. This is useful for passing operational metadata like team names or alert channels that the task can read at runtime via flyte.ctx().custom_context.
# Defined in examples/triggers/custom_context.py
minutely_monitor = flyte.Trigger.minutely(
trigger_time_input_key="trigger_time",
name="minutely_monitor",
custom_context={"team": "infra", "alert_channel": "#ops"},
)
@env.task(triggers=minutely_monitor)
async def monitor_task(trigger_time: datetime) -> str:
ctx = flyte.ctx().custom_context
return f"Monitor ran for team={ctx.get('team')}"
Notifications
You can configure automated notifications (Slack, Email, or Webhooks) that trigger based on the execution phase of the scheduled run.
# Defined in examples/triggers/notifications.py
trig = flyte.Trigger(
name="alerting_trigger",
automation=flyte.Cron("0 * * * *"),
notifications=flyte.notify.Slack(
on_phase=flyte.models.ActionPhase.FAILED,
webhook_url="https://hooks.slack.com/...",
message="Scheduled task {task.name} failed!"
),
)
Remote Management
While triggers are often defined in code, they are managed on the Flyte cluster using the flyte.remote.Trigger class (found in src/flyte/remote/_trigger.py). This class provides methods to interact with the TriggerService.
Creating and Deploying
The Trigger.create method deploys a trigger specification to the cluster and associates it with an existing task.
from flyte.remote import Trigger
import flyte
# Define the spec
spec = flyte.Trigger(name="remote_trigger", automation=flyte.Cron("0 0 * * *"))
# Deploy to the cluster for a specific task
remote_trigger = Trigger.create(spec, task_name="my_existing_task")
Listing and Retrieving
You can list all triggers for a project or filter them by task name.
# List all triggers for a task
triggers = Trigger.listall(task_name="my_task")
async for t in triggers:
print(f"Trigger: {t.name}, Active: {t.is_active}")
# Get details for a specific trigger
details = Trigger.get(name="daily_cleanup", task_name="my_task")
print(f"Status: {details.status}")
Pausing and Deleting
Triggers can be paused (deactivated) or deleted entirely using the remote interface.
# Pause a trigger
Trigger.update(name="daily_cleanup", task_name="my_task", active=False)
# Delete a trigger
Trigger.delete(name="daily_cleanup", task_name="my_task")
The TriggerDetails class provides additional runtime information, including the current status and metadata of the trigger as stored on the platform.