Skip to main content

Managing the Connector Service

To host multiple asynchronous connectors in a production environment, you use the gRPC connector service provided by this SDK. This service acts as a central hub that manages the lifecycle of tasks (creation, status tracking, deletion) and provides metadata about available connectors.

Starting the Connector Service

The most direct way to run the service is using the flyte-connect CLI. This command initializes the gRPC server and starts a Prometheus metrics endpoint.

flyte-connect c0 \
--port 8000 \
--prometheus_port 9090 \
--worker 10 \
--modules my_custom_connector.plugin

Programmatic Execution

If you need to start the service from within a Python script, use the ConnectorService.run method. This method is decorated with @syncify, allowing it to be called in both synchronous and asynchronous contexts.

from flyte.connectors import ConnectorService

# Start the service with custom configuration
ConnectorService.run(
port=8000,
prometheus_port=9090,
worker=20,
timeout=None,
modules=["my_project.connectors.bigquery"]
)

Configuring Connector Discovery

The service must be able to find and register your connectors to handle requests. It uses two primary discovery mechanisms:

  1. Entry Points: The service automatically loads any connectors registered under the flyte.connectors entry point group in your setup.py or pyproject.toml.
  2. Explicit Modules: Use the --modules flag (CLI) or the modules argument (ConnectorService.run) to specify additional Python modules to import. The service will append the current working directory to sys.path to ensure local modules are importable.

Service Architecture

The connector service hosts two primary gRPC services defined in flyte.connectors._server:

AsyncConnectorService

This service handles the core task lifecycle by delegating requests to the appropriate connector registered in the ConnectorRegistry. It implements:

  • CreateTask: Translates Flyte literals to native types and calls connector.create.
  • GetTask: Retrieves the current status and resource information.
  • DeleteTask: Cleans up external resources.
  • GetTaskMetrics and GetTaskLogs: Fetches execution-specific telemetry.

ConnectorMetadataService

This service provides discovery capabilities for clients:

  • ListConnectors: Returns a list of all registered connectors.
  • GetConnector: Returns detailed metadata for a specific connector by name.

Monitoring and Metrics

The service automatically exports Prometheus metrics on the configured prometheus_port. Key metrics include:

  • flyte_connector_requests_success_total: Counter of successful requests labeled by task_type and operation.
  • flyte_connector_requests_failure_total: Counter of failed requests labeled by task_type, operation, and error_code.
  • flyte_connector_request_latency_seconds: Summary of time spent processing requests.
  • flyte_connector_input_literal_bytes: Size of input payloads for CreateTask operations.

These metrics are captured via the @record_connector_metrics decorator applied to the AsyncConnectorService methods.

Troubleshooting

Connector Not Found

If the service returns a NOT_FOUND gRPC status code, ensure that:

  1. The connector class is correctly decorated or registered in the ConnectorRegistry.
  2. The module containing the connector is passed to the --modules flag or is discoverable via entry points.
  3. The task_type and version in the request match the registration values.

Connection Configuration

The service extracts secrets and configurations from the gRPC Connection object and passes them as keyword arguments to the connector's methods (create, get, etc.). If your connector fails to authenticate, verify that the Connection object in the CreateTaskRequest contains the expected keys.

# Internal mapping of connection secrets/configs to kwargs
def _get_connection_kwargs(request: Connection) -> Dict[str, str]:
kwargs = {}
for k, v in request.secrets.items():
kwargs[k] = v
for k, v in request.configs.items():
kwargs[k] = v
return kwargs