DynamicBatcher
Batches records from many concurrent producers and runs them through a single async processing function, maximizing resource utilization.
Attributes
| Attribute | Type | Description |
|---|---|---|
| stats | [BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats) | Current BatchStats snapshot. |
| is_running | boolean | Whether the aggregation and processing loops are active. |
Constructor
Signature
def DynamicBatcher(
process_fn: ProcessFn[RecordT, ResultT],
cost_estimator: CostEstimatorFn[RecordT]| None = None,
target_batch_cost: int = 32000,
max_batch_size: int = 256,
min_batch_size: int = 1,
batch_timeout_s: float = 0.05,
max_queue_size: int = 5000,
prefetch_batches: int = 2,
default_cost: int = 1
) - > null
Parameters
| Name | Type | Description |
|---|---|---|
| process_fn | ProcessFn[RecordT, ResultT] | An async function that processes a list of records and returns a list of results in the same order. |
| cost_estimator | `CostEstimatorFn[RecordT] | None` = None |
| target_batch_cost | int = 32000 | The cost budget per batch; the aggregator fills batches up to this limit. |
| max_batch_size | int = 256 | The maximum number of records allowed in a single batch. |
| min_batch_size | int = 1 | The minimum number of records required before dispatching a batch, unless a timeout occurs. |
| batch_timeout_s | float = 0.05 | Maximum time in seconds to wait for a batch to reach its target cost or size. |
| max_queue_size | int = 5000 | The capacity of the internal submission queue for backpressure management. |
| prefetch_batches | int = 2 | The number of assembled batches to buffer between aggregation and processing loops. |
| default_cost | int = 1 | The fallback cost assigned to a record if no other estimator is available. |
Methods
stats()
@classmethod
def stats() - > [BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats)
Current BatchStats snapshot.
Returns
| Type | Description |
|---|---|
[BatchStats](batchstats.md?sid=flyte_extras__dynamic_batcher_batchstats) | A snapshot of performance metrics including throughput, utilization, and average batch sizes. |
is_running()
@classmethod
def is_running() - > bool
Whether the aggregation and processing loops are active.
Returns
| Type | Description |
|---|---|
bool | True if the batcher has been started and is currently processing work, False otherwise. |
start()
@classmethod
def start() - > None
Start the aggregation and processing loops.
Returns
| Type | Description |
|---|---|
None |
stop()
@classmethod
def stop() - > None
Graceful shutdown: process all enqueued work, then stop.
Returns
| Type | Description |
|---|---|
None |
submit()
@classmethod
def submit(
record: RecordT,
estimated_cost: int | None = None
) - > asyncio.Future[ResultT]
Submit a single record for batched processing.
Parameters
| Name | Type | Description |
|---|---|---|
| record | RecordT | The input record to be added to a processing batch. |
| estimated_cost | `int | None` = None |
Returns
| Type | Description |
|---|---|
asyncio.Future[ResultT] | A future whose result is the corresponding entry from the list returned by process_fn. |
submit_batch()
@classmethod
def submit_batch(
records: Sequence[RecordT],
estimated_cost: Sequence[int]| None = None
) - > list[asyncio.Future[ResultT]]
Convenience: submit multiple records and return their futures.
Parameters
| Name | Type | Description |
|---|---|---|
| records | Sequence[RecordT] | An iterable collection of input records to be processed. |
| estimated_cost | `Sequence[int] | None` = None |
Returns
| Type | Description |
|---|---|
list[asyncio.Future[ResultT]] | List of futures, one per record, that resolve when their respective batch completes. |