Skip to main content

Validating Pandas DataFrames

In this tutorial, you will build a Flyte workflow that uses Pandera to enforce data quality on Pandas DataFrames. You will learn how to define schemas, apply them to Flyte tasks, configure how validation errors are handled, and view rich visual reports in Flyte Decks.

Prerequisites

Before starting, ensure you have the following dependencies installed:

  • pandera
  • pandas
  • great_tables (required for the visual report renderer)
  • flytekit

Step 1: Define a Pandera Schema

First, define the structure and validation rules for your data using a pandera.DataFrameModel. This model acts as the source of truth for your data's shape and types.

import pandera as pa

class EmployeeSchema(pa.DataFrameModel):
employee_id: int = pa.Field(ge=0)
name: str
department: str = pa.Field(isin=["Engineering", "Marketing", "Sales"])

By defining EmployeeSchema, you specify that every DataFrame must have an integer employee_id (greater than or equal to 0), a string name, and a department restricted to specific values.

Step 2: Create a Task to Produce Validated Data

Next, create a task that returns a Pandas DataFrame. To trigger automatic validation, use the pandera.typing.pandas.DataFrame type hint specialized with your schema.

import pandas as pd
import pandera.typing.pandas as pt
from flytekit import task

@task(report=True)
def get_employees() -> pt.DataFrame[EmployeeSchema]:
return pd.DataFrame({
"employee_id": [101, 102, 103],
"name": ["Alice", "Bob", "Charlie"],
"department": ["Engineering", "Engineering", "Marketing"]
})

When this task returns, the PanderaPandasDataFrameTransformer intercepts the output. It uses the schema to validate the DataFrame before it is serialized. Setting report=True in the @task decorator enables Flyte Decks, allowing the PanderaPandasReportRenderer to attach a validation summary to the task execution.

Step 3: Configure Validation Error Handling

In some cases, you may want a task to continue even if validation fails. You can use ValidationConfig within an Annotated type hint to change the behavior from raising an exception to merely issuing a warning.

from typing import Annotated
from flyteplugins.pandera import ValidationConfig

@task(report=True)
def process_employees(
df: Annotated[pt.DataFrame[EmployeeSchema], ValidationConfig(on_error="warn")]
) -> pt.DataFrame[EmployeeSchema]:
# This task will run even if 'df' has validation errors
print(f"Processing {len(df)} employees")
return df

The ValidationConfig(on_error="warn") setting ensures that if the input df fails validation, the PanderaPandasDataFrameTransformer will log the errors and generate a report, but it will not stop the task execution. This is useful for monitoring data quality without breaking pipelines.

Step 4: View Validation Reports

The PanderaPandasReportRenderer automatically generates an HTML report using the great_tables library. This report includes:

  1. Summary: A high-level overview showing the schema name, data shape, and total error counts.
  2. Data Preview: A snapshot of the first few rows of the DataFrame.
  3. Schema-level Errors: Details on missing columns or incorrect data types.
  4. Data-level Errors: A breakdown of value-level failures (e.g., out-of-range integers), including the percentage of valid rows and specific failure cases.

To see these reports, navigate to the Flyte Deck tab in the Flyte UI after the task has executed.

Complete Working Example

Below is the complete code combining these steps into a workflow. Note the import order: pandera.typing.pandas is imported before flyteplugins.pandera to ensure the type engine correctly registers the transformer.

import pandas as pd
import pandera as pa
import pandera.typing.pandas as pt
from typing import Annotated
from flytekit import task, workflow
from flyteplugins.pandera import ValidationConfig

# 1. Define Schema
class EmployeeSchema(pa.DataFrameModel):
employee_id: int = pa.Field(ge=0)
name: str
department: str = pa.Field(isin=["Engineering", "Marketing", "Sales"])

# 2. Produce Data
@task(report=True)
def get_employees() -> pt.DataFrame[EmployeeSchema]:
return pd.DataFrame({
"employee_id": [101, 102, -1], # -1 will trigger a validation error
"name": ["Alice", "Bob", "Charlie"],
"department": ["Engineering", "Engineering", "InvalidDept"] # InvalidDept will trigger an error
})

# 3. Consume with Warning Configuration
@task(report=True)
def process_employees(
df: Annotated[pt.DataFrame[EmployeeSchema], ValidationConfig(on_error="warn")]
) -> pt.DataFrame[EmployeeSchema]:
return df

@workflow
def employee_validation_wf() -> pt.DataFrame[EmployeeSchema]:
df = get_employees()
return process_employees(df=df)

if __name__ == "__main__":
# Local execution will perform validation but won't show Flyte Decks
print(employee_validation_wf())

Important Considerations

  • Import Order: Always import pandera.typing.pandas before flyteplugins.pandera to ensure the PanderaPandasDataFrameTransformer is correctly associated with the Pandas DataFrame type.
  • Lazy Validation: The transformer uses "lazy" validation (via schema.validate(data, lazy=True)), meaning it will collect all validation errors in the DataFrame rather than stopping at the first one it encounters.
  • Local Execution: While validation logic runs during local execution, the rich HTML reports generated by PanderaPandasReportRenderer are only visible within the Flyte UI via Flyte Decks.