Skip to main content

Running Jobs on Databricks

To execute distributed Spark tasks on managed Databricks infrastructure, you use the Databricks configuration object within a TaskEnvironment. This allows you to define cluster specifications and authentication details while writing standard PySpark code.

Basic Databricks Task Configuration

The following example demonstrates how to configure a task to run on a new Databricks cluster.

import flyte
from flyteplugins.databricks import Databricks

# Define the Databricks configuration
databricks_conf = Databricks(
spark_conf={
"spark.driver.memory": "2000M",
"spark.executor.memory": "1000M",
},
executor_path="/databricks/python3/bin/python",
databricks_conf={
"run_name": "flyte databricks execution",
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "m6i.large",
"num_workers": 1,
},
"timeout_seconds": 3600,
},
databricks_instance="myorg.cloud.databricks.com",
databricks_token="DATABRICKS_SECRET_NAME",
)

# Create a TaskEnvironment using the plugin configuration
databricks_env = flyte.TaskEnvironment(
name="databricks_env",
plugin_config=databricks_conf,
)

@databricks_env.task
async def spark_task(n: int) -> float:
# Access the Spark session provided by the plugin
spark = flyte.ctx().data["spark_session"]

df = spark.createDataFrame([(i,) for i in range(n)], ["id"])
return float(df.count())

Configuration Components

The Databricks class (found in flyteplugins.databricks.task) accepts several key parameters:

  • databricks_conf: A dictionary compliant with the Databricks Jobs API v2.1. You must specify either new_cluster or existing_cluster_id.
  • databricks_instance: The domain name of your Databricks deployment (e.g., dbc-xxxx.cloud.databricks.com). This can also be set via the FLYTE_DATABRICKS_INSTANCE environment variable.
  • databricks_token: The name of the Flyte secret containing your Databricks API token. The DatabricksConnector uses this name to retrieve the actual token at runtime.
  • spark_conf: Standard Spark configuration key-value pairs. If using new_cluster, these are automatically injected into the cluster's Spark configuration.

Cluster Management Variations

You can choose between provisioning a temporary cluster or reusing an existing one.

Using an Existing Cluster

To reduce startup time, you can target an existing cluster by its ID:

databricks_conf = Databricks(
databricks_conf={
"run_name": "existing cluster task",
"existing_cluster_id": "1234-567890-batch123",
},
databricks_instance="myorg.cloud.databricks.com",
databricks_token="DATABRICKS_SECRET_NAME",
)

Customizing a New Cluster

When using new_cluster, the DatabricksConnector automatically sets the docker_image to match your task's container image if it is not explicitly provided in the new_cluster dictionary.

databricks_conf = Databricks(
databricks_conf={
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "m6i.large",
"num_workers": 2,
"docker_image": {
"url": "your-custom-image:latest"
}
}
}
)

Accessing the Spark Session

Inside a task decorated with a Databricks environment, the Spark session is initialized automatically. You access it via the Flyte context:

spark = flyte.ctx().data["spark_session"]

The DatabricksFunctionTask handles the transformation of your local Python code into a format compatible with the Databricks entrypoint.

Troubleshooting and Gotchas

  • Secret Name vs. Token: The databricks_token field in the Databricks config must be the name of the secret stored in Flyte, not the raw token string.
  • Instance URL: Ensure the databricks_instance does not include https:// or trailing slashes; it should be the bare domain (e.g., myorg.cloud.databricks.com).
  • Cluster Requirements: If using new_cluster, ensure the node_type_id and spark_version are valid for your Databricks workspace region.
  • Job Lifecycle: The DatabricksConnector monitors the job via the Databricks API. If a job fails on the Databricks side, the connector retrieves the state_message and propagates it back to the Flyte task phase. You can find the direct link to the Databricks UI in the Flyte console logs under "Databricks Console".