Skip to main content

Task Discovery and Inspection

In the vik-advani-flyte-sdk-9b3ce04 codebase, task discovery and inspection are handled primarily through the Task and TaskDetails classes located in src/flyte/remote/_task.py. This implementation allows developers to interact with tasks that have already been deployed to a Flyte cluster, enabling dynamic workflow construction and resource management without needing the original source code of the tasks.

Task Discovery

The SDK provides two primary ways to discover tasks: retrieving a specific task by its identifier or listing tasks within a project and domain.

Retrieving Specific Tasks

The Task.get method is the entry point for fetching a remote task. It returns a LazyEntity, which acts as a proxy for the task definition.

from flyte.remote import Task

# Retrieve a specific version of a task
lazy_task = Task.get(
name="my_math_task",
project="flytesnacks",
domain="development",
version="v1"
)

# Retrieve the latest version automatically
latest_task = Task.get(
name="my_math_task",
project="flytesnacks",
domain="development",
auto_version="latest"
)

The auto_version parameter supports two modes:

  • "latest": The SDK queries the task service using Task.listall (sorted by created_at descending) to find the most recent version.
  • "current": The version is derived from the current execution context (flyte.ctx().version). This is specifically designed for use within a task context where you want to reference other tasks deployed in the same deployment cycle.

Listing Tasks

To browse available tasks, Task.listall provides an asynchronous iterator (wrapped for synchronous use via @syncify) that communicates with the task_service.

# List all tasks in a project
tasks = Task.listall(project="flytesnacks", domain="development")

for task in tasks:
print(f"Found task: {task.name} (Version: {task.version})")

The listall method supports filtering by name (by_task_name), environment prefix (by_task_env), and whether the task is an entrypoint.

The Lazy Loading Pattern

A key architectural decision in this SDK is the use of LazyEntity. When you call Task.get, the SDK does not immediately perform a network request to fetch the full task definition. Instead, it returns a LazyEntity instance.

The actual fetch occurs only when:

  1. The .fetch() method is explicitly called.
  2. The task is invoked (e.g., lazy_task(arg1=val1)).
  3. The .override() method is called.

This pattern prevents unnecessary network overhead when building complex workflows where only a subset of discovered tasks might actually be used or inspected.

Task Inspection

Once a task is fetched, it is represented as a TaskDetails object. This class provides properties to inspect the task's requirements and interface, derived from the underlying task_definition_pb2.TaskDetails protobuf.

Interface and Arguments

The interface property uses flyte.types.guess_interface to convert the protobuf interface into a NativeInterface, allowing you to inspect input and output types.

task_details = lazy_task.fetch()

# Inspect inputs and outputs
print(f"Inputs: {task_details.interface.inputs}")
print(f"Outputs: {task_details.interface.outputs}")

# Identify required vs optional arguments
print(f"Required: {task_details.required_args}")
print(f"Defaults: {task_details.default_input_args}")

Resources and Metadata

You can inspect the infrastructure requirements and execution policies defined for the task:

  • Resources: task_details.resources returns a tuple of (requests, limits) for CPU, memory, and GPU.
  • Cache: task_details.cache returns a flyte.Cache object describing the discovery version and serializability.
  • Secrets: task_details.secrets provides a list of secret keys the task expects to access.

Task Overrides

The TaskDetails.override method allows you to create a modified version of a discovered task. This is useful for adjusting resource requirements or retry strategies for a specific workflow execution without modifying the original task registration.

from flyte import Resources, RetryStrategy

# Create a version of the task with more memory and a retry policy
overridden_task = lazy_task.override(
resources=Resources(cpu="2", memory="1Gi"),
retries=RetryStrategy(retries=3),
env_vars={"LOG_LEVEL": "DEBUG"}
)

The override method performs a CopyFrom on the existing protobuf and updates the relevant fields in the TaskTemplate. Supported overrides include:

  • resources: CPU, Memory, GPU, and Ephemeral Storage.
  • retries: Integer or RetryStrategy object.
  • timeout: Execution timeout duration.
  • env_vars: Environment variables passed to the container.
  • cache: Overriding cache behavior (must be "disable" or "override" for remote tasks).

Execution Context and Constraints

Remote tasks discovered via Task.get are intended for remote execution. The TaskDetails.__call__ implementation enforces several constraints:

  1. No Local Execution: Calling a remote task outside of a Flyte task context will raise a flyte.errors.RemoteTaskUsageError. These tasks are executed by submitting a task reference to the Flyte controller.
  2. Keyword Arguments Only: Positional arguments are not supported for remote task calls. You must use keyword arguments that match the task's interface.
  3. Required Arguments: The SDK validates that all required_args are provided in the kwargs before attempting to submit the task to the controller.
# Correct usage within a workflow
@flyte.workflow
def my_workflow(val: int) -> int:
# This will be submitted to the controller for remote execution
return overridden_task(x=val)