Skip to main content

Streaming JSONL Files

The JsonlFile class provides a memory-efficient way to process large JSON Lines (JSONL) datasets by streaming records one at a time. It supports transparent Zstandard compression, configurable error handling for corrupt lines, and integration with Apache Arrow for data science workflows.

Basic Streaming Writes and Reads

To write data without loading the entire dataset into memory, use the writer() async context manager. To read it back, use the iter_records() async generator.

from flyteplugins.jsonl import JsonlFile

@env.task
async def process_jsonl() -> int:
# 1. Create a remote file reference
f = JsonlFile.new_remote("data.jsonl")

# 2. Stream write records
async with f.writer() as w:
for i in range(1000):
await w.write({"id": i, "data": f"item-{i}"})

# 3. Stream read records back
count = 0
async for record in f.iter_records():
count += 1

return count

The JsonlWriter uses an internal _JsonlBuffer (default 1MB) to batch writes to the underlying storage, minimizing I/O overhead.

Using Zstandard Compression

Compression is handled transparently based on the file extension. If the path ends in .jsonl.zst or .jsonl.zstd, JsonlFile automatically uses _ZstdJsonlWriter for writes and performs streaming decompression during reads.

@env.task
async def write_compressed() -> JsonlFile:
# The .zst extension triggers Zstandard compression
f = JsonlFile.new_remote("data.jsonl.zst")

# compression_level defaults to 3; higher is slower but smaller
async with f.writer(compression_level=5) as w:
for i in range(100):
await w.write({"id": i, "compressed": True})

return f

Synchronous Operations

If you are working in a non-async environment, JsonlFile provides synchronous equivalents for both reading and writing.

@env.task
def sync_workflow(f: JsonlFile):
# Synchronous streaming write
with f.writer_sync() as w:
w.write({"status": "starting"})
w.write_many([{"id": 1}, {"id": 2}])

# Synchronous streaming read
for record in f.iter_records_sync():
print(record)

Streaming to Arrow RecordBatches

For high-performance data processing, you can stream JSONL records directly into pyarrow.RecordBatch objects. This is significantly faster for tabular data and keeps memory usage bounded by the batch_size.

@env.task
async def process_as_arrow(f: JsonlFile):
import pyarrow as pa

# Yields Arrow RecordBatches of 4096 rows each
async for batch in f.iter_arrow_batches(batch_size=4096):
# Process the batch (e.g., filter, aggregate)
print(f"Processed batch with {batch.num_rows} rows")

Handling Parsing Errors

By default, iter_records() raises an exception if it encounters a malformed JSON line. You can change this behavior using the on_error parameter.

Skipping Corrupt Lines

Use on_error="skip" to ignore malformed lines and continue processing the rest of the file.

async for record in f.iter_records(on_error="skip"):
# Corrupt lines are logged as warnings but do not stop the loop
process(record)

Custom Error Handlers

Provide a callable to perform custom logging or error tracking.

def my_handler(line_number: int, raw_line: bytes, exc: Exception):
print(f"Error on line {line_number}: {exc}")

async for record in f.iter_records(on_error=my_handler):
process(record)

Troubleshooting

Compression Not Working

Zstandard compression is extension-based. If your file is compressed but does not end in .jsonl.zst or .jsonl.zstd, JsonlFile will attempt to read it as plain text and fail. Ensure your remote paths use the correct extension.

Missing Data on Write

Always use writer() or writer_sync() as context managers. The JsonlWriter buffers data in memory; the context manager's __aexit__ or __exit__ ensures that flush() is called to write the final remaining bytes to storage.

PyArrow Dependency

The iter_arrow_batches and iter_arrow_batches_sync methods require the pyarrow package. If it is not installed in your environment, these methods will raise a ModuleNotFoundError.

Memory Usage with Large Records

While JsonlFile streams line-by-line, a single extremely large JSON line (e.g., a multi-gigabyte single record) must still fit into memory to be parsed by orjson. For such cases, consider sharding the data into multiple files using JsonlDir.