Skip to main content

Action-Level Monitoring

In the vik-advani-flyte-sdk-9b3ce04 codebase, monitoring execution at the action level provides granular visibility into how individual tasks within a run are performing. While a Run represents the overall execution, an Action (defined in src/flyte/remote/_action.py) is the specific unit of work—such as a task or a trace—that moves through various lifecycle phases.

The Action Lifecycle

Every execution unit is represented by the Action class. Actions are nested within runs and can be retrieved individually or listed for an entire run.

Retrieving Actions

You can retrieve a specific action using its name and the name of the run it belongs to, or list all actions associated with a run.

from flyte.remote import Action

# Retrieve a specific action
action = Action.get(run_name="my-flyte-run", name="task-node-1")

# List all actions for a run, optionally filtering by phase
actions = Action.listall(for_run_name="my-flyte-run", in_phase=("SUCCEEDED", "FAILED"))
for a in actions:
print(f"Action: {a.name}, Phase: {a.phase}")

Monitoring State Transitions

The SDK provides two primary ways to monitor an action's progress: wait() and watch().

  • wait(): A blocking call that displays a rich progress panel (using the rich library) showing status transitions, elapsed time, and attempt counts. It handles both interactive environments (with spinners) and non-interactive environments (plain status lines).
  • watch(): An asynchronous generator that yields ActionDetails objects as the action's state changes.
# Blocking wait with a progress UI
action.wait(wait_for="terminal")

# Asynchronous watching for updates
async for details in action.watch(wait_for="logs-ready"):
print(f"Current Phase: {details.phase}")

The state of an action is represented by the ActionPhase enum, which includes states like QUEUED, INITIALIZING, RUNNING, SUCCEEDED, and FAILED.

Granular Performance Analysis

For deep-dive monitoring, the ActionDetails class provides access to the internal metadata and timing of an execution. This is where you can analyze bottlenecks, such as long queue times or initialization overhead.

Phase Transitions

The SDK tracks every transition an action makes. The PhaseTransitionInfo class captures the start_time, end_time, and calculated duration for each phase within a specific attempt.

details = action.details()
transitions = details.get_phase_transitions(attempt=1)

for t in transitions:
print(f"Phase {t.phase} took {t.duration.total_seconds()} seconds")

Built-in Duration Metrics

ActionDetails provides convenience properties to quickly access time spent in critical phases for the latest attempt:

  • queued_time: Time spent waiting in the queue.
  • waiting_for_resources_time: Time spent waiting for hardware/cluster resources.
  • initializing_time: Time spent setting up the execution environment.
  • running_time: Actual execution time of the task logic.

Inspecting Inputs and Outputs

Actions manage their data through ActionInputs and ActionOutputs. These classes handle the conversion from remote protobuf formats to native Python types.

ActionInputs

ActionInputs behaves like a standard Python dictionary. It is fetched from the remote service when action.inputs() is called.

inputs = action.inputs()
print(inputs["my_input_param"])

ActionOutputs

ActionOutputs is a specialized tuple that also supports named access. Outputs are only available once the action has reached a terminal state (SUCCEEDED).

if action.done():
outputs = action.outputs()

# Access as a tuple
val1, val2 = outputs

# Access by name
print(outputs.named_outputs["o0"])

# Access as attributes (auto-generated names like o0, o1)
print(outputs.o0)

Troubleshooting and Logs

When an action fails, the ActionDetails object contains error_info (an ErrorInfo protobuf message) which includes the error kind and message.

To inspect execution logs, the Action class provides:

  • show_logs(): An interactive viewer for logs.
  • get_logs(): An iterator that yields log lines as strings.
# Stream logs for the latest attempt
for line in action.get_logs(show_ts=True):
print(line)

If an action is retried, you can specify the attempt number (1-indexed) in both log and transition methods to investigate previous failures.