A 100-line async request coalescer

Embedding models are several times faster on a batch of 32 inputs than on 32 sequential calls of size 1. The GPU loads the weights once, runs one forward pass, returns. Sequential calls pay the kernel-launch and memory-transfer overhead 32 times.

This is well-known on the training side and annoyingly under-served on the serving side, because the natural API for callers is “embed this one thing.” If you make them batch manually, half of them will not, and your throughput collapses.

The fix is a small async primitive. Callers await evaluator.evaluate(item) as if it were a one-at-a-time call. Inside, the primitive holds requests for a few milliseconds, accumulates whatever arrives, and dispatches them as a single batch. Each caller’s future resolves to its own slice of the result.

The interface

Permalink to “The interface”
class DelayedEvaluator[InputT, OutputT]:
    def __init__(
        self,
        process_batch: Callable[[list[InputT]], Awaitable[list[OutputT]]],
        delay_ms: int = 5,
    ):
        self._process_batch = process_batch
        self._delay_ms = delay_ms
        self._lock = asyncio.Lock()
        self._pending: list[_Pending[InputT, OutputT]] = []
        self._task: asyncio.Task | None = None

    async def evaluate(self, items: list[InputT]) -> list[OutputT]:
        future = asyncio.get_running_loop().create_future()
        async with self._lock:
            self._pending.append(_Pending(items, future))
            if self._task is None:
                self._task = asyncio.create_task(self._dispatch_after_delay())
        return await future

_Pending is a tiny dataclass holding the per-call inputs and the future that resolves to that call’s outputs. The lock is there so two callers arriving in the same event loop tick can both register before the first dispatch fires.

The dispatch

Permalink to “The dispatch”
    async def _dispatch_after_delay(self):
        await asyncio.sleep(self._delay_ms / 1000)
        async with self._lock:
            pending, self._pending = self._pending, []
            self._task = None

        all_inputs = [item for p in pending for item in p.items]
        try:
            all_outputs = await self._process_batch(all_inputs)
        except Exception as exc:
            for p in pending:
                p.future.set_exception(exc)
            return

        # split results back per caller, in order.
        i = 0
        for p in pending:
            n = len(p.items)
            p.future.set_result(all_outputs[i : i + n])
            i += n

A few things matter here.

The inputs are concatenated and the outputs are split back by length. No sorting, no IDs. itertools.accumulate of len(p.items) gives you the slice boundaries in O(n).

Exceptions fan out. A failed batch fails every caller with the same exception. Do not swallow it on some callers and not others.

The task is None again at the end, so that the next caller starts a fresh sleep. If you forget this, you will dispatch one batch and then permanently hang, ask me how I know.

Choosing the delay

Permalink to “Choosing the delay”

5ms is a reasonable default for a model that takes 50ms or more to evaluate. A 10% latency tax for 5-10x more throughput is a good trade. For very fast models (under 10ms) the delay should be smaller, or the coalescer is just the wrong tool.

The cost shows up most under low load. A single caller still waits 5ms for nothing. If your service has lulls, that latency is visible. For services that are always busy the delay is paid only by the first request in each window and amortised across the rest.

There are libraries that do this kind of thing. They are also wrappers around HTTP servers, or tied to a specific ML framework, or they expect inputs of a fixed shape. The primitive itself is around 100 lines and fits into any async codebase. Inference, database access, external API rate-limiting, anything where a batched call is faster than N individual ones.

Once it is in your toolbox you stop writing batching logic at the call sites. The caller writes await x.evaluate(item), and the speedup is invisible.