OAuth to GCP Managed Kafka, no keys

If you run Kafka on GCP’s Managed Service for Apache Kafka and you want a Cloud Run service to consume from it, you have a problem. The Confluent Python client (librdkafka under the hood) speaks SASL/OAUTHBEARER, which expects a token in JWT shape. Google’s Application Default Credentials give you a plain OAuth 2.0 access token. The two simply do not line up.

The official tutorials use SASL_PLAIN with a service account key file. That works, sure, but I did not want a static key file back in the deployment when ADC was already available. So I went looking for how to make OAUTHBEARER work with an ADC token.

It turns out librdkafka does not really verify the JWT. It expects three base64url-encoded segments separated by .. It parses the header to read the algorithm, parses the payload to read exp, and treats the third segment as opaque. The token presented to the broker in the SASL handshake is the raw concatenation. That is enough information to write the bridge.

The bridge

Permalink to “The bridge”
import base64
import json
import time

import google.auth
from google.auth.transport.requests import Request


class GcpKafkaTokenProvider:
    HEADER = json.dumps({"typ": "JWT", "alg": "GOOG_OAUTH2_TOKEN"})

    def __init__(self):
        self._creds, _ = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )

    def get_token(self, _config_str: str) -> tuple[str, float]:
        if not self._creds.valid:
            self._creds.refresh(Request())

        payload = json.dumps({
            "exp": int(self._creds.expiry.timestamp()),
            "iat": int(time.time()),
            "iss": "google",
            "sub": self._creds.service_account_email,
        })

        token = ".".join([
            self._b64(self.HEADER),
            self._b64(payload),
            self._b64(self._creds.token),  # the actual OAuth token, in the signature slot
        ])
        return token, self._creds.expiry.timestamp()

    @staticmethod
    def _b64(s: str) -> str:
        return base64.urlsafe_b64encode(s.encode()).rstrip(b"=").decode()

The Confluent client takes a callback in its config:

from confluent_kafka import Consumer

provider = GcpKafkaTokenProvider()

consumer = Consumer({
    "bootstrap.servers": "...:9092",
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "OAUTHBEARER",
    "oauth_cb": provider.get_token,
    "group.id": "my-group",
})

That is the whole integration. No key file needed, ADC handles the credential through the metadata server.

On the IAM side you need two roles on the service account that runs the consumer. roles/managedkafka.client to actually call the broker. roles/iam.serviceAccountTokenCreator only if you mint tokens for another identity, which most people do not.

The Cloud Run revision picks up the service account from spec.template.spec.serviceAccountName. ADC inside the container resolves to the metadata server automatically.

I lost most of a day finding all of this out. The librdkafka docs do not mention GCP. The GCP docs hand you a key file. The Confluent samples are all in Java. The thing I needed, “synthesize a JWT shape from an ADC token in Python”, was 30 lines once I knew what to do, and not written down anywhere I could find before that. So now it is written down.

If you hit the same wall, the trick is just remembering that nobody is verifying the signature. Three base64url segments with a real token in the third one is all the broker needs.