Skip to main content

DynamicBatcher

Batches records from many concurrent producers and runs them through a single async processing function, maximizing resource utilization.

Attributes

AttributeTypeDescription
stats[BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats)Current BatchStats snapshot.
is_runningbooleanWhether the aggregation and processing loops are active.

Constructor

Signature

def DynamicBatcher(
process_fn: ProcessFn[RecordT, ResultT],
cost_estimator: CostEstimatorFn[RecordT]| None = None,
target_batch_cost: int = 32000,
max_batch_size: int = 256,
min_batch_size: int = 1,
batch_timeout_s: float = 0.05,
max_queue_size: int = 5000,
prefetch_batches: int = 2,
default_cost: int = 1
) - > null

Parameters

NameTypeDescription
process_fnProcessFn[RecordT, ResultT]An async function that processes a list of records and returns a list of results in the same order.
cost_estimator`CostEstimatorFn[RecordT]None` = None
target_batch_costint = 32000The cost budget per batch; the aggregator fills batches up to this limit.
max_batch_sizeint = 256The maximum number of records allowed in a single batch.
min_batch_sizeint = 1The minimum number of records required before dispatching a batch, unless a timeout occurs.
batch_timeout_sfloat = 0.05Maximum time in seconds to wait for a batch to reach its target cost or size.
max_queue_sizeint = 5000The capacity of the internal submission queue for backpressure management.
prefetch_batchesint = 2The number of assembled batches to buffer between aggregation and processing loops.
default_costint = 1The fallback cost assigned to a record if no other estimator is available.

Methods


stats()

@classmethod
def stats() - > [BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats)

Current BatchStats snapshot.

Returns

TypeDescription
[BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats)A snapshot of performance metrics including throughput, utilization, and average batch sizes.

is_running()

@classmethod
def is_running() - > bool

Whether the aggregation and processing loops are active.

Returns

TypeDescription
boolTrue if the batcher has been started and is currently processing work, False otherwise.

start()

@classmethod
def start() - > None

Start the aggregation and processing loops.

Returns

TypeDescription
None

stop()

@classmethod
def stop() - > None

Graceful shutdown: process all enqueued work, then stop.

Returns

TypeDescription
None

submit()

@classmethod
def submit(
record: RecordT,
estimated_cost: int | None = None
) - > asyncio.Future[ResultT]

Submit a single record for batched processing.

Parameters

NameTypeDescription
recordRecordTThe input record to be added to a processing batch.
estimated_cost`intNone` = None

Returns

TypeDescription
asyncio.Future[ResultT]A future whose result is the corresponding entry from the list returned by process_fn.

submit_batch()

@classmethod
def submit_batch(
records: Sequence[RecordT],
estimated_cost: Sequence[int]| None = None
) - > list[asyncio.Future[ResultT]]

Convenience: submit multiple records and return their futures.

Parameters

NameTypeDescription
recordsSequence[RecordT]An iterable collection of input records to be processed.
estimated_cost`Sequence[int]None` = None

Returns

TypeDescription
list[asyncio.Future[ResultT]]List of futures, one per record, that resolve when their respective batch completes.