Task Reliability: Retries & Timeouts
To ensure robust task execution in distributed environments, you can configure automatic retry strategies for transient failures and execution timeouts for long-running tasks.
Configuring Retries
You can define how many times a task should be retried by passing an integer or a RetryStrategy object to the @task decorator.
import flyte
from flyte import RetryStrategy
env = flyte.TaskEnvironment(name="reliability-demo")
# Simple retry configuration using an integer
@env.task(retries=3)
async def simple_retry_task(x: int) -> int:
return x + 1
# Explicit retry configuration using RetryStrategy
@env.task(retries=RetryStrategy(count=5))
async def explicit_retry_task(x: int) -> int:
return x * 2
RetryStrategy Details
The RetryStrategy class (found in src/flyte/_retry.py) is a simple dataclass that currently manages the retry count:
count: The number of times the task will be retried upon failure.
When you pass an integer to the retries parameter, the SDK internally converts it into a RetryStrategy(count=N).
Configuring Timeouts
Timeouts prevent tasks from running indefinitely or waiting too long in a queue. You can specify timeouts using integers (seconds), timedelta objects, or the Timeout class.
from datetime import timedelta
import flyte
from flyte import Timeout
env = flyte.TaskEnvironment(name="timeout-demo")
# Using an integer (seconds)
@env.task(timeout=60)
async def quick_task():
pass
# Using a timedelta for better readability
@env.task(timeout=timedelta(minutes=5))
async def medium_task():
pass
# Using the Timeout class for granular control
timeout_config = Timeout(
max_runtime=timedelta(minutes=10),
max_queued_time=timedelta(hours=1)
)
@env.task(timeout=timeout_config)
async def long_running_task():
pass
Timeout Class Parameters
The Timeout class (found in src/flyte/_timeout.py) allows you to distinguish between execution time and wait time:
max_runtime: The maximum time a single attempt of the task is allowed to run.max_queued_time: The maximum time the task can stay in the queue before it starts executing.
Overriding Settings at Runtime
You can dynamically override retry and timeout settings for a specific task invocation using the .override() method. This is useful when you know a specific call requires more resources or time than the default.
import flyte
from flyte import RetryStrategy
@env.task(retries=1, timeout=30)
async def standard_task(data: str):
# ... processing ...
pass
async def workflow():
# Override defaults for a specific high-priority or heavy call
result = await standard_task.override(
retries=RetryStrategy(count=10),
timeout=600 # 10 minutes
)(data="heavy_payload")
Troubleshooting
Internal Conversions
The SDK automatically normalizes inputs for the retries and timeout parameters:
- An
intpassed toretriesbecomesRetryStrategy(count=int). - An
intortimedeltapassed totimeoutbecomes aTimeoutobject where the value is assigned tomax_runtime.
Serialization Limits
While the Timeout class supports max_queued_time, the underlying Flyte IDL (Interface Definition Language) primarily utilizes the max_runtime (seconds) during task serialization and execution on the Flyte platform. Ensure your critical logic relies on max_runtime for execution bounds.
Task Environment Defaults
If you define retries or timeouts in a TaskEnvironment, they serve as the default for all tasks associated with that environment unless specifically overridden in the @env.task decorator or via .override().
# Default for all tasks in this environment is 2 retries
reliable_env = flyte.TaskEnvironment(name="prod", retries=2)
@reliable_env.task
async def inherits_retries():
pass
@reliable_env.task(retries=5) # Overrides environment default
async def custom_retries():
pass