Polars and PySpark Integration
This guide demonstrates how to integrate Pandera validation with Polars and PySpark SQL DataFrames in Flyte. The flyteplugins-pandera plugin provides specialized transformers and renderers to handle these distributed and lazy-evaluated data structures.
Validating Polars DataFrames and LazyFrames
You can validate both eager polars.DataFrame and lazy polars.LazyFrame types by using pandera.typing.polars. When using LazyFrame, the plugin automatically collects a small sample of the data to generate visual reports while maintaining the lazy execution plan for the actual task logic.
import polars as pl
import pandera.polars as pa
import pandera.typing.polars as pt
from flyteplugins.pandera import ValidationConfig
import flyte
class EmployeeSchema(pa.DataFrameModel):
employee_id: int = pa.Field(ge=0)
name: str
@flyte.task(report=True)
async def build_employees() -> pt.DataFrame[EmployeeSchema]:
# Validates an eager Polars DataFrame
return pl.DataFrame({
"employee_id": [1, 2, 3],
"name": ["Ada", "Grace", "Barbara"],
})
@flyte.task(report=True)
async def process_lazy(lf: pt.LazyFrame[EmployeeSchema]) -> pt.LazyFrame[EmployeeSchema]:
# Validates a Polars LazyFrame
# The plugin collects head(5) for the report preview
return lf.filter(pl.col("employee_id") > 1)
The PanderaPolarsDataFrameTransformer handles the conversion and validation. For LazyFrame inputs, it ensures the schema is respected before the data is processed in your task.
Validating PySpark SQL DataFrames
For Spark workloads, use pandera.typing.pyspark_sql.DataFrame. This integration requires the flyteplugins-spark plugin to be installed for underlying data movement.
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import pandera.typing.pyspark_sql as pt
import pandera.pyspark as pandera_pyspark
import flyte
class RowSchema(pandera_pyspark.DataFrameModel):
value: int = pandera_pyspark.Field()
@flyte.task(report=True)
def validate_spark_df(df: pt.DataFrame[RowSchema]) -> pt.DataFrame[RowSchema]:
# Pandera validation is performed by calling schema.validate(data, lazy=True)
# which triggers Spark execution to verify the schema.
return df
The PanderaPySparkSqlDataFrameTransformer manages the async validation flow and generates reports by converting a limited subset of the Spark DataFrame to Pandas for rendering.
Configuring Validation Behavior
You can customize how validation failures are handled using ValidationConfig and Annotated. By default, validation errors raise a RuntimeError, but you can configure them to only log a warning.
from typing import Annotated
from flyteplugins.pandera import ValidationConfig
@flyte.task(report=True)
async def validate_with_warning(
df: Annotated[pt.DataFrame[EmployeeSchema], ValidationConfig(on_error="warn")]
) -> pt.DataFrame[EmployeeSchema]:
# If validation fails, a warning is logged and the task continues
return df
Visual Reports and Previews
The plugin automatically generates HTML reports in the Flyte UI using great-tables. These reports include data previews and failure case summaries.
- Polars:
PanderaPolarsReportRenderercollects the first 5 rows (DATA_PREVIEW_HEAD) of aLazyFrameto show a preview. It also converts Polars failure cases to Pandas for consistent rendering. - PySpark:
PanderaPySparkSqlReportRendereruseslimit(5).toPandas()to generate previews andlimit(10)for failure cases to avoid pulling massive amounts of data into the driver memory.
Troubleshooting and Performance
Performance Considerations
- LazyFrame Collection: When validating a
LazyFrame, the plugin mustcollect()a small portion of the data to generate the report. While this is limited to 5 rows, it still triggers a partial execution of the lazy plan. - Validation Memoization: The transformers use an internal
_validation_memo(a set of URI and type tuples) to avoid re-validating the same data multiple times within the same execution context, such as when a value is passed between tasks.
Dependencies
- Polars: Requires
flyteplugins-polarsandpandera[polars]. - PySpark: Requires
flyteplugins-sparkandpandera[pyspark].
Common Errors
If you receive a TypeTransformerFailedError stating "Only pandera.typing.polars.DataFrame and LazyFrame are supported", ensure you are using the correct typing module (pandera.typing.polars or pandera.typing.pyspark_sql) rather than the standard Polars or PySpark classes.