Skip to main content

Custom Infrastructure with Container Tasks

In many machine learning and data engineering workflows, the standard Python-based task execution environment is insufficient. This occurs when a workload requires specialized system binaries (like s5cmd for high-speed I/O), non-Python runtimes (like C++ or Shell), or strict infrastructure constraints such as network isolation.

The ContainerTask class in flyte.extras._container provides a mechanism to execute arbitrary pre-containerized applications. Unlike standard @task decorated functions, ContainerTask does not require the Flyte SDK to be installed inside the container, making it the primary tool for integrating legacy code or highly optimized infrastructure into a Flyte workflow.

The Architecture of SDK-less Execution

The design of ContainerTask centers on a raw-container task type. Instead of serializing Python code and executing it via a wrapper, Flyte Propeller directly invokes the specified container image with a defined command and arguments.

Data exchange is handled through the filesystem. The task defines an input_data_dir (defaulting to /var/inputs) and an output_data_dir (defaulting to /var/outputs). Flyte Propeller populates the input directory with data before the container starts and collects results from the output directory after it exits.

Basic Implementation

A ContainerTask is defined by specifying its interface (inputs and outputs) and the command to run. The following example from examples/advanced/container_task.py demonstrates a simple shell-based task:

from flyte.extras import ContainerTask
import flyte

greeting_task = ContainerTask(
name="echo_and_return_greeting",
image=flyte.Image.from_base("alpine:3.18"),
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs={"name": str},
outputs={"greeting": str},
command=["/bin/sh", "-c", "echo 'Hello, my name is {{.inputs.name}}.' | tee -a /var/outputs/greeting"],
)

Input Mapping and Volume Binding

The ContainerTask handles two distinct types of data injection: template rendering for scalar values and volume binding for complex data types.

Scalar Templating

For simple types like str, int, or float, the SDK uses a template syntax: {{.inputs.key}}. During execution, the _render_command_and_volume_binding method replaces these placeholders with the actual values passed to the task.

File and Directory Binding

When dealing with flyte.io.File or flyte.io.Dir, the implementation shifts from templating to volume mounting. The ContainerTask logic in _render_command_and_volume_binding detects these types and creates a mapping between the local path on the host and a path inside the container's input_data_dir.

A critical constraint in this implementation is that File and Dir inputs must use path-like syntax in the command string (e.g., /var/inputs/myfile) rather than the {{.inputs.myfile}} template syntax. As noted in the source code, this is due to how Flyte Propeller processes template syntax inputs for complex types.

Network Isolation and Security

For sensitive ML environments, ContainerTask supports network blocking through the block_network parameter. This is particularly useful for ensuring that data processing tasks do not exfiltrate data to external endpoints.

The implementation handles this differently based on the execution environment:

  • Local Execution: In the execute method, if block_network is True, the Docker client is configured with network_mode="none".
  • On-Cluster Execution: The __init__ method applies a specific pod template label:
if block_network:
existing = kwargs.get("pod_template")
if existing is None:
kwargs["pod_template"] = "sandboxed-pod-template"
elif isinstance(existing, str):
raise ValueError(
"block_network=True cannot be combined with a string pod_template reference. "
"Use a PodTemplate object instead..."
)
else:
existing.labels = {**(existing.labels or {}), "sandboxed": "true"}

This ensures that the Kubernetes scheduler places the pod in a restricted network environment (typically managed via a sandboxed-pod-template on the cluster).

High-Performance ML Environments

One of the most powerful applications of ContainerTask is in performance-critical data movement. In examples/stress/benchmark/large_io_comparison.py, ContainerTask is used to wrap s5cmd, a high-performance S3 client.

s5cmd_file_task = ContainerTask(
name="s5cmd_download_file",
image=s5cmd_image,
inputs={"remote_path": str, "file_size_mb": int},
outputs={"duration": float, "throughput_mbps": float},
command=[
"/bin/bash",
"-c",
"""
set -e
START=$(date +%s%N)
s5cmd cp -c 32 "$0" /tmp/downloaded_file
END=$(date +%s%N)
# ... calculation logic ...
echo "$DURATION" > /var/outputs/duration
echo "$THROUGHPUT" > /var/outputs/throughput_mbps
""",
"{{.inputs.remote_path}}",
],
)

This approach bypasses the overhead of the Python SDK's data transformers, allowing the task to utilize the full throughput of specialized system tools.

Tradeoffs and Constraints

While ContainerTask offers maximum flexibility, it introduces several constraints:

  1. Strict Typing: All elements in the command and arguments lists must be strings. The SDK will raise a ValueError during initialization if non-string types are detected.
  2. Manual Output Management: Unlike @task functions that return values, a ContainerTask must explicitly write its outputs to files in the output_data_dir. The SDK reads these files and converts them back to the expected types in _convert_output_val_to_correct_type.
  3. Local Dependencies: Local execution of these tasks requires the docker Python package and a running Docker daemon. The execute method will raise an ImportError if the library is missing.
  4. Metadata Format: The metadata_format (JSON, YAML, or PROTO) determines how Flyte Propeller interprets the data loading configuration. This must match the format the containerized application uses to read its input metadata.