Skip to main content

Webhook and FastAPI Integration

This tutorial guides you through building and deploying authenticated webhooks using FastAPI and Flyte. You will learn how to create a custom FastAPI application that executes Flyte tasks on behalf of users and how to use the pre-built webhook environment for rapid deployment.

By the end of this guide, you will have a secure, scalable webhook service that integrates seamlessly with Flyte's authentication system.

Prerequisites

To follow this tutorial, you need the following installed:

  • fastapi
  • uvicorn
  • flyte (this SDK)
  • httpx (for testing)

Step 1: Create a Custom FastAPI App with Passthrough Auth

The core of a Flyte-integrated FastAPI app is the FastAPIPassthroughAuthMiddleware. This middleware extracts credentials (like Bearer tokens) from incoming requests and ensures that any Flyte operations performed within the request context use those credentials.

Create a file named webhook_app.py and define your FastAPI application:

import flyte
import flyte.remote as remote
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from flyte.app.extras import FastAPIPassthroughAuthMiddleware

@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize Flyte with passthrough authentication on startup
await flyte.init_passthrough.aio(
project=flyte.current_project(),
domain=flyte.current_domain(),
)
yield

app = FastAPI(title="My Flyte Webhook", lifespan=lifespan)

# Add middleware to automatically handle user credentials
app.add_middleware(FastAPIPassthroughAuthMiddleware, excluded_paths={"/health"})

@app.get("/health")
async def health():
return {"status": "ok"}

@app.post("/run-my-task")
async def run_task(inputs: dict):
# This task runs using the credentials of the person calling the webhook
tk = remote.Task.get(name="my_project.my_task")
run = await flyte.run.aio(tk, **inputs)
return {"run_id": run.name, "url": run.url}

What's happening here?

  • lifespan: We use the FastAPI lifespan to call flyte.init_passthrough.aio(). This prepares the Flyte SDK to accept and propagate credentials from the HTTP request context.
  • FastAPIPassthroughAuthMiddleware: This middleware (found in flyte.app.extras._auth_middleware) intercepts requests, looks for Authorization or Cookie headers, and wraps the request in a Flyte auth_metadata context.
  • remote.Task.get: Because of the middleware, this call and the subsequent flyte.run.aio are authenticated as the end-user, not the webhook service itself.

Step 2: Deploy with FastAPIAppEnvironment

To run this FastAPI app within the Flyte ecosystem, you wrap it in a FastAPIAppEnvironment. This class handles the boilerplate of starting a uvicorn server and ensures the FastAPI application can be serialized correctly.

Add the following to your webhook_app.py:

from flyte.app.extras import FastAPIAppEnvironment

image = flyte.Image.from_debian_base().with_pip_packages("fastapi", "uvicorn")

app_env = FastAPIAppEnvironment(
name="custom-webhook-service",
app=app,
image=image,
requires_auth=True,
resources=flyte.Resources(cpu=1, memory="512Mi"),
)

if __name__ == "__main__":
flyte.serve(app_env)

What's happening here?

  • FastAPIAppEnvironment: This environment (defined in flyte.app.extras._fastapi) automatically patches the FastAPI app.state with a PicklableState. This is critical because standard FastAPI apps contain circular references that prevent them from being moved across the network.
  • requires_auth=True: This tells the Flyte platform to enforce authentication at the gateway level before the request even reaches your app.
  • flyte.serve: This command deploys your environment and provides a public URL for your webhook.

Step 3: Use the Pre-built FlyteWebhookAppEnvironment

If you don't need custom logic and just want standard Flyte operations (like running tasks, checking status, or aborting runs) exposed via HTTP, use the FlyteWebhookAppEnvironment.

from flyte.app.extras import FlyteWebhookAppEnvironment

# Create a restricted webhook that only allows running specific tasks
webhook_env = FlyteWebhookAppEnvironment(
name="restricted-webhook",
endpoint_groups=["core", "task", "run"],
task_allowlist=["my-project/my-task"],
requires_auth=True,
)

if __name__ == "__main__":
flyte.serve(webhook_env)

What's happening here?

  • endpoint_groups: You can limit the exposed API. Available groups include "core" (health/user info), "task" (running/getting tasks), and "run" (status/IO/abort).
  • task_allowlist: This provides fine-grained security, ensuring the webhook can only be used to trigger specific, approved tasks.
  • Automatic Setup: This environment automatically includes the FastAPIPassthroughAuthMiddleware and the necessary lifespan logic.

Step 4: Authenticate and Call the Webhook

Once your webhook is served, you can call it using any HTTP client. You must provide a valid Flyte token in the Authorization header.

import httpx
import os

# Obtain your token via 'flyte get api-key' or your auth provider
token = os.getenv("FLYTE_API_KEY")
endpoint = "https://your-webhook-url.flyte.org"

headers = {
"Authorization": f"Bearer {token}",
}

with httpx.Client(headers=headers) as client:
# 1. Verify who you are via the webhook
me = client.get(f"{endpoint}/me")
print(f"Authenticated as: {me.json()['name']}")

# 2. Trigger a task
payload = {"x": 10, "y": "test"}
resp = client.post(
f"{endpoint}/run-task/my-project/development/my-task",
json=payload
)
print(f"Task launched: {resp.json()['url']}")

Visible Result: When you run this client, the /me endpoint will return the identity associated with the FLYTE_API_KEY. The /run-task call will create a new execution in the Flyte UI, and that execution will show your user identity as the owner, thanks to the passthrough authentication mechanism.

Next Steps

  • Custom Extractors: If your credentials are in a custom header, pass a header_extractor to FastAPIPassthroughAuthMiddleware using FastAPIPassthroughAuthMiddleware.extract_custom_header("X-My-Header").
  • Scaling: Configure the scaling parameter in your AppEnvironment to handle higher request volumes.
  • OpenAPI Docs: Visit {endpoint}/docs on your deployed webhook to see the automatically generated interactive documentation.