Skip to main content

Local Testing for Connectors

To test and debug connector logic in a local environment without deploying to a remote Flyte cluster, you can use the AsyncConnectorExecutorMixin. This mixin allows a task to simulate the behavior of a Flyte agent by executing the connector's create and get methods locally.

Define a Task with Local Execution Support

To enable local testing, your custom task class must inherit from AsyncConnectorExecutorMixin in addition to TaskTemplate. This mixin overrides the execute method to handle the local polling loop.

from typing import Any, Dict, Optional, Type
from flyte.connectors import AsyncConnectorExecutorMixin
from flyte.extend import TaskTemplate
from flyte.models import NativeInterface

class BatchJobTask(AsyncConnectorExecutorMixin, TaskTemplate):
_TASK_TYPE = "batch_job"

def __init__(
self,
name: str,
inputs: Optional[Dict[str, Type]] = None,
outputs: Optional[Dict[str, Type]] = None,
**kwargs,
):
super().__init__(
name=name,
interface=NativeInterface(
{k: (v, None) for k, v in inputs.items()} if inputs else {},
outputs or {},
),
task_type=self._TASK_TYPE,
image=None,
**kwargs,
)

Implement and Register the Connector

The AsyncConnectorExecutorMixin relies on the ConnectorRegistry to find the implementation of the connector for a specific task type. You must implement the AsyncConnector interface and register it in the same process where you run your local tests.

import time
import uuid
from flyteidl2.core.execution_pb2 import TaskExecution
from flyte.connectors import AsyncConnector, ConnectorRegistry, Resource, ResourceMeta

class BatchJobMetadata(ResourceMeta):
job_id: str
created_at: float

class BatchJobConnector(AsyncConnector):
name = "Batch Job Connector"
task_type_name = "batch_job"
metadata_type = BatchJobMetadata

async def create(self, task_template, inputs=None, **kwargs) -> BatchJobMetadata:
job_id = str(uuid.uuid4())[:8]
return BatchJobMetadata(job_id=job_id, created_at=time.time())

async def get(self, resource_meta: BatchJobMetadata, **kwargs) -> Resource:
# Simulate polling logic
return Resource(
phase=TaskExecution.SUCCEEDED,
outputs={"result": f"output-from-{resource_meta.job_id}"}
)

# Registration is required for the mixin to find the connector locally
ConnectorRegistry.register(BatchJobConnector())

Execute the Task Locally

Once the task is defined and the connector is registered, you can use flyte.run() to trigger the local execution logic.

import flyte
from pathlib import Path

# Initialize Flyte context
flyte.init_from_config(root_dir=Path(__file__).parent)

# Define the task instance
batch_job = BatchJobTask(
name="local_test_job",
inputs={"val": str},
outputs={"result": str}
)

# Run the task locally
if __name__ == "__main__":
result = flyte.run(batch_job, val="hello")
print(f"Task result: {result}")

How Local Execution Works

When flyte.run() is called on a task that includes AsyncConnectorExecutorMixin, the following sequence occurs:

  1. Connector Lookup: The mixin calls ConnectorRegistry.get_connector() using the task's task_type.
  2. Secret Resolution: It inspects the task template for required secrets. It attempts to resolve these secrets by looking up environment variables.
  3. Job Creation: It calls connector.create(...) to initiate the external job and receives a ResourceMeta object.
  4. Polling Loop: It enters a while loop, calling connector.get(resource_meta=...) every 3 seconds.
  5. Log Tracking: If the Resource returned by get() contains log_links, the mixin logs them to the console and records them in the internal task tracker.
  6. Completion: Once a terminal phase (like SUCCEEDED or FAILED) is reached, the loop exits and returns the outputs.

Troubleshooting and Gotchas

Missing Secrets

The mixin automatically maps secrets defined in the task's custom configuration to environment variables. If a connector requires a secret named API_KEY, you must ensure that an environment variable with that name is set in your local shell:

export API_KEY=your_secret_value

If the environment variable is missing, the mixin will raise a ValueError.

Registry Scope

The ConnectorRegistry is a global singleton within the Python process. If your connector registration happens in a different module, ensure that module is imported before calling flyte.run().

Polling Interval

The local execution loop has a hardcoded sleep interval of 3 seconds between calls to connector.get(). This is intended to simulate the polling behavior of the Flyte agent without overwhelming external APIs during local development.

Remote vs. Local Mode

If the task context mode is set to "remote", the mixin will attempt to delegate execution to AsyncFunctionTaskTemplate.execute instead of running the connector logic locally. Ensure your local environment is configured for local execution (usually the default for flyte.run()).