Resource Management for Distributed Clusters
In vik-advani-flyte-sdk-9b3ce04, resource management for distributed clusters is handled through specialized plugin configuration classes. These classes allow you to define the infrastructure requirements—such as CPU, memory, and custom Kubernetes pod specifications—for different components of a distributed system (e.g., Spark drivers, Ray head nodes, or Dask workers).
These configurations are typically applied to a task using the TaskEnvironment class, which binds the plugin-specific settings to the task's execution context.
Spark Resource Management
The Spark configuration class in flyteplugins.spark.task provides two levels of resource control: Spark-native configurations and Kubernetes-native pod templates.
Spark and Hadoop Configuration
You can tune the Spark engine itself using the spark_conf and hadoop_conf dictionaries. These are passed directly to the Spark cluster manager.
from flyteplugins.spark.task import Spark
spark_config = Spark(
spark_conf={
"spark.driver.memory": "2g",
"spark.executor.memory": "4g",
"spark.executor.instances": "3",
},
hadoop_conf={
"fs.s3a.endpoint": "s3.amazonaws.com",
}
)
Fine-grained Pod Control
For advanced Kubernetes requirements (like node selectors, tolerations, or sidecars), the Spark class supports driver_pod and executor_pod attributes. These accept a PodTemplate object, allowing you to customize the underlying K8s pods for the driver and executors independently.
# plugins/spark/src/flyteplugins/spark/task.py
class Spark(object):
# ...
driver_pod: Optional[PodTemplate] = None
executor_pod: Optional[PodTemplate] = None
Ray Cluster Configuration
Ray clusters are managed via the RayJobConfig class in flyteplugins.ray.task. This implementation separates the configuration of the head node from the worker nodes, allowing for heterogeneous cluster setups.
Head and Worker Node Specs
The HeadNodeConfig and WorkerNodeConfig classes allow you to specify resources in two ways:
- Direct Resources: Using
requestsandlimits(of typeflyte.Resources). - Pod Templates: Using
pod_templatefor full K8s specification.
If requests or limits are provided, the plugin automatically generates a pod specification using pod_spec_from_resources.
from flyteplugins.ray.task import RayJobConfig, HeadNodeConfig, WorkerNodeConfig
from flyte import Resources
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(
requests=Resources(cpu="2", memory="4Gi"),
limits=Resources(cpu="4", memory="8Gi"),
),
worker_node_config=[
WorkerNodeConfig(
group_name="gpu-workers",
replicas=2,
requests=Resources(cpu="4", memory="16Gi", gpu="1"),
)
]
)
Dask Cluster Management
Dask resource management is defined in flyteplugins.dask.task through the Dask configuration class, which contains a Scheduler and a WorkerGroup.
Scheduler and Worker Resources
Both the Scheduler and WorkerGroup classes accept a resources parameter of type flyte.Resources. This allows you to define the compute requirements for the Dask scheduler pod and the pool of worker pods.
from flyteplugins.dask.task import Dask, Scheduler, WorkerGroup
from flyte import Resources
dask_config = Dask(
scheduler=Scheduler(
resources=Resources(cpu="1", memory="2Gi")
),
workers=WorkerGroup(
number_of_workers=5,
resources=Resources(cpu="2", memory="4Gi")
)
)
The Dask plugin also handles code synchronization automatically. In DaskTask.pre, it registers DownloadCodeBundleWorkerPlugin and DownloadCodeBundleSchedulerPlugin to ensure the task's code bundle is available across the cluster.
Databricks Integration
The Databricks class in flyteplugins.databricks.task extends the Spark configuration. It allows you to manage resources on managed Databricks clusters by providing a databricks_conf dictionary that complies with the Databricks Jobs API.
from flyteplugins.databricks.task import Databricks
databricks_config = Databricks(
spark_conf={"spark.executor.instances": "2"},
databricks_conf={
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "m6i.large",
"num_workers": 2,
},
},
databricks_instance="my-org.cloud.databricks.com",
databricks_token="DATABRICKS_SECRET_TOKEN"
)
Applying Configurations via TaskEnvironment
To use these resource configurations, they must be passed to a TaskEnvironment. This environment then wraps the task function, ensuring the cluster is provisioned with the specified resources when the task is executed.
import flyte
# Binding a Spark configuration to a task environment
spark_env = flyte.TaskEnvironment(
name="spark_resource_env",
plugin_config=spark_config, # The Spark object defined above
)
@spark_env.task
async def my_distributed_task():
# This task will run on a Spark cluster configured with the resources defined in spark_config
pass
You can also override these configurations at the call-site using the .override method:
# Overriding the number of executors for a specific execution
my_distributed_task.override(
plugin_config=Spark(spark_conf={"spark.executor.instances": "10"})
)()