Skip to main content

Data Types and IO

Flyte handles large-scale data by offloading it to remote storage (such as S3, GCS, or local blob storage). The flyte.io package provides core types—File, Dir, and DataFrame—that act as smart references to these offloaded objects. These classes allow tasks to interact with data efficiently through streaming, selective downloading, and cross-framework interoperability.

Remote File Handling

The File class in flyte.io._file represents a single blob in storage. It is designed to handle files that may be too large to fit in memory by providing both streaming and local-download interfaces.

Streaming vs. Local Access

You can interact with a File in two primary ways:

  1. Streaming: Use open() (async) or open_sync() to get a file-like object. This is ideal for processing data without downloading the entire file to the local disk.
  2. Downloading: Use download() (async) or download_sync() to copy the remote file to a local path. This is necessary when using external tools that require a standard filesystem path.
@env.task
async def process_file(f: File) -> str:
# Streaming access (Async)
async with f.open("rb") as fh:
# Note: fh.read() returns a memoryview; convert to bytes if needed
content = bytes(await fh.read())
return content.decode("utf-8")

@env.task
def process_file_sync(f: File) -> str:
# Local access (Sync)
local_path = f.download_sync()
with open(local_path, "r") as fh:
return fh.read()

Creating and Uploading Files

To produce a file as a task output, you can either create a reference to a new remote location or upload an existing local file:

  • File.new_remote(): Generates a random remote path where you can stream data directly.
  • File.from_local(): Uploads a file from the local filesystem to Flyte's managed storage.
  • File.named_remote(name): Produces a deterministic path based on the provided name. This is critical for retry safety, ensuring that retried tasks write to the same location rather than generating new random paths.

Managed Directories

The Dir class in flyte.io._dir manages collections of files. Like File, it avoids unnecessary data movement by allowing you to inspect and process contents without downloading the entire directory.

Efficient Directory Walking

The walk() and walk_sync() methods are the most efficient ways to process a Dir. They yield File objects for each file found in the remote directory. Because these yielded File objects are themselves remote references, you can open them individually without ever downloading the rest of the directory.

@env.task
async def process_directory(d: Dir):
async for file in d.walk(recursive=True):
# Only the content of this specific file is streamed
async with file.open("rt") as f:
print(await f.read())

Handling Optional Outputs

Flyte's type engine can sometimes struggle with Optional[Dir] during serialization. To handle tasks that may or may not produce a directory, use the EmptyDir sentinel:

@env.task
def optional_output(condition: bool) -> Dir:
if not condition:
return Dir.empty()
return Dir.from_local_sync("/tmp/results")

@env.task
def consumer(d: Dir):
if d.is_empty:
return "No data produced"
# Process real directory...

DataFrames and Interoperability

The DataFrame class in flyte.io._dataframe.dataframe serves as a meta-type that wraps various dataframe implementations (Pandas, PyArrow, Polars, Spark). It enables a task written in Pandas to pass data seamlessly to a task expecting a PyArrow table.

Materialization and Streaming

A flyte.io.DataFrame is non-materialized by default. It only downloads and converts data when you explicitly request it using all() or iter(). You must call open() first to specify the desired local representation.

@env.task
async def consume_df(fdf: flyte.io.DataFrame):
# Materialize the entire dataframe as Pandas
df = await fdf.open(pd.DataFrame).all()

# Or stream the dataframe in chunks
async for chunk in await fdf.open(pd.DataFrame).iter():
process(chunk)

Format Hints

You can use Annotated to provide hints to the Flyte engine about the preferred storage format (e.g., "csv", "parquet").

@env.task
async def produce_csv() -> Annotated[flyte.io.DataFrame, "csv"]:
df = pd.DataFrame({"a": [1, 2], "b": [3, 4]})
return flyte.io.DataFrame.from_df(df)

Async and Sync Interfaces

Most IO operations in this SDK provide both asynchronous and synchronous versions. Methods without a suffix (e.g., open, download, walk) are async and should be used within async tasks. Methods with the _sync suffix (e.g., open_sync, download_sync, walk_sync) are blocking and intended for standard Python tasks.

Async MethodSync EquivalentPurpose
File.open()File.open_sync()Stream file contents
File.download()File.download_sync()Copy to local disk
Dir.walk()Dir.walk_sync()Iterate over files
DataFrame.all()DataFrame.all_sync()Materialize data

Content-Based Caching

For tasks that use Flyte's caching mechanism, the SDK supports custom hashing via HashMethod. This allows you to compute cache keys based on the actual content of a file or dataframe rather than just its metadata.

from flyte.io import HashFunction

def hash_pandas(df: pd.DataFrame) -> str:
return str(pd.util.hash_pandas_object(df).sum())

@env.task
async def cached_task() -> flyte.io.DataFrame:
df = pd.DataFrame(...)
# The hash is computed and stored with the reference
return await flyte.io.DataFrame.from_local(
df,
hash_method=HashFunction.from_fn(hash_pandas)
)