Custom Infrastructure with Container Tasks
In many machine learning and data engineering workflows, the standard Python-based task execution environment is insufficient. This occurs when a workload requires specialized system binaries (like s5cmd for high-speed I/O), non-Python runtimes (like C++ or Shell), or strict infrastructure constraints such as network isolation.
The ContainerTask class in flyte.extras._container provides a mechanism to execute arbitrary pre-containerized applications. Unlike standard @task decorated functions, ContainerTask does not require the Flyte SDK to be installed inside the container, making it the primary tool for integrating legacy code or highly optimized infrastructure into a Flyte workflow.
The Architecture of SDK-less Execution
The design of ContainerTask centers on a raw-container task type. Instead of serializing Python code and executing it via a wrapper, Flyte Propeller directly invokes the specified container image with a defined command and arguments.
Data exchange is handled through the filesystem. The task defines an input_data_dir (defaulting to /var/inputs) and an output_data_dir (defaulting to /var/outputs). Flyte Propeller populates the input directory with data before the container starts and collects results from the output directory after it exits.
Basic Implementation
A ContainerTask is defined by specifying its interface (inputs and outputs) and the command to run. The following example from examples/advanced/container_task.py demonstrates a simple shell-based task:
from flyte.extras import ContainerTask
import flyte
greeting_task = ContainerTask(
name="echo_and_return_greeting",
image=flyte.Image.from_base("alpine:3.18"),
input_data_dir="/var/inputs",
output_data_dir="/var/outputs",
inputs={"name": str},
outputs={"greeting": str},
command=["/bin/sh", "-c", "echo 'Hello, my name is {{.inputs.name}}.' | tee -a /var/outputs/greeting"],
)
Input Mapping and Volume Binding
The ContainerTask handles two distinct types of data injection: template rendering for scalar values and volume binding for complex data types.
Scalar Templating
For simple types like str, int, or float, the SDK uses a template syntax: {{.inputs.key}}. During execution, the _render_command_and_volume_binding method replaces these placeholders with the actual values passed to the task.
File and Directory Binding
When dealing with flyte.io.File or flyte.io.Dir, the implementation shifts from templating to volume mounting. The ContainerTask logic in _render_command_and_volume_binding detects these types and creates a mapping between the local path on the host and a path inside the container's input_data_dir.
A critical constraint in this implementation is that File and Dir inputs must use path-like syntax in the command string (e.g., /var/inputs/myfile) rather than the {{.inputs.myfile}} template syntax. As noted in the source code, this is due to how Flyte Propeller processes template syntax inputs for complex types.
Network Isolation and Security
For sensitive ML environments, ContainerTask supports network blocking through the block_network parameter. This is particularly useful for ensuring that data processing tasks do not exfiltrate data to external endpoints.
The implementation handles this differently based on the execution environment:
- Local Execution: In the
executemethod, ifblock_networkis True, the Docker client is configured withnetwork_mode="none". - On-Cluster Execution: The
__init__method applies a specific pod template label:
if block_network:
existing = kwargs.get("pod_template")
if existing is None:
kwargs["pod_template"] = "sandboxed-pod-template"
elif isinstance(existing, str):
raise ValueError(
"block_network=True cannot be combined with a string pod_template reference. "
"Use a PodTemplate object instead..."
)
else:
existing.labels = {**(existing.labels or {}), "sandboxed": "true"}
This ensures that the Kubernetes scheduler places the pod in a restricted network environment (typically managed via a sandboxed-pod-template on the cluster).
High-Performance ML Environments
One of the most powerful applications of ContainerTask is in performance-critical data movement. In examples/stress/benchmark/large_io_comparison.py, ContainerTask is used to wrap s5cmd, a high-performance S3 client.
s5cmd_file_task = ContainerTask(
name="s5cmd_download_file",
image=s5cmd_image,
inputs={"remote_path": str, "file_size_mb": int},
outputs={"duration": float, "throughput_mbps": float},
command=[
"/bin/bash",
"-c",
"""
set -e
START=$(date +%s%N)
s5cmd cp -c 32 "$0" /tmp/downloaded_file
END=$(date +%s%N)
# ... calculation logic ...
echo "$DURATION" > /var/outputs/duration
echo "$THROUGHPUT" > /var/outputs/throughput_mbps
""",
"{{.inputs.remote_path}}",
],
)
This approach bypasses the overhead of the Python SDK's data transformers, allowing the task to utilize the full throughput of specialized system tools.
Tradeoffs and Constraints
While ContainerTask offers maximum flexibility, it introduces several constraints:
- Strict Typing: All elements in the
commandandargumentslists must be strings. The SDK will raise aValueErrorduring initialization if non-string types are detected. - Manual Output Management: Unlike
@taskfunctions that return values, aContainerTaskmust explicitly write its outputs to files in theoutput_data_dir. The SDK reads these files and converts them back to the expected types in_convert_output_val_to_correct_type. - Local Dependencies: Local execution of these tasks requires the
dockerPython package and a running Docker daemon. Theexecutemethod will raise anImportErrorif the library is missing. - Metadata Format: The
metadata_format(JSON, YAML, or PROTO) determines how Flyte Propeller interprets the data loading configuration. This must match the format the containerized application uses to read its input metadata.