Skip to main content

Resource Management for Distributed Clusters

In vik-advani-flyte-sdk-9b3ce04, resource management for distributed clusters is handled through specialized plugin configuration classes. These classes allow you to define the infrastructure requirements—such as CPU, memory, and custom Kubernetes pod specifications—for different components of a distributed system (e.g., Spark drivers, Ray head nodes, or Dask workers).

These configurations are typically applied to a task using the TaskEnvironment class, which binds the plugin-specific settings to the task's execution context.

Spark Resource Management

The Spark configuration class in flyteplugins.spark.task provides two levels of resource control: Spark-native configurations and Kubernetes-native pod templates.

Spark and Hadoop Configuration

You can tune the Spark engine itself using the spark_conf and hadoop_conf dictionaries. These are passed directly to the Spark cluster manager.

from flyteplugins.spark.task import Spark

spark_config = Spark(
spark_conf={
"spark.driver.memory": "2g",
"spark.executor.memory": "4g",
"spark.executor.instances": "3",
},
hadoop_conf={
"fs.s3a.endpoint": "s3.amazonaws.com",
}
)

Fine-grained Pod Control

For advanced Kubernetes requirements (like node selectors, tolerations, or sidecars), the Spark class supports driver_pod and executor_pod attributes. These accept a PodTemplate object, allowing you to customize the underlying K8s pods for the driver and executors independently.

# plugins/spark/src/flyteplugins/spark/task.py
class Spark(object):
# ...
driver_pod: Optional[PodTemplate] = None
executor_pod: Optional[PodTemplate] = None

Ray Cluster Configuration

Ray clusters are managed via the RayJobConfig class in flyteplugins.ray.task. This implementation separates the configuration of the head node from the worker nodes, allowing for heterogeneous cluster setups.

Head and Worker Node Specs

The HeadNodeConfig and WorkerNodeConfig classes allow you to specify resources in two ways:

  1. Direct Resources: Using requests and limits (of type flyte.Resources).
  2. Pod Templates: Using pod_template for full K8s specification.

If requests or limits are provided, the plugin automatically generates a pod specification using pod_spec_from_resources.

from flyteplugins.ray.task import RayJobConfig, HeadNodeConfig, WorkerNodeConfig
from flyte import Resources

ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(
requests=Resources(cpu="2", memory="4Gi"),
limits=Resources(cpu="4", memory="8Gi"),
),
worker_node_config=[
WorkerNodeConfig(
group_name="gpu-workers",
replicas=2,
requests=Resources(cpu="4", memory="16Gi", gpu="1"),
)
]
)

Dask Cluster Management

Dask resource management is defined in flyteplugins.dask.task through the Dask configuration class, which contains a Scheduler and a WorkerGroup.

Scheduler and Worker Resources

Both the Scheduler and WorkerGroup classes accept a resources parameter of type flyte.Resources. This allows you to define the compute requirements for the Dask scheduler pod and the pool of worker pods.

from flyteplugins.dask.task import Dask, Scheduler, WorkerGroup
from flyte import Resources

dask_config = Dask(
scheduler=Scheduler(
resources=Resources(cpu="1", memory="2Gi")
),
workers=WorkerGroup(
number_of_workers=5,
resources=Resources(cpu="2", memory="4Gi")
)
)

The Dask plugin also handles code synchronization automatically. In DaskTask.pre, it registers DownloadCodeBundleWorkerPlugin and DownloadCodeBundleSchedulerPlugin to ensure the task's code bundle is available across the cluster.

Databricks Integration

The Databricks class in flyteplugins.databricks.task extends the Spark configuration. It allows you to manage resources on managed Databricks clusters by providing a databricks_conf dictionary that complies with the Databricks Jobs API.

from flyteplugins.databricks.task import Databricks

databricks_config = Databricks(
spark_conf={"spark.executor.instances": "2"},
databricks_conf={
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "m6i.large",
"num_workers": 2,
},
},
databricks_instance="my-org.cloud.databricks.com",
databricks_token="DATABRICKS_SECRET_TOKEN"
)

Applying Configurations via TaskEnvironment

To use these resource configurations, they must be passed to a TaskEnvironment. This environment then wraps the task function, ensuring the cluster is provisioned with the specified resources when the task is executed.

import flyte

# Binding a Spark configuration to a task environment
spark_env = flyte.TaskEnvironment(
name="spark_resource_env",
plugin_config=spark_config, # The Spark object defined above
)

@spark_env.task
async def my_distributed_task():
# This task will run on a Spark cluster configured with the resources defined in spark_config
pass

You can also override these configurations at the call-site using the .override method:

# Overriding the number of executors for a specific execution
my_distributed_task.override(
plugin_config=Spark(spark_conf={"spark.executor.instances": "10"})
)()