Skip to main content

Your First SQL Workflow

In this tutorial, you will build a data pipeline that executes a parameterized SQL query against Google BigQuery and retrieves the results as a structured DataFrame. You will learn how to configure the BigQuery connection, use Flyte's templating engine to inject dynamic values into your SQL, and handle the query output.

Prerequisites

Before starting, ensure you have the following:

  • A Google Cloud Project ID with BigQuery enabled.
  • A BigQuery dataset and table to query.
  • Google Application Credentials configured (usually via a service account JSON file).

Step 1: Configure the BigQuery Connection

The first step is to define where your query will run. You use the BigQueryConfig class to specify your Google Cloud project and, optionally, the geographic location of your data.

from flyteplugins.bigquery import BigQueryConfig

# Configure the connection to your GCP project
bq_config = BigQueryConfig(
ProjectID="your-gcp-project-id",
Location="US" # Optional: defaults to project default
)

The BigQueryConfig object acts as the bridge between Flyte and your Google Cloud environment.

Step 2: Define the BigQuery Task

Next, you define the BigQueryTask. This task takes a SQL template and a set of inputs. You use Flyte's Golang-style templating syntax ({{ .input_name }}) to make your queries dynamic.

from flyteplugins.bigquery import BigQueryTask
from flyte.io import DataFrame

# Define the task with inputs and a templated query
analytics_task = BigQueryTask(
name="user_event_query",
plugin_config=bq_config,
inputs={
"user_id": int,
"limit_count": int
},
output_dataframe_type=DataFrame,
query_template="""
SELECT user_id, event_type, timestamp
FROM `your-project.your_dataset.events`
WHERE user_id = {{ .user_id }}
LIMIT {{ .limit_count }}
"""
)

In this step:

  • inputs: Defines the names and types of variables you can use in your query.
  • query_template: The SQL statement. Note that BigQueryTask automatically normalizes this string by removing extra whitespace and newlines before execution.
  • output_dataframe_type: By setting this to DataFrame, Flyte knows to collect the query results and make them available for downstream tasks.

Step 3: Understand the Task Output

When you specify an output_dataframe_type, the BigQueryTask automatically creates an output named results. If you were to connect this task to another task in a workflow, you would access the data using this specific key.

# Example of how the output is structured internally
# The task interface will have: outputs={'results': DataFrame}

Step 4: Execute the Task Locally

You can test your BigQuery task locally before deploying it to a Flyte cluster. This requires that your local environment has the necessary Google Cloud permissions.

import flyte

if __name__ == "__main__":
# Initialize Flyte configuration
flyte.init_from_config()

# Run the task in local mode
# Pass the inputs defined in the 'inputs' dictionary
execution = flyte.with_runcontext(mode="local").run(
analytics_task,
user_id=12345,
limit_count=10
)

# Access the results
print(f"Execution completed. Results available at: {execution.url}")

When running locally, Flyte executes the query against BigQuery using your local credentials and returns a reference to the resulting data.

Complete Example

Here is the full code combining all the steps above:

from flyteplugins.bigquery import BigQueryConfig, BigQueryTask
from flyte.io import DataFrame
import flyte

# 1. Configuration
bq_config = BigQueryConfig(ProjectID="dogfood-gcp-dataplane")

# 2. Task Definition
bigquery_task = BigQueryTask(
name="get_flyte_data",
inputs={"version": int},
output_dataframe_type=DataFrame,
plugin_config=bq_config,
query_template="SELECT * FROM dataset.flyte_table WHERE version = {{ .version }};",
)

# 3. Local Execution
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.with_runcontext(mode="local").run(bigquery_task, version=1)
print(f"Query results: {run.outputs['results']}")

Next Steps

  • Advanced Job Config: Use the QueryJobConfig attribute in BigQueryConfig to set destination tables or time partitioning.
  • Secrets: If running on a cluster, use the google_application_credentials parameter in BigQueryTask to specify the name of the Flyte secret containing your GCP service account key.