Webhook and FastAPI Integration
This tutorial guides you through building and deploying authenticated webhooks using FastAPI and Flyte. You will learn how to create a custom FastAPI application that executes Flyte tasks on behalf of users and how to use the pre-built webhook environment for rapid deployment.
By the end of this guide, you will have a secure, scalable webhook service that integrates seamlessly with Flyte's authentication system.
Prerequisites
To follow this tutorial, you need the following installed:
fastapiuvicornflyte(this SDK)httpx(for testing)
Step 1: Create a Custom FastAPI App with Passthrough Auth
The core of a Flyte-integrated FastAPI app is the FastAPIPassthroughAuthMiddleware. This middleware extracts credentials (like Bearer tokens) from incoming requests and ensures that any Flyte operations performed within the request context use those credentials.
Create a file named webhook_app.py and define your FastAPI application:
import flyte
import flyte.remote as remote
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from flyte.app.extras import FastAPIPassthroughAuthMiddleware
@asynccontextmanager
async def lifespan(app: FastAPI):
# Initialize Flyte with passthrough authentication on startup
await flyte.init_passthrough.aio(
project=flyte.current_project(),
domain=flyte.current_domain(),
)
yield
app = FastAPI(title="My Flyte Webhook", lifespan=lifespan)
# Add middleware to automatically handle user credentials
app.add_middleware(FastAPIPassthroughAuthMiddleware, excluded_paths={"/health"})
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/run-my-task")
async def run_task(inputs: dict):
# This task runs using the credentials of the person calling the webhook
tk = remote.Task.get(name="my_project.my_task")
run = await flyte.run.aio(tk, **inputs)
return {"run_id": run.name, "url": run.url}
What's happening here?
lifespan: We use the FastAPI lifespan to callflyte.init_passthrough.aio(). This prepares the Flyte SDK to accept and propagate credentials from the HTTP request context.FastAPIPassthroughAuthMiddleware: This middleware (found inflyte.app.extras._auth_middleware) intercepts requests, looks forAuthorizationorCookieheaders, and wraps the request in a Flyteauth_metadatacontext.remote.Task.get: Because of the middleware, this call and the subsequentflyte.run.aioare authenticated as the end-user, not the webhook service itself.
Step 2: Deploy with FastAPIAppEnvironment
To run this FastAPI app within the Flyte ecosystem, you wrap it in a FastAPIAppEnvironment. This class handles the boilerplate of starting a uvicorn server and ensures the FastAPI application can be serialized correctly.
Add the following to your webhook_app.py:
from flyte.app.extras import FastAPIAppEnvironment
image = flyte.Image.from_debian_base().with_pip_packages("fastapi", "uvicorn")
app_env = FastAPIAppEnvironment(
name="custom-webhook-service",
app=app,
image=image,
requires_auth=True,
resources=flyte.Resources(cpu=1, memory="512Mi"),
)
if __name__ == "__main__":
flyte.serve(app_env)
What's happening here?
FastAPIAppEnvironment: This environment (defined inflyte.app.extras._fastapi) automatically patches the FastAPIapp.statewith aPicklableState. This is critical because standard FastAPI apps contain circular references that prevent them from being moved across the network.requires_auth=True: This tells the Flyte platform to enforce authentication at the gateway level before the request even reaches your app.flyte.serve: This command deploys your environment and provides a public URL for your webhook.
Step 3: Use the Pre-built FlyteWebhookAppEnvironment
If you don't need custom logic and just want standard Flyte operations (like running tasks, checking status, or aborting runs) exposed via HTTP, use the FlyteWebhookAppEnvironment.
from flyte.app.extras import FlyteWebhookAppEnvironment
# Create a restricted webhook that only allows running specific tasks
webhook_env = FlyteWebhookAppEnvironment(
name="restricted-webhook",
endpoint_groups=["core", "task", "run"],
task_allowlist=["my-project/my-task"],
requires_auth=True,
)
if __name__ == "__main__":
flyte.serve(webhook_env)
What's happening here?
endpoint_groups: You can limit the exposed API. Available groups include"core"(health/user info),"task"(running/getting tasks), and"run"(status/IO/abort).task_allowlist: This provides fine-grained security, ensuring the webhook can only be used to trigger specific, approved tasks.- Automatic Setup: This environment automatically includes the
FastAPIPassthroughAuthMiddlewareand the necessary lifespan logic.
Step 4: Authenticate and Call the Webhook
Once your webhook is served, you can call it using any HTTP client. You must provide a valid Flyte token in the Authorization header.
import httpx
import os
# Obtain your token via 'flyte get api-key' or your auth provider
token = os.getenv("FLYTE_API_KEY")
endpoint = "https://your-webhook-url.flyte.org"
headers = {
"Authorization": f"Bearer {token}",
}
with httpx.Client(headers=headers) as client:
# 1. Verify who you are via the webhook
me = client.get(f"{endpoint}/me")
print(f"Authenticated as: {me.json()['name']}")
# 2. Trigger a task
payload = {"x": 10, "y": "test"}
resp = client.post(
f"{endpoint}/run-task/my-project/development/my-task",
json=payload
)
print(f"Task launched: {resp.json()['url']}")
Visible Result:
When you run this client, the /me endpoint will return the identity associated with the FLYTE_API_KEY. The /run-task call will create a new execution in the Flyte UI, and that execution will show your user identity as the owner, thanks to the passthrough authentication mechanism.
Next Steps
- Custom Extractors: If your credentials are in a custom header, pass a
header_extractortoFastAPIPassthroughAuthMiddlewareusingFastAPIPassthroughAuthMiddleware.extract_custom_header("X-My-Header"). - Scaling: Configure the
scalingparameter in yourAppEnvironmentto handle higher request volumes. - OpenAPI Docs: Visit
{endpoint}/docson your deployed webhook to see the automatically generated interactive documentation.