Managing the Connector Service
To host multiple asynchronous connectors in a production environment, you use the gRPC connector service provided by this SDK. This service acts as a central hub that manages the lifecycle of tasks (creation, status tracking, deletion) and provides metadata about available connectors.
Starting the Connector Service
The most direct way to run the service is using the flyte-connect CLI. This command initializes the gRPC server and starts a Prometheus metrics endpoint.
flyte-connect c0 \
--port 8000 \
--prometheus_port 9090 \
--worker 10 \
--modules my_custom_connector.plugin
Programmatic Execution
If you need to start the service from within a Python script, use the ConnectorService.run method. This method is decorated with @syncify, allowing it to be called in both synchronous and asynchronous contexts.
from flyte.connectors import ConnectorService
# Start the service with custom configuration
ConnectorService.run(
port=8000,
prometheus_port=9090,
worker=20,
timeout=None,
modules=["my_project.connectors.bigquery"]
)
Configuring Connector Discovery
The service must be able to find and register your connectors to handle requests. It uses two primary discovery mechanisms:
- Entry Points: The service automatically loads any connectors registered under the
flyte.connectorsentry point group in yoursetup.pyorpyproject.toml. - Explicit Modules: Use the
--modulesflag (CLI) or themodulesargument (ConnectorService.run) to specify additional Python modules to import. The service will append the current working directory tosys.pathto ensure local modules are importable.
Service Architecture
The connector service hosts two primary gRPC services defined in flyte.connectors._server:
AsyncConnectorService
This service handles the core task lifecycle by delegating requests to the appropriate connector registered in the ConnectorRegistry. It implements:
CreateTask: Translates Flyte literals to native types and callsconnector.create.GetTask: Retrieves the current status and resource information.DeleteTask: Cleans up external resources.GetTaskMetricsandGetTaskLogs: Fetches execution-specific telemetry.
ConnectorMetadataService
This service provides discovery capabilities for clients:
ListConnectors: Returns a list of all registered connectors.GetConnector: Returns detailed metadata for a specific connector by name.
Monitoring and Metrics
The service automatically exports Prometheus metrics on the configured prometheus_port. Key metrics include:
flyte_connector_requests_success_total: Counter of successful requests labeled bytask_typeandoperation.flyte_connector_requests_failure_total: Counter of failed requests labeled bytask_type,operation, anderror_code.flyte_connector_request_latency_seconds: Summary of time spent processing requests.flyte_connector_input_literal_bytes: Size of input payloads forCreateTaskoperations.
These metrics are captured via the @record_connector_metrics decorator applied to the AsyncConnectorService methods.
Troubleshooting
Connector Not Found
If the service returns a NOT_FOUND gRPC status code, ensure that:
- The connector class is correctly decorated or registered in the
ConnectorRegistry. - The module containing the connector is passed to the
--modulesflag or is discoverable via entry points. - The
task_typeandversionin the request match the registration values.
Connection Configuration
The service extracts secrets and configurations from the gRPC Connection object and passes them as keyword arguments to the connector's methods (create, get, etc.). If your connector fails to authenticate, verify that the Connection object in the CreateTaskRequest contains the expected keys.
# Internal mapping of connection secrets/configs to kwargs
def _get_connection_kwargs(request: Connection) -> Dict[str, str]:
kwargs = {}
for k, v in request.secrets.items():
kwargs[k] = v
for k, v in request.configs.items():
kwargs[k] = v
return kwargs