Data & Type System
Flyte's data and type system is designed around the concept of offloaded types. Instead of passing large datasets directly between tasks, Flyte passes references to data stored in remote object stores (like S3, GCS, or Azure Blob Storage). The core entities—File, Dir, and DataFrame—act as these references, providing a unified interface for both local and remote data access.
Files
The flyte.io.File class represents a single file. It is a generic class where the type parameter T can specify the file format (e.g., File["csv"]).
Creating File References
There are three primary ways to create a File object in src/flyte/io/_file.py:
new_remote(): Generates a random remote path. Use this when you want to stream data directly to the cloud without creating a local file first.from_local(): Uploads an existing local file to remote storage and returns a reference to the new remote location.from_existing_remote(): Creates a reference to a file that already exists in remote storage.
Reading and Writing
File provides both asynchronous and synchronous APIs. Asynchronous methods are preferred in async tasks for better performance.
@env.task
async def process_file(input_file: File) -> File:
# Reading from a file
async with input_file.open("rb") as f:
# Note: .read() returns a memoryview, convert to bytes if needed
content = bytes(await f.read()).decode("utf-8")
# Writing to a new remote file
output_file = File.new_remote()
async with output_file.open("wb") as f:
await f.write(f"Processed: {content}".encode("utf-8"))
return output_file
Stability Across Retries
In distributed systems, tasks may be retried. If a task generates a random path using new_remote() and fails after uploading, a retry will generate a different path. To ensure stability, use named_remote(name: str). This method derives a deterministic path based on the task execution context, ensuring that retries point to the same location.
Directories
The flyte.io.Dir class handles collections of files. Like File, it supports from_local(), from_existing_remote(), and both sync/async operations.
Iterating vs. Downloading
You can interact with a directory in two ways:
- Streaming Iteration: Use
walk()orlist_files()to iterate overFileobjects within the directory. This is efficient because it doesn't require downloading the entire directory. - Full Download: Use
download()to bring the entire directory structure to the local filesystem. This is useful when using legacy tools that require local path access.
@env.task
async def process_directory(d: Dir) -> int:
count = 0
# Efficiently walk through files without downloading the whole directory
async for file in d.walk(recursive=True):
count += 1
return count
Handling Optional Directories
Standard Python Optional[Dir] can sometimes encounter issues during serialization. This codebase provides a sentinel pattern using Dir.empty() and the is_empty property in src/flyte/io/_dir.py:
@env.task
def maybe_output(condition: bool) -> Dir:
if condition:
return Dir.from_local_sync("/tmp/data")
return Dir.empty()
@env.task
def consumer(d: Dir):
if d.is_empty:
return "No data provided"
# ... process data
DataFrames
The flyte.io.DataFrame class is a meta-dataframe object. It doesn't implement dataframe logic itself but wraps various implementations like Pandas, Polars, PyArrow, or Spark.
Interoperability
The primary goal of flyte.io.DataFrame is to allow tasks written with different libraries to communicate. A task can return a pandas.DataFrame wrapped in a Flyte DataFrame, and a subsequent task can consume it as a polars.DataFrame.
Lazy Evaluation and Materialization
Flyte DataFrames are lazy by default. When a task receives a DataFrame, it is just a reference (URI). Data is only materialized when you explicitly call .all() or .iter(). You must use .open(type) to specify which local representation you want.
@env.task
async def consume_dataframe(fdf: flyte.io.DataFrame):
# Specify that we want to work with Pandas
# .all() materializes the entire dataset
df = await fdf.open(pd.DataFrame).all()
print(df.head())
Wrapping Native DataFrames
To pass a native dataframe (like Pandas) as a Flyte DataFrame, use wrap_df() or from_local().
@env.task
def create_df() -> flyte.io.DataFrame:
df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
return flyte.io.DataFrame.wrap_df(df)
Content-Based Caching with HashMethod
For tasks where caching depends on the content of the data rather than just the path, you can use HashMethod. This is particularly useful for File and DataFrame.
In src/flyte/io/_file.py, new_remote accepts a hash_method. If a HashMethod is provided, Flyte will compute a hash as the data is written, which can then be used as part of the task's cache key.
from flyte.io import HashMethod
@env.task
async def hash_aware_task() -> File:
# The hash will be computed during the write operation
f = File.new_remote(hash_method=HashMethod.MD5)
async with f.open("wb") as fh:
await fh.write(b"important data")
return f
Configuration and Performance
FLYTE_IO_BATCH_SIZE: When uploading or downloading directories, this environment variable controls the concurrency limit for file transfers (default is 32).- Memory Management:
File.open().read()returns amemoryview. For large files, it is recommended to read in chunks using theblock_sizeparameter inopen()to avoid loading the entire file into memory.