Skip to main content

Configuring Ray Clusters

To execute distributed Python code using Ray on Flyte, you must configure a Ray cluster topology using RayJobConfig. This configuration defines the head node, worker groups, and resource allocations required for the Ray job.

Basic Cluster Configuration

The most common way to set up a Ray cluster is to define a RayJobConfig and pass it to a TaskEnvironment. This creates a transient cluster that exists for the duration of the task.

import typing
import ray
import flyte
from flyteplugins.ray.task import HeadNodeConfig, RayJobConfig, WorkerNodeConfig

# Define the cluster topology
ray_config = RayJobConfig(
head_node_config=HeadNodeConfig(
ray_start_params={"log-color": "True"},
requests=flyte.Resources(cpu="1", memory="1Gi"),
limits=flyte.Resources(cpu="2", memory="2Gi"),
),
worker_node_config=[
WorkerNodeConfig(
group_name="compute-group",
replicas=2,
requests=flyte.Resources(cpu="2", memory="4Gi"),
)
],
shutdown_after_job_finishes=True,
ttl_seconds_after_finished=300,
)

# Apply configuration to a TaskEnvironment
ray_env = flyte.TaskEnvironment(
name="ray_env",
plugin_config=ray_config,
)

@ray.remote
def compute_square(x: int) -> int:
return x * x

@ray_env.task
async def run_ray_job(n: int = 5) -> typing.List[int]:
# The plugin automatically calls ray.init() based on ray_config
futures = [compute_square.remote(i) for i in range(n)]
return ray.get(futures)

Configuring Node Resources

You can specify resources for both the head and worker nodes using the requests and limits parameters in HeadNodeConfig and WorkerNodeConfig.

  • Head Node: Use HeadNodeConfig to define the orchestrator node's resources.
  • Worker Groups: Use WorkerNodeConfig to define groups of identical worker nodes. You can have multiple worker groups by passing a list to worker_node_config.

If you provide requests or limits, the plugin internally generates a PodTemplate for that node group. Alternatively, you can provide a custom pod_template directly:

from flyte.core.pod_template import PodTemplate

worker_config = WorkerNodeConfig(
group_name="gpu-workers",
replicas=1,
pod_template=PodTemplate(
# Custom pod spec configuration
)
)

Enabling Autoscaling

To allow the Ray cluster to scale dynamically based on workload, set enable_autoscaling=True and define min_replicas and max_replicas in your WorkerNodeConfig.

ray_config = RayJobConfig(
enable_autoscaling=True,
worker_node_config=[
WorkerNodeConfig(
group_name="dynamic-group",
replicas=2, # Initial replicas
min_replicas=1,
max_replicas=10,
)
],
)

Managing Runtime Environments

You can manage dependencies for your Ray workers using the runtime_env field in RayJobConfig. This is useful for installing pip packages or setting environment variables across the cluster.

ray_config = RayJobConfig(
worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=2)],
runtime_env={
"pip": ["numpy", "pandas", "scikit-learn"],
"env_vars": {"MY_ENV_VAR": "value"}
},
)

Connecting to Existing Clusters

If you want to connect to a Ray cluster that is already running (e.g., created by a previous Flyte task), you can provide the cluster address in the RayJobConfig.

# Option 1: Using RayJobConfig address
existing_cluster_config = RayJobConfig(
address="ray://<cluster-head-ip>:10001",
worker_node_config=[] # No new workers needed
)

@flyte.TaskEnvironment(name="client_env", plugin_config=existing_cluster_config).task
async def use_existing_cluster():
# ray.init(address=...) is called automatically by the plugin
pass

# Option 2: Manual connection inside the task
@flyte.task
async def manual_connect(cluster_ip: str):
ray.init(address=f"ray://{cluster_ip}:10001")
# ... execute ray code ...

Troubleshooting and Best Practices

  • Resource Overrides: If you provide both requests/limits and a pod_template in a node configuration, the requests and limits will take precedence and be used to construct a new pod specification.
  • Automatic Working Directory: When running in a cluster environment, RayFunctionTask automatically sets the working_dir in the Ray runtime_env to the current working directory, while excluding Flyte-specific artifacts like script_mode.tar.gz.
  • KubeRay Compatibility: The runtime_env field is internally converted to runtime_env_yaml for compatibility with KubeRay >= 1.1.0.
  • Cluster Lifecycle: Use shutdown_after_job_finishes=True to ensure resources are cleaned up after the task completes. You can also set ttl_seconds_after_finished to keep the cluster alive for a short period for debugging or subsequent tasks.