Connectors
The Connector architecture in this SDK provides a standardized way to extend Flyte's capabilities by offloading task execution to external services. Instead of running a task within a Flyte worker pod, a connector acts as an intermediary that manages the lifecycle of a job on an external platform like BigQuery, Databricks, or Snowflake.
This design solves the problem of resource inefficiency and long-running job management. By using an asynchronous polling model, Flyte Propeller can release worker resources while waiting for a remote job to complete, only checking back periodically via the connector service.
The Async Lifecycle
The core of this system is the AsyncConnector abstract base class found in src/flyte/connectors/_connector.py. Every connector must implement three primary methods that map to the lifecycle of an external job:
create: Submits the job to the external service. It receives theTaskTemplateand inputs, and must return aResourceMetaobject.get: Polls the external service for the current status of the job. It uses theResourceMetareturned bycreateto identify the job and returns aResourceobject containing the current phase (e.g.,RUNNING,SUCCEEDED).delete: Handles job cancellation or cleanup. This is called if a Flyte workflow is aborted.
State Persistence with ResourceMeta
Because the connector service is stateless and may be restarted, or the polling may happen across different service instances, the SDK uses ResourceMeta to persist job identity.
@dataclass
class ResourceMeta:
def encode(self) -> bytes:
return json.dumps(asdict(self)).encode("utf-8")
@classmethod
def decode(cls, data: bytes) -> "ResourceMeta":
return dataclass_from_dict(cls, json.loads(data.decode("utf-8")))
As seen in src/flyte/connectors/_connector.py, ResourceMeta provides built-in JSON serialization. When a connector's create method returns a metadata object, the Flyte control plane stores this encoded blob. Subsequent calls to get or delete pass this metadata back to the connector, allowing it to reconnect to the remote job without maintaining local state.
Implementation Example: BigQuery
The BigQueryConnector in plugins/bigquery/src/flyteplugins/bigquery/connector.py demonstrates a production implementation. It defines a specific metadata class to track the BigQuery job ID and location:
@dataclass
class BigQueryMetadata(ResourceMeta):
job_id: str
project: str
location: str
user_agent: str
class BigQueryConnector(AsyncConnector):
name = "Bigquery Connector"
task_type_name = "bigquery_query_job_task"
metadata_type = BigQueryMetadata
async def create(self, task_template, inputs=None, **kwargs) -> BigQueryMetadata:
# ... client initialization ...
query_job = client.query(task_template.sql.statement, job_config=job_config)
return BigQueryMetadata(
job_id=str(query_job.job_id),
location=location,
project=project,
user_agent=user_agent
)
In the get method, the connector retrieves the job status and maps it to Flyte's internal execution phases using convert_to_flyte_phase. It also populates log_links, which allows the Flyte UI to display a direct link to the BigQuery Console.
Connector Registration and Discovery
For a connector to be usable, it must be registered with the ConnectorRegistry. The registry maintains a mapping of task_type_name and task_type_version to the specific AsyncConnector implementation.
class ConnectorRegistry(object):
_REGISTRY: typing.ClassVar[Dict[ConnectorRegistryKey, AsyncConnector]] = {}
@staticmethod
def register(connector: AsyncConnector, override: bool = False):
key = ConnectorRegistryKey(
task_type_name=connector.task_type_name,
task_type_version=connector.task_type_version
)
ConnectorRegistry._REGISTRY[key] = connector
This registration typically happens at the module level in the connector's file (e.g., ConnectorRegistry.register(BigQueryConnector())). When the Connector Service receives a request to create a task, it looks up the appropriate connector in this registry based on the task type defined in the workflow.
Local Execution and Testing
One of the significant design challenges with offloading tasks to external services is local development. To address this, the SDK provides the AsyncConnectorExecutorMixin in src/flyte/connectors/_connector.py.
When a task class inherits from this mixin, its execute method is overridden to simulate the behavior of the Flyte Connector Service locally. It performs the following steps:
- Looks up the connector in the
ConnectorRegistry. - Calls
connector.createto start the remote job. - Enters a
whileloop that pollsconnector.getevery few seconds until a terminal phase is reached. - Records log links to the local context so they can be viewed during development.
This allows developers to test the full logic of their connector—including remote job submission and status polling—without needing a deployed Flyte cluster.
Tradeoffs and Constraints
The primary tradeoff in this architecture is the shift from synchronous execution to asynchronous polling.
- Complexity: Implementing a connector is more complex than a standard Python task because it requires managing remote state and handling transient network errors during polling.
- Latency: There is a slight delay introduced by the polling interval (defaulting to 3 seconds in the local executor).
- Reliability: The
deletemethod must be idempotent. Since network failures can occur during cancellation, the system relies on the connector to handle "already deleted" or "not found" errors gracefully to ensure the workflow engine can reach a consistent state.
Despite these, the architecture provides a robust foundation for integrating heavy-duty external compute engines while keeping the Flyte control plane lightweight and responsive.