Execution Context
The execution context in this SDK provides a mechanism for tasks to access runtime metadata, manage state across retries, and propagate custom information through the execution graph. This is primarily achieved through two layers: the user-facing TaskContext and the internal Context management system.
Accessing the Task Context
The primary entry point for developers is the flyte.ctx() function, defined in src/flyte/_context.py. This function returns an instance of flyte.models.TaskContext when called within a running task.
import flyte
@env.task
def my_task():
ctx = flyte.ctx()
if ctx:
print(f"Running action: {ctx.action}")
print(f"Attempt number: {ctx.attempt_number}")
If flyte.ctx() is called outside of an active task execution (for example, at the module level during import), it returns None.
The TaskContext Interface
The TaskContext class in src/flyte/models.py is a dataclass that aggregates metadata provided by the Flyte runtime. Key attributes include:
action: A unique identifier (ActionID) for the current execution.attempt_number: Retrieved from theFLYTE_ATTEMPT_NUMBERenvironment variable, indicating how many times the task has been retried.raw_data_path: The prefix used for storing raw data (e.g., offloaded literals).custom_context: A dictionary of strings used for user-defined metadata propagation.checkpoint: A helper for managing durable state.
State Management with Checkpoints
One of the most powerful features of the TaskContext is the checkpoint property. It provides a flyte.Checkpoint object (defined in src/flyte/_checkpoint.py) that allows tasks to save progress and resume from failure.
As seen in examples/checkpoint/generic_data_checkpoint.py, the checkpoint helper is used to handle state across retries:
@env.task(retries=3)
def use_checkpoint(n_iterations: int) -> int:
checkpoint = flyte.ctx().checkpoint
# Load previous state if it exists
path = checkpoint.load_sync()
if path:
with open(path, "rb") as f:
index = int(f.read().decode())
else:
index = 0
# ... perform work ...
# Save state for the next attempt
checkpoint.save_sync(f"{index + 1}".encode())
The TaskContext.checkpoint property lazily constructs the Checkpoint object using checkpoint_paths provided by the runtime.
Metadata Propagation
The SDK supports propagating custom metadata through the execution tree using custom_context. This is useful for passing configuration or tracing information to downstream tasks without explicitly adding them to every function signature.
Using the custom_context Manager
The flyte.custom_context manager (found in src/flyte/_custom_context.py) allows you to inject metadata into the context of any tasks spawned within its scope.
@env.task
async def main(x: int) -> int:
vals = []
for i in range(3):
# Injects 'increment' into the context of downstream_task
with flyte.custom_context(increment=str(i)):
vals.append(await downstream_task(x))
return sum(vals)
Inside the downstream task, this metadata is retrieved via flyte.ctx().custom_context or the helper flyte.get_custom_context().
Internal Context Architecture
The underlying implementation relies on the Context class in src/flyte/_context.py and Python's contextvars module.
Context and ContextData
The Context class wraps ContextData, which holds the actual state. The SDK maintains a global root_context_var to track the current context across synchronous and asynchronous boundaries.
# From src/flyte/_context.py
root_context_var = contextvars.ContextVar("root", default=Context(data=ContextData()))
Contextual Execution
Because Context is not coroutine-safe for direct mutations, the SDK uses contextual_run to execute functions within a new context tree. This ensures that mutations (like updating TaskContext or raw_data_path) are isolated to the specific execution branch.
When you use a context manager like with flyte.ctx().replace_task_context(new_tctx):, the Context.__enter__ method uses root_context_var.set(self) to make the modified context active for the duration of the block, and __exit__ restores the previous state using the token returned by set().
Raw Data Resolution
The Context class also manages the resolution of raw_data_path. It prioritizes the path defined in the TaskContext over the general context-level path:
@property
def raw_data(self) -> RawDataPath:
if self.data and self.data.task_context and self.data.task_context.raw_data_path:
return self.data.task_context.raw_data_path
if self.data and self.data.raw_data_path:
return self.data.raw_data_path
raise ValueError("Raw data path has not been set in the context.")
This hierarchy ensures that task-specific data storage configurations take precedence during execution.