Compute Resource Management
To manage compute resources for your tasks, use the Resources class to specify CPU, memory, ephemeral disk, and accelerator requirements. These can be set as defaults for an entire environment or overridden for specific task executions.
Configure Basic Resources
You can define resource requirements when creating a TaskEnvironment. This ensures all tasks associated with that environment have the necessary compute capacity.
import flyte
# Define an environment with 1 CPU, 1GiB memory, and 1GiB ephemeral disk
env = flyte.TaskEnvironment(
name="standard-compute",
resources=flyte.Resources(
cpu="1",
memory="1Gi",
disk="1Gi",
),
)
@env.task
async def process_data(x: int) -> int:
return x + 1
Specify Resource Requests and Limits
The Resources class supports both single values and request/limit ranges for CPU and memory. Providing a tuple allows you to request a minimum amount of resources while setting a maximum limit.
- CPU: Accepts
int,float, or Kubernetes-style strings (e.g.,"500m"). - Memory: Accepts strings with binary (
Mi,Gi) or decimal (M,G) units.
import flyte
resources = flyte.Resources(
# Request 1 CPU core, limit to 2
cpu=(1, 2),
# Request 2GiB memory, limit to 4GiB
memory=("2Gi", "4Gi"),
)
Allocate Accelerators (GPU, TPU, Neuron)
Accelerators can be specified using simple strings for common configurations or specialized helper functions for advanced partitioning.
Simple GPU Allocation
Use a formatted string "Type:Quantity" or a simple integer for generic GPUs.
import flyte
# Request 8 A100 GPUs
gpu_resources = flyte.Resources(gpu="A100:8")
# Request 1 generic GPU
generic_gpu = flyte.Resources(gpu=1)
Advanced Accelerator Configuration
For hardware partitioning (like NVIDIA MIG) or specific TPU slices, use the GPU, TPU, or Neuron helper functions.
import flyte
# GPU with MIG partitioning (1g.5gb partition on an A100)
mig_gpu = flyte.Resources(
gpu=flyte.GPU(device="A100", quantity=1, partition="1g.5gb")
)
# TPU with specific slice configuration
tpu_resources = flyte.Resources(
gpu=flyte.TPU(device="V5P", partition="2x2x1")
)
# AWS Neuron (Inferentia/Trainium)
neuron_resources = flyte.Resources(
gpu=flyte.Neuron(device="Trn1")
)
Manage Shared Memory and Ephemeral Disk
For workloads like deep learning that require high-performance data loading, you can configure shared memory (/dev/shm).
import flyte
resources = flyte.Resources(
disk="100Gi", # Ephemeral storage cleaned up after task completion
shm="16Gi", # Specific shared memory size
# OR
# shm="auto" # Automatically use maximum available shared memory
)
Override Resources at Call-Time
You can dynamically adjust resources for a specific task call using the .override() method. This is particularly useful for handling tasks that might encounter Out-Of-Memory (OOM) errors with default settings.
import flyte
@flyte.task
async def memory_intensive_task(data: list):
# ... processing ...
pass
async def run_with_retry(data):
try:
# Try with standard resources
await memory_intensive_task.override(
resources=flyte.Resources(memory="2Gi")
)(data=data)
except flyte.errors.OOMError:
# Retry with significantly more memory if it fails
await memory_intensive_task.override(
resources=flyte.Resources(memory="16Gi")
)(data=data)
Troubleshooting
- GPU Quantity: When using the
Deviceclass orGPU()helper, thequantitymust be at least1. - Invalid Accelerator Strings: If using the string format (e.g.,
"H100:2"), the device name must match one of the supported types defined inflyte._resources.Accelerators. - Tuple Length: CPU and Memory ranges must be exactly two elements:
(request, limit). Providing more or fewer elements will raise aValueError.