Caching and Versioning
Caching in this SDK is designed to optimize performance by avoiding the redundant execution of tasks when their logic and inputs remain unchanged. This is achieved through a flexible versioning system that can automatically detect code changes or allow for manual control over cache invalidation.
Caching Behaviors
The Cache class in flyte._cache.cache defines how a task should interact with the Flyte cache. It supports three primary behaviors:
auto: The SDK automatically computes a cache version based on the task's implementation. This is the most common setting for development, as it ensures that any change to the function body invalidates previous cached results.override: The user provides an explicitversion_overridestring. The cache is only invalidated when this string is manually updated. This is useful for production workflows where you want stable caching regardless of minor code refactors.disable: Caching is completely turned off, and the task will execute every time it is called.
These behaviors are typically configured via the TaskEnvironment or directly in the @env.task decorator:
from flyte import env, Cache
# Automatic versioning based on code content
@env.task(cache=Cache(behavior="auto"))
def compute_expensive_metric(data: list[int]) -> int:
return sum(data)
# Manual versioning
@env.task(cache=Cache(behavior="override", version_override="v1.0"))
def stable_task(x: int) -> int:
return x * 2
Automatic Versioning via AST Hashing
When behavior="auto" is used, the SDK relies on CachePolicy implementations to generate a version string. By default, it uses the FunctionBodyPolicy (found in flyte._cache.policy_function_body).
Instead of hashing the raw source code string, FunctionBodyPolicy uses Python's ast (Abstract Syntax Tree) module. This design choice is significant because it makes the cache resilient to non-functional changes. For example, adding comments, changing docstrings, or altering whitespace will not change the AST, and therefore will not invalidate the cache.
The implementation follows these steps:
- Retrieves the source code using
inspect.getsource(params.func). - Parses the source into an AST using
ast.parse. - Dumps the AST into a canonical string representation using
ast.dump(parsed_ast, include_attributes=False). - Hashes the resulting bytes combined with an optional
salt.
# Logic from FunctionBodyPolicy.get_version
source = inspect.getsource(params.func)
dedented_source = textwrap.dedent(source)
parsed_ast = ast.parse(dedented_source)
ast_bytes = ast.dump(parsed_ast, include_attributes=False).encode("utf-8")
combined_data = ast_bytes + salt.encode("utf-8")
return hashlib.sha256(combined_data).hexdigest()
Tradeoffs of AST Hashing
While AST hashing is more robust than raw string hashing, it depends on inspect.getsource. This can fail if the function is defined in an interactive environment (like a REPL) or if the source file is not accessible at runtime. Additionally, it only captures the logic inside the function; changes to external dependencies or global variables used by the function are not automatically detected by this policy.
Fine-tuning Cache Keys
The Cache class provides several parameters to refine how cache hits are determined:
Ignored Inputs
Sometimes, a task has inputs that should not affect whether a cached result is reused—for example, a request_id or a timestamp used only for logging. The ignored_inputs parameter allows you to exclude these from the cache key calculation.
@env.task(cache=Cache(behavior="auto", ignored_inputs=["timestamp"]))
def process_data(data: str, timestamp: str):
# This will hit the cache even if 'timestamp' changes,
# provided 'data' and the function body remain the same.
...
Cache Serialization
By default, if multiple instances of the same task (with the same inputs) are triggered simultaneously, they might all execute in parallel before the first one can populate the cache. Setting serialize=True ensures that only one instance runs while the others wait for the result, preventing redundant heavy computation.
Salting
The salt parameter provides a way to globally invalidate caches for a specific task without changing the code or the manual version. By changing the salt (e.g., salt="v2"), you ensure the generated hash will be different, effectively creating a new cache namespace.
Custom Cache Policies
For advanced use cases, the SDK defines a CachePolicy protocol. You can implement this protocol to create versioning logic based on external factors, such as the current Git SHA or the version of a specific library.
class GitHashPolicy:
def get_version(self, salt: str, params: VersionParameters) -> str:
import subprocess
# Use the current git commit as the version
git_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode().strip()
return hashlib.sha256(f"{salt}{git_hash}".encode()).hexdigest()
# Apply the custom policy
@env.task(cache=Cache(behavior="auto", policies=[GitHashPolicy()]))
def my_task():
...
The VersionParameters object passed to get_version contains metadata about the task, including the function object itself (func), the container image (image), and the code_bundle, allowing policies to make informed decisions about the version string. When multiple policies are provided, Cache.get_version concatenates their individual outputs before producing a final SHA-256 hash.