Data Validation Overview
The Pandera integration in this SDK provides a robust mechanism for enforcing data integrity across Flyte tasks. By leveraging pandera.typing, developers can define schemas that are automatically validated when data enters or leaves a task.
Core Architecture
The integration is built around the PanderaDataFrameTransformer class found in plugins/pandera/src/flyteplugins/pandera/transformers/base.py. This class acts as a wrapper around Flyte's standard DataFrameTransformerEngine.
When a task is defined with a Pandera-annotated type (e.g., pt.DataFrame[MySchema]), Flyte's TypeEngine uses the specialized Pandera transformer to handle the conversion between Python objects and Flyte literals.
The Validation Lifecycle
The PanderaDataFrameTransformer intercepts the data at two critical points:
- Input Validation (
to_python_value): When a task receives a dataframe, the transformer first uses the underlyingDataFrameTransformerEngineto load the data into a native dataframe (like a pandasDataFrame). It then calls_validateto ensure the input matches the expected schema before the task code ever runs. - Output Validation (
to_literal): When a task returns a dataframe, the transformer validates the data against the return type's schema. If validation passes, it delegates the actual serialization and upload to theDataFrameTransformerEngine.
Validation Memoization
To prevent redundant validation, the transformer maintains a _validation_memo. This is a set of URIs representing data that has already been validated within the current execution context.
# From plugins/pandera/src/flyteplugins/pandera/transformers/base.py
async def to_python_value(self, lv, expected_python_type: type[DF]) -> DF:
# ...
uri = lv.scalar.structured_dataset.uri
if (uri, report_title) in self._validation_memo:
return raw_df
return await self._validate(raw_df, base, config, report_title)
If a dataframe is validated during output from one task and then passed as input to another task in the same process, the second validation step can be skipped if the URI matches.
Schema Definition and Usage
Schemas are defined using Pandera's DataFrameModel. These schemas are then used as type hints in Flyte tasks using pandera.typing.
In examples/plugins/pandera/pandas_schema.py, a schema is defined and used as follows:
import pandas as pd
import pandera.pandas as pa
import pandera.typing.pandas as pt
class EmployeeSchema(pa.DataFrameModel):
employee_id: int = pa.Field(ge=0)
name: str
@env.task(report=True)
async def build_valid_employees() -> pt.DataFrame[EmployeeSchema]:
return pd.DataFrame(
{
"employee_id": [1, 2, 3],
"name": ["Ada", "Grace", "Barbara"],
}
)
Error Handling and Configuration
By default, validation failures raise a pandera.errors.SchemaErrors exception, causing the Flyte task to fail. This behavior can be customized using the ValidationConfig class and Python's Annotated type.
The ValidationConfig (defined in plugins/pandera/src/flyteplugins/pandera/config.py) allows you to switch the error behavior to "warn", which logs the validation errors but allows the task to proceed.
from typing import Annotated
from flyteplugins.pandera import ValidationConfig
@env.task(report=True)
async def pass_through_with_error_warn(
df: Annotated[pt.DataFrame[EmployeeSchema], ValidationConfig(on_error="warn")],
) -> Annotated[pt.DataFrame[EmployeeSchemaWithStatus], ValidationConfig(on_error="warn")]:
# If validation fails, a warning is logged and the task continues
del df["name"]
return df
Flyte Deck Reporting
One of the most powerful features of this integration is the automatic generation of Flyte Deck reports. The _validate method in the base transformer uses a _report_renderer to generate HTML summaries of the validation results.
- Success: A report is generated showing the validated data.
- Failure: A detailed report is generated highlighting the specific rows and columns that violated the schema constraints.
These reports are attached to the Flyte task as a new tab, providing immediate visibility into data quality issues directly within the Flyte UI.
Supported Dataframe Libraries
The SDK provides specialized implementations for several popular dataframe libraries, each inheriting from PanderaDataFrameTransformer:
- Pandas:
PanderaPandasDataFrameTransformer(usespandera.typing.pandas) - Polars:
PanderaPolarsDataFrameTransformer(usespandera.typing.polars) - PySpark SQL:
PanderaPySparkSqlDataFrameTransformer(usespandera.typing.pyspark_sql)
Each specialized transformer implements _resolve_native_df_type to ensure the correct underlying dataframe type is used during the validation and conversion process.