Skip to main content

Generative AI & Agents

In this tutorial, you will build a Deep Research Agent that uses an LLM to search for information, processes the results using Flyte tasks as tools, and pauses for human approval before finalizing its report.

Prerequisites

To follow this tutorial, you need the following installed:

  • flyte
  • flyteplugins-anthropic
  • flyteplugins-hitl

You also need an Anthropic API key configured in your environment:

export ANTHROPIC_API_KEY="your-api-key"

Step 1: Define the Agent Environment

First, we define a TaskEnvironment. This centralizes the infrastructure configuration (images, resources, and secrets) for all tasks in our agentic workflow. We also include the hitl.env to enable Human-in-the-Loop capabilities.

import flyte
import flyteplugins.hitl as hitl

# Define the base environment for our agent and tools
agent_env = flyte.TaskEnvironment(
name="research-agent-env",
image=flyte.Image.from_debian_base(python_version=(3, 12)),
resources=flyte.Resources(cpu="2", memory="4Gi"),
secrets=[flyte.SecretRequest(key="ANTHROPIC_API_KEY")],
depends_on=[hitl.env], # Required for HITL events
)

The TaskEnvironment ensures that every task decorated with @agent_env.task inherits these settings, including the necessary dependencies for the Human-in-the-Loop plugin.

Step 2: Create Tools from Flyte Tasks

Agents become powerful when they can interact with external systems. In Flyte, any task can be converted into a tool that an LLM can understand using function_tool.

@agent_env.task
async def web_search(query: str) -> str:
"""
Search the web for the given query.
Returns a summary of the search results.
"""
# In a real scenario, you would call a search API here
return f"Search results for '{query}': Flyte is an open-source orchestrator..."

from flyteplugins.anthropic import function_tool

# Convert the Flyte task into a tool Claude can use
search_tool = function_tool(web_search)

The function_tool utility automatically extracts the task's docstring and type hints to create a JSON schema that the LLM uses to decide when and how to call the tool.

Step 3: Build the Agent Loop

Now we create the agent task. We use run_agent from the Anthropic plugin, which manages the conversation loop, tool execution, and response parsing.

from flyteplugins.anthropic import run_agent

@agent_env.task
async def research_agent(topic: str) -> str:
prompt = f"Research the following topic and provide a summary: {topic}"

# run_agent handles the loop of:
# 1. Sending prompt to Claude
# 2. Executing tools if Claude requests them
# 3. Returning the final response
return await run_agent(
prompt=prompt,
tools=[search_tool],
model="claude-3-5-sonnet-20240620",
max_iterations=5
)

Step 4: Add Human-in-the-Loop Approval

Before the workflow finishes, we want a human to review the agent's findings. We use hitl.new_event to pause the workflow and wait for input.

@agent_env.task(report=True)
async def main_workflow(topic: str) -> str:
# 1. Run the agent
report = await research_agent(topic=topic)

# 2. Create a Human-in-the-Loop event
event = await hitl.new_event.aio(
name="approve_report",
data_type=bool,
prompt=f"The agent produced this report:\n\n{report}\n\nDo you approve?",
)

# 3. Wait for human input (the workflow pauses here)
approved = await event.wait.aio()

if approved:
return f"Final Approved Report: {report}"
return "Report rejected by human."

Setting report=True in the task decorator ensures that the Flyte UI displays the HITL form to the user.

Step 5: Organize with Groups

For complex agents that perform many sub-tasks, the Flyte UI can become cluttered. Use flyte.group to organize related actions into a single collapsible section in the UI.

@agent_env.task
async def multi_step_research(topic: str) -> str:
with flyte.group("Initial Research"):
# All tasks called inside this block are grouped in the UI
raw_data = await web_search(query=topic)

with flyte.group("Agent Analysis"):
analysis = await run_agent(
prompt=f"Analyze this data: {raw_data}",
model="claude-3-5-sonnet-20240620"
)

return analysis

Step 6: Self-Hosting Models with vLLM

If you prefer to run your own models instead of using an external API, you can deploy a model server using VLLMAppEnvironment. This provides an OpenAI-compatible endpoint within your Flyte cluster.

from flyteplugins.vllm import VLLMAppEnvironment

vllm_app = VLLMAppEnvironment(
name="llama-3-server",
model_hf_path="meta-llama/Meta-Llama-3-8B-Instruct",
model_id="llama3",
resources=flyte.Resources(cpu="4", memory="16Gi", gpu="L40s:1"),
stream_model=True, # Stream model weights directly to GPU
)

# You can now use vllm_app.url in your agent tasks to call your local model

Complete Working Example

Combining these pieces, you have a robust, observable, and human-governed AI agent:

import flyte
import flyteplugins.hitl as hitl
from flyteplugins.anthropic import function_tool, run_agent

agent_env = flyte.TaskEnvironment(
name="deep-research",
image=flyte.Image.from_debian_base(python_version=(3, 12)),
depends_on=[hitl.env],
)

@agent_env.task
async def get_data(query: str) -> str:
"""Fetch data for research."""
return f"Data for {query}: Flyte simplifies AI orchestration."

@agent_env.task(report=True)
async def run_research(topic: str) -> str:
# Grouping for UI observability
with flyte.group("Agent Execution"):
report = await run_agent(
prompt=f"Research {topic}",
tools=[function_tool(get_data)],
)

# Human-in-the-loop gate
event = await hitl.new_event.aio(
"review",
data_type=str,
prompt="Edit the report if needed:"
)
final_report = await event.wait.aio()

return final_report

Next Steps

  • Observability: Use @flyte.trace on internal Python functions to see detailed execution traces alongside your agent's tool calls in the Flyte UI.
  • Multi-Agent: Create multiple run_agent tasks that call each other or share tools to build complex multi-agent systems.
  • Scaling: Use VLLMAppEnvironment with flyte.app.Scaling to automatically scale your model servers based on demand.