10 · KRaft Consensus (Raft)
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
KRaft is Kafka's hand-rolled implementation of the Raft consensus algorithm, used to replicate the cluster's metadata log across a quorum of controllers. Its centerpiece, KafkaRaftClient, is a single-threaded, non-blocking event loop that drives a precise state machine through the roles Unattached, Voted, Prospective, Candidate, Follower, Leader and Resigned. Unlike textbook Raft, replication is pull-based: followers and observers issue Fetch requests to the leader, which never pushes data. This chapter dissects the poll loop, leader election (with KIP-996 pre-vote), the persisted quorum-state file, the high-watermark commit rule, snapshots (KIP-630), and dynamic quorum reconfiguration (KIP-853), all grounded in the raft/ module source.
Role & responsibilities
KRaft replaces ZooKeeper (removed entirely since 4.0) as the system of record for cluster metadata. A small set of controllers form a voter quorum that runs Raft over a single topic partition, __cluster_metadata-0. The Raft library here is generic over a record type T (KafkaRaftClient<T> implements RaftClient<T>); the controller plugs in its own RecordSerde and a RaftClient.Listener to receive committed batches. See The KRaft Controller for the state machine that consumes this log and Metadata Propagation & Broker Lifecycle for how brokers (as observers) replicate it.
The class header itself frames the design (raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:129): "Leader election is more or less pure Raft, but replication is driven by replica fetching and we use Kafka's log reconciliation protocol to truncate the log to a common point following each leader election." KRaft reuses Kafka's existing log/segment storage, record-batch format, and Fetch RPC, layering Raft semantics on top.
Voters take part in elections and are the only nodes eligible to handle Vote/BeginQuorumEpoch/EndQuorumEpoch requests. Observers (brokers, and controllers being added) never vote; they only fetch. Whether a node is a voter is not a static flag but is derived continuously from the latest VoterSet in the replicated log (QuorumState.isVoter(), QuorumState.java:332).
Where it lives in the code
| Concern | Class | File |
|---|---|---|
| Event-loop Raft state machine | KafkaRaftClient<T> | raft/.../raft/KafkaRaftClient.java |
| Single-threaded driver (the IO thread) | KafkaRaftClientDriver<T> | raft/.../raft/KafkaRaftClientDriver.java |
| Role/epoch state & transition rules | QuorumState | raft/.../raft/QuorumState.java |
| Per-role state objects | EpochState impls: UnattachedState, ProspectiveState, CandidateState, FollowerState, LeaderState<T>, ResignedState | raft/.../raft/*State.java |
| Election vote tally | EpochElection (+ NomineeState) | raft/.../raft/internals/EpochElection.java |
| Voter membership | VoterSet, ReplicaKey, Endpoints | raft/.../raft/VoterSet.java |
| Persisted election state | FileQuorumStateStore, ElectionState | raft/.../raft/FileQuorumStateStore.java |
| Leader batch building | BatchAccumulator<T>, BatchBuilder<T> | raft/.../raft/internals/BatchAccumulator.java |
| Control-record tracking (voters, kraft.version) | KRaftControlRecordStateMachine, VoterSetHistory | raft/.../raft/internals/KRaftControlRecordStateMachine.java |
| Connection backoff & in-flight tracking | RequestManager | raft/.../raft/RequestManager.java |
| Snapshots (file naming, read/write) | Snapshots, RecordsSnapshotWriter, FileRawSnapshotReader | raft/.../snapshot/*.java |
| Log over segment storage | KafkaRaftLog (implements RaftLog) | raft/.../raft/internals/KafkaRaftLog.java |
| Reconfiguration handlers | AddVoterHandler, RemoveVoterHandler, UpdateVoterHandler | raft/.../raft/internals/*VoterHandler.java |
| Configuration | QuorumConfig | raft/.../raft/QuorumConfig.java |
Core concepts & terminology
- Voter vs Observer
- A voter participates in elections and quorum commit; an observer only fetches. Membership comes from the latest
VoterSet. Brokers are always observers; controllers are normally voters. - Epoch
- The Raft term, a monotonically increasing integer. Each election bumps the epoch by exactly one (
QuorumState.transitionToCandidate(),QuorumState.java:641:int newEpoch = epoch() + 1). Records are uniquely identified by (offset, epoch). - ReplicaKey
- A pair of
id(int) and an optional directoryUuid(the node's log directory). KIP-853 added directory ids so that a re-formatted node with the same broker id is treated as a distinct voter. - High watermark (HWM)
- The largest offset replicated to a majority of voters; the commit point. Exposed via
RaftClient.highWatermark(). - Leader endpoints
- The advertised listener addresses of the current leader, carried in RPC responses so observers and unattached voters can discover where to fetch.
- kraft.version
- A feature level:
KRAFT_VERSION_0(static voters, no reconfiguration) orKRAFT_VERSION_1(dynamic quorums).isReconfigSupported()returns true for v1+ (server-common/.../KRaftVersion.java:59). - Bootstrap snapshot
- The synthetic snapshot id
(offset=0, epoch=0)(BOOTSTRAP_SNAPSHOT_ID,Snapshots.java:49) used to seed initial cluster state; it is never replicated over the wire.
Data structures
In-memory: the role objects
All six role classes implement EpochState; the current one is held in a single volatile EpochState state field inside QuorumState (QuorumState.java:99). Each role carries the data relevant to it:
| Role | Key fields | Timer |
|---|---|---|
UnattachedState | epoch, OptionalInt leaderId, Optional<ReplicaKey> votedKey, voter id set, HWM | election timer |
ProspectiveState | as Unattached + VoterSet voters, EpochElection epochElection (pre-vote tally) | election timer |
CandidateState | localId, localDirectoryId, epoch, EpochElection epochElection | election timer |
FollowerState | epoch, leaderId, Endpoints leaderEndpoints, votedKey, HWM, hasFetchedFromLeader, fetchingSnapshot | fetch timer, update-voter-set timer |
LeaderState<T> | epoch, epochStartOffset, grantingVoters, per-replica ReplicaState map, BatchAccumulator, HWM | check-quorum timer, begin-quorum-epoch timer |
ResignedState | localId, epoch, unackedVoters, preferredSuccessors | election timer |
Notably there is no separate VotedState class. A binding vote is represented as an UnattachedState (or ProspectiveState) carrying a non-empty votedKey (see QuorumState.unattachedAddVotedState(), QuorumState.java:428 and isUnattachedAndVoted(), QuorumState.java:850). The "Voted" role in the protocol is thus Unattached-with-a-vote.
The leader's per-replica view is LeaderState.ReplicaState (LeaderState.java:978): replicaKey, Endpoints listeners, Optional<LogOffsetMetadata> endOffset, lastFetchTimestamp, lastFetchLeaderLogEndOffset, lastCaughtUpTimestamp, and hasAcknowledgedLeader. ReplicaState is Comparable, sorted by descending end offset, this is how the leader finds the majority offset for the HWM.
Persistent: the quorum-state file
Election state survives restarts in a small JSON file named quorum-state (FileQuorumStateStore.DEFAULT_FILE_NAME, FileQuorumStateStore.java:80) inside the metadata log directory. Its schema is QuorumStateData (raft/src/main/resources/common/message/QuorumStateData.json), with a data_version field appended for forward-compat. The version is tied to kraft.version via KRaftVersion.quorumStateVersion().
| Field | Type | Versions | Meaning |
|---|---|---|---|
ClusterId | string | 0 only | Cluster id (dropped in v1) |
LeaderId | int32 | 0+ | Elected leader id, default -1 |
LeaderEpoch | int32 | 0+ | Current epoch, default -1 |
VotedId | int32 | 0+ | Id this replica voted for, -1 if none |
VotedDirectoryId | uuid | 1+ | Directory id of the candidate voted for (KIP-853) |
AppliedOffset | int64 | 0 only | Legacy; unused in v1 |
CurrentVoters | []Voter | 0 only | Legacy static voter list; v1 stores voters in the log instead |
The quorum-state file is written before the in-memory transition takes effect (QuorumState.durableTransitionTo(), QuorumState.java:729): it calls store.writeElectionState(...) then memoryTransitionTo(...). The write is durable, FileQuorumStateStore writes to a .tmp file, calls fileOutputStream.getFD().sync(), then atomically renames (FileQuorumStateStore.java:181-183). This guarantees that once a replica has voted (or become a candidate at epoch N), that fact cannot be lost across a crash, preserving "at most one vote per epoch".
Note the Resigned and Prospective states do not trigger a durable write: transitionToResigned and transitionToProspective use memoryTransitionTo directly (QuorumState.java:358, :622), because Resigned does not change the persisted ElectionState and Prospective is a pre-vote phase that has not yet bumped the epoch.
On-wire: voter membership records
Under kraft.version 1, the voter set lives in the log as control records, not in the quorum file. The VotersRecord (clients/.../message/VotersRecord.json) carries, per voter: VoterId, VoterDirectoryId (uuid), a list of Endpoints (name/host/port), and a KRaftVersionFeature range (min/max supported version). A companion KRaftVersionRecord records the finalized feature level. These are read back by KRaftControlRecordStateMachine.handleBatch() (KRaftControlRecordStateMachine.java:301).
Architecture & control/data flow
kafka-raft-io-thread; the message queue is the sole inbound concurrency boundary. Step 2 emits outbound RPCs; step 5 delivers committed records to the listener.The driver is a ShutdownableThread whose name is threadNamePrefix + "-io-thread" (KafkaRaftClientDriver.java:55); the default prefix is kafka-raft (core/.../KafkaRaftManager.scala:104), giving the thread name kafka-raft-io-thread. Its doWork() simply calls client.poll() in a loop; any uncaught exception is routed to a fatal FaultHandler (KafkaRaftClientDriver.java:62-67), which typically halts the process.
The pull-based replication contrast
AppendEntries to followers and uses empty AppendEntries as periodic heartbeats to assert leadership.Fetch. BeginQuorumEpoch substitutes for the empty-AppendEntries heartbeat, announcing a new leader once per epoch.Reusing Kafka's Fetch path lets the metadata quorum share the broker's mature replication machinery (log segments, batch format, zero-copy sends, fetch purgatory). Because the leader never pushes, it needs a separate way to announce itself after an election, hence BeginQuorumEpoch (KafkaRaftClient.java:144). The overall protocol is KIP-595 ("A Raft Protocol for the Metadata Quorum"); pull-based fetch and log reconciliation are its defining deviations from Diego Ongaro's Raft.
The state machine in detail
· Unattached → Follower, a leader with ≥ epoch and a known endpoint is learned
· Prospective → Follower, PreVote lost but a last-known leader exists
· Prospective → Unattached, PreVote lost and no leader is known
· Candidate → Prospective, election loss or timeout
· Follower → Prospective, fetch timeout
EndQuorumEpoch to voters. Observers use only Unattached ⇄ Follower.QuorumState.java:35-83.initialize())
transition (label = trigger)
A → B = off-spine transition (see notes)
Initialization
QuorumState.initialize() (QuorumState.java:141) reads the persisted ElectionState and reconstructs a starting role:
- If the stored epoch is below the log's last epoch, the file is stale → start
Unattachedat the log's epoch. - If we were the leader → start
Resignedin the same epoch. This is deliberate (QuorumState.java:173-189): it prevents voting for someone else in that epoch and protects the (offset, epoch) uniqueness invariant if unflushed data was lost. - If we were a voted candidate for ourselves → start
Candidate. - If a leader is known and its endpoints are in the voter set → start
Follower; otherwise startUnattached(we cannot fetch without an endpoint) and discover the leader via Fetch-to-bootstrap or an inbound BeginQuorumEpoch.
KafkaRaftClient.initialize() then short-circuits: a single-voter quorum becomes leader immediately by going Prospective→Candidate→Leader (KafkaRaftClient.java:567-571).
Pre-Vote and the Prospective role (KIP-996)
When a voter's fetch timeout expires, it does not immediately bump the epoch and become a candidate. Instead it transitions to Prospective (pollFollowerAsVoter → transitionToProspective, KafkaRaftClient.java:3326-3328) and sends Vote requests with PreVote=true at the current epoch. The vote request schema added this field at version 2 (VoteRequest.json:51; comment at VoteRequest.json:22: "Version 2 adds PreVote field and renames candidate to replica").
A pre-vote is granted only if the prospective's log is up-to-date (ProspectiveState.canGrantVote → unattachedOrProspectiveCanGrantVote, QuorumState.java:886), and crucially a follower will grant a pre-vote even though it has a leader as long as it has not yet successfully fetched from that leader (FollowerState.canGrantVote, FollowerState.java:228-231: isPreVote && !hasFetchedFromLeader && isLogUpToDate). Only when a majority grant the pre-vote does the prospective call transitionToCandidate, bumping the epoch (maybeTransitionToCandidate, KafkaRaftClient.java:688). On pre-vote loss/timeout it falls back to Follower (if a leader is known with endpoints) or Unattached (prospectiveTransitionAfterElectionLoss, KafkaRaftClient.java:3277).
Pre-vote (KIP-996) prevents a partitioned or bouncing voter from repeatedly incrementing the epoch and disrupting a healthy leader. The candidate canvasses for support at the current epoch before committing to a disruptive epoch bump. KIP-853 reuses the same PreVote flag to fence a removed voter from forcing elections. If any voter responds UNSUPPORTED_VERSION to a pre-vote (i.e. doesn't understand PreVote), the prospective falls back to a real candidacy immediately (handleVoteResponse, KafkaRaftClient.java:954-961).
Vote tallying
EpochElection (EpochElection.java) tracks each voter as UNRECORDED/GRANTED/REJECTED. The local node records a grant for itself on construction (e.g. CandidateState.java:77). isVoteGranted() is numGranted >= majoritySize where majoritySize = voterStates.size()/2 + 1; isVoteRejected() short-circuits the election when numGranted + numUnrecorded < majoritySize, i.e. winning is already impossible (EpochElection.java:85-97). On each response, handleVoteResponse records the grant/reject and calls maybeTransitionForward (Prospective→Candidate or Candidate→Leader) or maybeHandleElectionLoss (KafkaRaftClient.java:1006-1014).
Vote granting rules (handleVoteRequest)
For an incoming Vote (handleVoteRequest, KafkaRaftClient.java:829):
- Validate cluster id, topic-partition, and epoch sanity. For a standard vote, the request's epoch must exceed the log's last epoch (
lastEpoch >= replicaEpochis illegal); for pre-vote,lastEpoch > replicaEpochis illegal (KafkaRaftClient.java:861). - If
replicaEpoch > quorum.epoch()and it is a standard vote, transition to Unattached at the new epoch first (KafkaRaftClient.java:890). - Verify the request's voter key matches the local replica key (id + directory id) → else
INVALID_VOTER_KEY. - Grant only if the candidate's log is at least as up-to-date:
lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0(KafkaRaftClient.java:919-923). - For a granted standard vote, persist the vote by transitioning to Unattached/Prospective with the candidate's key (
unattachedAddVotedState/prospectiveAddVotedState).
The pull-based commit: how the leader advances the HWM
The leader holds a ReplicaState per voter. On each Fetch from a follower, updateReplicaState records the follower's fetch offset and timestamp (LeaderState.java:846), then maybeUpdateHighWatermark (LeaderState.java:727):
- Sort all voter
ReplicaStateby descending end offset. - Take the element at index
voterStates.size()/2, the offset that a majority (including the leader, which counts itself) has reached. - Apply the leader-epoch commit restriction: only advance the HWM past
epochStartOffset, i.e. the leader must have committed at least one record from its own epoch before exposing any earlier-epoch records (LeaderState.java:747). - The HWM is monotonic; a computed value below the current HWM is logged and ignored (it can only legitimately drop on a voter-set change), guarding against committed-data loss (
LeaderState.java:762-768).
A freshly-elected leader does not inherit the previous HWM (QuorumState.transitionToLeader comment, QuorumState.java:699-708). To advance the global HWM it must replicate a record from its new epoch to a majority. To force this without waiting for client writes, onBecomeLeader immediately appends a LeaderChange control record via appendStartOfEpochControlRecords (KafkaRaftClient.java:667, LeaderState.java:392).
The leader batch pipeline
Client appends (from the controller) enter via RaftClient.prepareAppend → append (KafkaRaftClient.java:3697), which writes into the LeaderState's BatchAccumulator. The accumulator coalesces records into batches under an appendLock (ReentrantLock), bounded by MAX_BATCH_SIZE_BYTES = 8 MiB and MAX_NUMBER_OF_BATCHES = 10 (KafkaRaftClient.java:170-173). A linger timer (default controller.quorum.append.linger.ms = 25) is tracked with an AtomicLong so the IO thread can read the deadline without locking (BatchAccumulator.SimpleTimer, BatchAccumulator.java:629).
On the next poll, pollLeader → maybeAppendBatches drains ready batches, appends them to the log as leader, parks each in the appendPurgatory awaiting commit, then flushLeaderLog updates the end offset and fsyncs (KafkaRaftClient.java:3082-3107). When the HWM later crosses a batch's last offset, the purgatory completes and the listener is notified (onUpdateLeaderHighWatermark → appendPurgatory.maybeComplete, KafkaRaftClient.java:383). The leader optimizes by routing the in-memory record objects straight to its own listener rather than re-reading from disk (ListenerContext.fireHandleCommit(...) overload, KafkaRaftClient.java:4073).
The follower fetch loop & log reconciliation
A follower builds a Fetch request carrying its log end offset, last fetched epoch, the current leader epoch, its directory id, and its current HWM (buildFetchRequest, KafkaRaftClient.java:2985). The leader validates the (fetchOffset, lastFetchedEpoch) against its log (validateOffsetAndEpoch). Three outcomes drive reconciliation in handleFetchResponse (KafkaRaftClient.java:1689):
- Diverging epoch: the leader returns a
divergingEpoch(epoch, endOffset). The follower truncates to that point,log.truncateToEndOffset, but never below its own HWM (a violation throwsKafkaException), then re-fetches (KafkaRaftClient.java:1742-1764). - Snapshot id: the follower's offset is below the leader's log start; the leader returns a
snapshotIdand the follower switches to FetchSnapshot (KafkaRaftClient.java:1765-1807). - Records: appended via
appendAsFollower, which writes to the log, fsyncs if the node is (or can become) a voter, updates the control-record state machine, and advances the follower HWM tomin(LEO, leaderHWM)(KafkaRaftClient.java:1808-1814,updateFollowerHighWatermark,KafkaRaftClient.java:341).
There is at most one in-flight Fetch per follower for its LEO. RequestManager tracks a single in-flight correlation id per connection and maybeSendFetchToBestNode only sends when !hasAnyInflightRequest (KafkaRaftClient.java:3410). Two concurrent Fetches could cause the follower to append the same offset twice, the comment at RequestManager.java:66-68 spells this out.
BeginQuorumEpoch & EndQuorumEpoch
After winning, the leader periodically sends BeginQuorumEpoch to voters that haven't fetched recently (maybeSendBeginQuorumEpochRequests, interval fetchTimeoutMs/2, LeaderState.java:156). It retries until each voter acknowledges (addAcknowledgementFrom) or a new election occurs. This is how a voter that was Unattached (because it didn't know the leader's endpoint) discovers and follows the new leader (maybeTransition, KafkaRaftClient.java:2691).
On graceful shutdown or forced resignation, the leader transitions to Resigned and sends EndQuorumEpoch to all voters (pollResigned, KafkaRaftClient.java:3141). The request includes a preferred successors list ordered by descending fetch offset (nonLeaderVotersByDescendingFetchOffset). A receiving follower shortens its fetch timeout using a strict-exponential backoff keyed on its position in that list, so the most caught-up voter is most likely to win the next election quickly (endEpochElectionBackoff / strictExponentialElectionBackoffMs, KafkaRaftClient.java:1320, :1058).
Snapshots (KIP-630)
Because the metadata log would grow unbounded, KRaft periodically snapshots state and truncates the log prefix. Snapshot files are named <20-digit-offset>-<10-digit-epoch>.checkpoint (Snapshots.filenameFromSnapshotId, Snapshots.java:63), e.g. 00000000000000001234-0000000007.checkpoint, living alongside the log segments. Partial downloads use .checkpoint.part; pending deletions use .checkpoint.deleted (Snapshots.java:40-41).
<offset>-<epoch>.checkpoint, in batch order. Identified by snapshotId = (endOffset, epoch); covers offsets [0, endOffset). The KRaftVersion and Voters batches are written only when kraft.version 1 (reconfiguration) is supported. Built by RecordsSnapshotWriter; voters/version are sourced from KRaftControlRecordStateMachine at the snapshot's last contained offset.Generating: a state-machine client calls RaftClient.createSnapshot(snapshotId, lastContainedLogTimestamp) (KafkaRaftClient.java:3798). The writer is wrapped in a NotifyingRawSnapshotWriter whose freeze callback trims the in-memory control-record history (partitionState.truncateOldEntries). The voter set and kraft.version embedded come from voterSetAtOffset/kraftVersionAtOffset at snapshotId.offset() - 1 (KafkaRaftClient.java:3821-3822).
Fetching: when a lagging follower is told to fetch a snapshot, it issues FetchSnapshot requests carrying the SnapshotId and a byte Position (FetchSnapshotRequest.json:41-50); the leader replies with a slice of the file as UnalignedRecords (records are not offset-aligned). The follower appends slices until sizeInBytes == size, freezes the snapshot, then fully truncates its log to the snapshot via log.truncateToLatestSnapshot() and reloads control-record state (handleFetchSnapshotResponse, KafkaRaftClient.java:2181-2210).
Loading on startup: KRaftControlRecordStateMachine.updateState() reads the latest snapshot first, then replays log batches up to the LEO (KRaftControlRecordStateMachine.java:116, :259), maintaining the invariant that this listener has always read to the LEO (described at KafkaRaftClient.java:202-213).
Retention & cleaning: a timer-driven RaftMetadataLogCleanerManager calls log.maybeClean() every 60 s (KafkaRaftClient.java:314). KafkaRaftLog.maybeClean deletes old snapshots/segments by retention size then retention time, but always keeps at least one snapshot and respects segment invariants (KafkaRaftLog.java:538-583). Snapshot generation cadence is governed by metadata.log.max.snapshot.interval.ms (default 1 hour) and metadata.log.max.record.bytes.between.snapshots (default 20 MiB), defined in MetadataLogConfig.java:38-41 (the controller decides when to call createSnapshot; see Log Management, Retention & Compaction).
Dynamic quorum reconfiguration (KIP-853)
Under kraft.version 1, the voter set can change at runtime without restarting controllers. Three RPCs are handled by dedicated handlers: AddRaftVoter, RemoveRaftVoter, UpdateRaftVoter. Each appends a new VotersRecord to the log; the change takes effect when the KRaftControlRecordStateMachine reads the (even uncommitted) record, and is finalized when it commits.
The add-voter algorithm (AddVoterHandler.java:45-64) is exacting:
- Reject if a voter change is already pending, or if the leader has no HWM yet (epoch not committed) →
REQUEST_TIMED_OUT. - Reject if the cluster doesn't support kraft.version 1 →
UNSUPPORTED_VERSION; if the latestVotersRecordisn't committed yet →REQUEST_TIMED_OUT; if the voter id already exists →DUPLICATE_VOTER. - Send
ApiVersionsto the new voter to learn its supported kraft.version range; abort if it doesn't support the finalized version, or if it isn't caught up to the leader's LEO (isReplicaCaughtUp, fetched within the last hour,LeaderState.java:487). - Append the updated
VotersRecord, then wait for it to commit using the new majority before acking (highWatermarkUpdated,AddVoterHandler.java:334).
KRaft adds/removes one voter at a time. A newly-added voter must be caught up before it joins, and the change must commit under the new majority. Removing a voter when the quorum is already at minimum availability can lose the quorum, the leader requires its current epoch to be committed (HWM established) before processing any membership change, which is the mechanism that prevents split-brain reconfiguration.
Observers configured with controller.quorum.auto.join.enable=true automatically send AddRaftVoter (or RemoveRaftVoter to evict a stale entry sharing their id) once reconfiguration is supported (pollFollowerAsObserver, KafkaRaftClient.java:3365). Voters also periodically send UpdateRaftVoter when their endpoints or supported-version range drift from what's recorded (shouldSendUpdateVoteRequest, KafkaRaftClient.java:3300). Upgrading kraft.version 0→1 is coordinated through LeaderState.maybeAppendUpgradedKRaftVersion using optimistic locking on an AtomicReference<KRaftVersionUpgrade> (LeaderState.java:107, :528), it requires every voter to support v1 and to have a known directory id.
Concurrency & threading
KRaft is overwhelmingly single-threaded by design. Almost all mutable state, QuorumState, every role object, RequestManager, LeaderState's replica map, the listener contexts, is touched only on the kafka-raft-io-thread. The crossing points are:
| Shared state | Guard | Other thread(s) |
|---|---|---|
| Inbound messages & responses | BlockingMessageQueue (LinkedBlockingQueue) + wakeup() | network threads enqueue; IO thread dequeues |
| Listener registration | ConcurrentLinkedQueue<Registration> drained in pollListeners | any thread calling register/unregister |
| Leader append buffer | BatchAccumulator.appendLock (ReentrantLock); linger via AtomicLong; completed is a ConcurrentLinkedQueue | controller threads calling prepareAppend |
| Resignation request | volatile boolean resignRequested (set via requestResign()) | external thread calling RaftClient.resign |
| Graceful shutdown flag | AtomicReference<GracefulShutdown> | thread calling shutdown() |
| kraft.version upgrade | AtomicReference<KRaftVersionUpgrade> with CAS | client calling upgradeKRaftVersion |
| Control-record history | synchronized(voterSetHistory) / synchronized(kraftVersionHistory) + volatile nextOffset | snapshot-freezing threads reading voterSetAtOffset |
| Listener offsets | synchronized(ListenerContext) for nextOffset/lastSent | the listener thread via BatchReader.onClose |
Fetch and append purgatories (ThresholdPurgatory<Long> backed by a TimingWheelExpirationService) complete futures from an expiration thread, but the success-path completion of a parked Fetch is always re-evaluated on the IO thread (handleFetchRequest's comment, KafkaRaftClient.java:1586-1589). The IO thread sleeps in messageQueue.poll(timeoutMs) where the timeout is the minimum of every role timer and the snapshot cleaner deadline; any state change that should shorten the sleep calls wakeup() (KafkaRaftClient.java:3645).
Configuration reference
All knobs are defined in QuorumConfig.java under the controller.quorum. prefix (snapshot knobs are in MetadataLogConfig).
| Key | Default | Effect |
|---|---|---|
controller.quorum.voters | empty | Static {id}@{host}:{port} voter map (legacy / kraft.version 0). Should NOT be set with dynamic quorums. |
controller.quorum.bootstrap.servers | empty | {host}:{port} endpoints for discovering the metadata quorum (dynamic quorums). Bootstrap nodes get decreasing negative ids (KafkaRaftClient.java:318). |
controller.quorum.election.timeout.ms | 1000 | Max time without fetching from the leader before starting an election. Actual timeout is randomized: t + random.nextInt(t) (QuorumState.java:745). |
controller.quorum.fetch.timeout.ms | 2000 | Follower: max time without a successful fetch before becoming Prospective. Leader: max time without a fetch from a majority before resigning (check-quorum = 1.5×, LeaderState.java:65). |
controller.quorum.election.backoff.max.ms | 1000 | Cap for the strict-exponential election backoff that prevents gridlocked elections. |
controller.quorum.append.linger.ms | 25 | How long the leader lets writes accumulate in the BatchAccumulator before flushing to disk. |
controller.quorum.request.timeout.ms | 2000 | RPC timeout used by RequestManager; a timed-out connection is reset. |
controller.quorum.retry.backoff.ms | 20 | Backoff after a failed RPC before the connection is retried. |
controller.quorum.auto.join.enable | false | If true, a controller observer auto-issues AddRaftVoter to join its cluster's quorum (KIP-853). |
controller.quorum.fetch.max.bytes | 1048576 | Max bytes per Fetch (always returns ≥1 batch). |
controller.quorum.fetch.snapshot.max.bytes | 1048576 | Max bytes per FetchSnapshot. |
metadata.log.max.snapshot.interval.ms | 3600000 | Max wall-clock between snapshot generations (MetadataLogConfig.java:38). |
metadata.log.max.record.bytes.between.snapshots | 20971520 | Max log bytes accumulated before a snapshot is generated. |
The defaults are deliberately aggressive (election ~1 s, fetch ~2 s). The QuorumConfig header explains the philosophy (QuorumConfig.java:42-50): leader changes should be quick because the KIP-631 controller standby is "hot", it has the metadata in memory and can take over without a cold reload.
Failure modes, edge cases & recovery
- Leader isolation (check-quorum): if a leader stops receiving fetches from a majority of voters within
1.5 × fetch.timeout.ms,timeUntilCheckQuorumExpireshits zero and the leader resigns (pollLeader,KafkaRaftClient.java:3176-3179). A single-voter quorum never expires this timer (LeaderState.java:225). - Stale leader endpoint: if a restarted voter knows a leader id but not its endpoint (e.g. leader not in the local static voter set), it starts
Unattachedand learns the endpoint from a BeginQuorumEpoch or a bootstrap Fetch rather than getting stuck (QuorumState.java:204-231). - Log divergence after election: handled inline on the Fetch path via diverging-epoch truncation; a follower will never truncate below its HWM (throws if asked to,
KafkaRaftClient.java:1749-1753). - Snapshot deleted mid-download: if the leader GC'd the snapshot, FetchSnapshot returns
SNAPSHOT_NOT_FOUND; the follower resets its fetching-snapshot state and retries from a plain Fetch (KafkaRaftClient.java:2122-2138). - Crash before flush: starting a former leader as
Resignedin the same epoch protects against re-using an (offset, epoch) if unflushed records were lost (QuorumState.java:173-189). - Graceful-shutdown timeout:
GracefulShutdownuses a timer; if a resigning leader can't hand off within the window the future completes exceptionally withTimeoutException(KafkaRaftClient.java:3931). A follower/observer or sole voter can shut down immediately (maybeCompleteShutdown,KafkaRaftClient.java:3604-3610). - Disruptive rejoining voter: pre-vote (above) prevents it from bumping the epoch; a partitioned voter spins in Prospective without disturbing the leader.
- Cluster-id mismatch: every RPC validates the cluster id and returns
INCONSISTENT_CLUSTER_IDon mismatch (e.g.handleVoteRequest,KafkaRaftClient.java:834).
Invariants & guarantees
- At most one leader per epoch. Epochs increase by exactly one per election; the durable quorum-state write before transition means a vote is never forgotten across a crash. Voted/leader state is retained when transitioning within the same epoch (
QuorumState.transitionToFollowerpreservesvotedKey,QuorumState.java:583). - Election safety via log up-to-dateness. A vote is granted only if the candidate's (lastEpoch, lastOffset) is ≥ the voter's, so any new leader has all committed records (
KafkaRaftClient.java:919). - Leader completeness via the epoch-start commit rule. The HWM cannot advance past prior-epoch records until the leader commits a record from its own epoch (
LeaderState.java:747). - Monotonic HWM. Both leader and follower reject non-monotonic HWM updates (
FollowerState.updateHighWatermarkthrows,FollowerState.java:190). - Records uniquely identified by (offset, epoch). Underpins truncation/reconciliation correctness.
- One in-flight Fetch per follower for its LEO (prevents double-append).
- Control-record listener is always read to the LEO, so the latest voter set and kraft.version are always current (
KafkaRaftClient.java:202).
Interactions with other subsystems
- The KRaft Controller registers a
RaftClient.Listener; it receiveshandleCommitfor committed metadata batches,handleSnapshot/handleLoadBootstrap, andhandleLeaderChange(fired only once the listener has caught up to the new epoch's start offset,KafkaRaftClient.java:4121-4137). The controller, in turn, decides when to callcreateSnapshotandupgradeKRaftVersion. - Metadata Propagation: brokers run a
KafkaRaftClientas an observer, fetching the same log to build theirMetadataImage. - The Log Storage Engine and Log Management:
KafkaRaftLogis a thinRaftLogover the same segment storage, batch format, and indexes used by ordinary partitions. - The Fetch Path and Replication, ISR & HWM: KRaft reuses the Fetch RPC and a HWM/commit concept analogous to ISR replication, but the quorum here is a strict majority of voters, not an ISR.
- Wire Protocol & RPC Framework: the Vote/BeginQuorumEpoch/EndQuorumEpoch/FetchSnapshot/Add|Remove|UpdateRaftVoter RPCs are all defined as message JSON and listed on the
controllerlistener. - Network Layer & Threading: inbound requests arrive via
KafkaRaftClientDriver.handleRequestand are placed on the message queue; outbound requests go through aKafkaNetworkChannel.
RPC summary
| API | ApiKey | Direction | Purpose |
|---|---|---|---|
Vote | 52 | nominee → voters | Request a (pre-)vote; carries last (offset, epoch) and PreVote flag (v2). |
BeginQuorumEpoch | 53 | leader → voters | Announce leadership of a new epoch; carries leader endpoints (v1). |
EndQuorumEpoch | 54 | leader → voters | Graceful resignation + preferred-successor ordering. |
Fetch | 1 | follower/observer → leader | Pull records; piggybacks leader/epoch, HWM, divergence & snapshot id. |
FetchSnapshot | 59 | follower → leader | Download a snapshot by (snapshotId, byte position). |
DescribeQuorum | , | admin → leader | Report voter/observer replication state. |
AddRaftVoter / RemoveRaftVoter / UpdateRaftVoter | , | client/observer → leader | Dynamic reconfiguration (KIP-853). |
Design rationale & evolution
KRaft has accreted across several KIPs, all visible in the code and message schemas:
- KIP-595 The core Raft protocol for the metadata quorum, pull-based fetch, log reconciliation, BeginQuorumEpoch.
- KIP-630 Snapshots: the
FetchSnapshotRPC,.checkpointfiles, and the SnapshotHeader/Footer control records. - KIP-853 Dynamic KRaft quorums: directory ids in
ReplicaKey/VotersRecord, leader endpoints in responses, AddVoter/RemoveVoter/UpdateVoter, and kraft.version 1 (the JSON comments cite KIP-853 explicitly, e.g.VoteRequest.json:21). - KIP-996 Pre-Vote: the
Prospectiverole and thePreVotefield, preventing disruptive epoch bumps. - KIP-631 The quorum-controller design that motivates the low default timeouts (hot standby).
"Voter" is not a config you flip per node at runtime; it is whether your ReplicaKey appears in the latest committed (or even uncommitted) VotersRecord. A node whose broker id is in the voter set but with a different directory id is treated as an observer (the directory ids don't match in VoterSet.isVoter), which is exactly why auto-join may issue a RemoveRaftVoter for the stale entry before adding itself (KafkaRaftClient.java:3375-3385). Mixing the legacy controller.quorum.voters with dynamic quorums is unsupported and the config doc warns against it.
The quorum-state file only stores leader/epoch/vote, under kraft.version 1 it no longer stores the voter set (the CurrentVoters field is version-0 only). The authoritative voter set lives in the metadata log/snapshot. Hand-editing quorum-state to "fix" membership will not work and risks violating the at-most-one-leader-per-epoch invariant.