Tabular Data with DataFrames
Flyte's meta-DataFrame system, implemented primarily in flyte.io._dataframe.dataframe, provides a unified interface for handling tabular data across different libraries and storage backends. By decoupling the physical storage format from the Python representation, Flyte enables seamless interoperability between tasks using different dataframe libraries like Pandas, PyArrow, Polars, and Spark.
The Meta-DataFrame Concept
The core of this system is the DataFrame class. Unlike a standard dataframe object that holds data in memory, the Flyte DataFrame serves as a reference to remote tabular data. It wraps various dataframe types and supports non-materialized access, meaning data is only downloaded or converted when explicitly requested.
Core Attributes
A DataFrame instance tracks three primary pieces of information:
uri: The remote location of the data (e.g., an S3 or GCS path).format: The storage format (e.g., Parquet, CSV).hash: An optional content-based hash used for cache key computation.
Lazy Evaluation and Materialization
One of the most powerful features of the meta-DataFrame system is lazy evaluation. When a task receives a DataFrame object, it is merely a reference. To work with the actual data, you must "open" it as a specific Python type.
Materializing Data
The .open() method prepares the dataframe for a specific library, and .all() (or .all_sync()) triggers the actual download and deserialization.
import pandas as pd
from flyte.io import DataFrame
@task
async def process_data(df: DataFrame):
# Data is not downloaded yet
# Materialize as a Pandas DataFrame
pdf = await df.open(pd.DataFrame).all()
print(f"Loaded {len(pdf)} rows")
Streaming Access
For large datasets that do not fit in memory, the system supports iterative access via .iter():
@task
async def stream_data(df: DataFrame):
# Iterate over chunks of the dataframe
async for chunk in await df.open(pd.DataFrame).iter():
process(chunk)
Interoperability and the Transformer Engine
The DataFrameTransformerEngine acts as a central registry and dispatcher. It manages a collection of DataFrameEncoder and DataFrameDecoder implementations, matching them based on the Python type, storage protocol (e.g., s3, file), and file format.
How Interoperability Works
Because Flyte uses a common intermediate representation (StructuredDataset in the IDL), a task can return a pandas.DataFrame and a subsequent task can accept it as a polars.DataFrame. The engine automatically selects the correct encoder for the output and the correct decoder for the input.
- Task A returns
pd.DataFrame. DataFrameTransformerEnginefinds the Pandas-to-Parquet encoder.- Data is uploaded to S3 as Parquet.
- Task B accepts
pl.DataFrame. DataFrameTransformerEnginefinds the Parquet-to-Polars decoder.- Data is downloaded and loaded into Polars.
Content-Based Hashing for Caching
Flyte supports sophisticated caching based on the actual content of the dataframe rather than just its metadata. This is achieved using the HashFunction class from flyte.io._hashing_io.
Eager Upload with Hashing
The from_local and from_local_sync methods allow you to upload a dataframe eagerly while computing a hash for caching purposes.
import pandas as pd
from flyte.io import DataFrame, HashFunction
def hash_pandas_dataframe(df: pd.DataFrame) -> str:
# Custom logic to compute a stable hash
return str(pd.util.hash_pandas_object(df).sum())
@task
async def produce_dataframe() -> DataFrame:
df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
hash_method = HashFunction.from_fn(hash_pandas_dataframe)
# Uploads data and attaches the computed hash
return await DataFrame.from_local(df, hash_method=hash_method)
Extending the System
You can add support for new dataframe libraries or storage formats by implementing custom handlers and registering them with the DataFrameTransformerEngine.
Implementing Encoders and Decoders
DataFrameEncoder: Responsible for serializing a Python object to remote storage.DataFrameDecoder: Responsible for deserializing remote data back into a Python object.
Registration
Handlers are registered using DataFrameTransformerEngine.register(). This allows you to specify defaults for specific types or formats.
# Example of how a plugin might register a handler
from flyte.io._dataframe.dataframe import DataFrameTransformerEngine, DataFrameEncoder
class MyCustomEncoder(DataFrameEncoder):
async def encode(self, dataframe, structured_dataset_type):
# Implementation logic to write data
...
# Register the encoder for a specific type and format
DataFrameTransformerEngine.register(
MyCustomEncoder(python_type=MyCustomType, supported_format="custom-fmt"),
default_for_type=True
)
Column Subsetting and Schema
The system supports column subsetting through type annotations. When a task signature specifies a subset of columns using Annotated, the DataFrameTransformerEngine passes this information to the decoder, which is then responsible for loading only the requested columns.
from typing import Annotated
from flyte.io import DataFrame
# Only 'col_a' and 'col_b' will be loaded by the decoder
@task
def subset_task(df: Annotated[DataFrame, {"col_a": int, "col_b": float}]):
...
This mechanism ensures efficiency by avoiding the transfer of unnecessary data when only a portion of a large table is required.