Retries and Timeouts
To handle transient failures and execution limits in your Flyte tasks, you can configure retries and timeouts. These settings can be applied at the task definition level, within a task environment, or dynamically overridden at the call site.
Configure Retries
You can specify how many times a task should be retried if it fails due to a transient error.
Simple Retry Count
The most common way to set retries is by passing an integer to the @task decorator.
from flyte import task
@task(retries=3)
def flaky_task(x: int) -> int:
# This task will be attempted up to 4 times (1 original + 3 retries)
return 10 / x
Explicit Retry Strategy
For more formal configuration, use the RetryStrategy class. Currently, this class primarily supports setting the retry count.
from flyte import task, RetryStrategy
@task(retries=RetryStrategy(count=5))
def robust_task():
pass
Configure Timeouts
Timeouts prevent tasks from running indefinitely or waiting in a queue for too long.
Simple Runtime Timeout
You can set a maximum runtime for a single attempt using an integer (seconds) or a timedelta.
from datetime import timedelta
from flyte import task
# Using seconds
@task(timeout=60)
def quick_task():
pass
# Using timedelta
@task(timeout=timedelta(minutes=5))
def long_running_task():
pass
Advanced Timeout Configuration
To control both the execution time and the time a task is allowed to stay in the queue before starting, use the Timeout class.
from datetime import timedelta
from flyte import task, Timeout
# Define limits for both execution and queue wait time
custom_timeout = Timeout(
max_runtime=timedelta(minutes=10),
max_queued_time=timedelta(hours=1)
)
@task(timeout=custom_timeout)
def resource_intensive_task():
pass
Dynamic Overrides
You can change retry and timeout settings for a specific invocation of a task without modifying the original task definition. This is useful when you know a specific execution needs more resources or time.
from flyte import task, RetryStrategy
@task(retries=1, timeout=30)
def my_task(val: int):
print(val)
# Override settings for a specific use case
reliable_task = my_task.override(
retries=RetryStrategy(count=5),
timeout=600 # 10 minutes
)
# The overridden task can now be used in a workflow
Task Environment Defaults
When using a TaskEnvironment, you can define default retry and timeout behaviors for all tasks registered within that environment.
import flyte
from datetime import timedelta
env = flyte.TaskEnvironment(name="production")
# All tasks in this environment inherit these defaults unless they override them
@env.task(retries=2, timeout=timedelta(minutes=2))
async def env_task():
pass
Troubleshooting
Retry Strategy Limitations
The RetryStrategy class in this SDK currently only supports the count parameter. Advanced features like exponential backoff factors or jitter are not yet exposed through this interface.
No Timeout
Setting a timeout to 0 (the default) indicates that there is no timeout enforced by the SDK for that task.
@task(timeout=0)
def infinite_task():
# This task has no runtime limit
pass
Queued Time vs. Runtime
If you use an integer or timedelta directly in the timeout parameter, it only sets the max_runtime. To set max_queued_time, you must use the Timeout object:
from flyte import Timeout
# Correct way to set queued time
timeout = Timeout(max_runtime=300, max_queued_time=600)