Skip to main content

Reliability & Validation

This codebase provides a robust framework for ensuring workflow reliability through multi-layered data validation, granular error handling, and rich execution reporting. These features are designed to handle the distributed and often unpredictable nature of cloud-native task execution.

Data Validation with Pandera

The SDK integrates with Pandera to provide schema enforcement for dataframes. This ensures that data flowing between tasks adheres to expected formats and types, catching data quality issues early in the pipeline.

When a validation fails, the system raises a RuntimeDataValidationError (found in src/flyte/errors.py), which provides detailed information about which variable failed serialization or deserialization and why.

import pandas as pd
import pandera as pa
from flyte import env

# Define a schema for validation
schema = pa.DataFrameSchema({
"column1": pa.Column(int),
"column2": pa.Column(str),
})

@env.task
async def validate_data(df: pd.DataFrame) -> Annotated[pd.DataFrame, schema]:
# The return value is automatically validated against the schema
return df

Workflow Reliability & Error Recovery

Reliability is managed through a combination of retry strategies and a comprehensive exception hierarchy that allows for sophisticated recovery logic.

Exception Hierarchy

The codebase defines a structured exception hierarchy in src/flyte/errors.py rooted at BaseRuntimeError. This allows tasks to distinguish between different failure modes:

  • RuntimeUserError: Errors caused by user code (e.g., ValueError, KeyError).
  • OOMError: Specifically raised when a task execution fails due to out-of-memory conditions.
  • TaskTimeoutError: Raised when a task exceeds its allocated execution time.
  • RetriesExhaustedError: Raised when all retry attempts for a task have failed.

Error Recovery and Resource Overrides

Tasks can catch specific Flyte errors to implement custom recovery logic. A powerful pattern in this codebase is catching an OOMError and retrying the task with increased resources using the .override() method.

from flyte import env, errors, Resources

@env.task
async def memory_intensive_task(data: int):
# Logic that might exceed memory limits
...

@env.task
async def workflow_manager():
try:
await memory_intensive_task(100)
except errors.OOMError:
# Retry with more memory if the first attempt failed due to OOM
await memory_intensive_task.override(
resources=Resources(cpu=2, memory="4Gi")
)(100)

Retry Strategies

The RetryStrategy class in src/flyte/_retry.py allows for configuring how many times a task should be retried. This can be applied directly to the @task decorator.

from flyte import env, RetryStrategy

@env.task(retries=RetryStrategy(count=3))
async def flaky_task():
# This task will be retried up to 3 times on failure
...

Rich Execution Reporting

For visibility into complex executions, the Report and Tab classes in src/flyte/report/_report.py allow tasks to generate rich HTML reports. These reports are flushed to remote storage and can be viewed in the Flyte UI.

Tasks must be decorated with report=True to enable this feature. You can log HTML snippets to specific tabs using flyte.report.get_tab(name).

import flyte.report

@env.task(report=True)
async def analysis_task():
# Log to the default 'main' tab
await flyte.report.log("<p>Starting analysis...</p>")

# Create and log to a custom tab
stats_tab = flyte.report.get_tab("Statistics")
stats_tab.log("<table>...</table>")

# Manually flush the report to remote storage
await flyte.report.flush.aio()

Robust Data Handling

The File and Dir classes provide deterministic and safe ways to handle data, especially during task retries.

Deterministic Paths with named_remote

When a task is retried, generating random paths for output files can lead to orphaned data or inconsistencies. File.named_remote(name) in src/flyte/io/_file.py ensures that the remote path is derived deterministically from the provided name and the task context. This makes the write operation idempotent across retries.

from flyte.io import File

@env.task
async def idempotent_write():
# This path is stable across retries of this specific task attempt
file = File.named_remote("results.csv")
async with file.open("wb") as f:
await f.write(b"data")
return file

Handling Optional Directories

Using Optional[Dir] can sometimes cause issues with serialization. The Dir class in src/flyte/io/_dir.py provides a sentinel pattern using Dir.empty() and the is_empty property. This allows tasks to signal that no directory was produced in a way that round-trips cleanly through the Flyte type engine.

from flyte.io import Dir

@env.task
async def conditional_dir_producer(condition: bool) -> Dir:
if condition:
return await Dir.from_local("/tmp/data")
return Dir.empty()

@env.task
async def consumer(d: Dir):
if d.is_empty:
print("No directory provided")
return
# Process directory

Async vs. Sync IO

To maintain reliability and performance in async tasks, the codebase provides both async and sync interfaces for File and Dir. Using sync methods like open_sync or download_sync inside an async task is discouraged as it blocks the event loop; the system may raise OnlyAsyncIOSupportedError in certain contexts to enforce this.