Skip to main content

Data Integration & SQL

This guide demonstrates how to integrate with external data warehouses like BigQuery and Snowflake, and how to process structured data formats such as JSONL and Parquet using the Flyte SDK.

Querying BigQuery

To execute SQL queries on Google BigQuery, use the BigQueryTask. This task returns results as a Flyte DataFrame, which can then be consumed by downstream tasks as Pandas or Polars objects.

from flyte.io import DataFrame
from flyteplugins.bigquery import BigQueryTask, BigQueryConfig

# Configure the BigQuery connection
bq_config = BigQueryConfig(
ProjectID="my-gcp-project",
Location="US"
)

# Define the query task
# The query_template uses Golang templating syntax for inputs
bigquery_task = BigQueryTask(
name="query_users",
inputs={"min_age": int},
output_dataframe_type=DataFrame,
plugin_config=bq_config,
query_template="SELECT id, name, age FROM `my-project.my_dataset.users` WHERE age >= {{ .inputs.min_age }}",
google_application_credentials="gcp-secret-name"
)

Key configuration points:

  • google_application_credentials: The name of the Flyte secret containing your GCP service account JSON.
  • output_dataframe_type: When specified, the task returns a DataFrame containing the query results.

Integrating with Snowflake

The Snowflake task supports both standard queries and high-performance batch inserts.

from flyteplugins.snowflake import Snowflake, SnowflakeConfig

sf_config = SnowflakeConfig(
user="MY_USER",
account="xy12345.us-east-1",
database="FLYTE_DB",
schema="PUBLIC",
warehouse="COMPUTE_WH",
)

# Batch insert example
snowflake_insert_task = Snowflake(
name="snowflake_batch_insert",
inputs={"id": list[int], "name": list[str], "age": list[int]},
plugin_config=sf_config,
query_template="INSERT INTO FLYTE_DB.PUBLIC.TEST (ID, NAME, AGE) VALUES (%(id)s, %(name)s, %(age)s);",
snowflake_private_key="snowflake-secret-key",
batch=True,
)

Batch Insert Requirements

When using batch=True:

  • The query_template must contain exactly one VALUES clause with placeholders matching the input keys (e.g., %(col_name)s).
  • The inputs must be provided as lists of values for each column.

Processing JSONL Files

The JsonlFile type provides efficient, streaming access to JSON Lines data, supporting both plain text and zstd compression.

Streaming Reads and Writes

Use iter_records to process large files without loading them entirely into memory.

from flyteplugins.jsonl import JsonlFile
from flyte import env

@env.task
async def process_jsonl(f: JsonlFile) -> int:
count = 0
# Transparently handles .jsonl and .jsonl.zst
async for record in f.iter_records(on_error="skip"):
if record.get("active"):
count += 1
return count

@env.task
async def write_compressed_jsonl() -> JsonlFile:
# Extension .zst triggers automatic compression
f = JsonlFile.new_remote("data.jsonl.zst")
async with f.writer(compression_level=3) as w:
for i in range(1000):
await w.write({"id": i, "data": "sample"})
return f

Arrow Batch Iteration

For higher performance, you can iterate over JSONL data as Arrow RecordBatch objects. This requires installing the plugin with arrow support: pip install 'flyteplugins-jsonl[arrow]'.

@env.task
async def process_arrow(f: JsonlFile):
async for batch in f.iter_arrow_batches(batch_size=1024):
# 'batch' is a pyarrow.RecordBatch
print(f"Processed batch with {batch.num_rows} rows")

Data Processing with Polars

Flyte DataFrame objects can be materialized as Polars DataFrame or LazyFrame for efficient in-memory processing.

import polars as pl
from flyte.io import DataFrame
from flyte import env

@env.task
async def process_with_polars(df: DataFrame) -> pl.DataFrame:
# Materialize as a Polars DataFrame
# Note: .open() is required if the plugin is loaded inside the task
polars_df = await df.open(pl.DataFrame).all()

return polars_df.filter(pl.col("value") > 10)

@env.task
async def lazy_processing(df: DataFrame) -> pl.LazyFrame:
# Materialize as a Polars LazyFrame for query optimization
lf = await df.open(pl.LazyFrame).all()

return lf.groupby("category").agg(pl.col("score").mean())

Polars Storage Options

When Polars interacts with remote storage (S3, GCS, ABFS), it requires storage_options to be a flat dictionary of strings. The Flyte Polars plugin handles this conversion automatically via get_polars_storage_options, mapping Flyte's internal storage configuration to the format Polars expects (e.g., mapping access_key_id to aws_access_key_id).

Troubleshooting

Missing Dataframe Handlers

If you import data processing libraries (like Pandas or Polars) inside your task function rather than at the top level, you must call .open() before materializing the data:

@env.task
async def my_task(df: DataFrame):
import pandas as pd
# This ensures the Pandas handler is registered within the task's scope
data = await df.open(pd.DataFrame).all()

Snowflake Batch Template Errors

If a Snowflake batch insert fails, ensure your query_template uses the %(key)s syntax. Standard SQL ? or $1 placeholders are not supported for batch operations in this SDK.

JSONL Arrow Support

If iter_arrow_batches raises a ModuleNotFoundError, ensure you have installed the specific extra:

pip install 'flyteplugins-jsonl[arrow]'