Skip to main content

Task Reliability: Retries & Timeouts

To ensure robust task execution in distributed environments, you can configure automatic retry strategies for transient failures and execution timeouts for long-running tasks.

Configuring Retries

You can define how many times a task should be retried by passing an integer or a RetryStrategy object to the @task decorator.

import flyte
from flyte import RetryStrategy

env = flyte.TaskEnvironment(name="reliability-demo")

# Simple retry configuration using an integer
@env.task(retries=3)
async def simple_retry_task(x: int) -> int:
return x + 1

# Explicit retry configuration using RetryStrategy
@env.task(retries=RetryStrategy(count=5))
async def explicit_retry_task(x: int) -> int:
return x * 2

RetryStrategy Details

The RetryStrategy class (found in src/flyte/_retry.py) is a simple dataclass that currently manages the retry count:

  • count: The number of times the task will be retried upon failure.

When you pass an integer to the retries parameter, the SDK internally converts it into a RetryStrategy(count=N).

Configuring Timeouts

Timeouts prevent tasks from running indefinitely or waiting too long in a queue. You can specify timeouts using integers (seconds), timedelta objects, or the Timeout class.

from datetime import timedelta
import flyte
from flyte import Timeout

env = flyte.TaskEnvironment(name="timeout-demo")

# Using an integer (seconds)
@env.task(timeout=60)
async def quick_task():
pass

# Using a timedelta for better readability
@env.task(timeout=timedelta(minutes=5))
async def medium_task():
pass

# Using the Timeout class for granular control
timeout_config = Timeout(
max_runtime=timedelta(minutes=10),
max_queued_time=timedelta(hours=1)
)

@env.task(timeout=timeout_config)
async def long_running_task():
pass

Timeout Class Parameters

The Timeout class (found in src/flyte/_timeout.py) allows you to distinguish between execution time and wait time:

  • max_runtime: The maximum time a single attempt of the task is allowed to run.
  • max_queued_time: The maximum time the task can stay in the queue before it starts executing.

Overriding Settings at Runtime

You can dynamically override retry and timeout settings for a specific task invocation using the .override() method. This is useful when you know a specific call requires more resources or time than the default.

import flyte
from flyte import RetryStrategy

@env.task(retries=1, timeout=30)
async def standard_task(data: str):
# ... processing ...
pass

async def workflow():
# Override defaults for a specific high-priority or heavy call
result = await standard_task.override(
retries=RetryStrategy(count=10),
timeout=600 # 10 minutes
)(data="heavy_payload")

Troubleshooting

Internal Conversions

The SDK automatically normalizes inputs for the retries and timeout parameters:

  • An int passed to retries becomes RetryStrategy(count=int).
  • An int or timedelta passed to timeout becomes a Timeout object where the value is assigned to max_runtime.

Serialization Limits

While the Timeout class supports max_queued_time, the underlying Flyte IDL (Interface Definition Language) primarily utilizes the max_runtime (seconds) during task serialization and execution on the Flyte platform. Ensure your critical logic relies on max_runtime for execution bounds.

Task Environment Defaults

If you define retries or timeouts in a TaskEnvironment, they serve as the default for all tasks associated with that environment unless specifically overridden in the @env.task decorator or via .override().

# Default for all tasks in this environment is 2 retries
reliable_env = flyte.TaskEnvironment(name="prod", retries=2)

@reliable_env.task
async def inherits_retries():
pass

@reliable_env.task(retries=5) # Overrides environment default
async def custom_retries():
pass