Skip to main content

Execution Context

The execution context in this SDK provides a mechanism for tasks to access runtime metadata, manage state across retries, and propagate custom information through the execution graph. This is primarily achieved through two layers: the user-facing TaskContext and the internal Context management system.

Accessing the Task Context

The primary entry point for developers is the flyte.ctx() function, defined in src/flyte/_context.py. This function returns an instance of flyte.models.TaskContext when called within a running task.

import flyte

@env.task
def my_task():
ctx = flyte.ctx()
if ctx:
print(f"Running action: {ctx.action}")
print(f"Attempt number: {ctx.attempt_number}")

If flyte.ctx() is called outside of an active task execution (for example, at the module level during import), it returns None.

The TaskContext Interface

The TaskContext class in src/flyte/models.py is a dataclass that aggregates metadata provided by the Flyte runtime. Key attributes include:

  • action: A unique identifier (ActionID) for the current execution.
  • attempt_number: Retrieved from the FLYTE_ATTEMPT_NUMBER environment variable, indicating how many times the task has been retried.
  • raw_data_path: The prefix used for storing raw data (e.g., offloaded literals).
  • custom_context: A dictionary of strings used for user-defined metadata propagation.
  • checkpoint: A helper for managing durable state.

State Management with Checkpoints

One of the most powerful features of the TaskContext is the checkpoint property. It provides a flyte.Checkpoint object (defined in src/flyte/_checkpoint.py) that allows tasks to save progress and resume from failure.

As seen in examples/checkpoint/generic_data_checkpoint.py, the checkpoint helper is used to handle state across retries:

@env.task(retries=3)
def use_checkpoint(n_iterations: int) -> int:
checkpoint = flyte.ctx().checkpoint

# Load previous state if it exists
path = checkpoint.load_sync()
if path:
with open(path, "rb") as f:
index = int(f.read().decode())
else:
index = 0

# ... perform work ...

# Save state for the next attempt
checkpoint.save_sync(f"{index + 1}".encode())

The TaskContext.checkpoint property lazily constructs the Checkpoint object using checkpoint_paths provided by the runtime.

Metadata Propagation

The SDK supports propagating custom metadata through the execution tree using custom_context. This is useful for passing configuration or tracing information to downstream tasks without explicitly adding them to every function signature.

Using the custom_context Manager

The flyte.custom_context manager (found in src/flyte/_custom_context.py) allows you to inject metadata into the context of any tasks spawned within its scope.

@env.task
async def main(x: int) -> int:
vals = []
for i in range(3):
# Injects 'increment' into the context of downstream_task
with flyte.custom_context(increment=str(i)):
vals.append(await downstream_task(x))
return sum(vals)

Inside the downstream task, this metadata is retrieved via flyte.ctx().custom_context or the helper flyte.get_custom_context().

Internal Context Architecture

The underlying implementation relies on the Context class in src/flyte/_context.py and Python's contextvars module.

Context and ContextData

The Context class wraps ContextData, which holds the actual state. The SDK maintains a global root_context_var to track the current context across synchronous and asynchronous boundaries.

# From src/flyte/_context.py
root_context_var = contextvars.ContextVar("root", default=Context(data=ContextData()))

Contextual Execution

Because Context is not coroutine-safe for direct mutations, the SDK uses contextual_run to execute functions within a new context tree. This ensures that mutations (like updating TaskContext or raw_data_path) are isolated to the specific execution branch.

When you use a context manager like with flyte.ctx().replace_task_context(new_tctx):, the Context.__enter__ method uses root_context_var.set(self) to make the modified context active for the duration of the block, and __exit__ restores the previous state using the token returned by set().

Raw Data Resolution

The Context class also manages the resolution of raw_data_path. It prioritizes the path defined in the TaskContext over the general context-level path:

@property
def raw_data(self) -> RawDataPath:
if self.data and self.data.task_context and self.data.task_context.raw_data_path:
return self.data.task_context.raw_data_path
if self.data and self.data.raw_data_path:
return self.data.raw_data_path
raise ValueError("Raw data path has not been set in the context.")

This hierarchy ensures that task-specific data storage configurations take precedence during execution.