Skip to main content

Remote Management & Serving

Interact with the Flyte platform programmatically to manage task executions, discover remote entities, and deploy long-running applications.

Initializing the Remote Client

Before interacting with remote entities, initialize the client using your Flyte configuration. This typically uses the FLYTE_CONFIG environment variable to locate your configuration file.

import flyte

# Initializes the global client context
flyte.init_from_config()

Execution Management

Manage remote task executions using the Run and Action classes. You can list, filter, wait for, and abort runs programmatically.

Listing and Filtering Runs

Use Run.listall to retrieve executions. You can filter by phase, task name, or time ranges using TimeFilter.

from flyte.remote import Run
from flyte.remote._common import TimeFilter
import datetime

# List all runs that are currently in the 'running' phase
runs = [r async for r in Run.listall.aio(in_phase=("running",))]

# Filter by creation time (e.g., runs created in the last 24 hours)
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
recent_runs = Run.listall_sync(
created_at=TimeFilter(after=yesterday),
limit=10
)

Aborting Long-Running Executions

You can programmatically terminate runs based on custom logic, such as duration thresholds.

from flyte.remote import Run
import datetime

async def abort_stale_runs(hours_threshold: int):
threshold = datetime.datetime.now() - datetime.timedelta(hours=hours_threshold)

async for run in Run.listall.aio(in_phase=("running",)):
# Access the underlying Action to check start time
if run.action.start_time < threshold:
print(f"Aborting stale run: {run.name}")
await run.abort.aio(reason=f"Exceeded {hours_threshold} hour limit.")

# Usage
# await abort_stale_runs(10)

Tracking Progress and Logs

The wait method provides a blocking (or async) way to track execution until it reaches a terminal state. You can also stream logs directly from the run.

from flyte.remote import Run

run = Run.get_sync(name="my-run-id")

# Wait for completion with a progress bar
run.wait_sync(quiet=False)

# Stream logs to the console
async for line in run.get_logs.aio(filter_system=True):
print(line)

Remote Entity Discovery

The Task and TaskDetails classes allow you to inspect metadata for tasks already deployed on the platform.

Fetching Task Metadata

You can retrieve a task reference using Task.get. By default, this returns a LazyEntity that fetches data only when accessed.

from flyte.remote import Task

# Get a reference to the latest version of a task
lazy_task = Task.get("my_task_name", project="p1", domain="d1", auto_version="latest")

# Fetch the full TaskDetails
task_details = await lazy_task.fetch.aio()

print(f"Task Type: {task_details.task_type}")
print(f"Interface: {task_details.interface}")
print(f"Resources: {task_details.resources}")

Auto-Versioning Modes

When fetching tasks, you can use auto_version to avoid hardcoding version strings:

  • latest: Fetches the most recently created version of the task.
  • current: Fetches the version matching the current execution context (only works when called from within a running Flyte task).
# Works only inside a Flyte task to reference 'self' or related tasks in the same deployment
task = Task.get("helper_task", auto_version="current")

Long-running Applications (Apps)

The App class manages the lifecycle of long-running services, such as model servers or web applications.

Managing App Lifecycle

You can activate or deactivate apps and wait for them to reach the desired state.

from flyte.remote import App

app = App.get_sync(name="my-model-server", project="p1", domain="d1")

# Activate the app and wait for it to be ready
app.activate_sync(wait=True)
print(f"App is live at: {app.endpoint}")

# Deactivate when finished
app.deactivate_sync(wait=True)

Using Ephemeral Contexts

For testing or temporary serving, use ephemeral_ctx to ensure the app is deactivated automatically after use.

from flyte.remote import App

app = App.get_sync(name="temp-server")

# Automatically activates on enter and deactivates on exit
with app.ephemeral_ctx_sync():
# Perform requests against app.endpoint
print(f"Serving at {app.endpoint}...")

# App is now deactivated

Troubleshooting

Positional Arguments

Remote task calls do not support positional arguments. Always use keyword arguments when triggering or overriding remote tasks.

# Incorrect
# await task(val1, val2)

# Correct
await task(input_a=val1, input_b=val2)

Async vs Sync API

Most methods in Run, Task, and App support both synchronous and asynchronous calls via the @syncify decorator.

  • Use method_name() for synchronous blocking calls.
  • Use method_name.aio() for asynchronous calls within an async function.

Auto-version 'current' Errors

If you receive a ValueError stating auto_version=current can only be used within a task context, ensure you are not calling Task.get(..., auto_version="current") from a local script or outside of a Flyte execution environment. Use auto_version="latest" for local management scripts.