Skip to main content

Logging and Metrics

Implement structured logging and performance monitoring in your Flyte applications using the built-in JSON formatter and the Stopwatch utility.

Configuring Structured Logging

To enable structured JSON logging, initialize the Flyte system with the json log format. This is typically done during the initial setup of your application.

import flyte

# Initialize Flyte with JSON logging
flyte.init(
log_level="info",
log_format="json"
)

Alternatively, you can configure this via environment variables without changing your code:

export LOG_FORMAT=json
export LOG_LEVEL=info

When log_format="json" is set, the initialize_logger function in flyte._logging attaches the JSONFormatter to the 'flyte' logger.

Automatic Context Enrichment

The JSONFormatter (found in src/flyte/_logging.py) automatically enriches every log record with execution context. This is achieved through the ContextFilter, which injects metadata from the current Flyte execution context into the log record.

Standard fields included in every JSON log entry:

  • timestamp: ISO 8601 formatted time.
  • level: Log level (e.g., INFO, ERROR).
  • message: The log message.
  • run_name: The name of the current Flyte run.
  • action_name: The name of the specific action/task being executed.
  • filename, lineno, funcName: Source code location.

Measuring Execution Time

Use the Stopwatch utility from flyte._metrics to time specific blocks of code. When the stopwatch is stopped, it automatically emits a structured log record containing the duration.

from flyte._metrics import Stopwatch

async def process_data(data):
# Create and start the stopwatch
sw = Stopwatch("data_processing_phase")
sw.start()

# ... perform work ...

# Stop the stopwatch to emit the metric
sw.stop()

The resulting JSON log will include specific metric fields:

{
"timestamp": "2023-10-27T10:00:00.000000",
"level": "INFO",
"message": "Stopwatch: data_processing_phase completed in 1.2345s",
"metric_type": "timer",
"metric_name": "data_processing_phase",
"duration_seconds": 1.2345,
"run_name": "my-flyte-run",
"action_name": "my-task"
}

Adding Custom Metadata to Metrics

You can attach additional metadata to your timing metrics by passing a dictionary to the extra_fields parameter of the Stopwatch.

from flyte._metrics import Stopwatch

def download_file(file_id: str):
sw = Stopwatch(
"file_download",
extra_fields={"file_id": file_id, "provider": "s3"}
)
sw.start()
# download logic
sw.stop()

These extra fields are merged into the JSON output, allowing you to filter and aggregate metrics by custom dimensions in your logging backend.

Troubleshooting

Stopwatch Not Started

If you call stop() on a Stopwatch instance that was never started, it will raise a RuntimeError.

sw = Stopwatch("invalid_usage")
sw.stop() # Raises RuntimeError: Stopwatch 'invalid_usage' was never started

Rich Logging Conflicts

In local development, Flyte often uses RichHandler for beautiful console output. However, if log_format="json" is specified, the JSONFormatter takes precedence, and Rich logging is disabled to ensure the output remains valid JSON.

In-Cluster Execution

When running inside a Flyte cluster (detected via the FLYTE_INTERNAL_EXECUTION_PROJECT environment variable), Rich logging is automatically disabled by _initialize_logger in src/flyte/_initialize.py to prevent issues with stdout capture and terminal formatting.