Container Reuse
Container reuse in this SDK is a mechanism designed to minimize cold-start overhead by keeping execution environments "warm" across multiple task invocations. When environment creation (e.g., loading large ML models or installing complex dependencies) is expensive relative to the task runtime, reusable containers allow subsequent tasks to reuse the same Python process.
The Reuse Policy
The core of this feature is the ReusePolicy class defined in flyte._reusable_environment. It provides fine-grained control over how many replicas are maintained, how long they stay alive, and how many tasks they can handle simultaneously.
from flyte import ReusePolicy, TaskEnvironment, Resources
# Example: Auto-scaling reusable environment
env = TaskEnvironment(
name="fast_env",
reusable=ReusePolicy(
replicas=(1, 5), # Scale between 1 and 5 replicas
concurrency=1, # One task per replica at a time
idle_ttl=300, # Shut down all replicas after 5 mins of total idle
scaledown_ttl=60, # Wait 60s before removing an individual idle replica
),
)
Configuration Parameters
The ReusePolicy supports the following configuration:
replicas: Defines the pool size.int: A fixed number of replicas (e.g.,replicas=3).tuple(min, max): An auto-scaling range (e.g.,replicas=(1, 10)).
concurrency: The maximum number of concurrent tasks a single replica can handle.- Note: Values greater than 1 are only supported for
asynctasks. If you attempt to useconcurrency > 1with a synchronous function, the SDK will raise aValueErrorduring task definition inTaskEnvironment.task.
- Note: Values greater than 1 are only supported for
idle_ttl: The environment-level timeout. If the entire environment remains idle for this duration, all replicas are shut down. The minimum value is 30 seconds.scaledown_ttl: The per-replica scale-down delay. This prevents rapid "flapping" by ensuring an individual replica stays alive for at least this long after becoming idle during auto-scaling. The minimum value is 30 seconds.
Starvation Prevention
A critical consideration when using container reuse is "starvation." This occurs when a parent task occupies a replica while waiting for child tasks to complete, but no other replicas are available to run those child tasks.
As noted in src/flyte/_reusable_environment.py, the SDK issues a warning if you configure a policy with only 1 replica and 1 concurrency:
if self.replicas[1] == 1 and self.concurrency == 1:
logger.warning(
"It is recommended to use a minimum of 2 replicas, to avoid starvation. "
"Starvation can occur if a task is running and no other replicas are available to handle new tasks."
)
To avoid this, you should either:
- Increase
replicasto at least 2. - Increase
concurrency(requiresasynctasks). - Disable reuse for the parent task using
task.override(reusable='off').
Task Overrides and Restrictions
Tasks defined within a TaskEnvironment inherit its ReusePolicy. You can override this policy at the call site using the .override() method on the task template.
However, src/flyte/_task.py enforces strict rules when reusable is active. Because a reusable container is a shared process with fixed resources, you cannot override compute resources, environment variables, or secrets for a specific invocation unless you first disable reusability.
# This will raise a ValueError
my_task.override(resources=Resources(cpu="4"), reusable=ReusePolicy(replicas=1))
# This is the correct way to change resources for a reusable task
my_task.override(resources=Resources(cpu="4"), reusable="off")
Advanced Pattern: Resource Sharing
One of the most powerful uses of container reuse is sharing expensive resources, such as GPUs, across multiple small tasks. By setting concurrency > 1, multiple async tasks can run on the same replica, allowing them to share process-level state (like a loaded model) or keep a GPU saturated.
In examples/ml/batch_inference_saturate.py, this pattern is used for high-throughput inference:
gpu_env = TaskEnvironment(
name="gpu_worker",
resources=Resources(cpu=4, memory="16Gi", gpu="L4:1"),
reusable=ReusePolicy(
replicas=2,
concurrency=10, # 10 tasks share 1 GPU on each replica
),
)
@gpu_env.task
async def infer_batch(prompts: list[str]) -> list[str]:
# The model can be loaded once at the module level or
# cached in a process-level singleton.
return await model.generate(prompts)
In this scenario, the concurrency=10 setting ensures that the GPU is not idle while waiting for I/O in any single task invocation.