TokenBatcher
Token-aware batcher for LLM inference workloads.
Constructor
Signature
def TokenBatcher(
inference_fn: ProcessFn[RecordT, ResultT]| None = None,
process_fn: ProcessFn[RecordT, ResultT]| None = None,
token_estimator: CostEstimatorFn[RecordT]| None = None,
cost_estimator: CostEstimatorFn[RecordT]| None = None,
target_batch_tokens: int | None = None,
target_batch_cost: int = 32000,
default_token_estimate: int | None = None,
default_cost: int = 1,
max_batch_size: int = 256,
min_batch_size: int = 1,
batch_timeout_s: float = 0.05,
max_queue_size: int = 5000,
prefetch_batches: int = 2
) - > None
Parameters
| Name | Type | Description |
|---|---|---|
| inference_fn | `ProcessFn[RecordT, ResultT] | None` = None |
| process_fn | `ProcessFn[RecordT, ResultT] | None` = None |
| token_estimator | `CostEstimatorFn[RecordT] | None` = None |
| cost_estimator | `CostEstimatorFn[RecordT] | None` = None |
| target_batch_tokens | `int | None` = None |
| target_batch_cost | int = 32000 | The target cost per batch. |
| default_token_estimate | `int | None` = None |
| default_cost | int = 1 | Default cost if estimation fails. |
| max_batch_size | int = 256 | Maximum number of records in a single batch. |
| min_batch_size | int = 1 | Minimum number of records required to trigger processing before timeout. |
| batch_timeout_s | float = 0.05 | Seconds to wait before processing a partial batch. |
| max_queue_size | int = 5000 | Maximum number of records allowed in the queue. |
| prefetch_batches | int = 2 | Number of batches to pre-calculate and keep ready. |
Signature
def TokenBatcher(
inference_fn: ProcessFn[RecordT, ResultT]| None = None,
process_fn: ProcessFn[RecordT, ResultT]| None = None,
token_estimator: CostEstimatorFn[RecordT]| None = None,
cost_estimator: CostEstimatorFn[RecordT]| None = None,
target_batch_tokens: int | None = None,
target_batch_cost: int = 32000,
default_token_estimate: int | None = None,
default_cost: int = 1,
max_batch_size: int = 256,
min_batch_size: int = 1,
batch_timeout_s: float = 0.05,
max_queue_size: int = 5000,
prefetch_batches: int = 2
) - > null
Parameters
| Name | Type | Description |
|---|---|---|
| inference_fn | `ProcessFn[RecordT, ResultT] | None` = None |
| process_fn | `ProcessFn[RecordT, ResultT] | None` = None |
| token_estimator | `CostEstimatorFn[RecordT] | None` = None |
| cost_estimator | `CostEstimatorFn[RecordT] | None` = None |
| target_batch_tokens | `int | None` = None |
| target_batch_cost | int = 32000 | Alias for target_batch_tokens; defines the capacity limit for a single batch. |
| default_token_estimate | `int | None` = None |
| default_cost | int = 1 | Alias for default_token_estimate; the default weight for any single record. |
| max_batch_size | int = 256 | The maximum number of individual records allowed in a single batch. |
| min_batch_size | int = 1 | The minimum number of records required to process a batch before the timeout is reached. |
| batch_timeout_s | float = 0.05 | The maximum time in seconds to wait for a batch to fill before processing it anyway. |
| max_queue_size | int = 5000 | The maximum number of pending records allowed in the queue before backpressure is applied. |
| prefetch_batches | int = 2 | The number of batches to prepare in advance to keep the inference function saturated. |
Methods
submit()
@classmethod
def submit(
record: RecordT,
estimated_tokens: int | None,
estimated_cost: int | None
) - > asyncio.Future[ResultT]
Submit a single record for batched inference. Accepts either estimated_tokens or estimated_cost.
Parameters
| Name | Type | Description |
|---|---|---|
| record | RecordT | The input record. |
| estimated_tokens | `int | None` |
| estimated_cost | `int | None` |
Returns
| Type | Description |
|---|---|
asyncio.Future[ResultT] | A future whose result is the corresponding entry from the list returned by the inference function. |