11 · The KRaft Controller
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
The KRaft controller is a deterministic replicated state machine driven by the cluster metadata log. A single class, QuorumController, runs a one-thread event loop that turns every administrative request into a list of metadata records, hands them to the Raft layer, and applies them to in-memory state under a strict write → commit → apply discipline. Because every controller in the quorum replays the same committed records in the same order, the active controller and the standbys converge on byte-identical state, and a failover is just another node deciding to start writing. This chapter dissects the event loop, the timeline data structures that make uncommitted state reversible, the per-domain control managers, broker liveness and fencing, leader/ISR recomputation, and the periodic background tasks.
Role & responsibilities
In a KRaft cluster a small set of nodes (typically 3 or 5) form the controller quorum. They run a Raft implementation (covered in KRaft Consensus (Raft)) over the internal __cluster_metadata topic. The Raft leader of that log becomes the active controller; the others are standbys that merely replay what the active controller writes. The controller owns all hard state that used to live in ZooKeeper: topic and partition definitions, ISR membership, leadership, broker and controller registrations, dynamic configs, client quotas, SCRAM credentials, delegation tokens, ACLs, finalized feature levels (including metadata.version), and producer-ID block allocation.
The class Javadoc states the contract plainly: the leader of the metadata log becomes the active controller, all other nodes stay in standby and cannot create new log entries; the controller is single-threaded so as to avoid complex locking; and the API is asynchronous and futures-based because many operations may be in flight, each future completing only once its results are durable in the metadata log metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:156.
The controller never mutates durable ("hard") state directly in response to a request. It computes records, appends them to the Raft log, and only the act of replaying those records changes in-memory state. The same replay() path runs on the active controller (just after the write) and on every standby (after commit), which is what keeps the quorum identical.
Cluster formation & bootstrap
A KRaft cluster does not spring into existence when the first broker starts, unlike the ZooKeeper era, storage must be formatted first, and that step is where the cluster's identity and its seed metadata are minted. The whole bootstrap is offline tooling in kafka-storage (StorageTool.scala) plus the Formatter it drives.
- Generate a cluster id. A cluster is named by a 22-character base64
Uuid.kafka-storage random-uuidsimply printsUuid.randomUuidcore/src/main/scala/kafka/tools/StorageTool.scala:100. The same id must be passed to every node so they agree on which cluster they belong to. - Format every log directory.
kafka-storage format --cluster-id X [--release-version Y]runsrunFormatCommand, which refuses non-KRaft configs and builds aFormatterfrom the node's ownserver.properties(node id, the configuredlog.dirs, the controller listener name) StorageTool.scala:116, StorageTool.scala:175.--release-versionselects the initialmetadata.version(defaultMetadataVersion.LATEST_PRODUCTION, minimumMINIMUM_VERSION) StorageTool.scala:336. - Write
meta.propertiesinto each directory.Formatter.doFormatconstructs a V1MetaPropertiescarryingcluster.idandnode.id, then for every empty log dir mints a per-directorydirectory.id(a freshUuid, used by JBOD to identify the disk) and writes the file through aMetaPropertiesEnsemble.Copiermetadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java:402, Formatter.java:436. The on-disk keys areversion,cluster.id,node.id,directory.idserver-common/src/main/java/org/apache/kafka/metadata/properties/MetaProperties.java:35; the file name ismeta.propertiesserver-common/src/main/java/org/apache/kafka/metadata/properties/MetaPropertiesEnsemble.java:74. At startup the ensemble is re-read andverify'd so a node never joins the wrong cluster or boots with a half-formatted disk. - Seed the bootstrap metadata. The initial
metadata.version(and any extra finalized features or--add-scramrecords) is packaged as aBootstrapMetadata, a list of records, the first always aFeatureLevelRecordformetadata.versionmetadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java:104, computed bycalculateBootstrapMetadataFormatter.java:384. On a controller node, these records are written as the Raft bootstrap snapshot00000000000000000000-0000000000.checkpoint(BOOTSTRAP_SNAPSHOT_ID = (0,0)) inside the__cluster_metadata-0directory, viawriteBoostrapSnapshotFormatter.java:506, raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java:49. (A legacy binary file literally namedbootstrap.checkpoint,BINARY_BOOTSTRAP_FILENAME, is still read as a fallback byBootstrapMetadata.fromDirectorywhen no snapshot is present BootstrapMetadata.java:46.) When the active controller first activates on an empty log,ActivationRecordsGeneratorreplays exactly these bootstrap records as the log's opening batch, which is why the controller refuses to start without a pinnedmetadata.version.
The same format step also decides how the initial controller quorum is established, and the two models are mutually exclusive:
- Static quorum. Set
controller.quorum.voters(a fixedid@host:portlist) in the config and format with no quorum flag.kraft.versiondefaults to0; the voter set is configuration, not metadata, and cannot be changed without a rolling config update Formatter.java:380. - Dynamic quorum (KIP-853). Leave
controller.quorum.votersempty (usecontroller.quorum.bootstrap.serversinstead) and pick one mutually-exclusive flag at format time StorageTool.scala:347:--standaloneformats this controller as a single-node quorum (its voter entry is synthesized from its own advertised controller listener,id@host:port:directoryId) StorageTool.scala:289;--initial-controllers id@host:port:dirId,…bakes a specific multi-node voter set, the identical string must be used to format every node so they share one directory-id-stamped quorum; and--no-initial-controllersformats a node that will join an already-bootstrapped dynamic quorum later. Any of these raiseskraft.versionto1, and the voter set is then stored in the metadata log, so controllers can be added or removed at runtime StorageTool.scala:148, Formatter.java:378. A controller with empty static voters and none of these flags is rejected StorageTool.scala:165.
cluster.idcluster.id · node.id · directory.id00…0-0.checkpoint · FeatureLevelRecord(metadata.version)id@host:port in config__cluster_metadata = active controller__cluster_metadataWhere it lives in the code
| Concern | Principal class | File |
|---|---|---|
| Event loop, write/commit/apply, failover | QuorumController | metadata/.../controller/QuorumController.java |
| Public async API | Controller (interface) | metadata/.../controller/Controller.java |
| Single-threaded event queue + timeouts/deferrals | KafkaEventQueue | server-common/.../queue/KafkaEventQueue.java |
| Pending-write "purgatory" | DeferredEventQueue | server-common/.../deferred/DeferredEventQueue.java |
| Offsets: write / committed / stable, snapshots, txns | OffsetControlManager | metadata/.../controller/OffsetControlManager.java |
| Topics, partitions, ISR, leader election, reassignment | ReplicationControlManager | metadata/.../controller/ReplicationControlManager.java |
| Broker / controller registration, fencing | ClusterControlManager | metadata/.../controller/ClusterControlManager.java |
| Broker liveness (soft state, off the log) | BrokerHeartbeatManager, BrokerHeartbeatTracker | metadata/.../controller/BrokerHeartbeat*.java |
| Recompute leader/ISR/ELR for one partition | PartitionChangeBuilder | metadata/.../controller/PartitionChangeBuilder.java |
| Dynamic configs | ConfigurationControlManager | metadata/.../controller/ConfigurationControlManager.java |
Feature levels & metadata.version (KIP-584) | FeatureControlManager | metadata/.../controller/FeatureControlManager.java |
| Producer-ID block allocation | ProducerIdControlManager | metadata/.../controller/ProducerIdControlManager.java |
| ACLs / SCRAM / tokens / quotas | AclControlManager, ScramControlManager, DelegationTokenControlManager, ClientQuotaControlManager | metadata/.../controller/*.java |
| Replica placement | StripedReplicaPlacer | metadata/.../metadata/placement/StripedReplicaPlacer.java |
| Reversible in-memory state | SnapshotRegistry, TimelineHashMap/Set/Long/Object | server-common/.../timeline/*.java |
| Background work (leader balance, unclean recovery, no-op, token expiry) | PeriodicTaskControlManager, PeriodicTask | metadata/.../controller/PeriodicTask*.java |
Core concepts & terminology
- Active controller
- The node whose
curClaimEpoch != -1. It alone appends records.isActiveController(claimEpoch)is literallyclaimEpoch != -1QuorumController.java:1159. - Claim epoch
curClaimEpoch, the Raft leader epoch at which this node is active, or-1. It isvolatile int: written only from the controller thread, but readable from RPC threads QuorumController.java:1483.- Record
- An
ApiMessageAndVersionwrapping a generated metadata message (e.g.TopicRecord,PartitionChangeRecord,FenceBrokerRecord), see Record Format & Batches. - Hard vs. soft state
- Hard state appears in the metadata log and is replayed (broker registrations, ISR, configs). Soft state lives only in RAM on the active controller (heartbeat times, the active-broker set) and is rebuilt on failover.
- Write / committed / stable offset
- Three offsets the controller tracks:
nextWriteOffset(next slot the active controller will append to),lastCommittedOffset(Raft high watermark), andlastStableOffset(the highest offset that reads may observe, accounting for open metadata transactions). - Purgatory
- The
DeferredEventQueue: write events parked by the offset they were appended at, completed only when the stable offset reaches that offset.
The event loop and the asynchronous API
All controller work funnels through one KafkaEventQueue served by a single thread named {prefix}event-handler, where the prefix defaults to quorum-controller-{nodeId}- QuorumController.java:406, KafkaEventQueue.java:43. The queue is a circular doubly-linked list of EventContext nodes guarded by a single ReentrantLock, plus a TreeMap<Long,EventContext> (deadlineMap) keyed by monotonic-nanosecond deadline for timeouts and deferred work KafkaEventQueue.java:173. Events can be APPENDed (tail), PREPENDed (head, used once, for activation) or DEFERRED (run when their scheduled time arrives) KafkaEventQueue.java:329.
Three event kinds wrap the work:
ControllerReadEvent<T>, runs aSupplier<T>against committed in-memory state and completes a future. Reads useoffsetControl.lastStableOffset()as the epoch so they never observe uncommitted data QuorumController.java:658.ControllerWriteEvent<T>, runs aControllerWriteOperationthat returns aControllerResult<T>= (records, response). This is the heart of the controller QuorumController.java:765.ControllerEvent, internal control work (Raft callbacks, activation), no future.
The public Controller interface methods are thin wrappers that build a write/read event and return its CompletableFuture; e.g. alterPartition, createTopics, registerBroker, processBrokerHeartbeat, allocateProducerIds, updateFeatures QuorumController.java:1762, Controller.java:67. The future does not complete when the operation runs; it completes when the records are durable.
(RPC thread)ControllerWriteEvent
(event-handler thread)RaftClientDeferredEventQueue
(purgatory)CompletableFuture
2 · generateRecordsAndResult ⇒ records
Write → commit → apply, in detail
Inside ControllerWriteEvent.run() the active-controller check throws NotControllerException (via ControllerExceptions.newWrongControllerException) if this node lost leadership. Then op.generateRecordsAndResult() produces the records. Two branches follow QuorumController.java:791:
- No records, the "write" was really a read. If the purgatory is empty it completes immediately; otherwise it is parked at
deferredEventQueue.highestPendingOffset()so the caller still sees a linearizable view that waits out any in-flight writes QuorumController.java:802. - Records present,
appendRecordsdrives the appender callback, which: callsraftClient.prepareAppend(controllerEpoch, records)to stage them and learn the last offset; then immediately callsreplay()on each record to update in-memory state; thenraftClient.schedulePreparedAppend()to actually queue them for replication; and finallyoffsetControl.handleScheduleAppend(lastOffset)QuorumController.java:825.
The ordering, apply to memory before scheduling replication, is deliberate: if a record cannot be applied, that is a fatal bug, and we must discover it before the data is replicated. A failure there goes through fatalFaultHandler QuorumController.java:843. Because the active controller has already applied the records, the commit callback does not re-apply them; it only advances offsets and drains the purgatory. Standbys, which never applied them, do replay them in handleCommit QuorumController.java:991.
A user-initiated write future completes successfully only after lastStableOffset >= the offset at which its records were appended. If the node renounces leadership first, the purgatory is failed with NotControllerException and the client retries against the new controller, so the client never sees a write that did not commit.
Atomic vs. non-atomic batches
appendRecords respects ControllerResult.isAtomic(). Atomic results must fit in one batch (≤ maxRecordsPerBatch, default 10000) and are written as a single Raft batch, "all or nothing" QuorumController.java:898, QuorumController.java:180. Non-atomic results are split across as many batches as needed, each producing its own in-memory snapshot so any prefix can be reverted on failure. Any single user operation is capped at MAX_RECORDS_PER_USER_OP (= 10000) via BoundedList backing in the managers QuorumController.java:187.
Timeline data structures: reversible in-memory state
The controller's whole correctness argument depends on being able to (a) read a consistent committed view while uncommitted writes pile up, and (b) discard uncommitted writes wholesale on failover or transaction abort. This is provided by the timeline collections in org.apache.kafka.timeline, all coordinated by a SnapshotRegistry server-common/.../timeline/SnapshotRegistry.java:37.
SnapshotRegistry and snapshots
A SnapshotRegistry holds a doubly-linked list of Snapshot objects, each keyed by an epoch which the controller equates to a metadata offset. Every timeline collection registers itself (via a WeakReference) as a Revertable SnapshotRegistry.java:120. Key operations:
idempotentCreateSnapshot(epoch)/getOrCreateSnapshot(epoch), append a snapshot tier; epochs must be non-decreasing SnapshotRegistry.java:220.revertToSnapshot(epoch), delete all later snapshots and callhandleRevert(), restoring every registered collection to its state at that epoch SnapshotRegistry.java:253.deleteSnapshotsUpTo(epoch), merge/drop old tiers once they are below the stable offset and can never be reverted to SnapshotRegistry.java:297.reset(), wipe everything (used before loading a Raft snapshot) SnapshotRegistry.java:351.
How a snapshotted hash table works
SnapshottableHashTable is the engine under TimelineHashMap and TimelineHashSet server-common/.../timeline/SnapshottableHashTable.java:89. The current tier holds live data (separate-chaining hash table); each snapshot tier holds only the entries that were overwritten or deleted since that snapshot, a copy-on-write delta. Every element carries a startEpoch. To read at epoch E, it first checks the current tier (returning the element if its startEpoch ≤ E), otherwise walks snapshot tiers from E forward; encountering an element with too-new an epoch proves the value at E was absent, so it returns null early SnapshottableHashTable.java:311. On mutation, if the previous value belonged to the most recent snapshot, it is copied into that snapshot's delta table before being overwritten SnapshottableHashTable.java:383. executeRevert drops every element whose startEpoch > targetEpoch and re-adds the saved deltas SnapshottableHashTable.java:413. There are sibling primitives TimelineLong, TimelineInteger, and TimelineObject<T> for scalar reversible state (e.g. FeatureControlManager.metadataVersion is a TimelineObject<Optional<MetadataVersion>>) FeatureControlManager.java:152.
topicB · e105
The active controller always keeps an in-memory snapshot at lastStableOffset. OffsetControlManager.activate creates one before going active, and deactivate reverts to it, guaranteeing that renouncing leadership throws away exactly the records that were appended but not yet committed OffsetControlManager.java:252, OffsetControlManager.java:260.
The three offsets and metadata transactions
OffsetControlManager tracks nextWriteOffset, lastCommittedOffset/Epoch, lastStableOffset, and transactionStartOffset OffsetControlManager.java:118. On every committed batch it sets the committed offset and calls maybeAdvanceLastStableOffset: with no open transaction the stable offset equals the committed offset; with an open transaction it is clamped to min(transactionStartOffset-1, lastCommittedOffset) so readers never observe partially-applied transactional batches OffsetControlManager.java:319. Metadata transactions (KIP-868) are themselves records: BeginTransactionRecord snapshots the pre-transaction offset, EndTransactionRecord closes it, and AbortTransactionRecord calls revertToSnapshot(transactionStartOffset-1) to undo the whole transaction OffsetControlManager.java:382. These are used to make multi-record operations such as activation atomic across the log.
Becoming (and ceasing to be) the active controller
Leadership changes arrive through the Raft listener QuorumMetaLogListener.handleLeaderChange, which runs on the controller thread QuorumController.java:1107. The transitions:
- Standby → active: when the new leader is this node,
claim(epoch, raftClient.logEndOffset())setscurClaimEpoch, callsoffsetControl.activateandclusterControl.activate(which builds a freshBrokerHeartbeatManager), and prepends acompleteActivationwrite event so it runs before any queued client work QuorumController.java:1163. - Active → standby (a new epoch elects someone else, or a fault):
renounce()callsraftClient.resign(curClaimEpoch), resetscurClaimEpoch = -1, fails the entire purgatory withNotControllerException, and deactivates the offset/cluster/periodic managers (which reverts in-memory state tolastStableOffset) QuorumController.java:1213.
The prepended activation event runs ActivationRecordsGenerator.generate. On an empty log it writes the bootstrap metadata (including the initial FeatureLevelRecord for metadata.version from the kafka-storage tool), wrapped in a Begin/EndTransactionRecord pair when the metadata version supports metadata transactions; on a non-empty log it may finish or abort a transaction left dangling by a previous controller's crash metadata/.../controller/ActivationRecordsGenerator.java:154. Only after these records are applied does processBatchEndOffset call periodicControl.activate(), so background tasks see the correct metadata.version QuorumController.java:1203.
Failover requires no state transfer. The new active controller has already replayed the entire committed log as a standby, so it is identical to the old one up to lastStableOffset. "Becoming active" is just: stop replaying others' records, start writing your own at logEndOffset, and rebuild the purely-soft heartbeat state from incoming heartbeats.
The replay switch: applying records
replay(ApiMessage, Optional<snapshotId>, offset) is the single dispatch point that routes each committed record to the owning manager QuorumController.java:1238. A representative slice:
| Record type | Handler | Effect |
|---|---|---|
RegisterBrokerRecord | clusterControl.replay(..., offset) | add/replace broker registration; remember its offset |
FenceBrokerRecord/UnfenceBrokerRecord/BrokerRegistrationChangeRecord | clusterControl.replay | flip fenced / controlled-shutdown / directory state |
TopicRecord/PartitionRecord/PartitionChangeRecord/RemoveTopicRecord | replicationControl.replay | create/modify/delete topics & partitions, ISR, ELR |
ConfigRecord | configurationControl.replay | set/clear one dynamic config key |
FeatureLevelRecord | featureControl.replay | finalize a feature / set metadata.version |
ProducerIdsRecord | producerIdControlManager.replay | advance the next producer-ID block |
Begin/End/AbortTransactionRecord | offsetControl.replay | open/close/roll back a metadata transaction |
NoOpRecord | , | no state; used only to advance the log (liveness) |
ZkMigrationStateRecord | , | retained as a no-op for clusters that migrated from ZK before 4.0 |
An unhandled type throws, which becomes a fatal fault, the controller refuses to silently skip metadata it does not understand QuorumController.java:1331.
ReplicationControlManager: topics, partitions, ISR, leadership
This is the largest manager. Its core timeline state ReplicationControlManager.java:326:
topicsByName: TimelineHashMap<String,Uuid>andtopics: TimelineHashMap<Uuid,TopicControlInfo>; eachTopicControlInfoholdsparts: TimelineHashMap<Integer,PartitionRegistration>ReplicationControlManager.java:253.brokersToIsrs: BrokersToIsrsandbrokersToElrs: BrokersToElrs, reverse indexes from a broker to the partitions where it sits in the ISR / ELR, plus a "partitions with no leader" set. These power fast fencing and leader-election sweeps ReplicationControlManager.java:353.imbalancedPartitions: TimelineHashSet<TopicIdPartition>, partitions whose leader is not the preferred replica ReplicationControlManager.java:369.reassigningTopicsanddirectoriesToPartitionsfor in-progress reassignments and per-log-directory partition tracking.
PartitionChangeBuilder: the leader/ISR algorithm
Every change to a partition's leadership, ISR, ELR, replica set, or recovery state is computed by PartitionChangeBuilder, which takes the current PartitionRegistration, an isAcceptableLeader predicate, the metadataVersion, the effective min.insync.replicas, and whether ELR is enabled (the target ISR is supplied separately via setTargetIsr/setTargetIsrWithBrokerStates), and emits an Optional<ApiMessageAndVersion> wrapping a PartitionChangeRecord, empty when nothing changed PartitionChangeBuilder.java:106. Its Election enum picks the policy PartitionChangeBuilder.java:68:
- PREFERRED
- switch to the first (preferred) replica only if it is in the ISR, used by the leader-balancing task.
- ONLINE
- pick any ISR member to keep the partition online, the default, used on fencing/shutdown.
- UNCLEAN
- pick a replica even outside the ISR (potential data loss), used only when unclean election is enabled for the topic.
changeRecordIsNoOp guards against emitting empty records by checking every mutable field is unset PartitionChangeBuilder.java:53. With ELR enabled (KIP-966) the builder also maintains the ELR and "last known ELR" sets so the leader-candidate pool cannot silently shrink below min.insync.replicas.
AlterPartition
When a partition leader proposes an ISR change, alterPartition validates the broker epoch and per-partition leader epoch, then runs a PartitionChangeBuilder with setTargetIsrWithBrokerStates ReplicationControlManager.java:1078. A subtle case: if applying the ISR change also completes an in-flight reassignment and thereby moves leadership away from the requesting broker, the controller both writes the record and returns NEW_LEADER_ELECTED to tell the old leader to refetch metadata, "we usually only do one or the other" ReplicationControlManager.java:1157. ISR shrink/expand from the broker side is detailed in Replication, ISR & High Watermark.
generateLeaderAndIsrUpdates: the fan-out primitive
Broker fencing, unfencing, unregistration, controlled shutdown, unclean shutdown, and directory-offline all funnel into generateLeaderAndIsrUpdates(context, brokerToRemove, brokerToAdd, brokerWithUncleanShutdown, records, partitionIterator), which walks the affected partitions (via the reverse indexes) and runs a PartitionChangeBuilder per partition ReplicationControlManager.java:2051. So a single fenced broker yields one BrokerRegistrationChangeRecord plus a PartitionChangeRecord for every partition it led or was in the ISR of ReplicationControlManager.java:1399.
ClusterControlManager & broker liveness
ClusterControlManager owns the hard registration state: brokerRegistrations: TimelineHashMap<Integer,BrokerRegistration>, the offset of each broker's registration record, the controller registrations, and a directory→broker map ClusterControlManager.java:234. Registration (registerBroker) checks the cluster ID, rejects ZK brokers, validates that the broker supports every finalized feature version, allocates a new broker epoch (the registration record's offset), and, when the incarnation ID is new, first emits cleanup records for the previous incarnation (including unclean-shutdown ISR removal) ClusterControlManager.java:349. A returning broker registers fenced; it must catch up before it can be unfenced.
Heartbeats: soft state off the log
Crucially, heartbeats are not written to the metadata log. The BrokerHeartbeatManager (and its BrokerHeartbeatTracker) keep the last-contact time and last-reported metadata offset of each broker in plain RAM, and exist only on the active controller BrokerHeartbeatManager.java:44. The Javadoc spells out the trade-off: keeping heartbeats off the log keeps the metadata partition small, at the cost of extra fencing latency right after a failover, since the new controller does not know when each broker last checked in BrokerHeartbeatManager.java:51.
processBrokerHeartbeat is special-cased for overload resilience. The request first updates the lock-free BrokerHeartbeatTracker (a ConcurrentHashMap<BrokerIdAndEpoch,Long>) directly from the RPC thread via clusterControl.trackBrokerHeartbeat, so the last-contact time lands even if the controller thread is backlogged; only then is a write event queued QuorumController.java:1978, BrokerHeartbeatTracker.java:32. If that event times out on the queue, a completion hook calls processExpiredBrokerHeartbeat which refreshes the contact time so the broker is not fenced for the controller's own slowness QuorumController.java:2025, ReplicationControlManager.java:1718.
The broker state machine
On the controller thread, calculateNextBrokerState turns the heartbeat's wantFence/wantShutDown flags and the broker's currentMetadataOffset into a transition over four states BrokerHeartbeatManager.java:402:
calculateNextBrokerState: a fenced broker is unfenced only once currentMetadataOffset ≥ its registration record's offset; UNFENCED can re-fence on a missed session, shut down immediately when it leads nothing, or drain via controlled shutdown when it still leads partitions.A fenced broker is unfenced only when it reports a metadata offset at least equal to the offset of its own RegisterBrokerRecord, i.e. it has caught up on metadata BrokerHeartbeatManager.java:415. For controlled shutdown, the active set is a TreeSet ordered by reported metadata offset; the broker may finish only when its controlledShutdownOffset is ≤ the lowest active offset (every still-active broker has seen the leadership-move records) and it leads no partitions BrokerHeartbeatManager.java:456. The shutdown offset itself is set in processBatchEndOffset to the offset at which the leadership-move records were appended QuorumController.java:2018.
Fencing stale brokers
The periodic maybeFenceStaleBroker task asks the tracker for one expired session (maybeRemoveExpired), validates the broker still exists at that epoch, and if so fences exactly one broker per run so each fence's effect is visible before the next ReplicationControlManager.java:1739. Expiry is (now > t) && (t + sessionTimeoutNs < now); the session timeout defaults to 9 s BrokerHeartbeatTracker.java:131, ClusterControlManager.java:84. The task is scheduled to sample 8× per timeout, so a dead broker is fenced within roughly 112.5% of the session timeout QuorumController.java:1684.
Other control managers
FeatureControlManager, metadata.version & features (KIP-584)
Holds finalizedVersions: TimelineHashMap<String,Short> and metadataVersion: TimelineObject<Optional<MetadataVersion>> FeatureControlManager.java:147. updateFeatures validates each requested level: upgrade vs. (safe/unsafe) downgrade rules, that the controller's own software supports it, and, via reasonNotSupported, that every registered broker and controller supports the target level; if any controller has not yet registered, the update is refused FeatureControlManager.java:321. metadata.version gets extra validation in updateMetadataVersion; kraft.version upgrades go through the Raft client. Feature updates are written as an atomic batch of FeatureLevelRecords FeatureControlManager.java:179. On replay, a record outside the controller's locally-supported range is a fatal error, guarding against a node applying a feature it cannot implement FeatureControlManager.java:447. metadata.version is the gate the whole controller waits on: many code paths call metadataVersionOrThrow(), and the build refuses to start without bootstrap metadata specifying it QuorumController.java:395.
ProducerIdControlManager
Allocates producer-ID blocks for the transaction/idempotence subsystem (Transactions & EOS). State is a TimelineObject<ProducerIdsBlock> plus a TimelineLong broker epoch ProducerIdControlManager.java:70. generateNextProducerId checks the broker epoch, carves a fixed-size block (PRODUCER_ID_BLOCK_SIZE) starting at the current high-water producer ID, and writes a ProducerIdsRecord recording the next block's first ID; replay rejects any record that does not advance the counter ProducerIdControlManager.java:84.
ConfigurationControlManager and the rest
Dynamic configs live in a nested TimelineHashMap<ConfigResource,TimelineHashMap<String,String>>, mutated by incrementalAlterConfigs/legacyAlterConfigs with optional policy and existence checks; a validateOnly request returns the result with the records stripped (withoutRecords()) ConfigurationControlManager.java:80, QuorumController.java:1919. AclControlManager, ScramControlManager, DelegationTokenControlManager, and ClientQuotaControlManager each own a slice of timeline state and a replay method, following the same record-then-apply pattern; see Security and Quotas.
Replica placement: StripedReplicaPlacer
When creating topics or partitions the controller asks a ReplicaPlacer (default StripedReplicaPlacer) to choose broker sets QuorumController.java:205. Its goals, in priority order, are: spread replicas evenly across racks (highest priority in multi-rack clusters), then evenly across brokers, then prefer unfenced brokers; with two hard constraints, at most one replica per broker, and the leader (first replica) must be unfenced metadata/.../placement/StripedReplicaPlacer.java:33. The algorithm loads brokers into per-rack lists (fenced and unfenced kept separate), assigns racks round-robin with a random starting offset, and round-robins replicas within a rack, producing the characteristic "striped" assignment where each successive partition rotates its starting rack. The usable-broker stream comes from the heartbeat manager and excludes shutting-down brokers and those with no uncordoned directories BrokerHeartbeatManager.java:330.
Periodic background tasks
Background work is modeled as PeriodicTasks registered with the PeriodicTaskControlManager, which schedules each as a DEFERRED event via the scheduleDeferred/cancelDeferred hooks on the event queue and reschedules it after each run QuorumController.java:532, metadata/.../controller/PeriodicTaskControlManager.java:200. They are activated on claim and cancelled on renounce, so only the active controller runs them. Each task returns a ControllerResult; if its records are non-empty they are written like any other batch, and a task may signal "run me again immediately" by returning true.
| Task | Does | Default cadence | Code |
|---|---|---|---|
maybeFenceStaleBroker | Fence one timed-out broker | sessionTimeout/8 (≈1.125 s) | QuorumController.java:1696 |
electPreferred | Move imbalanced partitions back to their preferred leader (only if leader.imbalance.check.interval.seconds set) | opt-in | QuorumController.java:1711 |
electUnclean | Unclean-elect leaders for leaderless partitions where the topic allows it | 5 min | QuorumController.java:1726 |
writeNoOpRecord | Append a NoOpRecord so the log advances when idle (only if metadata.max.idle.interval.ms set) | opt-in | QuorumController.java:1667 |
expireDelegationTokens | Sweep expired tokens | 5 min | QuorumController.java:1755 |
generatePeriodicPerformanceMessage | Log event-processing stats | sample period (default 60 s) | QuorumController.java:1738 |
Both election tasks elect at most MAX_ELECTIONS_PER_IMBALANCE = 1000 per run, returning true (reschedule immediately) if they hit the cap ReplicationControlManager.java:151, ReplicationControlManager.java:1785.
Configuration reference
| Builder knob / config | Default | Effect |
|---|---|---|
defaultReplicationFactor | 3 | Replication factor for topics that omit it QuorumController.java:203. |
defaultNumPartitions | 1 | Partition count for topics that omit it QuorumController.java:204. |
sessionTimeoutNs (broker.session.timeout.ms) | 9 s | How long without a heartbeat before a broker may be fenced ClusterControlManager.java:84. |
leaderImbalanceCheckIntervalNs | empty (off) | Enables/period of the preferred-leader balancing task QuorumController.java:206. |
maxIdleIntervalNs (metadata.max.idle.interval.ms) | empty (off) | Period of the NoOpRecord liveness writer QuorumController.java:207. |
uncleanLeaderElectionCheckIntervalMs | 5 min | Period of the unclean-recovery task QuorumController.java:225. |
maxRecordsPerBatch | 10000 | Cap on records per Raft batch and per user op QuorumController.java:180. |
controllerPerformanceSamplePeriodMs | 60000 | Window for the event-performance monitor QuorumController.java:218. |
controllerPerformanceAlwaysLogThresholdMs | 2000 | Any event slower than this is always logged QuorumController.java:219. |
delegationTokenExpiryCheckIntervalMs | 5 min | Token-expiry sweep cadence QuorumController.java:224. |
Key JMX metrics from QuorumControllerMetrics/OffsetControlManager include ActiveControllerCount (1 on the active controller, 0 elsewhere), EventQueueTimeMs, EventQueueProcessingTimeMs, NewActiveControllersCount, LastAppliedRecordOffset/Timestamp/LagMs, LastCommittedRecordOffset, and per-broker TimeSinceLastHeartbeatReceivedMs metadata/.../controller/metrics/QuorumControllerMetrics.java:47, OffsetControlManager.java:41.
Concurrency & threading model
| State | Owner / guard | Accessed from |
|---|---|---|
All timeline collections, SnapshotRegistry, offsets, every manager's hard state | the single event-handler thread (no locks) | controller thread only |
curClaimEpoch | volatile; written on controller thread | read by RPC threads for active-check |
Event queue internals (linked list, deadlineMap, size) | ReentrantLock + Condition | any thread enqueuing; handler thread dequeuing |
BrokerHeartbeatTracker.contactTimes | ConcurrentHashMap (lock-free) | RPC threads (write) + controller thread (expire) |
BrokerHeartbeatManager.heartbeatManager reference | volatile in ClusterControlManager | swapped on activate/deactivate |
The design's whole point is that the bulk of state needs no synchronization because exactly one thread touches it. The only deliberately multi-threaded component is the heartbeat tracker, precisely so liveness signals are never starved by a busy controller thread. Raft callbacks (handleCommit, handleLoadSnapshot, handleLeaderChange) arrive on Raft threads but immediately re-enqueue themselves as ControllerEvents, and they self-cancel if they belong to a superseded listener registration QuorumController.java:1144.
Failure modes, edge cases & recovery
- Write applied but never committed (this node loses leadership mid-flight):
renounce()fails the purgatory withNotControllerExceptionand reverts in-memory state to the snapshot atlastStableOffset, discarding exactly the appended-but-uncommitted records. The client retries against the new controller. - Replay failure on the active controller: treated as a fatal bug through
fatalFaultHandler, the controller must never diverge from its own log QuorumController.java:843. - Replay failure / fault that warrants failover but not process death:
handleEventExceptionroutes faults whosecausesFailover()is true torenounce(), handing leadership to another node QuorumController.java:595. - Snapshot load: a standby that falls too far behind loads a Raft snapshot;
handleLoadSnapshotfirstreset()s all timeline state, replays the snapshot, and rebuilds offsets, and it is a fatal fault for the active controller to ever be asked to load one QuorumController.java:1032. - Dangling metadata transaction from a crashed controller: the activation generator finishes or aborts it before normal operation resumes QuorumController.java:1187.
- Heartbeat overload: contact times still land lock-free; timed-out heartbeat events refresh liveness so brokers are not fenced for controller slowness ReplicationControlManager.java:1718.
- Duplicate broker incarnation: a registration whose incarnation differs from a live session is rejected with
DuplicateBrokerRegistrationException; a genuinely new incarnation triggers cleanup records for the old one ClusterControlManager.java:371.
Invariants & guarantees
Given the same committed metadata log, every controller computes byte-identical in-memory state, because the only mutation path is replay() over committed records in offset order. Standbys are warm replicas; failover transfers no state.
A write future never completes successfully before lastStableOffset reaches its append offset. Reads observe only lastStableOffset, never uncommitted or in-transaction records.
Only the node with curClaimEpoch != -1 appends, and prepareAppend is bound to that epoch, so a stale leader's writes are rejected by the Raft layer.
A leader-side AlterPartition that completes a reassignment and moves leadership returns NEW_LEADER_ELECTED, forcing the old leader to refetch before acting on stale leadership.
Interactions with other subsystems
- KRaft Consensus (Raft), supplies
RaftClient.prepareAppend/schedulePreparedAppend/resignand the commit/leader-change callbacks the controller is built around. - Metadata Propagation & Broker Lifecycle, brokers consume the committed log the controller produces, ack progress via heartbeats, and drive the registration/fencing lifecycle described here.
- Replication, ISR & High Watermark and The Fetch Path, the leader/ISR/ELR records this controller emits are exactly what replicas act on.
- Request Processing (KafkaApis), controller-bound admin RPCs are forwarded here and answered via the futures returned by the
Controllerinterface. - Transactions & EOS, consumes producer-ID blocks from
ProducerIdControlManager. - Security / Quotas, ACL, SCRAM, token and quota managers live inside the controller and replay through the same path.
Design rationale & evolution
Replacing the ZooKeeper-backed controller with a Raft-replicated metadata log is KIP-500, realized as the quorum/event-driven controller of KIP-631. The old ZK controller cached state by reading ZooKeeper, raced with watchers, and pushed full LeaderAndIsr/UpdateMetadata RPCs to brokers; the KRaft controller instead is the source of truth, a deterministic state machine over an ordered log, so recovery is replay rather than re-read, and brokers pull incremental metadata deltas. ZooKeeper was fully removed in 4.0; the lone ZkMigrationStateRecord no-op is the only vestige, kept so clusters that migrated under 3.x can roll forward.
The single-threaded event loop plus copy-on-write timeline structures is a conscious trade: give up parallelism inside the controller to get rid of locks entirely and make uncommitted state trivially reversible. Finalized feature levels and metadata.version gating come from KIP-584; atomic multi-record operations use metadata transactions (KIP-868); and the ELR / strict-min-ISR machinery in PartitionChangeBuilder is KIP-966, which prevents the leader-candidate pool from shrinking below min.insync.replicas under normal conditions.
Gotchas & operational notes
Heartbeat state is soft and rebuilt only from incoming heartbeats after a failover. The fresh active controller does not know past contact times, so it may take up to roughly a full session timeout before it begins fencing brokers that died around the failover. This is the price of keeping heartbeats out of the metadata log.
A feature upgrade (including metadata.version) will be refused if any registered broker, or any controller that has not yet registered, does not advertise support for the target level. After adding a new controller, ensure it has registered before raising feature levels.
Unclean leader election (the UNCLEAN path and the electUnclean task) can elect a leader outside the ISR and therefore lose data. It runs only for topics where it is explicitly enabled, and exists to bring otherwise-permanently-offline partitions back online.
The controller refuses to start without bootstrap metadata that pins an initial metadata.version (produced by kafka-storage format), a non-fatal and a fatal fault handler, and a configured quorum-features set QuorumController.java:392. An unhandled record type at replay is intentionally fatal rather than skipped.