krivaltsevich.com Kafka Internals4.4

11 · The KRaft Controller

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

The KRaft controller is a deterministic replicated state machine driven by the cluster metadata log. A single class, QuorumController, runs a one-thread event loop that turns every administrative request into a list of metadata records, hands them to the Raft layer, and applies them to in-memory state under a strict write → commit → apply discipline. Because every controller in the quorum replays the same committed records in the same order, the active controller and the standbys converge on byte-identical state, and a failover is just another node deciding to start writing. This chapter dissects the event loop, the timeline data structures that make uncommitted state reversible, the per-domain control managers, broker liveness and fencing, leader/ISR recomputation, and the periodic background tasks.

Role & responsibilities

In a KRaft cluster a small set of nodes (typically 3 or 5) form the controller quorum. They run a Raft implementation (covered in KRaft Consensus (Raft)) over the internal __cluster_metadata topic. The Raft leader of that log becomes the active controller; the others are standbys that merely replay what the active controller writes. The controller owns all hard state that used to live in ZooKeeper: topic and partition definitions, ISR membership, leadership, broker and controller registrations, dynamic configs, client quotas, SCRAM credentials, delegation tokens, ACLs, finalized feature levels (including metadata.version), and producer-ID block allocation.

The class Javadoc states the contract plainly: the leader of the metadata log becomes the active controller, all other nodes stay in standby and cannot create new log entries; the controller is single-threaded so as to avoid complex locking; and the API is asynchronous and futures-based because many operations may be in flight, each future completing only once its results are durable in the metadata log metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:156.

Key idea

The controller never mutates durable ("hard") state directly in response to a request. It computes records, appends them to the Raft log, and only the act of replaying those records changes in-memory state. The same replay() path runs on the active controller (just after the write) and on every standby (after commit), which is what keeps the quorum identical.

Cluster formation & bootstrap

A KRaft cluster does not spring into existence when the first broker starts, unlike the ZooKeeper era, storage must be formatted first, and that step is where the cluster's identity and its seed metadata are minted. The whole bootstrap is offline tooling in kafka-storage (StorageTool.scala) plus the Formatter it drives.

  1. Generate a cluster id. A cluster is named by a 22-character base64 Uuid. kafka-storage random-uuid simply prints Uuid.randomUuid core/src/main/scala/kafka/tools/StorageTool.scala:100. The same id must be passed to every node so they agree on which cluster they belong to.
  2. Format every log directory. kafka-storage format --cluster-id X [--release-version Y] runs runFormatCommand, which refuses non-KRaft configs and builds a Formatter from the node's own server.properties (node id, the configured log.dirs, the controller listener name) StorageTool.scala:116, StorageTool.scala:175. --release-version selects the initial metadata.version (default MetadataVersion.LATEST_PRODUCTION, minimum MINIMUM_VERSION) StorageTool.scala:336.
  3. Write meta.properties into each directory. Formatter.doFormat constructs a V1 MetaProperties carrying cluster.id and node.id, then for every empty log dir mints a per-directory directory.id (a fresh Uuid, used by JBOD to identify the disk) and writes the file through a MetaPropertiesEnsemble.Copier metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:402, Formatter.java:436. The on-disk keys are version, cluster.id, node.id, directory.id server-common/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java:35; the file name is meta.properties server-common/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java:74. At startup the ensemble is re-read and verify'd so a node never joins the wrong cluster or boots with a half-formatted disk.
  4. Seed the bootstrap metadata. The initial metadata.version (and any extra finalized features or --add-scram records) is packaged as a BootstrapMetadata, a list of records, the first always a FeatureLevelRecord for metadata.version metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:104, computed by calculateBootstrapMetadata Formatter.java:384. On a controller node, these records are written as the Raft bootstrap snapshot 00000000000000000000-0000000000.checkpoint (BOOTSTRAP_SNAPSHOT_ID = (0,0)) inside the __cluster_metadata-0 directory, via writeBoostrapSnapshot Formatter.java:506, raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java:49. (A legacy binary file literally named bootstrap.checkpoint, BINARY_BOOTSTRAP_FILENAME, is still read as a fallback by BootstrapMetadata.fromDirectory when no snapshot is present BootstrapMetadata.java:46.) When the active controller first activates on an empty log, ActivationRecordsGenerator replays exactly these bootstrap records as the log's opening batch, which is why the controller refuses to start without a pinned metadata.version.

The same format step also decides how the initial controller quorum is established, and the two models are mutually exclusive:

  • Static quorum. Set controller.quorum.voters (a fixed id@host:port list) in the config and format with no quorum flag. kraft.version defaults to 0; the voter set is configuration, not metadata, and cannot be changed without a rolling config update Formatter.java:380.
  • Dynamic quorum (KIP-853). Leave controller.quorum.voters empty (use controller.quorum.bootstrap.servers instead) and pick one mutually-exclusive flag at format time StorageTool.scala:347: --standalone formats this controller as a single-node quorum (its voter entry is synthesized from its own advertised controller listener, id@host:port:directoryId) StorageTool.scala:289; --initial-controllers id@host:port:dirId,… bakes a specific multi-node voter set, the identical string must be used to format every node so they share one directory-id-stamped quorum; and --no-initial-controllers formats a node that will join an already-bootstrapped dynamic quorum later. Any of these raises kraft.version to 1, and the voter set is then stored in the metadata log, so controllers can be added or removed at runtime StorageTool.scala:148, Formatter.java:378. A controller with empty static voters and none of these flags is rejected StorageTool.scala:165.
kafka-storage random-uuidprints Uuid.randomUuid ⇒ cluster.id
kafka-storage format--cluster-id X --release-version Y
meta.propertiescluster.id · node.id · directory.id
bootstrap snapshot00…0-0.checkpoint · FeatureLevelRecord(metadata.version)
controller quorum?
controller.quorum.votersfixed id@host:port in config
--standalone / --initial-controllersvoter set stored in metadata log
controllers run Raftleader of __cluster_metadata = active controller
ActivationRecordsGeneratorreplays bootstrap records on the empty log
brokers register (fenced)fetch __cluster_metadata
brokers unfencedcluster serving
Brand-new cluster bootstrap: format mints identity and seed metadata into every log dir; the first elected controller replays the bootstrap records; brokers then register and catch up.
tooling / client-side step on-disk artifact (log / snapshot) controller · quorum · Raft broker / controller component cylinder = file / store rounded italic = decision step / data flow label = condition on the edge

Where it lives in the code

ConcernPrincipal classFile
Event loop, write/commit/apply, failoverQuorumControllermetadata/.../controller/QuorumController.java
Public async APIController (interface)metadata/.../controller/Controller.java
Single-threaded event queue + timeouts/deferralsKafkaEventQueueserver-common/.../queue/KafkaEventQueue.java
Pending-write "purgatory"DeferredEventQueueserver-common/.../deferred/DeferredEventQueue.java
Offsets: write / committed / stable, snapshots, txnsOffsetControlManagermetadata/.../controller/OffsetControlManager.java
Topics, partitions, ISR, leader election, reassignmentReplicationControlManagermetadata/.../controller/ReplicationControlManager.java
Broker / controller registration, fencingClusterControlManagermetadata/.../controller/ClusterControlManager.java
Broker liveness (soft state, off the log)BrokerHeartbeatManager, BrokerHeartbeatTrackermetadata/.../controller/BrokerHeartbeat*.java
Recompute leader/ISR/ELR for one partitionPartitionChangeBuildermetadata/.../controller/PartitionChangeBuilder.java
Dynamic configsConfigurationControlManagermetadata/.../controller/ConfigurationControlManager.java
Feature levels & metadata.version (KIP-584)FeatureControlManagermetadata/.../controller/FeatureControlManager.java
Producer-ID block allocationProducerIdControlManagermetadata/.../controller/ProducerIdControlManager.java
ACLs / SCRAM / tokens / quotasAclControlManager, ScramControlManager, DelegationTokenControlManager, ClientQuotaControlManagermetadata/.../controller/*.java
Replica placementStripedReplicaPlacermetadata/.../metadata/placement/StripedReplicaPlacer.java
Reversible in-memory stateSnapshotRegistry, TimelineHashMap/Set/Long/Objectserver-common/.../timeline/*.java
Background work (leader balance, unclean recovery, no-op, token expiry)PeriodicTaskControlManager, PeriodicTaskmetadata/.../controller/PeriodicTask*.java

Core concepts & terminology

Active controller
The node whose curClaimEpoch != -1. It alone appends records. isActiveController(claimEpoch) is literally claimEpoch != -1 QuorumController.java:1159.
Claim epoch
curClaimEpoch, the Raft leader epoch at which this node is active, or -1. It is volatile int: written only from the controller thread, but readable from RPC threads QuorumController.java:1483.
Record
An ApiMessageAndVersion wrapping a generated metadata message (e.g. TopicRecord, PartitionChangeRecord, FenceBrokerRecord), see Record Format & Batches.
Hard vs. soft state
Hard state appears in the metadata log and is replayed (broker registrations, ISR, configs). Soft state lives only in RAM on the active controller (heartbeat times, the active-broker set) and is rebuilt on failover.
Write / committed / stable offset
Three offsets the controller tracks: nextWriteOffset (next slot the active controller will append to), lastCommittedOffset (Raft high watermark), and lastStableOffset (the highest offset that reads may observe, accounting for open metadata transactions).
Purgatory
The DeferredEventQueue: write events parked by the offset they were appended at, completed only when the stable offset reaches that offset.

The event loop and the asynchronous API

All controller work funnels through one KafkaEventQueue served by a single thread named {prefix}event-handler, where the prefix defaults to quorum-controller-{nodeId}- QuorumController.java:406, KafkaEventQueue.java:43. The queue is a circular doubly-linked list of EventContext nodes guarded by a single ReentrantLock, plus a TreeMap<Long,EventContext> (deadlineMap) keyed by monotonic-nanosecond deadline for timeouts and deferred work KafkaEventQueue.java:173. Events can be APPENDed (tail), PREPENDed (head, used once, for activation) or DEFERRED (run when their scheduled time arrives) KafkaEventQueue.java:329.

Three event kinds wrap the work:

  • ControllerReadEvent<T>, runs a Supplier<T> against committed in-memory state and completes a future. Reads use offsetControl.lastStableOffset() as the epoch so they never observe uncommitted data QuorumController.java:658.
  • ControllerWriteEvent<T>, runs a ControllerWriteOperation that returns a ControllerResult<T> = (records, response). This is the heart of the controller QuorumController.java:765.
  • ControllerEvent, internal control work (Raft callbacks, activation), no future.

The public Controller interface methods are thin wrappers that build a write/read event and return its CompletableFuture; e.g. alterPartition, createTopics, registerBroker, processBrokerHeartbeat, allocateProducerIds, updateFeatures QuorumController.java:1762, Controller.java:67. The future does not complete when the operation runs; it completes when the records are durable.

KafkaApis
(RPC thread)
ControllerWriteEvent
(event-handler thread)
RaftClientDeferredEventQueue
(purgatory)
CompletableFuture
appendWriteEvent (queue.append)
1 · check curClaimEpoch ≠ -1
2 · generateRecordsAndResult ⇒ records
3 · prepareAppend(epoch, records)
4 · replay() each record — APPLY in RAM
5 · schedulePreparedAppend()
6 · offsetControl.handleScheduleAppend(last)
7 · add(offset, this) — park future
handleCommit(batch) once replicated
offsetControl.handleCommitBatch(batch)
completeUpTo(lastStableOffset)
future.complete(response)
Lifecycle of a write: compute records, apply optimistically, stage and schedule the Raft append, park the future in purgatory, and complete it only once the batch commits and the stable offset advances.
broker thread / event Raft client purgatory (deferred queue) caller's future synchronous call async commit / park / wake callback / completion box = work done within a lane lanes read top → bottom over time

Write → commit → apply, in detail

Inside ControllerWriteEvent.run() the active-controller check throws NotControllerException (via ControllerExceptions.newWrongControllerException) if this node lost leadership. Then op.generateRecordsAndResult() produces the records. Two branches follow QuorumController.java:791:

  1. No records, the "write" was really a read. If the purgatory is empty it completes immediately; otherwise it is parked at deferredEventQueue.highestPendingOffset() so the caller still sees a linearizable view that waits out any in-flight writes QuorumController.java:802.
  2. Records present, appendRecords drives the appender callback, which: calls raftClient.prepareAppend(controllerEpoch, records) to stage them and learn the last offset; then immediately calls replay() on each record to update in-memory state; then raftClient.schedulePreparedAppend() to actually queue them for replication; and finally offsetControl.handleScheduleAppend(lastOffset) QuorumController.java:825.

The ordering, apply to memory before scheduling replication, is deliberate: if a record cannot be applied, that is a fatal bug, and we must discover it before the data is replicated. A failure there goes through fatalFaultHandler QuorumController.java:843. Because the active controller has already applied the records, the commit callback does not re-apply them; it only advances offsets and drains the purgatory. Standbys, which never applied them, do replay them in handleCommit QuorumController.java:991.

Invariant

A user-initiated write future completes successfully only after lastStableOffset >= the offset at which its records were appended. If the node renounces leadership first, the purgatory is failed with NotControllerException and the client retries against the new controller, so the client never sees a write that did not commit.

Atomic vs. non-atomic batches

appendRecords respects ControllerResult.isAtomic(). Atomic results must fit in one batch (≤ maxRecordsPerBatch, default 10000) and are written as a single Raft batch, "all or nothing" QuorumController.java:898, QuorumController.java:180. Non-atomic results are split across as many batches as needed, each producing its own in-memory snapshot so any prefix can be reverted on failure. Any single user operation is capped at MAX_RECORDS_PER_USER_OP (= 10000) via BoundedList backing in the managers QuorumController.java:187.

Timeline data structures: reversible in-memory state

The controller's whole correctness argument depends on being able to (a) read a consistent committed view while uncommitted writes pile up, and (b) discard uncommitted writes wholesale on failover or transaction abort. This is provided by the timeline collections in org.apache.kafka.timeline, all coordinated by a SnapshotRegistry server-common/.../timeline/SnapshotRegistry.java:37.

SnapshotRegistry and snapshots

A SnapshotRegistry holds a doubly-linked list of Snapshot objects, each keyed by an epoch which the controller equates to a metadata offset. Every timeline collection registers itself (via a WeakReference) as a Revertable SnapshotRegistry.java:120. Key operations:

  • idempotentCreateSnapshot(epoch) / getOrCreateSnapshot(epoch), append a snapshot tier; epochs must be non-decreasing SnapshotRegistry.java:220.
  • revertToSnapshot(epoch), delete all later snapshots and call handleRevert(), restoring every registered collection to its state at that epoch SnapshotRegistry.java:253.
  • deleteSnapshotsUpTo(epoch), merge/drop old tiers once they are below the stable offset and can never be reverted to SnapshotRegistry.java:297.
  • reset(), wipe everything (used before loading a Raft snapshot) SnapshotRegistry.java:351.

How a snapshotted hash table works

SnapshottableHashTable is the engine under TimelineHashMap and TimelineHashSet server-common/.../timeline/SnapshottableHashTable.java:89. The current tier holds live data (separate-chaining hash table); each snapshot tier holds only the entries that were overwritten or deleted since that snapshot, a copy-on-write delta. Every element carries a startEpoch. To read at epoch E, it first checks the current tier (returning the element if its startEpoch ≤ E), otherwise walks snapshot tiers from E forward; encountering an element with too-new an epoch proves the value at E was absent, so it returns null early SnapshottableHashTable.java:311. On mutation, if the previous value belonged to the most recent snapshot, it is copied into that snapshot's delta table before being overwritten SnapshottableHashTable.java:383. executeRevert drops every element whose startEpoch > targetEpoch and re-adds the saved deltas SnapshottableHashTable.java:413. There are sibling primitives TimelineLong, TimelineInteger, and TimelineObject<T> for scalar reversible state (e.g. FeatureControlManager.metadataVersion is a TimelineObject<Optional<MetadataVersion>>) FeatureControlManager.java:152.

current tier (live)topicA · e140
topicB · e105
snapshot @ off=130overwrite delta, topicA · e120
snapshot @ off=100topicX · e90 (deleted later)
read @ LATESTserve from the current tier
read @ 130topicB (e105) from current; topicA from snap130 (e120 ≤ 130)
revertToSnapshot(100)drop snap130; remove current elems with e ≥ 100; re-add saved deltas
Copy-on-write tiers let one map serve many committed views and roll back uncommitted ones: a read at an old offset starts in the current tier and walks snapshot deltas; a revert discards every element newer than the target and restores the saved deltas.
timeline tier (map state) read at an offset revert / discard path cylinder = tier / store read consults tier rollback to snapshot eN = element startEpoch
Invariant

The active controller always keeps an in-memory snapshot at lastStableOffset. OffsetControlManager.activate creates one before going active, and deactivate reverts to it, guaranteeing that renouncing leadership throws away exactly the records that were appended but not yet committed OffsetControlManager.java:252, OffsetControlManager.java:260.

The three offsets and metadata transactions

OffsetControlManager tracks nextWriteOffset, lastCommittedOffset/Epoch, lastStableOffset, and transactionStartOffset OffsetControlManager.java:118. On every committed batch it sets the committed offset and calls maybeAdvanceLastStableOffset: with no open transaction the stable offset equals the committed offset; with an open transaction it is clamped to min(transactionStartOffset-1, lastCommittedOffset) so readers never observe partially-applied transactional batches OffsetControlManager.java:319. Metadata transactions (KIP-868) are themselves records: BeginTransactionRecord snapshots the pre-transaction offset, EndTransactionRecord closes it, and AbortTransactionRecord calls revertToSnapshot(transactionStartOffset-1) to undo the whole transaction OffsetControlManager.java:382. These are used to make multi-record operations such as activation atomic across the log.

Becoming (and ceasing to be) the active controller

Leadership changes arrive through the Raft listener QuorumMetaLogListener.handleLeaderChange, which runs on the controller thread QuorumController.java:1107. The transitions:

  • Standby → active: when the new leader is this node, claim(epoch, raftClient.logEndOffset()) sets curClaimEpoch, calls offsetControl.activate and clusterControl.activate (which builds a fresh BrokerHeartbeatManager), and prepends a completeActivation write event so it runs before any queued client work QuorumController.java:1163.
  • Active → standby (a new epoch elects someone else, or a fault): renounce() calls raftClient.resign(curClaimEpoch), resets curClaimEpoch = -1, fails the entire purgatory with NotControllerException, and deactivates the offset/cluster/periodic managers (which reverts in-memory state to lastStableOffset) QuorumController.java:1213.

The prepended activation event runs ActivationRecordsGenerator.generate. On an empty log it writes the bootstrap metadata (including the initial FeatureLevelRecord for metadata.version from the kafka-storage tool), wrapped in a Begin/EndTransactionRecord pair when the metadata version supports metadata transactions; on a non-empty log it may finish or abort a transaction left dangling by a previous controller's crash metadata/.../controller/ActivationRecordsGenerator.java:154. Only after these records are applied does processBatchEndOffset call periodicControl.activate(), so background tasks see the correct metadata.version QuorumController.java:1203.

Key idea

Failover requires no state transfer. The new active controller has already replayed the entire committed log as a standby, so it is identical to the old one up to lastStableOffset. "Becoming active" is just: stop replaying others' records, start writing your own at logEndOffset, and rebuild the purely-soft heartbeat state from incoming heartbeats.

The replay switch: applying records

replay(ApiMessage, Optional<snapshotId>, offset) is the single dispatch point that routes each committed record to the owning manager QuorumController.java:1238. A representative slice:

Record typeHandlerEffect
RegisterBrokerRecordclusterControl.replay(..., offset)add/replace broker registration; remember its offset
FenceBrokerRecord/UnfenceBrokerRecord/BrokerRegistrationChangeRecordclusterControl.replayflip fenced / controlled-shutdown / directory state
TopicRecord/PartitionRecord/PartitionChangeRecord/RemoveTopicRecordreplicationControl.replaycreate/modify/delete topics & partitions, ISR, ELR
ConfigRecordconfigurationControl.replayset/clear one dynamic config key
FeatureLevelRecordfeatureControl.replayfinalize a feature / set metadata.version
ProducerIdsRecordproducerIdControlManager.replayadvance the next producer-ID block
Begin/End/AbortTransactionRecordoffsetControl.replayopen/close/roll back a metadata transaction
NoOpRecord, no state; used only to advance the log (liveness)
ZkMigrationStateRecord, retained as a no-op for clusters that migrated from ZK before 4.0

An unhandled type throws, which becomes a fatal fault, the controller refuses to silently skip metadata it does not understand QuorumController.java:1331.

ReplicationControlManager: topics, partitions, ISR, leadership

This is the largest manager. Its core timeline state ReplicationControlManager.java:326:

  • topicsByName: TimelineHashMap<String,Uuid> and topics: TimelineHashMap<Uuid,TopicControlInfo>; each TopicControlInfo holds parts: TimelineHashMap<Integer,PartitionRegistration> ReplicationControlManager.java:253.
  • brokersToIsrs: BrokersToIsrs and brokersToElrs: BrokersToElrs, reverse indexes from a broker to the partitions where it sits in the ISR / ELR, plus a "partitions with no leader" set. These power fast fencing and leader-election sweeps ReplicationControlManager.java:353.
  • imbalancedPartitions: TimelineHashSet<TopicIdPartition>, partitions whose leader is not the preferred replica ReplicationControlManager.java:369.
  • reassigningTopics and directoriesToPartitions for in-progress reassignments and per-log-directory partition tracking.

PartitionChangeBuilder: the leader/ISR algorithm

Every change to a partition's leadership, ISR, ELR, replica set, or recovery state is computed by PartitionChangeBuilder, which takes the current PartitionRegistration, an isAcceptableLeader predicate, the metadataVersion, the effective min.insync.replicas, and whether ELR is enabled (the target ISR is supplied separately via setTargetIsr/setTargetIsrWithBrokerStates), and emits an Optional<ApiMessageAndVersion> wrapping a PartitionChangeRecord, empty when nothing changed PartitionChangeBuilder.java:106. Its Election enum picks the policy PartitionChangeBuilder.java:68:

PREFERRED
switch to the first (preferred) replica only if it is in the ISR, used by the leader-balancing task.
ONLINE
pick any ISR member to keep the partition online, the default, used on fencing/shutdown.
UNCLEAN
pick a replica even outside the ISR (potential data loss), used only when unclean election is enabled for the topic.

changeRecordIsNoOp guards against emitting empty records by checking every mutable field is unset PartitionChangeBuilder.java:53. With ELR enabled (KIP-966) the builder also maintains the ELR and "last known ELR" sets so the leader-candidate pool cannot silently shrink below min.insync.replicas.

AlterPartition

When a partition leader proposes an ISR change, alterPartition validates the broker epoch and per-partition leader epoch, then runs a PartitionChangeBuilder with setTargetIsrWithBrokerStates ReplicationControlManager.java:1078. A subtle case: if applying the ISR change also completes an in-flight reassignment and thereby moves leadership away from the requesting broker, the controller both writes the record and returns NEW_LEADER_ELECTED to tell the old leader to refetch metadata, "we usually only do one or the other" ReplicationControlManager.java:1157. ISR shrink/expand from the broker side is detailed in Replication, ISR & High Watermark.

generateLeaderAndIsrUpdates: the fan-out primitive

Broker fencing, unfencing, unregistration, controlled shutdown, unclean shutdown, and directory-offline all funnel into generateLeaderAndIsrUpdates(context, brokerToRemove, brokerToAdd, brokerWithUncleanShutdown, records, partitionIterator), which walks the affected partitions (via the reverse indexes) and runs a PartitionChangeBuilder per partition ReplicationControlManager.java:2051. So a single fenced broker yields one BrokerRegistrationChangeRecord plus a PartitionChangeRecord for every partition it led or was in the ISR of ReplicationControlManager.java:1399.

ClusterControlManager & broker liveness

ClusterControlManager owns the hard registration state: brokerRegistrations: TimelineHashMap<Integer,BrokerRegistration>, the offset of each broker's registration record, the controller registrations, and a directory→broker map ClusterControlManager.java:234. Registration (registerBroker) checks the cluster ID, rejects ZK brokers, validates that the broker supports every finalized feature version, allocates a new broker epoch (the registration record's offset), and, when the incarnation ID is new, first emits cleanup records for the previous incarnation (including unclean-shutdown ISR removal) ClusterControlManager.java:349. A returning broker registers fenced; it must catch up before it can be unfenced.

Heartbeats: soft state off the log

Crucially, heartbeats are not written to the metadata log. The BrokerHeartbeatManager (and its BrokerHeartbeatTracker) keep the last-contact time and last-reported metadata offset of each broker in plain RAM, and exist only on the active controller BrokerHeartbeatManager.java:44. The Javadoc spells out the trade-off: keeping heartbeats off the log keeps the metadata partition small, at the cost of extra fencing latency right after a failover, since the new controller does not know when each broker last checked in BrokerHeartbeatManager.java:51.

processBrokerHeartbeat is special-cased for overload resilience. The request first updates the lock-free BrokerHeartbeatTracker (a ConcurrentHashMap<BrokerIdAndEpoch,Long>) directly from the RPC thread via clusterControl.trackBrokerHeartbeat, so the last-contact time lands even if the controller thread is backlogged; only then is a write event queued QuorumController.java:1978, BrokerHeartbeatTracker.java:32. If that event times out on the queue, a completion hook calls processExpiredBrokerHeartbeat which refreshes the contact time so the broker is not fenced for the controller's own slowness QuorumController.java:2025, ReplicationControlManager.java:1718.

The broker state machine

On the controller thread, calculateNextBrokerState turns the heartbeat's wantFence/wantShutDown flags and the broker's currentMetadataOffset into a transition over four states BrokerHeartbeatManager.java:402:

registerBroker FENCED caught up to regBrokerOffset UNFENCED wantFence / session timeout → back to FENCED  ·  wantShutDown & leads no partition → SHUTDOWN_NOW CONTROLLED_SHUTDOWN controlledShutdownOffset ≤ lowestActiveOffset & no leaderships SHUTDOWN_NOW broker may exit
The broker state machine driven by calculateNextBrokerState: a fenced broker is unfenced only once currentMetadataOffset ≥ its registration record's offset; UNFENCED can re-fence on a missed session, shut down immediately when it leads nothing, or drain via controlled shutdown when it still leads partitions.
pill = broker state FENCED (not serving) UNFENCED (serving) transition (guard above arrow) ● = initial · ◎ = terminal (broker exits)

A fenced broker is unfenced only when it reports a metadata offset at least equal to the offset of its own RegisterBrokerRecord, i.e. it has caught up on metadata BrokerHeartbeatManager.java:415. For controlled shutdown, the active set is a TreeSet ordered by reported metadata offset; the broker may finish only when its controlledShutdownOffset is ≤ the lowest active offset (every still-active broker has seen the leadership-move records) and it leads no partitions BrokerHeartbeatManager.java:456. The shutdown offset itself is set in processBatchEndOffset to the offset at which the leadership-move records were appended QuorumController.java:2018.

Fencing stale brokers

The periodic maybeFenceStaleBroker task asks the tracker for one expired session (maybeRemoveExpired), validates the broker still exists at that epoch, and if so fences exactly one broker per run so each fence's effect is visible before the next ReplicationControlManager.java:1739. Expiry is (now > t) && (t + sessionTimeoutNs < now); the session timeout defaults to 9 s BrokerHeartbeatTracker.java:131, ClusterControlManager.java:84. The task is scheduled to sample 8× per timeout, so a dead broker is fenced within roughly 112.5% of the session timeout QuorumController.java:1684.

Other control managers

FeatureControlManager, metadata.version & features (KIP-584)

Holds finalizedVersions: TimelineHashMap<String,Short> and metadataVersion: TimelineObject<Optional<MetadataVersion>> FeatureControlManager.java:147. updateFeatures validates each requested level: upgrade vs. (safe/unsafe) downgrade rules, that the controller's own software supports it, and, via reasonNotSupported, that every registered broker and controller supports the target level; if any controller has not yet registered, the update is refused FeatureControlManager.java:321. metadata.version gets extra validation in updateMetadataVersion; kraft.version upgrades go through the Raft client. Feature updates are written as an atomic batch of FeatureLevelRecords FeatureControlManager.java:179. On replay, a record outside the controller's locally-supported range is a fatal error, guarding against a node applying a feature it cannot implement FeatureControlManager.java:447. metadata.version is the gate the whole controller waits on: many code paths call metadataVersionOrThrow(), and the build refuses to start without bootstrap metadata specifying it QuorumController.java:395.

ProducerIdControlManager

Allocates producer-ID blocks for the transaction/idempotence subsystem (Transactions & EOS). State is a TimelineObject<ProducerIdsBlock> plus a TimelineLong broker epoch ProducerIdControlManager.java:70. generateNextProducerId checks the broker epoch, carves a fixed-size block (PRODUCER_ID_BLOCK_SIZE) starting at the current high-water producer ID, and writes a ProducerIdsRecord recording the next block's first ID; replay rejects any record that does not advance the counter ProducerIdControlManager.java:84.

ConfigurationControlManager and the rest

Dynamic configs live in a nested TimelineHashMap<ConfigResource,TimelineHashMap<String,String>>, mutated by incrementalAlterConfigs/legacyAlterConfigs with optional policy and existence checks; a validateOnly request returns the result with the records stripped (withoutRecords()) ConfigurationControlManager.java:80, QuorumController.java:1919. AclControlManager, ScramControlManager, DelegationTokenControlManager, and ClientQuotaControlManager each own a slice of timeline state and a replay method, following the same record-then-apply pattern; see Security and Quotas.

Replica placement: StripedReplicaPlacer

When creating topics or partitions the controller asks a ReplicaPlacer (default StripedReplicaPlacer) to choose broker sets QuorumController.java:205. Its goals, in priority order, are: spread replicas evenly across racks (highest priority in multi-rack clusters), then evenly across brokers, then prefer unfenced brokers; with two hard constraints, at most one replica per broker, and the leader (first replica) must be unfenced metadata/.../placement/StripedReplicaPlacer.java:33. The algorithm loads brokers into per-rack lists (fenced and unfenced kept separate), assigns racks round-robin with a random starting offset, and round-robins replicas within a rack, producing the characteristic "striped" assignment where each successive partition rotates its starting rack. The usable-broker stream comes from the heartbeat manager and excludes shutting-down brokers and those with no uncordoned directories BrokerHeartbeatManager.java:330.

Periodic background tasks

Background work is modeled as PeriodicTasks registered with the PeriodicTaskControlManager, which schedules each as a DEFERRED event via the scheduleDeferred/cancelDeferred hooks on the event queue and reschedules it after each run QuorumController.java:532, metadata/.../controller/PeriodicTaskControlManager.java:200. They are activated on claim and cancelled on renounce, so only the active controller runs them. Each task returns a ControllerResult; if its records are non-empty they are written like any other batch, and a task may signal "run me again immediately" by returning true.

TaskDoesDefault cadenceCode
maybeFenceStaleBrokerFence one timed-out brokersessionTimeout/8 (≈1.125 s)QuorumController.java:1696
electPreferredMove imbalanced partitions back to their preferred leader (only if leader.imbalance.check.interval.seconds set)opt-inQuorumController.java:1711
electUncleanUnclean-elect leaders for leaderless partitions where the topic allows it5 minQuorumController.java:1726
writeNoOpRecordAppend a NoOpRecord so the log advances when idle (only if metadata.max.idle.interval.ms set)opt-inQuorumController.java:1667
expireDelegationTokensSweep expired tokens5 minQuorumController.java:1755
generatePeriodicPerformanceMessageLog event-processing statssample period (default 60 s)QuorumController.java:1738

Both election tasks elect at most MAX_ELECTIONS_PER_IMBALANCE = 1000 per run, returning true (reschedule immediately) if they hit the cap ReplicationControlManager.java:151, ReplicationControlManager.java:1785.

Configuration reference

Builder knob / configDefaultEffect
defaultReplicationFactor3Replication factor for topics that omit it QuorumController.java:203.
defaultNumPartitions1Partition count for topics that omit it QuorumController.java:204.
sessionTimeoutNs (broker.session.timeout.ms)9 sHow long without a heartbeat before a broker may be fenced ClusterControlManager.java:84.
leaderImbalanceCheckIntervalNsempty (off)Enables/period of the preferred-leader balancing task QuorumController.java:206.
maxIdleIntervalNs (metadata.max.idle.interval.ms)empty (off)Period of the NoOpRecord liveness writer QuorumController.java:207.
uncleanLeaderElectionCheckIntervalMs5 minPeriod of the unclean-recovery task QuorumController.java:225.
maxRecordsPerBatch10000Cap on records per Raft batch and per user op QuorumController.java:180.
controllerPerformanceSamplePeriodMs60000Window for the event-performance monitor QuorumController.java:218.
controllerPerformanceAlwaysLogThresholdMs2000Any event slower than this is always logged QuorumController.java:219.
delegationTokenExpiryCheckIntervalMs5 minToken-expiry sweep cadence QuorumController.java:224.

Key JMX metrics from QuorumControllerMetrics/OffsetControlManager include ActiveControllerCount (1 on the active controller, 0 elsewhere), EventQueueTimeMs, EventQueueProcessingTimeMs, NewActiveControllersCount, LastAppliedRecordOffset/Timestamp/LagMs, LastCommittedRecordOffset, and per-broker TimeSinceLastHeartbeatReceivedMs metadata/.../controller/metrics/QuorumControllerMetrics.java:47, OffsetControlManager.java:41.

Concurrency & threading model

StateOwner / guardAccessed from
All timeline collections, SnapshotRegistry, offsets, every manager's hard statethe single event-handler thread (no locks)controller thread only
curClaimEpochvolatile; written on controller threadread by RPC threads for active-check
Event queue internals (linked list, deadlineMap, size)ReentrantLock + Conditionany thread enqueuing; handler thread dequeuing
BrokerHeartbeatTracker.contactTimesConcurrentHashMap (lock-free)RPC threads (write) + controller thread (expire)
BrokerHeartbeatManager.heartbeatManager referencevolatile in ClusterControlManagerswapped on activate/deactivate

The design's whole point is that the bulk of state needs no synchronization because exactly one thread touches it. The only deliberately multi-threaded component is the heartbeat tracker, precisely so liveness signals are never starved by a busy controller thread. Raft callbacks (handleCommit, handleLoadSnapshot, handleLeaderChange) arrive on Raft threads but immediately re-enqueue themselves as ControllerEvents, and they self-cancel if they belong to a superseded listener registration QuorumController.java:1144.

Failure modes, edge cases & recovery

  • Write applied but never committed (this node loses leadership mid-flight): renounce() fails the purgatory with NotControllerException and reverts in-memory state to the snapshot at lastStableOffset, discarding exactly the appended-but-uncommitted records. The client retries against the new controller.
  • Replay failure on the active controller: treated as a fatal bug through fatalFaultHandler, the controller must never diverge from its own log QuorumController.java:843.
  • Replay failure / fault that warrants failover but not process death: handleEventException routes faults whose causesFailover() is true to renounce(), handing leadership to another node QuorumController.java:595.
  • Snapshot load: a standby that falls too far behind loads a Raft snapshot; handleLoadSnapshot first reset()s all timeline state, replays the snapshot, and rebuilds offsets, and it is a fatal fault for the active controller to ever be asked to load one QuorumController.java:1032.
  • Dangling metadata transaction from a crashed controller: the activation generator finishes or aborts it before normal operation resumes QuorumController.java:1187.
  • Heartbeat overload: contact times still land lock-free; timed-out heartbeat events refresh liveness so brokers are not fenced for controller slowness ReplicationControlManager.java:1718.
  • Duplicate broker incarnation: a registration whose incarnation differs from a live session is rejected with DuplicateBrokerRegistrationException; a genuinely new incarnation triggers cleanup records for the old one ClusterControlManager.java:371.

Invariants & guarantees

Determinism

Given the same committed metadata log, every controller computes byte-identical in-memory state, because the only mutation path is replay() over committed records in offset order. Standbys are warm replicas; failover transfers no state.

Durability before acknowledgement

A write future never completes successfully before lastStableOffset reaches its append offset. Reads observe only lastStableOffset, never uncommitted or in-transaction records.

Single writer

Only the node with curClaimEpoch != -1 appends, and prepareAppend is bound to that epoch, so a stale leader's writes are rejected by the Raft layer.

No partial reassignment visible

A leader-side AlterPartition that completes a reassignment and moves leadership returns NEW_LEADER_ELECTED, forcing the old leader to refetch before acting on stale leadership.

Interactions with other subsystems

Design rationale & evolution

Design rationale

Replacing the ZooKeeper-backed controller with a Raft-replicated metadata log is KIP-500, realized as the quorum/event-driven controller of KIP-631. The old ZK controller cached state by reading ZooKeeper, raced with watchers, and pushed full LeaderAndIsr/UpdateMetadata RPCs to brokers; the KRaft controller instead is the source of truth, a deterministic state machine over an ordered log, so recovery is replay rather than re-read, and brokers pull incremental metadata deltas. ZooKeeper was fully removed in 4.0; the lone ZkMigrationStateRecord no-op is the only vestige, kept so clusters that migrated under 3.x can roll forward.

Design rationale

The single-threaded event loop plus copy-on-write timeline structures is a conscious trade: give up parallelism inside the controller to get rid of locks entirely and make uncommitted state trivially reversible. Finalized feature levels and metadata.version gating come from KIP-584; atomic multi-record operations use metadata transactions (KIP-868); and the ELR / strict-min-ISR machinery in PartitionChangeBuilder is KIP-966, which prevents the leader-candidate pool from shrinking below min.insync.replicas under normal conditions.

Gotchas & operational notes

Gotcha

Heartbeat state is soft and rebuilt only from incoming heartbeats after a failover. The fresh active controller does not know past contact times, so it may take up to roughly a full session timeout before it begins fencing brokers that died around the failover. This is the price of keeping heartbeats out of the metadata log.

Gotcha

A feature upgrade (including metadata.version) will be refused if any registered broker, or any controller that has not yet registered, does not advertise support for the target level. After adding a new controller, ensure it has registered before raising feature levels.

Caution

Unclean leader election (the UNCLEAN path and the electUnclean task) can elect a leader outside the ISR and therefore lose data. It runs only for topics where it is explicitly enabled, and exists to bring otherwise-permanently-offline partitions back online.

Note

The controller refuses to start without bootstrap metadata that pins an initial metadata.version (produced by kafka-storage format), a non-fatal and a fatal fault handler, and a configured quorum-features set QuorumController.java:392. An unhandled record type at replay is intentionally fatal rather than skipped.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.