Running Jobs on Databricks
To execute distributed Spark tasks on managed Databricks infrastructure, you use the Databricks configuration object within a TaskEnvironment. This allows you to define cluster specifications and authentication details while writing standard PySpark code.
Basic Databricks Task Configuration
The following example demonstrates how to configure a task to run on a new Databricks cluster.
import flyte
from flyteplugins.databricks import Databricks
# Define the Databricks configuration
databricks_conf = Databricks(
spark_conf={
"spark.driver.memory": "2000M",
"spark.executor.memory": "1000M",
},
executor_path="/databricks/python3/bin/python",
databricks_conf={
"run_name": "flyte databricks execution",
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "m6i.large",
"num_workers": 1,
},
"timeout_seconds": 3600,
},
databricks_instance="myorg.cloud.databricks.com",
databricks_token="DATABRICKS_SECRET_NAME",
)
# Create a TaskEnvironment using the plugin configuration
databricks_env = flyte.TaskEnvironment(
name="databricks_env",
plugin_config=databricks_conf,
)
@databricks_env.task
async def spark_task(n: int) -> float:
# Access the Spark session provided by the plugin
spark = flyte.ctx().data["spark_session"]
df = spark.createDataFrame([(i,) for i in range(n)], ["id"])
return float(df.count())
Configuration Components
The Databricks class (found in flyteplugins.databricks.task) accepts several key parameters:
databricks_conf: A dictionary compliant with the Databricks Jobs API v2.1. You must specify eithernew_clusterorexisting_cluster_id.databricks_instance: The domain name of your Databricks deployment (e.g.,dbc-xxxx.cloud.databricks.com). This can also be set via theFLYTE_DATABRICKS_INSTANCEenvironment variable.databricks_token: The name of the Flyte secret containing your Databricks API token. TheDatabricksConnectoruses this name to retrieve the actual token at runtime.spark_conf: Standard Spark configuration key-value pairs. If usingnew_cluster, these are automatically injected into the cluster's Spark configuration.
Cluster Management Variations
You can choose between provisioning a temporary cluster or reusing an existing one.
Using an Existing Cluster
To reduce startup time, you can target an existing cluster by its ID:
databricks_conf = Databricks(
databricks_conf={
"run_name": "existing cluster task",
"existing_cluster_id": "1234-567890-batch123",
},
databricks_instance="myorg.cloud.databricks.com",
databricks_token="DATABRICKS_SECRET_NAME",
)
Customizing a New Cluster
When using new_cluster, the DatabricksConnector automatically sets the docker_image to match your task's container image if it is not explicitly provided in the new_cluster dictionary.
databricks_conf = Databricks(
databricks_conf={
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "m6i.large",
"num_workers": 2,
"docker_image": {
"url": "your-custom-image:latest"
}
}
}
)
Accessing the Spark Session
Inside a task decorated with a Databricks environment, the Spark session is initialized automatically. You access it via the Flyte context:
spark = flyte.ctx().data["spark_session"]
The DatabricksFunctionTask handles the transformation of your local Python code into a format compatible with the Databricks entrypoint.
Troubleshooting and Gotchas
- Secret Name vs. Token: The
databricks_tokenfield in theDatabricksconfig must be the name of the secret stored in Flyte, not the raw token string. - Instance URL: Ensure the
databricks_instancedoes not includehttps://or trailing slashes; it should be the bare domain (e.g.,myorg.cloud.databricks.com). - Cluster Requirements: If using
new_cluster, ensure thenode_type_idandspark_versionare valid for your Databricks workspace region. - Job Lifecycle: The
DatabricksConnectormonitors the job via the Databricks API. If a job fails on the Databricks side, the connector retrieves thestate_messageand propagates it back to the Flyte task phase. You can find the direct link to the Databricks UI in the Flyte console logs under "Databricks Console".