BigQuery SQL Tasks
The vik-advani-flyte-sdk-9b3ce04 codebase provides a specialized integration for Google BigQuery through the BigQueryTask and BigQueryConnector classes. This implementation allows developers to execute SQL queries asynchronously, leveraging Flyte's AsyncConnector framework to manage long-running BigQuery jobs without consuming worker resources during the wait period.
Core Task Definition
The primary interface for BigQuery operations is the BigQueryTask class found in plugins/bigquery/src/flyteplugins/bigquery/task.py. This class inherits from TaskTemplate and AsyncConnectorExecutorMixin, enabling it to delegate execution to a backend connector.
When defining a BigQueryTask, you provide a query template, a configuration object, and optional input/output specifications.
from flyteplugins.bigquery import BigQueryConfig, BigQueryTask
from flyte.io import DataFrame
# Basic BigQuery Task definition
bigquery_task = BigQueryTask(
name="my_bq_query",
inputs={"min_count": int},
output_dataframe_type=DataFrame,
plugin_config=BigQueryConfig(
ProjectID="my-gcp-project",
Location="US"
),
query_template="SELECT * FROM my_dataset.my_table WHERE count > {{ .min_count }}",
)
Query Templating and Normalization
BigQueryTask supports parameterization using a Golang-style templating syntax: {{ .input_name }}. During initialization, the task automatically normalizes the query_template by replacing newlines, tabs, and multiple spaces with a single space using re.sub(r"\s+", " ", ...).
Configuration with BigQueryConfig
The BigQueryConfig class manages the connection and job settings. It allows for both basic project identification and advanced job tuning via the standard Google Cloud BigQuery library.
- ProjectID: The GCP project that owns the dataset and billed for the job.
- Location: The geographic location (e.g., "US", "EU").
- QueryJobConfig: An optional
google.cloud.bigquery.QueryJobConfigobject for advanced settings like destination tables, caching, or priority.
Example of advanced configuration:
from google.cloud import bigquery
from flyteplugins.bigquery import BigQueryConfig, BigQueryTask
job_config = bigquery.QueryJobConfig(
use_query_cache=False,
maximum_bytes_billed=1000000
)
config = BigQueryConfig(
ProjectID="test-project",
Location="EU",
QueryJobConfig=job_config
)
task = BigQueryTask(
name="advanced_task",
query_template="SELECT * FROM table",
plugin_config=config
)
Execution Lifecycle and Connector
The actual interaction with GCP is handled by the BigQueryConnector in plugins/bigquery/src/flyteplugins/bigquery/connector.py. The lifecycle follows the AsyncConnector pattern:
- Create: The connector's
createmethod initializes a BigQuery client, maps Flyte inputs toScalarQueryParameterobjects, and submits the query job. It returns aBigQueryMetadataobject containing thejob_id. - Get (Polling): Flyte periodically calls
getwith theBigQueryMetadata. The connector checks the job status usingclient.get_job. - Succeed/Fail: If the job succeeds, the connector identifies the destination table and returns a
DataFramewith abq://URI (e.g.,bq://project:dataset.table). If it fails, it extracts error messages fromjob.errors.
Type Mapping
The connector maintains a mapping (pythonTypeToBigQueryType) to ensure Python types are correctly converted to BigQuery types for parameterized queries:
list->ARRAYbool->BOOLbytes->BYTESdatetime->DATETIMEfloat->FLOAT64int->INT64str->STRING
Authentication and Secrets
Authentication is managed via the google_application_credentials parameter in BigQueryTask. This parameter should be the name of a Flyte secret.
The BigQueryConnector retrieves this secret and uses it to initialize the BigQuery client. The secret is expected to contain the JSON service account key required for GCP authentication.
Local Execution
For testing purposes, BigQueryTask can be executed locally if the environment has appropriate GCP credentials configured.
import flyte
from flyteplugins.bigquery import BigQueryTask, BigQueryConfig
# ... task definition ...
if __name__ == "__main__":
flyte.init_from_config()
# Local execution triggers the connector's create/get logic locally
run = flyte.with_runcontext(mode="local").run(bigquery_task, min_count=10)
print(f"Job Results: {run.results}")
When running locally, the connector still generates a TaskLog with a URI pointing to the BigQuery Console, allowing you to track the job in the Google Cloud UI.