Skip to main content

TokenBatcher

Token-aware batcher for LLM inference workloads.

Constructor

Signature

def TokenBatcher(
inference_fn: ProcessFn[RecordT, ResultT]| None = None,
process_fn: ProcessFn[RecordT, ResultT]| None = None,
token_estimator: CostEstimatorFn[RecordT]| None = None,
cost_estimator: CostEstimatorFn[RecordT]| None = None,
target_batch_tokens: int | None = None,
target_batch_cost: int = 32000,
default_token_estimate: int | None = None,
default_cost: int = 1,
max_batch_size: int = 256,
min_batch_size: int = 1,
batch_timeout_s: float = 0.05,
max_queue_size: int = 5000,
prefetch_batches: int = 2
) - > None

Parameters

NameTypeDescription
inference_fn`ProcessFn[RecordT, ResultT]None` = None
process_fn`ProcessFn[RecordT, ResultT]None` = None
token_estimator`CostEstimatorFn[RecordT]None` = None
cost_estimator`CostEstimatorFn[RecordT]None` = None
target_batch_tokens`intNone` = None
target_batch_costint = 32000The target cost per batch.
default_token_estimate`intNone` = None
default_costint = 1Default cost if estimation fails.
max_batch_sizeint = 256Maximum number of records in a single batch.
min_batch_sizeint = 1Minimum number of records required to trigger processing before timeout.
batch_timeout_sfloat = 0.05Seconds to wait before processing a partial batch.
max_queue_sizeint = 5000Maximum number of records allowed in the queue.
prefetch_batchesint = 2Number of batches to pre-calculate and keep ready.

Signature

def TokenBatcher(
inference_fn: ProcessFn[RecordT, ResultT]| None = None,
process_fn: ProcessFn[RecordT, ResultT]| None = None,
token_estimator: CostEstimatorFn[RecordT]| None = None,
cost_estimator: CostEstimatorFn[RecordT]| None = None,
target_batch_tokens: int | None = None,
target_batch_cost: int = 32000,
default_token_estimate: int | None = None,
default_cost: int = 1,
max_batch_size: int = 256,
min_batch_size: int = 1,
batch_timeout_s: float = 0.05,
max_queue_size: int = 5000,
prefetch_batches: int = 2
) - > null

Parameters

NameTypeDescription
inference_fn`ProcessFn[RecordT, ResultT]None` = None
process_fn`ProcessFn[RecordT, ResultT]None` = None
token_estimator`CostEstimatorFn[RecordT]None` = None
cost_estimator`CostEstimatorFn[RecordT]None` = None
target_batch_tokens`intNone` = None
target_batch_costint = 32000Alias for target_batch_tokens; defines the capacity limit for a single batch.
default_token_estimate`intNone` = None
default_costint = 1Alias for default_token_estimate; the default weight for any single record.
max_batch_sizeint = 256The maximum number of individual records allowed in a single batch.
min_batch_sizeint = 1The minimum number of records required to process a batch before the timeout is reached.
batch_timeout_sfloat = 0.05The maximum time in seconds to wait for a batch to fill before processing it anyway.
max_queue_sizeint = 5000The maximum number of pending records allowed in the queue before backpressure is applied.
prefetch_batchesint = 2The number of batches to prepare in advance to keep the inference function saturated.

Methods


submit()

@classmethod
def submit(
record: RecordT,
estimated_tokens: int | None,
estimated_cost: int | None
) - > asyncio.Future[ResultT]

Submit a single record for batched inference. Accepts either estimated_tokens or estimated_cost.

Parameters

NameTypeDescription
recordRecordTThe input record.
estimated_tokens`intNone`
estimated_cost`intNone`

Returns

TypeDescription
asyncio.Future[ResultT]A future whose result is the corresponding entry from the list returned by the inference function.