Managing Sharded JSONL Datasets
The JsonlDir class in flyteplugins.jsonl._jsonl_dir provides a high-level interface for managing large datasets split across multiple JSONL files (shards). It automates the complexities of shard rotation during writes and provides optimized, concurrent reading across shards.
Sharded Directory Structure
A JsonlDir represents a directory containing files named according to the pattern part-NNNNN.jsonl (e.g., part-00000.jsonl, part-00001.jsonl). It also supports zstd-compressed shards using the .jsonl.zst extension.
The class inherits from Flyte's Dir class, meaning it natively supports remote storage and standard directory operations like walk() and download().
Writing with Automatic Rotation
Writing to a JsonlDir is handled by JsonlDirWriter (async) or JsonlDirWriterSync (synchronous). These writers automatically "rotate" shards—closing the current file and opening a new one—based on configurable thresholds.
Rotation can be triggered by:
- Record Count: Controlled by
max_records_per_shard. - Byte Size: Controlled by
max_bytes_per_shard(defaults to 256 MB uncompressed).
When a threshold is reached, the writer increments the shard index and creates a new part-NNNNN.jsonl file.
from flyteplugins.jsonl import JsonlDir
@env.task
async def create_sharded_dataset() -> JsonlDir:
# Create a new remote directory for shards
d = JsonlDir.new_remote("output_shards")
# Use the writer context manager to ensure all shards are flushed and closed
async with d.writer(max_records_per_shard=1000) as w:
for i in range(5000):
# Automatically rotates every 1000 records
await w.write({"id": i, "data": "..."})
return d
The writer lazily opens the first shard only when write() is called and uses orjson for high-performance serialization.
High-Throughput Reading with Prefetching
The iter_records() method provides a transparent way to iterate over all records in a directory, regardless of how many shards exist. It automatically sorts shards by filename to ensure a deterministic processing order.
To maximize throughput, JsonlDir implements a prefetch-one-ahead strategy. While the consumer is processing records from the current shard, a background task (using _prefetch_shard) reads the next shard into an asyncio.Queue.
async def process_dataset(d: JsonlDir):
# prefetch=True (default) overlaps network I/O with processing
async for record in d.iter_records(prefetch=True):
await do_heavy_processing(record)
This strategy ensures that the consumer rarely stalls waiting for the next file to be downloaded and parsed. The memory usage is capped by queue_size (default 8192 records), ensuring that at most one shard's worth of records is buffered in memory.
Appending and Resuming Datasets
JsonlDir is designed for incremental data processing. When you open a writer on an existing directory, it calls _next_index() to scan the directory for existing part-NNNNN files. It then resumes numbering from the highest index found + 1.
# If the directory already contains part-00000.jsonl and part-00001.jsonl
async with d.writer() as w:
# This writer will start at part-00002.jsonl
await w.write({"new": "data"})
This makes it safe to run multiple tasks that append to the same output directory without overwriting existing data.
Compression and Advanced Iteration
Shards can be compressed using Zstandard by specifying the extension in the writer:
async with d.writer(shard_extension=".jsonl.zst", compression_level=3) as w:
await w.write(record)
JsonlDir supports mixed compression; iter_records() will correctly detect and decompress .jsonl.zst files even if they are mixed with plain .jsonl files in the same directory.
For batch processing or analytics, JsonlDir provides specialized iterators:
iter_batches(): Yields lists of records of a specific size.iter_arrow_batches(): Yields PyArrowRecordBatchobjects across all shards, ideal for high-performance data science workflows.
async for batch in d.iter_batches(batch_size=500):
# Process 500 records at a time
pass
async for arrow_batch in d.iter_arrow_batches():
# Work with PyArrow RecordBatches
pass