Tasks and Templates
In the Flyte SDK, the TaskTemplate is the fundamental unit of work. It serves as a blueprint that encapsulates both the logic to be executed and the infrastructure requirements needed to run that logic. By separating the definition of a task from its execution context, the SDK allows for a flexible configuration hierarchy that can be adjusted at definition time or dynamically at the point of invocation.
The Configuration Hierarchy
The SDK implements a tiered approach to task configuration. This design allows developers to set sensible defaults at a broad level and refine them as needed for specific units of work.
- Environment Defaults: A
TaskEnvironmentdefines the baseline for all tasks associated with it, including the default container image and resource requirements. - Task Definition: The
@env.taskdecorator (or direct instantiation of a template) allows for per-task overrides. - Invocation Overrides: The
.override()method on a task instance allows for dynamic adjustments at the call site.
For example, a task might be defined with standard resources but overridden to use a GPU for a specific high-compute run:
@env.task(resources=Resources(cpu="1", memory="1Gi"))
def compute_task(data: dict):
...
# Overriding at the call site
compute_task.override(resources=Resources(cpu="4", gpu="1"))(data=my_data)
The TaskTemplate.override method (found in src/flyte/_task.py) creates a new instance of the template using dataclasses.replace, ensuring that the original task definition remains immutable while providing a specialized version for a specific execution.
AsyncFunctionTaskTemplate
The most common implementation of a task is the AsyncFunctionTaskTemplate. This class wraps standard Python functions (both synchronous and asynchronous) to make them compatible with the Flyte execution engine.
Execution and Local Forwarding
The TaskTemplate implements __call__, which acts as a router. When a task is called, the SDK checks the internal_ctx() to determine the execution context:
- Task Context: If the code is running inside a Flyte execution (e.g., on a remote cluster or within a managed local run), the task is submitted to a controller for orchestration.
- Local Context: If the task is called as a regular Python function, it triggers the
forward()method. InAsyncFunctionTaskTemplate,forward()simply executes the underlying function directly, staying "out of the way" of standard Python execution.
Handling Synchronous Functions
While the SDK is built on an asynchronous foundation, it provides seamless support for legacy synchronous code. The AsyncFunctionTaskTemplate.__post_init__ method detects if a function is not a coroutine and sets a _call_as_synchronous flag. During execution, the execute method uses run_sync_with_loop to bridge the sync-async gap:
# From src/flyte/_task.py
async def execute(self, *args, **kwargs) -> R:
...
if iscoroutinefunction(self.func):
v = await self.func(*args, **kwargs)
else:
v = await run_sync_with_loop(self.func, *args, **kwargs)
...
Extensibility and Plugins
The TaskTemplate is designed to be subclassed to support specialized execution engines or external services. This is achieved through several extension points that define how the task is serialized and what arguments are passed to the container.
Key extension methods include:
custom_config: Returns a dictionary of configuration specific to a plugin (e.g., BigQuery job settings).container_args: Defines the command-line arguments for the Flyte container.sql: Used by database-centric tasks to provide the raw query string.
For instance, a BigQuery plugin might implement custom_config to pass project and location metadata to the Flyte backend:
class BigQueryTask(TaskTemplate):
def custom_config(self, sctx: SerializationContext) -> Dict[str, str]:
return {
"Location": self.plugin_config.Location,
"ProjectID": self.plugin_config.ProjectID
}
Design Constraints and Tradeoffs
The implementation of tasks in this SDK includes several deliberate constraints to ensure reliability and predictability in distributed environments.
Reusability vs. Overrides
A significant constraint exists when using reusable environments. If a task is marked as reusable (via ReusePolicy), the SDK prevents overriding resources, env_vars, or secrets at the call site. This is because reusable environments are pre-warmed with specific configurations; allowing these to change at invocation would break the isolation and consistency guarantees of the environment.
Serialization Limits
The max_inline_io_bytes parameter (defaulting to 10 MiB via MAX_INLINE_IO_BYTES) limits the size of data passed directly between tasks as primitives or dictionaries. This design choice forces developers to use Flyte's off-loaded data types (like files or dataframes) for large datasets, preventing the orchestration layer from being overwhelmed by large payloads.
Nested Tasks in Traces
The SDK explicitly forbids invoking tasks from within a flyte.trace context. If a task is detected inside a trace, it raises a TraceDoesNotAllowNestedTasksError. This ensures that tracing remains a lightweight observation tool rather than a mechanism for complex nested orchestration, which should instead be handled via workflows or sub-tasks.
Migration with .aio
To support the transition from synchronous to asynchronous codebases, TaskTemplate provides an .aio() method. This allows synchronous tasks to be awaited in an asynchronous parent task, facilitating a gradual migration to asyncio patterns without requiring a complete rewrite of existing task logic.