Working with Remote Files
To handle individual data files in Flyte tasks, use the File class from flyte.io. This class provides a unified interface for interacting with files across local and remote storage (like S3 or GCS), supporting both asynchronous and synchronous operations.
Quick Start: Creating and Reading Remote Files
The most common way to handle files is to create a new remote reference and write data directly to it, or read from an existing File object passed into a task.
import flyte
from flyte.io import File
@flyte.TaskEnvironment(name="file-example").task
async def process_file(content: str) -> str:
# 1. Create a new remote file reference
f = File.new_remote()
# 2. Write to it asynchronously
async with f.open("wb") as fh:
await fh.write(content.encode("utf-8"))
# 3. Read it back
async with f.open("rb") as fh:
# NOTE: read() returns a memoryview; convert to bytes for decoding
contents = bytes(await fh.read())
return contents.decode("utf-8")
Uploading Local Files
If you have a file already existing on your local filesystem, you can upload it to Flyte's remote storage using from_local (async) or from_local_sync (sync).
import tempfile
import os
from flyte.io import File
@env.task
async def upload_data() -> File:
# Create a temporary local file
with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp:
tmp.write("local data content")
tmp_path = tmp.name
try:
# Upload to a randomly generated remote path
remote_file = await File.from_local(tmp_path)
return remote_file
finally:
os.unlink(tmp_path)
@env.task
def upload_data_sync() -> File:
# Synchronous version
return File.from_local_sync("/path/to/local/file.csv")
Streaming Large Files
For large files that do not fit in memory, use the block_size parameter in the open method to stream data in chunks.
@env.task
async def stream_large_file(f: File) -> int:
total_bytes = 0
# Use a small block_size to force chunked reading
async with f.open("rb", block_size=1024 * 1024) as fh:
while True:
chunk = await fh.read()
if not chunk:
break
total_bytes += len(chunk)
# Process chunk...
return total_bytes
Ensuring Retry Safety with Named Remotes
By default, File.new_remote() generates a random path every time it is called. If a task fails and retries, a new path is generated, which can lead to orphaned files. Use File.named_remote() to create a deterministic path based on the task's node ID, ensuring retries use the same location.
@env.task
async def retry_safe_task(data: str) -> File:
# This path is stable across task retries
f = File.named_remote(name="results.json")
if await f.exists():
return f # Skip work if already done in previous attempt
async with f.open("wb") as fh:
await fh.write(data.encode("utf-8"))
return f
Synchronous File Operations
While async is preferred for I/O-bound tasks, you can use synchronous methods in standard Python tasks.
@env.task
def sync_processing(f: File) -> str:
# Download to a local temporary path
local_path = f.download_sync()
# Check existence
if f.exists_sync():
# Open using the sync context manager
with f.open_sync("rt") as fh:
return fh.read()
return ""
Troubleshooting and Best Practices
- Memoryview Return Type: In asynchronous mode,
await fh.read()returns amemoryviewobject rather thanbytes. Always wrap the result inbytes()if you need to perform operations like.decode().# Correct
content = bytes(await fh.read()).decode("utf-8") - Downloading to Directories: When using
download()ordownload_sync(), if thelocal_pathends with a trailing slash (e.g.,/tmp/data/), the SDK treats it as a directory and preserves the remote filename inside that directory. - Existing Remote References: To reference a file that already exists in S3/GCS without uploading anything, use
File.from_existing_remote("s3://my-bucket/key.txt"). - Performance: The
Fileclass usesobstorefor high-performance asynchronous storage operations, which bypasses standard Python filesystem overhead when possible.