Configuring Ray Clusters
To execute distributed Python code using Ray on Flyte, you must configure a Ray cluster topology using RayJobConfig. This configuration defines the head node, worker groups, and resource allocations required for the Ray job.
Basic Cluster Configuration
The most common way to set up a Ray cluster is to define a RayJobConfig and pass it to a TaskEnvironment. This creates a transient cluster that exists for the duration of the task.
import typing
import ray
import flyte
from flyteplugins.ray.task import HeadNodeConfig, RayJobConfig, WorkerNodeConfig
# Define the cluster topology
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={"log-color": "True"},
requests=flyte.Resources(cpu="1", memory="1Gi"),
limits=flyte.Resources(cpu="2", memory="2Gi"),
),
worker_node_config=[
WorkerNodeConfig(
group_name="compute-group",
replicas=2,
requests=flyte.Resources(cpu="2", memory="4Gi"),
)
],
shutdown_after_job_finishes=True,
ttl_seconds_after_finished=300,
)
# Apply configuration to a TaskEnvironment
ray_env = flyte.TaskEnvironment(
name="ray_env",
plugin_config=ray_config,
)
@ray.remote
def compute_square(x: int) -> int:
return x * x
@ray_env.task
async def run_ray_job(n: int = 5) -> typing.List[int]:
# The plugin automatically calls ray.init() based on ray_config
futures = [compute_square.remote(i) for i in range(n)]
return ray.get(futures)
Configuring Node Resources
You can specify resources for both the head and worker nodes using the requests and limits parameters in HeadNodeConfig and WorkerNodeConfig.
- Head Node: Use
HeadNodeConfigto define the orchestrator node's resources. - Worker Groups: Use
WorkerNodeConfigto define groups of identical worker nodes. You can have multiple worker groups by passing a list toworker_node_config.
If you provide requests or limits, the plugin internally generates a PodTemplate for that node group. Alternatively, you can provide a custom pod_template directly:
from flyte.core.pod_template import PodTemplate
worker_config = WorkerNodeConfig(
group_name="gpu-workers",
replicas=1,
pod_template=PodTemplate(
# Custom pod spec configuration
)
)
Enabling Autoscaling
To allow the Ray cluster to scale dynamically based on workload, set enable_autoscaling=True and define min_replicas and max_replicas in your WorkerNodeConfig.
ray_config = RayJobConfig(
enable_autoscaling=True,
worker_node_config=[
WorkerNodeConfig(
group_name="dynamic-group",
replicas=2, # Initial replicas
min_replicas=1,
max_replicas=10,
)
],
)
Managing Runtime Environments
You can manage dependencies for your Ray workers using the runtime_env field in RayJobConfig. This is useful for installing pip packages or setting environment variables across the cluster.
ray_config = RayJobConfig(
worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=2)],
runtime_env={
"pip": ["numpy", "pandas", "scikit-learn"],
"env_vars": {"MY_ENV_VAR": "value"}
},
)
Connecting to Existing Clusters
If you want to connect to a Ray cluster that is already running (e.g., created by a previous Flyte task), you can provide the cluster address in the RayJobConfig.
# Option 1: Using RayJobConfig address
existing_cluster_config = RayJobConfig(
address="ray://<cluster-head-ip>:10001",
worker_node_config=[] # No new workers needed
)
@flyte.TaskEnvironment(name="client_env", plugin_config=existing_cluster_config).task
async def use_existing_cluster():
# ray.init(address=...) is called automatically by the plugin
pass
# Option 2: Manual connection inside the task
@flyte.task
async def manual_connect(cluster_ip: str):
ray.init(address=f"ray://{cluster_ip}:10001")
# ... execute ray code ...
Troubleshooting and Best Practices
- Resource Overrides: If you provide both
requests/limitsand apod_templatein a node configuration, therequestsandlimitswill take precedence and be used to construct a new pod specification. - Automatic Working Directory: When running in a cluster environment,
RayFunctionTaskautomatically sets theworking_dirin the Rayruntime_envto the current working directory, while excluding Flyte-specific artifacts likescript_mode.tar.gz. - KubeRay Compatibility: The
runtime_envfield is internally converted toruntime_env_yamlfor compatibility with KubeRay >= 1.1.0. - Cluster Lifecycle: Use
shutdown_after_job_finishes=Trueto ensure resources are cleaned up after the task completes. You can also setttl_seconds_after_finishedto keep the cluster alive for a short period for debugging or subsequent tasks.