Skip to main content

Human-in-the-Loop (HITL) Events

Human-in-the-Loop (HITL) events in this SDK provide a mechanism to pause workflow execution and wait for human input. This implementation is designed to be crash-resilient and integrates directly with the Flyte UI through embedded FastAPI applications and Flyte Reports.

Core Concepts

The HITL system is built around the Event class and a specialized FastAPI application that serves as the interface for human interaction.

The Event Lifecycle

A HITL interaction follows a specific lifecycle managed by the Event class in flyteplugins.hitl._event:

  1. Creation and Serving: When an event is created using hitl.new_event (or Event.create), the SDK automatically initializes and serves a FastAPI application if one is not already running for the current project/domain. This app is defined by the event_app_env (a FastAPIAppEnvironment) in _event.py.
  2. Metadata Persistence: The event details (prompt, data type, request ID) are serialized and stored in object storage using flyte.storage.
  3. Waiting and Reporting: When event.wait() is called, the SDK triggers a reporting task (show_form) that updates the Flyte UI with a link to the input form. It then enters a polling loop.
  4. Input Collection: A human provides input via the web form or a JSON API endpoint. The FastAPI app writes this response back to object storage.
  5. Resumption: The polling loop detects the response in storage, validates the data type, and returns the value to the workflow.

Crash Resilience

The HITL implementation is durable and crash-resilient because it relies on object storage rather than in-memory state for synchronization. The wait_for_input_event function (called by Event.wait) polls a specific response_path in storage:

# From plugins/hitl/src/flyteplugins/hitl/_event.py
while elapsed < timeout_seconds:
# Check if response exists in object storage
if await storage.exists(response_path):
async for chunk in storage.get_stream(response_path):
response = json.loads(chunk.decode())
if response.get("status") == "completed":
return response["value"]
await asyncio.sleep(poll_interval_seconds)

If a task waiting for human input crashes and restarts, it will simply resume polling the same storage location. If the human has already submitted the input, the restarted task will find the response immediately and continue.

Implementing HITL Events

To use HITL events, you typically use the hitl.new_event factory function within a Flyte task.

Defining the Task Environment

HITL tasks require a specific environment that includes the HITL plugin dependencies. The plugin provides a pre-configured hitl.env (aliased from event_task_env) for this purpose.

import flyte
import flyteplugins.hitl as hitl

task_env = flyte.TaskEnvironment(
name="hitl-workflow",
image=flyte.Image.from_debian_base(python_version=(3, 12)),
depends_on=[hitl.env], # Includes the HITL FastAPI app environment
)

Using the Event API

The Event class supports both synchronous and asynchronous usage. The new_event function allows you to specify the expected data_type (supporting int, float, str, and bool), a prompt for the user, and a timeout_seconds.

@task_env.task(report=True)
async def main() -> int:
# Create the event - this serves the FastAPI app automatically
event = await hitl.new_event.aio(
"integer_input_event",
data_type=int,
scope="run",
prompt="Please enter a value to add to the result:",
)

# Pause execution and wait for human input
# This triggers a Flyte Report with the input form link
y = await event.wait.aio()

return y + 10

User Interface and Reporting

The SDK integrates HITL events into the Flyte UI using two primary mechanisms:

The EventFormLink class is a specialized flyte.Link that generates the correct URL for the human input form. It is automatically attached to the task execution when event.wait() is called.

Flyte Reports

The show_form task in _event.py uses flyte.report.replace to embed the HITL interface directly into the Flyte Deck. This report includes:

  • A direct link to the hosted web form.
  • The API endpoint for programmatic submissions.
  • A sample curl command for submitting JSON data manually.

The HTML for this report is generated by get_event_report_html (imported from ._html_templates), ensuring a consistent user experience across different HITL events.

Configuration and Scoping

  • Scope: Currently, the SDK supports the "run" scope, meaning the event is associated with the specific execution run.
  • Timeouts: The default timeout is 3600 seconds (1 hour). This can be customized during event creation.
  • Polling: The SDK polls storage every 5 seconds by default, controlled by poll_interval_seconds.
  • Organization: The subdomain for the HITL app can be influenced by the _U_ORG_NAME environment variable, which is used in Event._serve_app to construct a unique URL for the project and domain.