Skip to main content

Snowflake SQL Tasks

The Snowflake plugin allows for the execution of parameterized SQL queries on Snowflake as asynchronous Flyte tasks. It leverages the Snowflake Python Connector's asynchronous execution capabilities to submit queries and poll for their completion, ensuring that Flyte workers are not blocked during long-running database operations.

Connection Configuration

All Snowflake tasks require a SnowflakeConfig object to define the connection parameters. This object encapsulates the target environment details.

from flyteplugins.snowflake import SnowflakeConfig

sf_config = SnowflakeConfig(
account="your-account-identifier",
user="your-username",
database="YOUR_DATABASE",
schema="PUBLIC",
warehouse="COMPUTE_WH",
# Optional: pass additional parameters like role or authenticator
connection_kwargs={"role": "SYSADMIN"}
)

The connection_kwargs field in SnowflakeConfig allows you to pass any additional parameters supported by the Snowflake Python Connector API.

Defining Snowflake Tasks

The Snowflake class in flyteplugins.snowflake.task is the primary interface for defining SQL tasks. It takes a query_template which supports Python's printf-style string formatting (e.g., %(param_name)s) for parameterization.

Basic Query Task

A standard query task defines its inputs and an optional output type. If output_dataframe_type is provided, the results of the query are returned as a DataFrame.

import pandas as pd
from flyteplugins.snowflake import Snowflake

snowflake_select_task = Snowflake(
name="snowflake_select_data",
plugin_config=sf_config,
query_template="SELECT * FROM MY_TABLE WHERE STATUS = %(status)s;",
inputs={"status": str},
output_dataframe_type=pd.DataFrame,
snowflake_private_key="my-snowflake-key"
)

Batch Insertions

The plugin supports a specialized "batch" mode for multi-row insertions. When batch=True is set, the task expects list inputs of equal length. The query_template must contain a single VALUES clause, which the plugin expands into a multi-row SQL statement.

snowflake_insert_task = Snowflake(
name="snowflake_batch_insert",
inputs={"id": list[int], "name": list[str]},
plugin_config=sf_config,
query_template="INSERT INTO MY_TABLE (ID, NAME) VALUES (%(id)s, %(name)s);",
batch=True,
snowflake_private_key="my-snowflake-key"
)

In this mode, the SnowflakeConnector uses an internal helper _expand_batch_query to transform the single VALUES placeholder into the appropriate multi-row syntax before execution.

Authentication and Secrets

The plugin primarily supports key-pair authentication via Flyte secrets. You can specify the secret keys for the private key and its passphrase directly in the task definition.

  • snowflake_private_key: The name of the secret containing the private key.
  • snowflake_private_key_passphrase: The name of the secret containing the passphrase (if the key is encrypted).
  • secret_group: An optional prefix for the secret environment variables.

The task automatically generates environment variable names using the logic in Snowflake._to_env_var. For example, if secret_group="PROD" and snowflake_private_key="SF_KEY", the plugin will look for an environment variable named PROD_SF_KEY.

Execution Lifecycle

The execution of a Snowflake task is managed by the SnowflakeConnector in flyteplugins.snowflake.connector.

  1. Submission: The create method extracts configuration from the task template, establishes a connection using _get_snowflake_connection, and calls cursor.execute_async(query, inputs).
  2. Metadata: It returns a SnowflakeJobMetadata object containing the Snowflake Query ID (sfqid). This metadata is used to track the job across Flyte's asynchronous polling cycles.
  3. Polling: The get method uses the query_id to check the status via conn.get_query_status_throw_if_error.
  4. Results: Once the query succeeds, if the task defines outputs, the connector constructs a URI using a custom snowflake:// scheme: snowflake://{user}/{account}/{warehouse}/{database}/{schema}/{query_id} This URI is then used by Flyte's data movement layer to load the results into the requested DataFrame type.

Cancellation

If a Flyte execution is aborted, the SnowflakeConnector.delete method is triggered. It uses the stored query_id to issue a SYSTEM$CANCEL_QUERY command to Snowflake, ensuring that resources are not wasted on abandoned computations.

# Internal cancellation logic in SnowflakeConnector
def _cancel_query():
cursor = conn.cursor()
try:
cursor.execute(f"SELECT SYSTEM$CANCEL_QUERY('{resource_meta.query_id}')")
finally:
cursor.close()