Skip to main content

Polars and PySpark Integration

This guide demonstrates how to integrate Pandera validation with Polars and PySpark SQL DataFrames in Flyte. The flyteplugins-pandera plugin provides specialized transformers and renderers to handle these distributed and lazy-evaluated data structures.

Validating Polars DataFrames and LazyFrames

You can validate both eager polars.DataFrame and lazy polars.LazyFrame types by using pandera.typing.polars. When using LazyFrame, the plugin automatically collects a small sample of the data to generate visual reports while maintaining the lazy execution plan for the actual task logic.

import polars as pl
import pandera.polars as pa
import pandera.typing.polars as pt
from flyteplugins.pandera import ValidationConfig
import flyte

class EmployeeSchema(pa.DataFrameModel):
employee_id: int = pa.Field(ge=0)
name: str

@flyte.task(report=True)
async def build_employees() -> pt.DataFrame[EmployeeSchema]:
# Validates an eager Polars DataFrame
return pl.DataFrame({
"employee_id": [1, 2, 3],
"name": ["Ada", "Grace", "Barbara"],
})

@flyte.task(report=True)
async def process_lazy(lf: pt.LazyFrame[EmployeeSchema]) -> pt.LazyFrame[EmployeeSchema]:
# Validates a Polars LazyFrame
# The plugin collects head(5) for the report preview
return lf.filter(pl.col("employee_id") > 1)

The PanderaPolarsDataFrameTransformer handles the conversion and validation. For LazyFrame inputs, it ensures the schema is respected before the data is processed in your task.

Validating PySpark SQL DataFrames

For Spark workloads, use pandera.typing.pyspark_sql.DataFrame. This integration requires the flyteplugins-spark plugin to be installed for underlying data movement.

import pyspark.sql.types as T
from pyspark.sql import SparkSession
import pandera.typing.pyspark_sql as pt
import pandera.pyspark as pandera_pyspark
import flyte

class RowSchema(pandera_pyspark.DataFrameModel):
value: int = pandera_pyspark.Field()

@flyte.task(report=True)
def validate_spark_df(df: pt.DataFrame[RowSchema]) -> pt.DataFrame[RowSchema]:
# Pandera validation is performed by calling schema.validate(data, lazy=True)
# which triggers Spark execution to verify the schema.
return df

The PanderaPySparkSqlDataFrameTransformer manages the async validation flow and generates reports by converting a limited subset of the Spark DataFrame to Pandas for rendering.

Configuring Validation Behavior

You can customize how validation failures are handled using ValidationConfig and Annotated. By default, validation errors raise a RuntimeError, but you can configure them to only log a warning.

from typing import Annotated
from flyteplugins.pandera import ValidationConfig

@flyte.task(report=True)
async def validate_with_warning(
df: Annotated[pt.DataFrame[EmployeeSchema], ValidationConfig(on_error="warn")]
) -> pt.DataFrame[EmployeeSchema]:
# If validation fails, a warning is logged and the task continues
return df

Visual Reports and Previews

The plugin automatically generates HTML reports in the Flyte UI using great-tables. These reports include data previews and failure case summaries.

  • Polars: PanderaPolarsReportRenderer collects the first 5 rows (DATA_PREVIEW_HEAD) of a LazyFrame to show a preview. It also converts Polars failure cases to Pandas for consistent rendering.
  • PySpark: PanderaPySparkSqlReportRenderer uses limit(5).toPandas() to generate previews and limit(10) for failure cases to avoid pulling massive amounts of data into the driver memory.

Troubleshooting and Performance

Performance Considerations

  • LazyFrame Collection: When validating a LazyFrame, the plugin must collect() a small portion of the data to generate the report. While this is limited to 5 rows, it still triggers a partial execution of the lazy plan.
  • Validation Memoization: The transformers use an internal _validation_memo (a set of URI and type tuples) to avoid re-validating the same data multiple times within the same execution context, such as when a value is passed between tasks.

Dependencies

  • Polars: Requires flyteplugins-polars and pandera[polars].
  • PySpark: Requires flyteplugins-spark and pandera[pyspark].

Common Errors

If you receive a TypeTransformerFailedError stating "Only pandera.typing.polars.DataFrame and LazyFrame are supported", ensure you are using the correct typing module (pandera.typing.polars or pandera.typing.pyspark_sql) rather than the standard Polars or PySpark classes.