Skip to main content

BigQuery SQL Tasks

The vik-advani-flyte-sdk-9b3ce04 codebase provides a specialized integration for Google BigQuery through the BigQueryTask and BigQueryConnector classes. This implementation allows developers to execute SQL queries asynchronously, leveraging Flyte's AsyncConnector framework to manage long-running BigQuery jobs without consuming worker resources during the wait period.

Core Task Definition

The primary interface for BigQuery operations is the BigQueryTask class found in plugins/bigquery/src/flyteplugins/bigquery/task.py. This class inherits from TaskTemplate and AsyncConnectorExecutorMixin, enabling it to delegate execution to a backend connector.

When defining a BigQueryTask, you provide a query template, a configuration object, and optional input/output specifications.

from flyteplugins.bigquery import BigQueryConfig, BigQueryTask
from flyte.io import DataFrame

# Basic BigQuery Task definition
bigquery_task = BigQueryTask(
name="my_bq_query",
inputs={"min_count": int},
output_dataframe_type=DataFrame,
plugin_config=BigQueryConfig(
ProjectID="my-gcp-project",
Location="US"
),
query_template="SELECT * FROM my_dataset.my_table WHERE count > {{ .min_count }}",
)

Query Templating and Normalization

BigQueryTask supports parameterization using a Golang-style templating syntax: {{ .input_name }}. During initialization, the task automatically normalizes the query_template by replacing newlines, tabs, and multiple spaces with a single space using re.sub(r"\s+", " ", ...).

Configuration with BigQueryConfig

The BigQueryConfig class manages the connection and job settings. It allows for both basic project identification and advanced job tuning via the standard Google Cloud BigQuery library.

  • ProjectID: The GCP project that owns the dataset and billed for the job.
  • Location: The geographic location (e.g., "US", "EU").
  • QueryJobConfig: An optional google.cloud.bigquery.QueryJobConfig object for advanced settings like destination tables, caching, or priority.

Example of advanced configuration:

from google.cloud import bigquery
from flyteplugins.bigquery import BigQueryConfig, BigQueryTask

job_config = bigquery.QueryJobConfig(
use_query_cache=False,
maximum_bytes_billed=1000000
)

config = BigQueryConfig(
ProjectID="test-project",
Location="EU",
QueryJobConfig=job_config
)

task = BigQueryTask(
name="advanced_task",
query_template="SELECT * FROM table",
plugin_config=config
)

Execution Lifecycle and Connector

The actual interaction with GCP is handled by the BigQueryConnector in plugins/bigquery/src/flyteplugins/bigquery/connector.py. The lifecycle follows the AsyncConnector pattern:

  1. Create: The connector's create method initializes a BigQuery client, maps Flyte inputs to ScalarQueryParameter objects, and submits the query job. It returns a BigQueryMetadata object containing the job_id.
  2. Get (Polling): Flyte periodically calls get with the BigQueryMetadata. The connector checks the job status using client.get_job.
  3. Succeed/Fail: If the job succeeds, the connector identifies the destination table and returns a DataFrame with a bq:// URI (e.g., bq://project:dataset.table). If it fails, it extracts error messages from job.errors.

Type Mapping

The connector maintains a mapping (pythonTypeToBigQueryType) to ensure Python types are correctly converted to BigQuery types for parameterized queries:

  • list -> ARRAY
  • bool -> BOOL
  • bytes -> BYTES
  • datetime -> DATETIME
  • float -> FLOAT64
  • int -> INT64
  • str -> STRING

Authentication and Secrets

Authentication is managed via the google_application_credentials parameter in BigQueryTask. This parameter should be the name of a Flyte secret.

The BigQueryConnector retrieves this secret and uses it to initialize the BigQuery client. The secret is expected to contain the JSON service account key required for GCP authentication.

Local Execution

For testing purposes, BigQueryTask can be executed locally if the environment has appropriate GCP credentials configured.

import flyte
from flyteplugins.bigquery import BigQueryTask, BigQueryConfig

# ... task definition ...

if __name__ == "__main__":
flyte.init_from_config()
# Local execution triggers the connector's create/get logic locally
run = flyte.with_runcontext(mode="local").run(bigquery_task, min_count=10)
print(f"Job Results: {run.results}")

When running locally, the connector still generates a TaskLog with a URI pointing to the BigQuery Console, allowing you to track the job in the Google Cloud UI.