Automatic Pickling Fallback
The Automatic Pickling Fallback system in this SDK serves as a safety net for Python types that do not have a native Flyte transformer. While Flyte prefers strongly-typed interfaces for better cross-language compatibility and performance, the pickling system ensures that arbitrary Python objects can still be passed between tasks without requiring custom transformer implementations for every user-defined class.
The Fallback Mechanism
The entry point for this system is the TypeEngine.get_transformer method in src/flyte/types/_type_engine.py. When the engine evaluates a Python type and fails to find a registered transformer (including built-ins, dataclasses, and plugins), it defaults to the FlytePickleTransformer.
# src/flyte/types/_type_engine.py
if dataclasses.is_dataclass(python_type):
return cls._DATACLASS_TRANSFORMER
display_pickle_warning(str(python_type))
from flyte.types._pickle import FlytePickleTransformer
return FlytePickleTransformer()
When this fallback occurs, the SDK issues a warning via display_pickle_warning to alert the developer that the type is being handled via pickling. This is important because pickling introduces a dependency on the Python version; objects pickled in one version of Python may not be deserializable in another.
Serialization Strategy and Optimization
The FlytePickleTransformer (found in src/flyte/types/_pickle.py) implements a dual-storage strategy to balance performance and resource usage. It uses cloudpickle for serialization and decides how to store the resulting bytes based on their size, governed by DEFAULT_PICKLE_BYTES_LIMIT (set to 10KB).
- Inlining (Binary): If the serialized object is smaller than or equal to 10KB, it is stored directly within the Flyte
Literalas aBinaryscalar. This avoids the overhead of an external file transfer. - Offloading (Blob): If the object exceeds 10KB, the transformer uses
FlytePickle.to_pickleto upload the data to the configured storage (e.g., S3, GCS, or local disk) and stores aBlobURI in the FlyteLiteral.
# src/flyte/types/_pickle.py
async def to_literal(self, python_val: T, python_type: Type[T], expected: types_pb2.LiteralType) -> literals_pb2.Literal:
if python_val is None:
raise AssertionError("Cannot pickle None Value.")
# ... metadata setup ...
if sys.getsizeof(python_val) > DEFAULT_PICKLE_BYTES_LIMIT:
remote_path = await FlytePickle.to_pickle(python_val)
return literals_pb2.Literal(
scalar=literals_pb2.Scalar(blob=literals_pb2.Blob(metadata=meta, uri=remote_path))
)
else:
return literals_pb2.Literal(
scalar=literals_pb2.Scalar(binary=literals_pb2.Binary(value=cloudpickle.dumps(python_val)))
)
The FlytePickle Type
While the system is designed to be automatic, the SDK provides a FlytePickle class that acts as a generic wrapper. Users can explicitly use FlytePickle or typing.Any in their task signatures to signal that pickling is acceptable for a specific input or output.
The FlytePickle class handles the low-level storage interactions. Its to_pickle method generates a unique path using an MD5 hash of the pickled bytes, ensuring that identical objects can potentially share storage paths.
# src/flyte/types/_pickle.py
@classmethod
async def to_pickle(cls, python_val: typing.Any) -> str:
h = hashlib.md5()
str_bytes = cloudpickle.dumps(python_val)
h.update(str_bytes)
uri = storage.get_random_local_path(file_path_or_file_name=h.hexdigest())
# ... file writing and storage upload ...
return await storage.put(str(uri))
Practical Example
In practice, this allows developers to use custom classes in Flyte tasks without any additional configuration. The following example from examples/basics/all_types.py demonstrates a task that accepts a custom object and a typing.Any type, both of which are handled by the pickling system.
from typing import Any, Dict
from flyte.types._pickle import FlytePickle
class CustomObject:
def __init__(self, value):
self.value = value
@env.task
async def process_any_pickle_types(my_any: Any, my_pickle: FlytePickle) -> Dict[str, str]:
"""Process Any and FlytePickle types"""
return {
"any": f"Any type: {my_any} (actual type: {type(my_any).__name__})",
"pickle": f"FlytePickle: {my_pickle} (type: {type(my_pickle).__name__})",
}
# Usage
sample_pickle = CustomObject("Hello from pickle!")
# The TypeEngine will automatically use FlytePickleTransformer for sample_pickle
Tradeoffs and Constraints
- Python Version Sensitivity: As noted in the system's warnings,
cloudpickleis sensitive to Python versions. Tasks producing and consuming pickled data should ideally run in environments with matching Python versions. - None Values: The
FlytePickleTransformerexplicitly forbids picklingNone. If a value isNone, it must be handled by theUnionTransformer(as anOptionaltype) or another appropriate transformer. - Type Erasure: For collection types like
listordictthat lack generic arguments (e.g., justlistinstead oflist[int]), theTypeEnginecannot determine a specific transformer and will fall back to pickling the entire collection. - Performance: While inlining helps for small objects, pickling large objects involves disk I/O and network transfers that are slower than using Flyte's native optimized types (like
FlyteDirectoryorStructuredDataset).