Managing Directories and File Collections
To manage collections of files and directory structures in Flyte, use the Dir class. It provides a unified interface for interacting with directories across local and remote filesystems, supporting both asynchronous and synchronous operations.
Uploading and Iterating through Files
The most common pattern involves uploading a local directory to remote storage and then iterating through its contents in a downstream task.
import os
from flyte import env
from flyte.io import Dir
@env.task
async def process_directory() -> int:
# 1. Create a local directory with files
os.makedirs("/tmp/my_data", exist_ok=True)
with open("/tmp/my_data/file1.txt", "w") as f:
f.write("data1")
with open("/tmp/my_data/file2.txt", "w") as f:
f.write("data2")
# 2. Upload to remote storage
remote_dir = await Dir.from_local("/tmp/my_data/")
# 3. Walk through files recursively
file_count = 0
async for file in remote_dir.walk(recursive=True):
async with file.open("rb") as f:
content = await f.read()
print(f"Processing {file.name}: {content}")
file_count += 1
return file_count
Creating Directory References
You can instantiate Dir objects using several class methods depending on where the data currently resides.
From Local Filesystem
Use from_local (async) or from_local_sync (sync) to upload a local directory. If no remote_destination is provided, Flyte automatically generates a path in the configured raw data prefix.
@env.task
def upload_sync() -> Dir:
# Synchronous upload to a specific S3 path
return Dir.from_local_sync(
local_path="/tmp/data_dir/",
remote_destination="s3://my-bucket/data/"
)
From Existing Remote Storage
If the data is already in remote storage (e.g., S3, GCS), use from_existing_remote to create a reference without moving any data.
@env.task
async def reference_existing():
d = Dir.from_existing_remote("s3://my-bucket/existing-dataset/")
if await d.exists():
files = await d.list_files()
print(f"Found {len(files)} top-level files.")
Creating a New Remote Reference
Use new_remote when you want to generate a remote path for a directory that you intend to write to directly (e.g., using a library that writes to S3).
@env.task
async def create_output_ref() -> Dir:
# Generates a unique remote path like s3://bucket/raw/uuid/my-output/
d = Dir.new_remote(dir_name="my-output")
return d
Navigating and Accessing Files
The Dir class provides methods to explore the directory structure without downloading the entire collection.
walk/walk_sync: Recursively (default) or non-recursively iterate throughFileobjects.list_files/list_files_sync: Get a list ofFileobjects in the top-level directory.get_file/get_file_sync: Retrieve a specificFileby its relative name.
@env.task
async def find_config(d: Dir) -> str:
# Get a specific file by name
config_file = await d.get_file("config.json")
if config_file:
async with config_file.open("r") as f:
return await f.read()
return "{}"
Downloading for Local Processing
If you need to use local tools or legacy libraries that require a local filesystem path, use the download method.
@env.task
def process_locally(d: Dir):
# Downloads the entire remote directory to a local temporary path
local_path = d.download_sync()
# local_path is a string pointing to the downloaded directory
for filename in os.listdir(local_path):
print(f"Local file: {filename}")
Handling Optional Directories
In Flyte, Optional[Dir] can encounter serialization issues. Instead, use the EmptyDir sentinel to represent the absence of a directory.
from flyte.io import Dir, EmptyDir
from flyte import Output
@env.task
async def maybe_produce_dir(should_output: bool) -> Output:
if should_output:
return Output(output_dir=await Dir.from_local("/tmp/data"))
# Return the sentinel instead of None
return Output(output_dir=EmptyDir())
@env.task
async def consumer_task(d: Dir):
# Check the is_empty property rather than using isinstance
if d.is_empty:
print("No directory was provided.")
return
async for file in d.walk():
...
Troubleshooting and Best Practices
Checking for Empty Directories
Always use the dir.is_empty property to check for EmptyDir. The EmptyDir subclass identity is not preserved during Flyte serialization; it is reconstructed as a standard Dir object with a sentinel path (::flyte-empty-dir::).
Upload Performance
When using Dir.from_local, you can control the concurrency of file uploads using the batch_size parameter or the FLYTE_IO_BATCH_SIZE environment variable (default is 32).
# Increase concurrency for directories with many small files
remote_dir = await Dir.from_local("/tmp/large_dir/", batch_size=64)
Generic Types and Formats
Dir is a generic class Dir[T]. The type T represents the format of the files within the directory (e.g., Dir[PARQUET]). This format is captured in the Flyte BlobType metadata during serialization.