Local Testing for Connectors
To test and debug connector logic in a local environment without deploying to a remote Flyte cluster, you can use the AsyncConnectorExecutorMixin. This mixin allows a task to simulate the behavior of a Flyte agent by executing the connector's create and get methods locally.
Define a Task with Local Execution Support
To enable local testing, your custom task class must inherit from AsyncConnectorExecutorMixin in addition to TaskTemplate. This mixin overrides the execute method to handle the local polling loop.
from typing import Any, Dict, Optional, Type
from flyte.connectors import AsyncConnectorExecutorMixin
from flyte.extend import TaskTemplate
from flyte.models import NativeInterface
class BatchJobTask(AsyncConnectorExecutorMixin, TaskTemplate):
_TASK_TYPE = "batch_job"
def __init__(
self,
name: str,
inputs: Optional[Dict[str, Type]] = None,
outputs: Optional[Dict[str, Type]] = None,
**kwargs,
):
super().__init__(
name=name,
interface=NativeInterface(
{k: (v, None) for k, v in inputs.items()} if inputs else {},
outputs or {},
),
task_type=self._TASK_TYPE,
image=None,
**kwargs,
)
Implement and Register the Connector
The AsyncConnectorExecutorMixin relies on the ConnectorRegistry to find the implementation of the connector for a specific task type. You must implement the AsyncConnector interface and register it in the same process where you run your local tests.
import time
import uuid
from flyteidl2.core.execution_pb2 import TaskExecution
from flyte.connectors import AsyncConnector, ConnectorRegistry, Resource, ResourceMeta
class BatchJobMetadata(ResourceMeta):
job_id: str
created_at: float
class BatchJobConnector(AsyncConnector):
name = "Batch Job Connector"
task_type_name = "batch_job"
metadata_type = BatchJobMetadata
async def create(self, task_template, inputs=None, **kwargs) -> BatchJobMetadata:
job_id = str(uuid.uuid4())[:8]
return BatchJobMetadata(job_id=job_id, created_at=time.time())
async def get(self, resource_meta: BatchJobMetadata, **kwargs) -> Resource:
# Simulate polling logic
return Resource(
phase=TaskExecution.SUCCEEDED,
outputs={"result": f"output-from-{resource_meta.job_id}"}
)
# Registration is required for the mixin to find the connector locally
ConnectorRegistry.register(BatchJobConnector())
Execute the Task Locally
Once the task is defined and the connector is registered, you can use flyte.run() to trigger the local execution logic.
import flyte
from pathlib import Path
# Initialize Flyte context
flyte.init_from_config(root_dir=Path(__file__).parent)
# Define the task instance
batch_job = BatchJobTask(
name="local_test_job",
inputs={"val": str},
outputs={"result": str}
)
# Run the task locally
if __name__ == "__main__":
result = flyte.run(batch_job, val="hello")
print(f"Task result: {result}")
How Local Execution Works
When flyte.run() is called on a task that includes AsyncConnectorExecutorMixin, the following sequence occurs:
- Connector Lookup: The mixin calls
ConnectorRegistry.get_connector()using the task'stask_type. - Secret Resolution: It inspects the task template for required secrets. It attempts to resolve these secrets by looking up environment variables.
- Job Creation: It calls
connector.create(...)to initiate the external job and receives aResourceMetaobject. - Polling Loop: It enters a
whileloop, callingconnector.get(resource_meta=...)every 3 seconds. - Log Tracking: If the
Resourcereturned byget()containslog_links, the mixin logs them to the console and records them in the internal task tracker. - Completion: Once a terminal phase (like
SUCCEEDEDorFAILED) is reached, the loop exits and returns the outputs.
Troubleshooting and Gotchas
Missing Secrets
The mixin automatically maps secrets defined in the task's custom configuration to environment variables. If a connector requires a secret named API_KEY, you must ensure that an environment variable with that name is set in your local shell:
export API_KEY=your_secret_value
If the environment variable is missing, the mixin will raise a ValueError.
Registry Scope
The ConnectorRegistry is a global singleton within the Python process. If your connector registration happens in a different module, ensure that module is imported before calling flyte.run().
Polling Interval
The local execution loop has a hardcoded sleep interval of 3 seconds between calls to connector.get(). This is intended to simulate the polling behavior of the Flyte agent without overwhelming external APIs during local development.
Remote vs. Local Mode
If the task context mode is set to "remote", the mixin will attempt to delegate execution to AsyncFunctionTaskTemplate.execute instead of running the connector logic locally. Ensure your local environment is configured for local execution (usually the default for flyte.run()).