Skip to main content

Working with Remote Files

To handle individual data files in Flyte tasks, use the File class from flyte.io. This class provides a unified interface for interacting with files across local and remote storage (like S3 or GCS), supporting both asynchronous and synchronous operations.

Quick Start: Creating and Reading Remote Files

The most common way to handle files is to create a new remote reference and write data directly to it, or read from an existing File object passed into a task.

import flyte
from flyte.io import File

@flyte.TaskEnvironment(name="file-example").task
async def process_file(content: str) -> str:
# 1. Create a new remote file reference
f = File.new_remote()

# 2. Write to it asynchronously
async with f.open("wb") as fh:
await fh.write(content.encode("utf-8"))

# 3. Read it back
async with f.open("rb") as fh:
# NOTE: read() returns a memoryview; convert to bytes for decoding
contents = bytes(await fh.read())
return contents.decode("utf-8")

Uploading Local Files

If you have a file already existing on your local filesystem, you can upload it to Flyte's remote storage using from_local (async) or from_local_sync (sync).

import tempfile
import os
from flyte.io import File

@env.task
async def upload_data() -> File:
# Create a temporary local file
with tempfile.NamedTemporaryFile(mode="w", delete=False) as tmp:
tmp.write("local data content")
tmp_path = tmp.name

try:
# Upload to a randomly generated remote path
remote_file = await File.from_local(tmp_path)
return remote_file
finally:
os.unlink(tmp_path)

@env.task
def upload_data_sync() -> File:
# Synchronous version
return File.from_local_sync("/path/to/local/file.csv")

Streaming Large Files

For large files that do not fit in memory, use the block_size parameter in the open method to stream data in chunks.

@env.task
async def stream_large_file(f: File) -> int:
total_bytes = 0
# Use a small block_size to force chunked reading
async with f.open("rb", block_size=1024 * 1024) as fh:
while True:
chunk = await fh.read()
if not chunk:
break
total_bytes += len(chunk)
# Process chunk...
return total_bytes

Ensuring Retry Safety with Named Remotes

By default, File.new_remote() generates a random path every time it is called. If a task fails and retries, a new path is generated, which can lead to orphaned files. Use File.named_remote() to create a deterministic path based on the task's node ID, ensuring retries use the same location.

@env.task
async def retry_safe_task(data: str) -> File:
# This path is stable across task retries
f = File.named_remote(name="results.json")

if await f.exists():
return f # Skip work if already done in previous attempt

async with f.open("wb") as fh:
await fh.write(data.encode("utf-8"))
return f

Synchronous File Operations

While async is preferred for I/O-bound tasks, you can use synchronous methods in standard Python tasks.

@env.task
def sync_processing(f: File) -> str:
# Download to a local temporary path
local_path = f.download_sync()

# Check existence
if f.exists_sync():
# Open using the sync context manager
with f.open_sync("rt") as fh:
return fh.read()
return ""

Troubleshooting and Best Practices

  • Memoryview Return Type: In asynchronous mode, await fh.read() returns a memoryview object rather than bytes. Always wrap the result in bytes() if you need to perform operations like .decode().
    # Correct
    content = bytes(await fh.read()).decode("utf-8")
  • Downloading to Directories: When using download() or download_sync(), if the local_path ends with a trailing slash (e.g., /tmp/data/), the SDK treats it as a directory and preserves the remote filename inside that directory.
  • Existing Remote References: To reference a file that already exists in S3/GCS without uploading anything, use File.from_existing_remote("s3://my-bucket/key.txt").
  • Performance: The File class uses obstore for high-performance asynchronous storage operations, which bypasses standard Python filesystem overhead when possible.