Skip to main content

Managing Directories and File Collections

To manage collections of files and directory structures in Flyte, use the Dir class. It provides a unified interface for interacting with directories across local and remote filesystems, supporting both asynchronous and synchronous operations.

Uploading and Iterating through Files

The most common pattern involves uploading a local directory to remote storage and then iterating through its contents in a downstream task.

import os
from flyte import env
from flyte.io import Dir

@env.task
async def process_directory() -> int:
# 1. Create a local directory with files
os.makedirs("/tmp/my_data", exist_ok=True)
with open("/tmp/my_data/file1.txt", "w") as f:
f.write("data1")
with open("/tmp/my_data/file2.txt", "w") as f:
f.write("data2")

# 2. Upload to remote storage
remote_dir = await Dir.from_local("/tmp/my_data/")

# 3. Walk through files recursively
file_count = 0
async for file in remote_dir.walk(recursive=True):
async with file.open("rb") as f:
content = await f.read()
print(f"Processing {file.name}: {content}")
file_count += 1

return file_count

Creating Directory References

You can instantiate Dir objects using several class methods depending on where the data currently resides.

From Local Filesystem

Use from_local (async) or from_local_sync (sync) to upload a local directory. If no remote_destination is provided, Flyte automatically generates a path in the configured raw data prefix.

@env.task
def upload_sync() -> Dir:
# Synchronous upload to a specific S3 path
return Dir.from_local_sync(
local_path="/tmp/data_dir/",
remote_destination="s3://my-bucket/data/"
)

From Existing Remote Storage

If the data is already in remote storage (e.g., S3, GCS), use from_existing_remote to create a reference without moving any data.

@env.task
async def reference_existing():
d = Dir.from_existing_remote("s3://my-bucket/existing-dataset/")
if await d.exists():
files = await d.list_files()
print(f"Found {len(files)} top-level files.")

Creating a New Remote Reference

Use new_remote when you want to generate a remote path for a directory that you intend to write to directly (e.g., using a library that writes to S3).

@env.task
async def create_output_ref() -> Dir:
# Generates a unique remote path like s3://bucket/raw/uuid/my-output/
d = Dir.new_remote(dir_name="my-output")
return d

The Dir class provides methods to explore the directory structure without downloading the entire collection.

  • walk / walk_sync: Recursively (default) or non-recursively iterate through File objects.
  • list_files / list_files_sync: Get a list of File objects in the top-level directory.
  • get_file / get_file_sync: Retrieve a specific File by its relative name.
@env.task
async def find_config(d: Dir) -> str:
# Get a specific file by name
config_file = await d.get_file("config.json")
if config_file:
async with config_file.open("r") as f:
return await f.read()
return "{}"

Downloading for Local Processing

If you need to use local tools or legacy libraries that require a local filesystem path, use the download method.

@env.task
def process_locally(d: Dir):
# Downloads the entire remote directory to a local temporary path
local_path = d.download_sync()

# local_path is a string pointing to the downloaded directory
for filename in os.listdir(local_path):
print(f"Local file: {filename}")

Handling Optional Directories

In Flyte, Optional[Dir] can encounter serialization issues. Instead, use the EmptyDir sentinel to represent the absence of a directory.

from flyte.io import Dir, EmptyDir
from flyte import Output

@env.task
async def maybe_produce_dir(should_output: bool) -> Output:
if should_output:
return Output(output_dir=await Dir.from_local("/tmp/data"))
# Return the sentinel instead of None
return Output(output_dir=EmptyDir())

@env.task
async def consumer_task(d: Dir):
# Check the is_empty property rather than using isinstance
if d.is_empty:
print("No directory was provided.")
return

async for file in d.walk():
...

Troubleshooting and Best Practices

Checking for Empty Directories

Always use the dir.is_empty property to check for EmptyDir. The EmptyDir subclass identity is not preserved during Flyte serialization; it is reconstructed as a standard Dir object with a sentinel path (::flyte-empty-dir::).

Upload Performance

When using Dir.from_local, you can control the concurrency of file uploads using the batch_size parameter or the FLYTE_IO_BATCH_SIZE environment variable (default is 32).

# Increase concurrency for directories with many small files
remote_dir = await Dir.from_local("/tmp/large_dir/", batch_size=64)

Generic Types and Formats

Dir is a generic class Dir[T]. The type T represents the format of the files within the directory (e.g., Dir[PARQUET]). This format is captured in the Flyte BlobType metadata during serialization.