Skip to main content

Data Hashing and Cache Management

Data hashing in this SDK is the foundational mechanism for content-based caching. By computing stable identifiers for data—ranging from simple files to complex DataFrames—the system can determine if a task's inputs have changed without necessarily downloading or inspecting the entire dataset. This infrastructure is primarily implemented in src/flyte/io/_hashing_io.py.

The Hashing Protocol

The core of the hashing system is the HashMethod protocol. It defines a simple, incremental interface that allows different hashing strategies to be used interchangeably by the SDK's I/O utilities.

class HashMethod(Protocol):
def update(self, data: Any, /) -> None: ...
def result(self) -> str: ...

This protocol allows the SDK to support various hashing scenarios:

  • Standard Algorithms: HashlibAccumulator wraps Python's built-in hashlib (e.g., SHA-256) to provide cryptographic hashing of byte streams.
  • Custom Logic: HashFunction allows developers to define domain-specific hashing, such as summing the hash of rows in a DataFrame.
  • Optimized Bypasses: PrecomputedValue is used when a hash is already known (e.g., from external metadata), allowing the system to satisfy the protocol without re-reading data.

Content-Based Hashing for Complex Types

For complex data types like pandas or Polars DataFrames, standard byte-level hashing of the serialized file is often insufficient or non-deterministic. The SDK uses HashFunction in conjunction with Python's typing.Annotated to attach custom hashing logic to type hints.

When a task returns a type annotated with a HashFunction, the SDK executes the provided callable to generate a cache key.

Example: Custom DataFrame Hashing

In examples/advanced/hash_pandas_dataframe.py, a custom hash is defined using pandas' internal hashing utilities:

def hash_pandas_dataframe(df: pd.DataFrame) -> str:
# Uses pandas' built-in hash_pandas_object to compute a content-based hash
return str(pd.util.hash_pandas_object(df).sum())

# Create a type alias that includes the HashFunction
HashedPandasDataFrame = Annotated[pd.DataFrame, HashFunction.from_fn(hash_pandas_dataframe)]

@env.task
async def produce_dataframe() -> HashedPandasDataFrame:
df = pd.DataFrame({"id": [1, 2, 3], "value": [100, 200, 300]})
return df

This design choice decouples the hashing logic from the task implementation, allowing the same DataFrame to be hashed differently depending on the requirements of the workflow.

Incremental I/O Hashing

A significant design challenge in data processing is the "double-read" problem: reading a file once to compute its hash and a second time to upload it to storage. The SDK solves this using HashingReader and HashingWriter.

These classes act as transparent wrappers around file-like objects. They intercept read() and write() calls to update an internal HashMethod accumulator as data passes through.

Implementation Details

The HashingWriter ensures that hashing is performed on the exact bytes being committed to storage. It handles string encoding automatically if the underlying file handle provides an encoding, defaulting to UTF-8.

class HashingWriter:
def __init__(self, fh, accumulator: HashMethod, *, encoding: Optional[str] = None):
self._fh = fh
self._acc = accumulator
self._encoding = encoding or getattr(fh, "encoding", None)

def write(self, data):
mv = self._to_bytes_mv(data)
self._acc.update(mv)
return self._fh.write(data)

By using memoryview (via _to_bytes_mv), the implementation minimizes memory copying during the hashing process, which is critical for performance when handling large files.

Design Tradeoffs and Constraints

Determinism

The effectiveness of the caching system relies entirely on the determinism of the HashFunction. If a custom hash function incorporates non-deterministic elements (like timestamps or unseeded random values), it will cause cache misses. The SDK assumes that any HashFunction provided via Annotated is deterministic across different execution environments.

Encoding Sensitivity

HashingReader and HashingWriter must handle the transition between text and binary modes. If a file is opened in text mode, the SDK must encode the strings into bytes before hashing. The implementation prioritizes the file handle's own encoding:

# From HashingReader._to_bytes_mv
if isinstance(data, str):
return memoryview(data.encode(self._encoding or "utf-8", self._errors))

This ensures that the hash computed by the SDK matches the hash of the file as it exists on disk or in remote storage, provided the encoding remains consistent.

Performance vs. Integrity

The use of HashingWriter adds a small overhead to every write operation. While this is generally negligible compared to network I/O latency, it is a tradeoff made to ensure that every file uploaded to the Flyte platform has a verified content hash available for downstream caching. In scenarios where the hash is already known, PrecomputedValue can be used to bypass this overhead entirely while still fulfilling the HashMethod interface required by the I/O utilities.