Two functions for a Kafka consumer

A Kafka consumer service does roughly five things: connect to the broker, deserialize the message, run business logic, retry on failure, send the dead ones to a DLQ. Out of those five, four are the same in every service.

I kept writing the same ~200 lines of consumer plumbing across services, with the actual message handler being maybe 10 of them. The fix is not a base class. It is to make the broker setup itself a one-liner and let each consumer be just two functions, a lifespan (set up clients, stash them in context) and an on_message (do the work).

The factory

Permalink to “The factory”
class KafkaBrokerProvider:
    @classmethod
    def init_broker(cls, settings: KafkaSettings, topic: str | None = None) -> KafkaBroker:
        broker = KafkaBroker(
            bootstrap_servers=settings.bootstrap_servers,
            security=cls._get_security(settings),
            config=cls._get_broker_config(settings),
        )
        if topic:
            cls._add_retry_middleware(broker, topic, max_retries=settings.max_consumer_retries)
        return broker

Auth, security, retry middleware, DLQ routing: all decided here, once. Every consumer in the org gets the same behaviour without copying anything.

The retry middleware

Permalink to “The retry middleware”

A BaseMiddleware that wraps the consume scope with tenacity.AsyncRetrying, and on final failure republishes to f"{topic}.DLQ" with the exception baked into the payload:

class RetryMiddleware(BaseMiddleware):
    async def consume_scope(self, call_next, msg):
        try:
            async for attempt in AsyncRetrying(stop=stop_after_attempt(self.max_retries), ...):
                with attempt:
                    return await call_next(msg)
        except RetryError as exc:
            await self.broker.publish(
                {"original": msg.decoded_body, "error": repr(exc)},
                topic=f"{self.topic}.DLQ",
            )

No service has to think about retries or DLQs again. They happen because the broker was created by the factory.

What a consumer looks like now

Permalink to “What a consumer looks like now”
@asynccontextmanager
async def lifespan(context: ContextRepo):
    processor = EventProcessor(...)
    context.set_global("processor", processor)
    yield
    await processor.aclose()

async def on_message(item: Item, context: ContextRepo) -> None:
    processor = cast(EventProcessor, context.get("processor"))
    await processor.handle(item)

def main():
    broker = KafkaBrokerProvider.init_broker(settings, topic="items")
    broker.subscriber("items")(on_message)
    app = FastStream(broker, lifespan=lifespan)
    asyncio.run(app.run())

That is the whole file. About 60 lines including imports. The same shape works for push (real-time events) and pull (batch ETL triggers), only the Pydantic model in the handler signature changes.

Why this beats a base class

Permalink to “Why this beats a base class”

A base class forces every consumer to inherit a hierarchy, override hooks in the right order, and fight the framework whenever the shape does not fit. The factory approach has none of that. The consumer is just a module with two top-level functions, and FastStream does the wiring. There is nothing to override, nothing to subclass, and the entire “framework” is one file that you can read in five minutes.

Most of a consumer service has nothing to do with the consumer itself. Once you accept that, the only code worth writing is the handler.