Data Integration & SQL
This guide demonstrates how to integrate with external data warehouses like BigQuery and Snowflake, and how to process structured data formats such as JSONL and Parquet using the Flyte SDK.
Querying BigQuery
To execute SQL queries on Google BigQuery, use the BigQueryTask. This task returns results as a Flyte DataFrame, which can then be consumed by downstream tasks as Pandas or Polars objects.
from flyte.io import DataFrame
from flyteplugins.bigquery import BigQueryTask, BigQueryConfig
# Configure the BigQuery connection
bq_config = BigQueryConfig(
ProjectID="my-gcp-project",
Location="US"
)
# Define the query task
# The query_template uses Golang templating syntax for inputs
bigquery_task = BigQueryTask(
name="query_users",
inputs={"min_age": int},
output_dataframe_type=DataFrame,
plugin_config=bq_config,
query_template="SELECT id, name, age FROM `my-project.my_dataset.users` WHERE age >= {{ .inputs.min_age }}",
google_application_credentials="gcp-secret-name"
)
Key configuration points:
google_application_credentials: The name of the Flyte secret containing your GCP service account JSON.output_dataframe_type: When specified, the task returns aDataFramecontaining the query results.
Integrating with Snowflake
The Snowflake task supports both standard queries and high-performance batch inserts.
from flyteplugins.snowflake import Snowflake, SnowflakeConfig
sf_config = SnowflakeConfig(
user="MY_USER",
account="xy12345.us-east-1",
database="FLYTE_DB",
schema="PUBLIC",
warehouse="COMPUTE_WH",
)
# Batch insert example
snowflake_insert_task = Snowflake(
name="snowflake_batch_insert",
inputs={"id": list[int], "name": list[str], "age": list[int]},
plugin_config=sf_config,
query_template="INSERT INTO FLYTE_DB.PUBLIC.TEST (ID, NAME, AGE) VALUES (%(id)s, %(name)s, %(age)s);",
snowflake_private_key="snowflake-secret-key",
batch=True,
)
Batch Insert Requirements
When using batch=True:
- The
query_templatemust contain exactly oneVALUESclause with placeholders matching the input keys (e.g.,%(col_name)s). - The inputs must be provided as lists of values for each column.
Processing JSONL Files
The JsonlFile type provides efficient, streaming access to JSON Lines data, supporting both plain text and zstd compression.
Streaming Reads and Writes
Use iter_records to process large files without loading them entirely into memory.
from flyteplugins.jsonl import JsonlFile
from flyte import env
@env.task
async def process_jsonl(f: JsonlFile) -> int:
count = 0
# Transparently handles .jsonl and .jsonl.zst
async for record in f.iter_records(on_error="skip"):
if record.get("active"):
count += 1
return count
@env.task
async def write_compressed_jsonl() -> JsonlFile:
# Extension .zst triggers automatic compression
f = JsonlFile.new_remote("data.jsonl.zst")
async with f.writer(compression_level=3) as w:
for i in range(1000):
await w.write({"id": i, "data": "sample"})
return f
Arrow Batch Iteration
For higher performance, you can iterate over JSONL data as Arrow RecordBatch objects. This requires installing the plugin with arrow support: pip install 'flyteplugins-jsonl[arrow]'.
@env.task
async def process_arrow(f: JsonlFile):
async for batch in f.iter_arrow_batches(batch_size=1024):
# 'batch' is a pyarrow.RecordBatch
print(f"Processed batch with {batch.num_rows} rows")
Data Processing with Polars
Flyte DataFrame objects can be materialized as Polars DataFrame or LazyFrame for efficient in-memory processing.
import polars as pl
from flyte.io import DataFrame
from flyte import env
@env.task
async def process_with_polars(df: DataFrame) -> pl.DataFrame:
# Materialize as a Polars DataFrame
# Note: .open() is required if the plugin is loaded inside the task
polars_df = await df.open(pl.DataFrame).all()
return polars_df.filter(pl.col("value") > 10)
@env.task
async def lazy_processing(df: DataFrame) -> pl.LazyFrame:
# Materialize as a Polars LazyFrame for query optimization
lf = await df.open(pl.LazyFrame).all()
return lf.groupby("category").agg(pl.col("score").mean())
Polars Storage Options
When Polars interacts with remote storage (S3, GCS, ABFS), it requires storage_options to be a flat dictionary of strings. The Flyte Polars plugin handles this conversion automatically via get_polars_storage_options, mapping Flyte's internal storage configuration to the format Polars expects (e.g., mapping access_key_id to aws_access_key_id).
Troubleshooting
Missing Dataframe Handlers
If you import data processing libraries (like Pandas or Polars) inside your task function rather than at the top level, you must call .open() before materializing the data:
@env.task
async def my_task(df: DataFrame):
import pandas as pd
# This ensures the Pandas handler is registered within the task's scope
data = await df.open(pd.DataFrame).all()
Snowflake Batch Template Errors
If a Snowflake batch insert fails, ensure your query_template uses the %(key)s syntax. Standard SQL ? or $1 placeholders are not supported for batch operations in this SDK.
JSONL Arrow Support
If iter_arrow_batches raises a ModuleNotFoundError, ensure you have installed the specific extra:
pip install 'flyteplugins-jsonl[arrow]'