krivaltsevich.com Kafka Internals4.4

18 · Security: Authentication & Authorization

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

Kafka's security model has two largely independent halves. Wire security decides who a connection belongs to: every listener carries a SecurityProtocol that selects a pluggable ChannelBuilder, which wraps each socket in a KafkaChannel with an optional TLS SslTransportLayer and/or a SASL Authenticator exchanging SaslHandshake/SaslAuthenticate frames; the outcome is a KafkaPrincipal. Access control then decides what that principal may do: the broker calls a pluggable Authorizer — in KRaft the built-in StandardAuthorizer, whose ACL bindings are sourced from AccessControlEntryRecords in the replicated metadata log, so ACLs propagate exactly like topic metadata. This chapter traces both paths from the byte level up: TLS and SASL channel construction, the SCRAM/PLAIN/GSSAPI/OAUTHBEARER mechanisms, principal building, and the deny-wins ACL evaluation algorithm invoked on every request.

Role & responsibilities

The security subsystem sits at the boundary between the network layer and request processing. Its responsibilities:

  • Confidentiality & integrity via TLS on SSL and SASL_SSL listeners (the JDK SSLEngine wrapped by SslTransportLayer).
  • Authentication: prove client (and broker) identity, either from a TLS client certificate (mutual TLS) or a SASL exchange (PLAIN, SCRAM-SHA-256/512, GSSAPI, OAUTHBEARER). The result is a KafkaPrincipal attached to the channel.
  • Principal building: map a TLS X.500 subject or a SASL authorization ID to a KafkaPrincipal via a configurable KafkaPrincipalBuilder.
  • Authorization: for each request action, evaluate ACLs and return ALLOWED/DENIED. ACL storage (in KRaft, the metadata log) and ACL evaluation are both owned here.
  • Credential management: persist SCRAM credentials and delegation tokens (as replicated metadata records / a token cache) and feed them to the SASL servers.
Key idea

Authentication and authorization are decoupled. Authentication runs once per connection (plus periodic re-auth) inside the network thread before any application request is read; authorization runs per-action on the request-handler thread using a locally cached, metadata-replicated ACL set. A connection can be authenticated yet authorized for nothing.

Where it lives in the code

ConcernPrincipal classFile
Channel factory dispatchChannelBuildersclients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
TLS channel builderSslChannelBuilderclients/.../common/network/SslChannelBuilder.java
SASL channel builderSaslChannelBuilderclients/.../common/network/SaslChannelBuilder.java
TLS transport / handshakeSslTransportLayerclients/.../common/network/SslTransportLayer.java
SSL engine factorySslFactory, DefaultSslEngineFactoryclients/.../common/security/ssl/SslFactory.java
SASL server / client state machineSaslServerAuthenticator, SaslClientAuthenticatorclients/.../common/security/authenticator/
SCRAM server / cryptoScramSaslServer, ScramFormatterclients/.../common/security/scram/internals/
Principal builderDefaultKafkaPrincipalBuilderclients/.../common/security/authenticator/DefaultKafkaPrincipalBuilder.java
Authorizer SPIAuthorizer, Action, AuthorizationResultclients/.../server/authorizer/
KRaft authorizerStandardAuthorizer, StandardAuthorizerData, AclCachemetadata/.../metadata/authorizer/
ACL record on the wire/diskStandardAcl, AccessControlEntryRecordmetadata/.../metadata/authorizer/StandardAcl.java, metadata/src/main/resources/common/metadata/AccessControlEntryRecord.json
Controller ACL/SCRAM managersAclControlManager, ScramControlManagermetadata/.../controller/
Metadata-to-runtime publishersAclPublisher, ScramPublishermetadata/.../metadata/publisher/
Broker-side authorize helperAuthHelpercore/src/main/scala/kafka/server/AuthHelper.scala

Core concepts & terminology

SecurityProtocol
Per-listener enum: PLAINTEXT(0), SSL(1), SASL_PLAINTEXT(2), SASL_SSL(3)SecurityProtocol.java:28. The numeric ids are permanent and must match across releases.
KafkaPrincipal
A (principalType, name) pair; type is "User" for the default authorizer (KafkaPrincipal.java:44). Its toString() is "User:name", which is exactly the string form used in ACLs and super.users.
Authorization ID
The SASL-level identity (SaslServer.getAuthorizationID()) that the principal builder turns into a KafkaPrincipal.
ACL binding
A ResourcePattern (type + name + PatternType) joined to an AccessControlEntry (principal, host, operation, permission). Stored as StandardAcl.
PatternType
LITERAL(3) (exact name, or * wildcard) or PREFIXED(4) (name prefix). MATCH/ANY exist only in filters.
Action
What the broker asks the authorizer about: an AclOperation on a literal ResourcePattern, with resourceReferenceCount and logIfAllowed/logIfDenied hints (Action.java:25).

Architecture & control flow

Listenername + SecurityProtocol
create(securityProtocol, mode)?ChannelBuilders.java:109
PlaintextChannelBuilder
SslChannelBuilder
SaslChannelBuilderJaasContext per mechanism
KafkaChannelTransportLayer (Plaintext ∣ SslTransportLayer) + Authenticator (lazy Supplier: none from TLS ∣ SASL FSM)
Authenticator.principal()⇒ KafkaPrincipal
attached to every RequestContext
Authorizer.authorize(ctx, actions)
From listener security protocol to an authenticated principal feeding authorization: create() dispatches on the SecurityProtocol to one of three channel builders, each producing a KafkaChannel whose transport layer and authenticator together yield the KafkaPrincipal.
broker network / channel component authentication outcome (principal) rounded = decision (protocol dispatch) construction / control flow (label = method or branch) Class = source identifier

The two factory entry points are serverChannelBuilder(...) and clientChannelBuilder(...) in ChannelBuilders.java:95 / :63; both funnel into the private create(...) at ChannelBuilders.java:109, which switches on the SecurityProtocol. For server SASL it loads one JaasContext per enabled mechanism (ChannelBuilders.java:138). Notably, for a SASL_SSL server listener, TLS client authentication is forced off unless explicitly re-enabled with a listener-prefixed ssl.client.auth: the code computes sslClientAuthOverride = NONE (ChannelBuilders.java:152) because SASL is the source of identity.

Where the principal comes from

For PLAINTEXT, the principal is always KafkaPrincipal.ANONYMOUS (DefaultKafkaPrincipalBuilder.java:70). For SSL, the principal is derived from the peer certificate's X.500 subject and run through the SslPrincipalMapper rules (DefaultKafkaPrincipalBuilder.java:100); if the peer presented no certificate (SSLPeerUnverifiedException), it falls back to ANONYMOUS. For SASL, GSSAPI authorization IDs are shortened via Kerberos rules, while all other mechanisms wrap the authorization ID directly as User:<authzId> (DefaultKafkaPrincipalBuilder.java:78).

Note

The principal builder is itself pluggable via principal.builder.class (default DefaultKafkaPrincipalBuilder, BrokerSecurityConfigs.java:72). Custom builders enable group/role principals, but the built-in StandardAuthorizer only understands a single (type, name) pair per ACL.

TLS: SslTransportLayer, SslFactory and mutual TLS

TLS is provided by the JDK SSLEngine. SslFactory (SslFactory.java:54) owns a pluggable SslEngineFactory (default DefaultSslEngineFactory, selected by ssl.engine.factory.class, SslFactory.java:135) which loads the keystore and truststore and stamps ssl.client.auth onto each engine. On a server listener, createServerSslEngine(peerHost, peerPort) is used; on a client, createClientSslEngine(..., endpointIdentification) is used and the engine performs hostname verification when ssl.endpoint.identification.algorithm is non-empty (default https).

SslTransportLayer drives the handshake as a non-blocking state machine inside the network thread. handshake() (SslTransportLayer.java:280) loops over doHandshake() (:335) dispatching on the engine's HandshakeStatus:

doHandshake() — switch on HandshakeStatusSslTransportLayer.java:335
handshakeWrap()produce TLS records into netWriteBuffer, flush
handshakeUnwrap()read peer records from netReadBuffer, feed engine
runDelegatedTasks()run blocking crypto tasks (e.g. cert validation)
handshake completeapp data may flow
Non-blocking TLS handshake: doHandshake() loops, dispatching on the engine's HandshakeStatus. OP_WRITE/OP_READ interest is toggled as wrap/unwrap need more I/O; the three work branches re-enter the switch until the engine reports FINISHED.
broker network-thread step (handshake work) handshake complete (app data flows) rounded = decision (HandshakeStatus dispatch) branch taken (label = HandshakeStatus) loop back into doHandshake() handshake finished

The ssl.client.auth setting (enum SslClientAuth: REQUIRED, REQUESTED, NONE; default NONE, BrokerSecurityConfigs.java:86) controls mutual TLS. With REQUIRED, the client must present a trusted certificate; its X.500 subject becomes the principal. After the handshake, SslTransportLayer.sslSession() exposes the SSLSession that the principal builder reads.

Dynamic keystore reconfiguration

Brokers can hot-swap keystores without restart. SslFactory.reconfigure() (SslFactory.java:125) builds a fresh engine factory and, crucially, validates compatibility: you cannot add or remove a keystore/truststore on an existing listener, and by default the new certificate's Distinguished Name and Subject Alternative Names must match the old one's (CertificateEntries.ensureCompatible, SslFactory.java:313). These checks are relaxed only by ssl.allow.dn.changes / ssl.allow.san.changes (both default false, BrokerSecurityConfigs.java:123/:128). For inter-broker listeners, SslEngineValidator (SslFactory.java:402) runs a full in-memory client/server handshake to prove the new config interoperates before committing it.

SASL: the authentication state machine

On a SASL listener, application bytes cannot be read until authentication completes. SaslServerAuthenticator (SaslServerAuthenticator.java:93) implements a precise state machine, driven by repeated authenticate() calls from the network thread as bytes arrive. Each SASL frame on the wire is a 4-byte big-endian length followed by an opaque payload (SaslServerAuthenticator.java:245).

first frame: ApiVersions or SaslHandshake INITIAL_REQUEST read first frame HANDSHAKE_OR_VERSIONS_REQUEST ApiVersions (client discovers versions first) HANDSHAKE_REQUEST SaslHandshake → createSaslServer (v1+ wraps tokens in SaslAuthenticate) AUTHENTICATE saslServer.isComplete() COMPLETE
Shortcut & failure edges (off the main spine):
· HANDSHAKE_OR_VERSIONS_REQUEST → AUTHENTICATE, first frame is already a SaslHandshake, so createSaslServer runs immediately (skips HANDSHAKE_REQUEST)
· AUTHENTICATE → FAILED, AuthenticationException: send failure response, then throw (terminal)
Re-authentication (KIP-368, long-lived connection):
· ◉ → REAUTH_PROCESS_HANDSHAKE, a re-auth begins on the established connection
· REAUTH_PROCESS_HANDSHAKE → AUTHENTICATE, same mechanism rejoins the normal authenticate loop
· REAUTH_PROCESS_HANDSHAKE → REAUTH_BAD_MECHANISM, mechanism changed → throw (terminal)
Server SASL state machine (SaslServerAuthenticator.java:108), driven by repeated authenticate() calls as frames arrive. The happy path runs INITIAL_REQUEST → … → AUTHENTICATE → COMPLETE; bad credentials reach the terminal FAILED state, and re-authentication rejoins at AUTHENTICATE only if the mechanism is unchanged.
pill = SASL authenticator state accent = COMPLETE (authenticated) ◉ = initial · ◉ end = terminal (COMPLETE / FAILED) transition (label = trigger frame / call) A → B = off-spine edge (see notes)

Handshake details

The very first frame must be either an ApiVersions request (so the client can discover supported versions before authenticating — the broker answers with apiVersionSupplier.apply(version) at SaslServerAuthenticator.java:583) or a SaslHandshakeRequest (apiKey 17) naming a mechanism (handleHandshakeRequest, :549). If the requested mechanism is not in sasl.enabled.mechanisms, the server replies UNSUPPORTED_SASL_MECHANISM with the list of supported mechanisms and throws. When the handshake is v1 or higher, the server sets enableKafkaSaslAuthenticateHeaders=true, meaning subsequent SASL tokens are carried inside Kafka SaslAuthenticate requests/responses (apiKey 36) rather than raw bytes (:552). The opaque SaslServer.evaluateResponse challenge/response loop then runs in handleSaslToken (:421).

Gotcha

If the first frame is not parseable as a Kafka request, the server raises an InvalidRequestException with a pointed message: the client is likely configured with the wrong security protocol, doesn't support KIP-43, or isn't a Kafka client at all (e.g. an HTTP probe). A raw GSSAPI token starts with 0x60 and would trigger exactly this (SaslServerAuthenticator.java:537).

Caution

On a SaslException caused by bad credentials, the server deliberately does not echo the exception text to the client — it returns a generic "invalid credentials" message (SaslServerAuthenticator.java:487) to avoid leaking whether a user exists. Custom ScramSaslServer code carries the same warning in its Javadoc.

Re-authentication (KIP-368)

SASL connections are long-lived, so a token (e.g. OAUTHBEARER) embedded at connect time could outlive its validity. KIP-368 adds connections.max.reauth.ms (default 0 = disabled, BrokerSecurityConfigs.java:109). When positive, on each successful authentication the server computes a session lifetime as min(credentialExpirationMs - now, connectionsMaxReauthMs) and ships it to the client in the SaslAuthenticate response's SessionLifetimeMs field (calcCompletionTimesAndReturnSessionLifetimeMs, SaslServerAuthenticator.java:667). After expiry, any non-re-auth use of the connection is rejected. Re-authentication starts in REAUTH_PROCESS_HANDSHAKE; the new mechanism must equal the original or it transitions to REAUTH_BAD_MECHANISM (:656), and ReauthInfo.ensurePrincipalUnchanged forbids changing identity across re-auth (:642).

SASL mechanisms

The server callback handler is chosen per mechanism in SaslChannelBuilder.createServerCallbackHandlers (SaslChannelBuilder.java:316):

MechanismServer callback handlerCredential sourceTLS recommended?
PLAINPlainServerCallbackHandlerJAAS config / custom handlerYes (password in cleartext on wire)
SCRAM-SHA-256/512ScramServerCallbackHandlerCredentialCache fed from metadata logOptional (challenge-response)
GSSAPISaslServerCallbackHandlerKerberos KDC / keytab subjectOptional
OAUTHBEAREROAuthBearerUnsecuredValidatorCallbackHandler (default) or customJWT validated against signing keyYes (bearer token)

The enabled set comes from sasl.enabled.mechanisms (default [GSSAPI], BrokerSecurityConfigs.java:97); the inter-broker mechanism is sasl.mechanism.inter.broker.protocol (default GSSAPI, from SaslConfigs.DEFAULT_SASL_MECHANISM, wired at BrokerSecurityConfigs.java:175). Clients pick one via sasl.mechanism (default GSSAPI, SaslConfigs.java:35).

SCRAM: salted challenge-response

SCRAM (RFC 5802) never sends the password and never stores it. Two mechanisms exist, differing only in hash/HMAC and iteration bounds (ScramMechanism.java:32):

Mechanismtype bytehashMACmin / max iterations
SCRAM-SHA-2561SHA-256HmacSHA2564096 / 16384
SCRAM-SHA-5122SHA-512HmacSHA5124096 / 16384

The stored credential is a ScramCredential = {salt, storedKey, serverKey, iterations} (ScramCredential.java:24). The crypto in ScramFormatter (ScramFormatter.java:40) derives these from the password:

saltedPassword = Hi(Normalize(password), salt, iterations)   // PBKDF2-style, ScramFormatter.java:93
clientKey      = HMAC(saltedPassword, "Client Key")          // :97
storedKey      = H(clientKey)                                 // :101
serverKey      = HMAC(saltedPassword, "Server Key")           // :144
clientserver (ScramSaslServer)
client-first   n,n=user,r=cNonce
RECEIVE_CLIENT_FIRST_MESSAGE, look up ScramCredential via NameCallback
server-first   r=cNonce+sNonce, s=salt, i=iters
ServerFirstMessage.toBytes() :135
client-final   c=biws, r=…, p=clientProof
RECEIVE_CLIENT_FINAL_MESSAGE, verifyClientProof() :227 · storedKey' = H(clientSig XOR proof) · MessageDigest.isEqual(storedKey', storedKey)
server-final   v=serverSignature
COMPLETE, client verifies the server via serverSignature
SCRAM exchange (top → bottom). The server proves the client knows the password without ever seeing it — it compares a reconstructed storedKey' against the stored one in constant time — and the client verifies the server via serverSignature.
client server (ScramSaslServer) client → server message server → client reply grey box = server-side step / verification field=value = SCRAM wire attributes

Verification is constant-time via MessageDigest.isEqual (ScramSaslServer.java:232). The server rejects credentials whose stored iteration count is below the mechanism minimum (:133) and rejects a mismatched authorization id (:130). On any failure it scrubs the in-memory credential (clearCredentials, :239). SCRAM provides neither integrity nor privacy for the channel itself — wrap/unwrap throw (:204), so use SASL_SSL for confidentiality.

Where SCRAM credentials live in KRaft

SCRAM credentials are cluster metadata. The controller's ScramControlManager handles AlterUserScramCredentials (KIP-554): it validates iteration bounds (4096..16384, ScramControlManager.java:290), converts the client-supplied saltedPassword into storedKey/serverKey (finishUpsertion, :260), and emits a UserScramCredentialRecord — whose fields are exactly {Name, Mechanism(int8), Salt(bytes), StoredKey(bytes), ServerKey(bytes), Iterations(int32)} (UserScramCredentialRecord.json). These records replicate through the metadata log; on every broker the ScramPublisher applies the delta to a CredentialProvider/CredentialCache (ScramPublisher.java:51), which is precisely the cache the ScramServerCallbackHandler reads at authentication time (ScramServerCallbackHandler.java). The toString() of ScramCredentialData redacts every field as [hidden] (ScramCredentialData.java:82).

Invariant

SCRAM requires a metadata version that supports it (metadataVersion.isScramSupported(), ScramControlManager.java:183); otherwise alterations fail with UNSUPPORTED_VERSION. The plaintext password never reaches the controller — the client computes saltedPassword, and only the derived keys are persisted.

OAUTHBEARER: JWT bearer tokens

OAUTHBEARER (RFC 7628) carries an OAuth2 bearer token. The client obtains an OAuthBearerToken (interface: value(), scope(), lifetimeMs(), principalName(), OAuthBearerToken.java:42) via its login module and stores it in the Subject's private credentials (OAuthBearerLoginModule.commit, OAuthBearerLoginModule.java:387); a refreshing login (OAuthBearerRefreshingLogin) renews it before expiry. The server validates the token through a JwtValidator. The production validator BrokerJwtValidator wraps a jose4j JwtConsumer (BrokerJwtValidator.java:79) that resolves the signing key, requires an expiration time, and optionally enforces expected audience/issuer and clock skew; the principal is the configured subject claim (default sub). The token's expiry feeds the KIP-368 re-auth lifetime via the negotiated property CREDENTIAL_LIFETIME_MS. The default callback handler is the unsecured validator, intended only for testing.

GSSAPI / Kerberos

For GSSAPI, SaslServerAuthenticator.createSaslKerberosServer (SaslServerAuthenticator.java:219) determines the service principal and hostname from the broker's JAAS-authenticated Subject (keytab) and creates a JDK GSS SaslServer. Kerberos authorization IDs like user/host@REALM are reduced to short names by KerberosShortNamer using sasl.kerberos.principal.to.local.rules (default DEFAULT, BrokerSecurityConfigs.java:63). Optional native GSS credentials are wired up when sun.security.jgss.native=true (SaslChannelBuilder.java:373).

Delegation tokens (KIP-48)

Delegation tokens are a lightweight, SCRAM-backed secondary credential issued to an already-authenticated principal — useful for distributed jobs that should not ship a keytab. A token authenticates via the SCRAM mechanisms: when a client sets the SCRAM extension tokenauth=true, ScramSaslServer uses a DelegationTokenCredentialCallback instead of the normal credential callback, looks the token up in the DelegationTokenCache, and sets the authorization id to the token owner (ScramSaslServer.java:112). The resulting KafkaPrincipal is flagged tokenAuthenticated(true) (SaslServerAuthenticator.java:313), a flag carried through to authorization and serialized in DefaultPrincipalData. Token metadata and lifecycle are managed by DelegationTokenManager (server-common/.../security/DelegationTokenManager.java) and replicated like SCRAM via a DelegationTokenPublisher.

Authorization: the Authorizer SPI

The pluggable contract is org.apache.kafka.server.authorizer.Authorizer (Authorizer.java:81), enabled with authorizer.class.name. The lifecycle is explicit in its Javadoc: the broker constructs and configures the authorizer, the authorizer begins loading its metadata, and for each listener the broker waits on the CompletionStage returned by start(AuthorizerServerInfo) before accepting connections on that listener. Key methods:

  • List<AuthorizationResult> authorize(ctx, List<Action>) — synchronous, called on the request thread; must avoid blocking.
  • createAcls / deleteAcls — asynchronous (return CompletionStages) so updates can round-trip to a controller without tying up request threads.
  • acls(filter) — enumerate bindings for DescribeAcls.
  • authorizeByResourceType(ctx, op, resourceType) — default method (Authorizer.java:184) answering "may this principal do op on any resource of this type?", used e.g. for idempotent-producer checks; it implements its own prefix-domination logic over allow/deny patterns.

On the broker side, the Scala AuthHelper (AuthHelper.scala:42) adapts KafkaApis calls into Action lists. authorize(...) builds a single literal ResourcePattern action and returns a boolean (:43); filterByAuthorized groups N resource references by name, authorizes each unique name once with the count as resourceReferenceCount, and returns the allowed set (:100). If no authorizer is configured, every call short-circuits to "allowed" (authorizer.forall). KafkaApis invokes these helpers densely — e.g. READ GROUP for offset commit, WRITE TRANSACTIONAL_ID and WRITE TOPIC for produce, CLUSTER_ACTION CLUSTER for inter-broker fetch (KafkaApis.scala:284, :402, :589).

Operation implication and resource types

Operations are a fixed enum (AclOperation): READ(3), WRITE(4), CREATE(5), DELETE(6), plus ALTER, DESCRIBE, CLUSTER_ACTION, DESCRIBE_CONFIGS, ALTER_CONFIGS, IDEMPOTENT_WRITE, token operations, etc. Which operations are meaningful per resource type is encoded in AclEntry.supportedOperations (AclEntry.java:53) — e.g. TOPIC supports READ, WRITE, CREATE, DESCRIBE, DELETE, ALTER, DESCRIBE_CONFIGS, ALTER_CONFIGS; TRANSACTIONAL_ID supports DESCRIBE, WRITE, TWO_PHASE_COMMIT; USER supports the delegation-token operations CREATE_TOKENS, DESCRIBE_TOKENS.

The KRaft StandardAuthorizer

StandardAuthorizer (KIP-801) is the built-in authorizer for KRaft clusters; it stores ACLs in __cluster_metadata rather than any external store. It is a thin, thread-safe shell around an immutable StandardAuthorizerData held in a single volatile field (StandardAuthorizer.java:72). Every mutation (config change, ACL add/remove, snapshot load) produces a new StandardAuthorizerData and reassigns the volatile reference; readers grab the current reference once per call (curData = data, :144) and run lock-free.

On-disk / replicated ACL representation

An ACL on the metadata log is an AccessControlEntryRecord (apiKey 18) with fields {Id(uuid), ResourceType(int8), ResourceName(string), PatternType(int8), Principal(string), Host(string), Operation(int8), PermissionType(int8)} (AccessControlEntryRecord.json). In memory this becomes a StandardAcl record (StandardAcl.java:34) plus its UUID. Deletions emit a RemoveAccessControlEntryRecord referencing the UUID. The controller's AclControlManager validates new bindings (no UNKNOWN/ANY types, only LITERAL/PREFIXED patterns, non-empty name, parseable principal — AclControlManager.java:135), assigns a random unique UUID, and on replay maintains both a TimelineHashMap<Uuid, StandardAcl> and a TimelineHashSet<StandardAcl> for dedup (:228).

AdminClient CreateAcls
QuorumController
AclControlManager.createAcls()
__cluster_metadataRaft-replicated
controller in-memoryTimelineHashMap
broker MetadataLoader⇒ AclPublisher.onMetadataUpdate
loadSnapshot(allAcls)replace whole set atomically
addAcl / removeAclin LinkedHashMap order
StandardAuthorizer.datavolatile reference
ACLs are ordinary metadata: written by the controller, replicated by Raft, and applied on every node by AclPublisher — either as an atomic snapshot load or as in-order deltas swapped into the authorizer's single volatile data reference.
admin client controller / metadata writer __cluster_metadata log broker-side publisher / authorizer cylinder = log / volatile data store record emitted Raft replay (every node)

AclPublisher (AclPublisher.java:57) is careful about ordering. A snapshot load replaces the whole set atomically via loadSnapshot; an incremental delta applies addAcl/removeAcl in the LinkedHashMap iteration order so that, e.g., a DENY ALL followed by an ALLOW topic foo is never transiently visible in reverse order. The first snapshot load also calls completeInitialLoad(), which flips loadingComplete=true and completes the future that gates request acceptance (StandardAuthorizer.java:91).

Invariant

Until the initial ACL load finishes, authorize() throws AuthorizerNotReadyException for non-super-users (StandardAuthorizerData.java:239). Super-users are still allowed, and early-start listeners (e.g. the controller-to-controller listener) bypass the wait via earlyStartListeners (StandardAuthorizer.java:129). This prevents a fail-open window during startup.

In-memory index: the reverse-sorted NavigableSet

AclCache (AclCache.java:32) holds two persistent structures: an ImmutableMap<Uuid, StandardAcl> by id, and an ImmutableNavigableSet<StandardAcl> ordered by StandardAcl.compareTo. The ordering is the clever part (StandardAcl.java:83): sort by resourceType, then by resource name reversed, then pattern type, operation, principal, host, permission. Reverse-name ordering makes all prefix ACLs that could match a given resource name contiguous, so the matcher can walk them with a bounded scan instead of examining every ACL.

The authorize() evaluation algorithm

StandardAuthorizerData.authorize (StandardAuthorizerData.java:226) requires the action's pattern to be LITERAL (the request side always passes a concrete resource name) and proceeds:

  1. Super-user shortcut. If principal.toString() is in super.users, return ALLOWED immediately (rule = SuperUserRule). Super-users bypass even DENY ACLs (:237).
  2. Readiness check. Else if loading is incomplete, throw AuthorizerNotReadyException.
  3. Find matching ACLs. findAclRule scans the index twice: first for ACLs whose resource name is a prefix of (or equal to) the request name (checkSection starting at an exemplar keyed on the resource name), then for wildcard ACLs stored as LITERAL "*" (:354).
  4. Match each candidate. findResult (:483) checks: principal ∈ {requestPrincipal, User:*}; host equals the ACL host or *; and operation matches — with the implication rule below.
  5. Deny wins. The first matching DENY short-circuits and returns DENIED (checkSection returns immediately on deny, :428). Otherwise a matching ALLOW yields ALLOWED.
  6. Default. If no ACL referenced the resource at all, return the configured default (allow.everyone.if.no.acl.found); if ACLs existed for the resource but none matched the principal/op, return DENIED (MatchingRuleBuilder.build, :584).
authorize(ctx, action)StandardAuthorizerData.java:226
principal ∈ super.users?
ALLOWEDSuperUserRule, bypasses DENY
!loadingComplete?
throw AuthorizerNotReadyException
findAclRule(principals={user, User:*}, host, action)§1 prefix/literal ACLs (reverse-sorted scan) · §2 wildcard ACLs (LITERAL '*')
per candidate: principal + host + operation (+implication) match?
DENIEDdeny wins, STOP
ALLOWED
DENIED
defaultallow.everyone.if.no.acl.found
Deny-wins evaluation: super-user > DENY > ALLOW > default, over a name-prefix-contiguous scan (operation matching applies implicit DESCRIBE on ALLOW ACLs only). The first matching DENY short-circuits to DENIED.
authorizer step ALLOWED outcome DENIED / not-ready outcome rounded = decision allow branch deny / throw branch continue / default branch

Operation implication

For ALLOW ACLs only, certain operations imply others (StandardAuthorizerData.java:436): an allow of READ/WRITE/DELETE/ALTER (or DESCRIBE itself) implies DESCRIBE; an allow of ALTER_CONFIGS (or DESCRIBE_CONFIGS) implies DESCRIBE_CONFIGS; and ALL implies everything. This implication does not apply to DENY ACLs — a DENY READ does not deny DESCRIBE (:502).

Key idea

Precedence is super-user > DENY > ALLOW > default. A single matching DENY beats any number of ALLOWs, regardless of LITERAL vs PREFIXED specificity. The only thing that beats a DENY is being a super-user.

Prefix-matching walk (the tricky bit)

checkSection (StandardAuthorizerData.java:380) exploits the reverse-name sort. Starting from an exemplar at (resourceType, resourceName, UNKNOWN...) — where PatternType.UNKNOWN sorts first — it iterates forward. For each ACL it computes matchesUpTo (the common-prefix length, :366):

  • If the ACL name is fully a prefix of the request name: a LITERAL ACL only matches on an exact full-length match (otherwise it is skipped while continuing to scan for PREFIXED ACLs); a PREFIXED ACL matches.
  • If the ACL name diverges from the request name (and is not the wildcard *), the scan has left the relevant region; it jumps by rebuilding the exemplar truncated to the divergence point and re-seeking, so it can still find shorter prefix ACLs (:411).
  • It stops as soon as the resourceType changes (the section boundary, :392).

The worked example in the source comment (:324) shows how, scanning candidates for topic foobar, the iterator visits the relevant PREFIX/LITERAL entries and skips irrelevant ones by re-seeking. This keeps authorization sub-linear in the total ACL count.

Concurrency & threading

StateOwner thread(s)Guard
TLS handshake buffers, SASL state machineThe single network/Processor thread owning the connectionNone needed — one thread per KafkaChannel (see ch.06)
StandardAuthorizer.dataRead on request-handler threads; replaced on the metadata-publish threadvolatile field + fully immutable StandardAuthorizerData/AclCache
Controller ACL/SCRAM mapsThe single QuorumController event-loop threadTimelineHashMap/TimelineHashSet (snapshot-versioned, single-writer)
SCRAM CredentialCacheWritten by ScramPublisher; read by SASL servers on network threadsConcurrent cache; per-user replace

The authorizer's design comment is explicit: "we expect one writer and multiple readers" (StandardAuthorizer.java:68). Because each data snapshot is immutable and swapped atomically, readers never see a partially-updated ACL set; the cost is that each ACL add allocates a new persistent-collection node rather than mutating in place (AclCache.addAcl returns a new AclCache, AclCache.java:75).

Configuration reference

Config keyDefaultEffect
authorizer.class.nameunset (no authz)Authorizer implementation; org.apache.kafka.metadata.authorizer.StandardAuthorizer for KRaft.
super.users""Semicolon-separated User:name list; these bypass all ACLs incl. DENY (StandardAuthorizer.java:58).
allow.everyone.if.no.acl.foundfalse (→ DENIED)Default result when no ACL references a resource (StandardAuthorizer.java:210).
principal.builder.classDefaultKafkaPrincipalBuilderMaps auth context → KafkaPrincipal (BrokerSecurityConfigs.java:72).
ssl.client.authnonerequired/requested/none mutual-TLS policy (BrokerSecurityConfigs.java:86).
ssl.principal.mapping.rulesDEFAULTRewrite X.500 DN → principal name (BrokerSecurityConfigs.java:53).
ssl.endpoint.identification.algorithmhttpsClient-side hostname verification of the server cert.
ssl.allow.dn.changes / ssl.allow.san.changesfalse / falsePermit DN/SAN changes on dynamic keystore update (BrokerSecurityConfigs.java:123/:128).
sasl.enabled.mechanisms[GSSAPI]Mechanisms the broker will accept (BrokerSecurityConfigs.java:97).
sasl.mechanism.inter.broker.protocolGSSAPIMechanism brokers use to talk to each other (BrokerSecurityConfigs.java:175).
sasl.mechanismGSSAPIClient-selected mechanism (SaslConfigs.java:35).
connections.max.reauth.ms0 (disabled)KIP-368 forced re-auth interval (BrokerSecurityConfigs.java:109).
sasl.server.max.receive.size524288Max bytes accepted before/during initial SASL auth (BrokerSecurityConfigs.java:117).
sasl.kerberos.principal.to.local.rulesDEFAULTKerberos short-namer rules (BrokerSecurityConfigs.java:63).
sasl.jaas.configunsetInline JAAS for a listener/mechanism (SaslConfigs.java:37).

Failure modes, edge cases & recovery

  • Authentication failure delivery. On failure, the server builds a response but defers sending it; handleAuthenticationFailure()sendAuthenticationFailureResponse() flushes the error and then the connection is closed (SaslServerAuthenticator.java:599). The client surfaces a SaslAuthenticationException; a small delay before disconnect mitigates auth-failure brute forcing (DelayedResponseAuthenticationException).
  • Wrong security protocol. A plaintext client hitting a TLS port (or vice-versa) fails the handshake/first-frame parse with the diagnostic InvalidRequestException described above.
  • Unsupported mechanism. Returns UNSUPPORTED_SASL_MECHANISM plus the supported list, letting well-behaved clients renegotiate.
  • Re-auth mechanism/principal change. Rejected with explicit messages (REAUTH_BAD_MECHANISM; ensurePrincipalUnchanged).
  • Authorizer not yet loaded. Non-super-user requests get AuthorizerNotReadyException rather than a fail-open allow; the listener future also gates connection acceptance.
  • Oversized SASL frame. A receive exceeding sasl.server.max.receive.size raises an InvalidReceiveException rethrown as SaslAuthenticationException (SaslServerAuthenticator.java:265), preventing pre-auth memory exhaustion.
  • Bulk ACL delete cap. A single deleteAcls filter that would remove more than MAX_RECORDS_PER_USER_OP ACLs fails the whole request (BoundedListTooLongException, AclControlManager.java:207) rather than partially applying.

Invariants & guarantees

Invariant

ACL consistency. Because ACLs are metadata-log records, every node converges to the same ACL set at a given log offset, and AclPublisher applies deltas in commit order. There is no window where a later ALLOW is visible before an earlier DENY (AclPublisher.java:80).

Invariant

Deny supremacy. For non-super-users, a matching DENY always wins; ALLOW can never override DENY regardless of pattern specificity (StandardAuthorizerData.java:424).

Invariant

Identity stability. A connection's KafkaPrincipal is fixed at authentication and cannot change across re-authentication (SaslServerAuthenticator.java:642).

Invariant

Secret minimization. Passwords are never stored; only SCRAM-derived storedKey/serverKey reach the controller, and credential toString() redacts them (ScramCredentialData.java:82).

Interactions with other subsystems

  • Network & threading: the authenticator and TLS transport are driven non-blockingly by the same processor thread that owns the socket; the principal lands in the channel before any request is dispatched.
  • Request processing (KafkaApis): every handler funnels through AuthHelper/Authorizer.authorize; AclApis serves CreateAcls/DeleteAcls/DescribeAcls.
  • KRaft controller: AclControlManager and ScramControlManager live in the controller and emit the ACL/credential records.
  • Metadata propagation: ACLs and SCRAM credentials ride the same metadata-log replication and are applied locally by AclPublisher/ScramPublisher.
  • KRaft consensus: the controller-quorum listener typically authenticates inter-controller traffic and is an early-start listener for the authorizer.
  • Transactions and group coordination rely on WRITE TRANSACTIONAL_ID and READ/DESCRIBE GROUP ACLs respectively; quotas are keyed partly on the same principal.

Design rationale & evolution

Design rationale

Storing ACLs in __cluster_metadata (KIP-801) removes the last reason to keep an external coordination store for security in KRaft: ACLs become ordinary replicated records with the same durability, ordering and snapshot semantics as topic metadata, and the immutable-snapshot authorizer gives lock-free reads on the hot authorize path.

Design rationale

SCRAM (RFC 5802) was adopted so that a password store can live in the cluster without the cluster ever holding the cleartext password; KIP-554 exposes SCRAM credential management over the Kafka API (AlterUserScramCredentials) so it works in a ZooKeeper-free world.

Design rationale

KIP-368 added periodic SASL re-authentication chiefly to bound the validity window of OAUTHBEARER tokens on long-lived connections; the SASL_AUTHENTICATE API was bumped to v1 to carry SessionLifetimeMs back to the client.

Other relevant history: KIP-43/KIP-12 introduced the SASL handshake and mechanism negotiation; KIP-48 added delegation tokens; KIP-684 added the ssl.principal.mapping.rules used by the default builder; and ZooKeeper-based ACL/SCRAM storage was removed entirely in the 4.0 KRaft-only line.

Gotchas / operational notes

Gotcha

On a SASL_SSL listener, TLS client-cert auth is disabled by default even if ssl.client.auth=required is set broker-wide — you must set the listener-prefixed listener.name.<name>.ssl.client.auth to re-enable it; otherwise the broker logs a warning and forces NONE (ChannelBuilders.java:152). With both SASL and mTLS, the SASL authorization id wins as the principal.

Gotcha

An empty super.users plus the default allow.everyone.if.no.acl.found=false means that the moment you enable the authorizer, everything is denied until you add ACLs — including inter-broker traffic, unless your broker principals are listed as super-users or have explicit CLUSTER_ACTION ACLs.

Gotcha

The authorize fast path requires a LITERAL action pattern and throws otherwise (StandardAuthorizerData.java:230); PREFIXED patterns exist only in stored ACLs, not in the per-request action. Custom authorizers must respect the same contract that AuthHelper always passes literal resources.

Note

Audit logging is emitted to the kafka.authorizer.logger logger: denies that were explicitly requested log at INFO, allows at DEBUG, and "filter/describe" style checks (where no access is actually granted) only at TRACE (StandardAuthorizerData.java:279). The logIfAllowed/logIfDenied flags on each Action control this.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.