Core Concepts
The Flyte SDK is designed around the principle of declarative execution environments. Instead of defining tasks in isolation, developers group them within a TaskEnvironment, which provides a consistent set of defaults for resources, images, and execution policies. This design ensures that tasks are not just code, but code coupled with the infrastructure requirements necessary to run them reliably.
Task Environments
The TaskEnvironment class (defined in src/flyte/_task_environment.py) is the primary organizational unit in the SDK. It acts as a configuration provider for all tasks defined within its scope. When you define a task using the @env.task decorator, it inherits the environment's settings, such as the Docker image, compute resources, and environment variables.
A key design choice in the SDK is the naming convention for tasks. Every task's fully-qualified name (FQN) is automatically prefixed with the environment name: <env_name>.<function_name>. This prevents naming collisions across different modules and makes it clear which environment a task belongs to in the Flyte UI.
import flyte
# Define an environment with shared defaults
env = flyte.TaskEnvironment(
name="data_processing",
image="ghcr.io/my-org/my-image:latest",
resources=flyte.Resources(cpu=2, memory="4Gi"),
)
@env.task
def process_data(data: list[int]) -> list[int]:
return [x * 2 for x in data]
In this example, the task's FQN will be data_processing.process_data.
Resource Management
Resource management in Flyte is handled via the Resources class in src/flyte/_resources.py. It provides a high-level abstraction over Kubernetes resource requests and limits, supporting standard units for CPU, Memory, Disk, and Shared Memory (shm).
Simple and Range-based Resources
Resources can be specified as single values (which Flyte treats as both request and limit) or as tuples to define a range.
# Request 1 CPU (limit 2) and 2 GiB of memory
resources = flyte.Resources(
cpu=(1, 2),
memory="2Gi",
disk="10Gi"
)
Advanced Accelerators
The SDK provides specialized support for various hardware accelerators, including NVIDIA GPUs, Google TPUs, and AWS Neuron devices. These are configured using helper functions like GPU(), TPU(), and Neuron().
# Advanced GPU configuration with partitioning (MIG)
env = flyte.TaskEnvironment(
name="ml_training",
resources=flyte.Resources(
gpu=flyte.GPU(device="A100", quantity=1, partition="1g.5gb"),
shm="auto" # Automatically sets shared memory to the node maximum
),
)
The shm parameter is particularly important for deep learning tasks that use multi-process data loading (e.g., PyTorch DataLoader), as it prevents "bus error" crashes by providing sufficient shared memory.
Configuration Hierarchy
Flyte implements a three-level hierarchy for task configuration, allowing for both broad defaults and fine-grained overrides:
- TaskEnvironment: Sets the baseline defaults for all tasks in the environment.
- @env.task Decorator: Overrides settings for a specific task at definition time.
- task.override(): Overrides settings at the call site for a specific execution.
The implementation in TaskTemplate.override (found in src/flyte/_task.py) ensures that these overrides are applied correctly. However, there is a significant constraint: when an environment is marked as reusable, certain resources (like resources, env_vars, and secrets) cannot be overridden unless reusability is explicitly disabled for that call.
@env.task(retries=3) # Override environment default for retries
def my_task(x: int) -> int:
return x + 1
# Override resources at the call site
result = my_task.override(resources=flyte.Resources(cpu=4))(x=10)
Instrumentation and Tracing
While @env.task is used for the primary units of execution that Flyte manages, developers often need to instrument regular Python functions that are called within a task. The trace decorator (in src/flyte/_trace.py) allows for this without the overhead of creating a full Flyte task.
Traced functions appear in the Flyte UI with timing information and input/output tracking, providing visibility into the internal execution flow of a complex task.
@flyte.trace
def internal_helper(x: int) -> int:
# This function is not a Flyte task, but its execution is tracked
return x * x
@env.task
def main_task(n: int) -> int:
return internal_helper(n)
Initialization
Before interacting with the Flyte platform (e.g., for remote execution or deployment), the SDK must be initialized. The init and init_from_config functions in src/flyte/_initialize.py establish the connection to the Flyte backend, setting the default project and domain for all subsequent operations.
import flyte
# Initialize connection to a specific project and domain
flyte.init(project="my_project", domain="development")
This initialization step is required for the SDK to resolve remote resources and handle authentication via the FLYTE_API_KEY environment variable.