krivaltsevich.com Kafka Internals4.4

10 · KRaft Consensus (Raft)

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

KRaft is Kafka's hand-rolled implementation of the Raft consensus algorithm, used to replicate the cluster's metadata log across a quorum of controllers. Its centerpiece, KafkaRaftClient, is a single-threaded, non-blocking event loop that drives a precise state machine through the roles Unattached, Voted, Prospective, Candidate, Follower, Leader and Resigned. Unlike textbook Raft, replication is pull-based: followers and observers issue Fetch requests to the leader, which never pushes data. This chapter dissects the poll loop, leader election (with KIP-996 pre-vote), the persisted quorum-state file, the high-watermark commit rule, snapshots (KIP-630), and dynamic quorum reconfiguration (KIP-853), all grounded in the raft/ module source.

Role & responsibilities

KRaft replaces ZooKeeper (removed entirely since 4.0) as the system of record for cluster metadata. A small set of controllers form a voter quorum that runs Raft over a single topic partition, __cluster_metadata-0. The Raft library here is generic over a record type T (KafkaRaftClient<T> implements RaftClient<T>); the controller plugs in its own RecordSerde and a RaftClient.Listener to receive committed batches. See The KRaft Controller for the state machine that consumes this log and Metadata Propagation & Broker Lifecycle for how brokers (as observers) replicate it.

The class header itself frames the design (raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:129): "Leader election is more or less pure Raft, but replication is driven by replica fetching and we use Kafka's log reconciliation protocol to truncate the log to a common point following each leader election." KRaft reuses Kafka's existing log/segment storage, record-batch format, and Fetch RPC, layering Raft semantics on top.

Key idea

Voters take part in elections and are the only nodes eligible to handle Vote/BeginQuorumEpoch/EndQuorumEpoch requests. Observers (brokers, and controllers being added) never vote; they only fetch. Whether a node is a voter is not a static flag but is derived continuously from the latest VoterSet in the replicated log (QuorumState.isVoter(), QuorumState.java:332).

Where it lives in the code

ConcernClassFile
Event-loop Raft state machineKafkaRaftClient<T>raft/.../raft/KafkaRaftClient.java
Single-threaded driver (the IO thread)KafkaRaftClientDriver<T>raft/.../raft/KafkaRaftClientDriver.java
Role/epoch state & transition rulesQuorumStateraft/.../raft/QuorumState.java
Per-role state objectsEpochState impls: UnattachedState, ProspectiveState, CandidateState, FollowerState, LeaderState<T>, ResignedStateraft/.../raft/*State.java
Election vote tallyEpochElection (+ NomineeState)raft/.../raft/internals/EpochElection.java
Voter membershipVoterSet, ReplicaKey, Endpointsraft/.../raft/VoterSet.java
Persisted election stateFileQuorumStateStore, ElectionStateraft/.../raft/FileQuorumStateStore.java
Leader batch buildingBatchAccumulator<T>, BatchBuilder<T>raft/.../raft/internals/BatchAccumulator.java
Control-record tracking (voters, kraft.version)KRaftControlRecordStateMachine, VoterSetHistoryraft/.../raft/internals/KRaftControlRecordStateMachine.java
Connection backoff & in-flight trackingRequestManagerraft/.../raft/RequestManager.java
Snapshots (file naming, read/write)Snapshots, RecordsSnapshotWriter, FileRawSnapshotReaderraft/.../snapshot/*.java
Log over segment storageKafkaRaftLog (implements RaftLog)raft/.../raft/internals/KafkaRaftLog.java
Reconfiguration handlersAddVoterHandler, RemoveVoterHandler, UpdateVoterHandlerraft/.../raft/internals/*VoterHandler.java
ConfigurationQuorumConfigraft/.../raft/QuorumConfig.java

Core concepts & terminology

Voter vs Observer
A voter participates in elections and quorum commit; an observer only fetches. Membership comes from the latest VoterSet. Brokers are always observers; controllers are normally voters.
Epoch
The Raft term, a monotonically increasing integer. Each election bumps the epoch by exactly one (QuorumState.transitionToCandidate(), QuorumState.java:641: int newEpoch = epoch() + 1). Records are uniquely identified by (offset, epoch).
ReplicaKey
A pair of id (int) and an optional directory Uuid (the node's log directory). KIP-853 added directory ids so that a re-formatted node with the same broker id is treated as a distinct voter.
High watermark (HWM)
The largest offset replicated to a majority of voters; the commit point. Exposed via RaftClient.highWatermark().
Leader endpoints
The advertised listener addresses of the current leader, carried in RPC responses so observers and unattached voters can discover where to fetch.
kraft.version
A feature level: KRAFT_VERSION_0 (static voters, no reconfiguration) or KRAFT_VERSION_1 (dynamic quorums). isReconfigSupported() returns true for v1+ (server-common/.../KRaftVersion.java:59).
Bootstrap snapshot
The synthetic snapshot id (offset=0, epoch=0) (BOOTSTRAP_SNAPSHOT_ID, Snapshots.java:49) used to seed initial cluster state; it is never replicated over the wire.

Data structures

In-memory: the role objects

All six role classes implement EpochState; the current one is held in a single volatile EpochState state field inside QuorumState (QuorumState.java:99). Each role carries the data relevant to it:

RoleKey fieldsTimer
UnattachedStateepoch, OptionalInt leaderId, Optional<ReplicaKey> votedKey, voter id set, HWMelection timer
ProspectiveStateas Unattached + VoterSet voters, EpochElection epochElection (pre-vote tally)election timer
CandidateStatelocalId, localDirectoryId, epoch, EpochElection epochElectionelection timer
FollowerStateepoch, leaderId, Endpoints leaderEndpoints, votedKey, HWM, hasFetchedFromLeader, fetchingSnapshotfetch timer, update-voter-set timer
LeaderState<T>epoch, epochStartOffset, grantingVoters, per-replica ReplicaState map, BatchAccumulator, HWMcheck-quorum timer, begin-quorum-epoch timer
ResignedStatelocalId, epoch, unackedVoters, preferredSuccessorselection timer

Notably there is no separate VotedState class. A binding vote is represented as an UnattachedState (or ProspectiveState) carrying a non-empty votedKey (see QuorumState.unattachedAddVotedState(), QuorumState.java:428 and isUnattachedAndVoted(), QuorumState.java:850). The "Voted" role in the protocol is thus Unattached-with-a-vote.

The leader's per-replica view is LeaderState.ReplicaState (LeaderState.java:978): replicaKey, Endpoints listeners, Optional<LogOffsetMetadata> endOffset, lastFetchTimestamp, lastFetchLeaderLogEndOffset, lastCaughtUpTimestamp, and hasAcknowledgedLeader. ReplicaState is Comparable, sorted by descending end offset, this is how the leader finds the majority offset for the HWM.

Persistent: the quorum-state file

Election state survives restarts in a small JSON file named quorum-state (FileQuorumStateStore.DEFAULT_FILE_NAME, FileQuorumStateStore.java:80) inside the metadata log directory. Its schema is QuorumStateData (raft/src/main/resources/common/message/QuorumStateData.json), with a data_version field appended for forward-compat. The version is tied to kraft.version via KRaftVersion.quorumStateVersion().

FieldTypeVersionsMeaning
ClusterIdstring0 onlyCluster id (dropped in v1)
LeaderIdint320+Elected leader id, default -1
LeaderEpochint320+Current epoch, default -1
VotedIdint320+Id this replica voted for, -1 if none
VotedDirectoryIduuid1+Directory id of the candidate voted for (KIP-853)
AppliedOffsetint640 onlyLegacy; unused in v1
CurrentVoters[]Voter0 onlyLegacy static voter list; v1 stores voters in the log instead
Invariant

The quorum-state file is written before the in-memory transition takes effect (QuorumState.durableTransitionTo(), QuorumState.java:729): it calls store.writeElectionState(...) then memoryTransitionTo(...). The write is durable, FileQuorumStateStore writes to a .tmp file, calls fileOutputStream.getFD().sync(), then atomically renames (FileQuorumStateStore.java:181-183). This guarantees that once a replica has voted (or become a candidate at epoch N), that fact cannot be lost across a crash, preserving "at most one vote per epoch".

Note the Resigned and Prospective states do not trigger a durable write: transitionToResigned and transitionToProspective use memoryTransitionTo directly (QuorumState.java:358, :622), because Resigned does not change the persisted ElectionState and Prospective is a pre-vote phase that has not yet bumped the epoch.

On-wire: voter membership records

Under kraft.version 1, the voter set lives in the log as control records, not in the quorum file. The VotersRecord (clients/.../message/VotersRecord.json) carries, per voter: VoterId, VoterDirectoryId (uuid), a list of Endpoints (name/host/port), and a KRaftVersionFeature range (min/max supported version). A companion KRaftVersionRecord records the finalized feature level. These are read back by KRaftControlRecordStateMachine.handleBatch() (KRaftControlRecordStateMachine.java:301).

Architecture & control/data flow

Inbound RPCsnetwork threads
BlockingMessageQueueLinkedBlockingQueue · inbound concurrency boundary
KafkaRaftClientDriverkafka-raft-io-thread · loop: client.poll()
1 · maybeCompleteShutdownKafkaRaftClient.poll()
2 · pollCurrentStatepollLeader / pollFollower / pollCandidate …
3 · snapshotCleaner.maybeClean
4 · messageQueue.poll(timeout)→ handleInboundMessage
5 · pollListenershandleCommit / handleLeaderChange / snapshot
Remote voters / leader
RaftClient.Listenercontroller / broker state machine
The single-threaded poll loop. All Raft state is mutated only on the kafka-raft-io-thread; the message queue is the sole inbound concurrency boundary. Step 2 emits outbound RPCs; step 5 delivers committed records to the listener.
broker / IO-thread component queue · waiting boundary remote quorum peer state-machine listener cylinder = queue / store poll step / data flow async hand-off across threads label = action on the edge

The driver is a ShutdownableThread whose name is threadNamePrefix + "-io-thread" (KafkaRaftClientDriver.java:55); the default prefix is kafka-raft (core/.../KafkaRaftManager.scala:104), giving the thread name kafka-raft-io-thread. Its doWork() simply calls client.poll() in a loop; any uncaught exception is routed to a fatal FaultHandler (KafkaRaftClientDriver.java:62-67), which typically halts the process.

The pull-based replication contrast

LeaderFollower
AppendEntries(records)
ack
heartbeats, assert leadership
Classic Raft (push). The leader drives replication: it sends AppendEntries to followers and uses empty AppendEntries as periodic heartbeats to assert leadership.
Raft voter (leader / follower) request / records ack · periodic heartbeat arrow points from → to over time (top → bottom)
FollowerLeader
Fetch(offset, epoch)
records + HWM + leaderEpoch
BeginQuorumEpoch, once per epoch, announces leader
KRaft (pull) inverts the data direction: the leader pushes nothing and only answers Fetch. BeginQuorumEpoch substitutes for the empty-AppendEntries heartbeat, announcing a new leader once per epoch.
Raft voter (leader / follower) Fetch request / fetch response leader announcement (BeginQuorumEpoch) arrow points from → to over time (top → bottom)
Design rationale

Reusing Kafka's Fetch path lets the metadata quorum share the broker's mature replication machinery (log segments, batch format, zero-copy sends, fetch purgatory). Because the leader never pushes, it needs a separate way to announce itself after an election, hence BeginQuorumEpoch (KafkaRaftClient.java:144). The overall protocol is KIP-595 ("A Raft Protocol for the Metadata Quorum"); pull-based fetch and log reconciliation are its defining deviations from Diego Ongaro's Raft.

The state machine in detail

initialize Unattached election timeout Prospective majority PreVotes granted (epoch++) Candidate majority votes granted Leader graceful shutdown Resigned election timeout (epoch++) → Unattached
Branch transitions (off the main spine):
· Unattached → Follower, a leader with ≥ epoch and a known endpoint is learned
· Prospective → Follower, PreVote lost but a last-known leader exists
· Prospective → Unattached, PreVote lost and no leader is known
· Candidate → Prospective, election loss or timeout
· Follower → Prospective, fetch timeout
Cross-cutting rules. Any role steps back to Unattached (or Follower) on learning a higher epoch. A Resigned leader sends EndQuorumEpoch to voters. Observers use only Unattached ⇄ Follower.
Voter state machine (simplified). The vertical spine is a voter's election happy-path; branch transitions and the higher-epoch fallback are noted below it. Full rules: QuorumState.java:35-83.
pill = Raft role / state accent = Leader warn = Resigned (stepping down) ◉ = initial state (after initialize()) transition (label = trigger) A → B = off-spine transition (see notes)

Initialization

QuorumState.initialize() (QuorumState.java:141) reads the persisted ElectionState and reconstructs a starting role:

  • If the stored epoch is below the log's last epoch, the file is stale → start Unattached at the log's epoch.
  • If we were the leader → start Resigned in the same epoch. This is deliberate (QuorumState.java:173-189): it prevents voting for someone else in that epoch and protects the (offset, epoch) uniqueness invariant if unflushed data was lost.
  • If we were a voted candidate for ourselves → start Candidate.
  • If a leader is known and its endpoints are in the voter set → start Follower; otherwise start Unattached (we cannot fetch without an endpoint) and discover the leader via Fetch-to-bootstrap or an inbound BeginQuorumEpoch.

KafkaRaftClient.initialize() then short-circuits: a single-voter quorum becomes leader immediately by going Prospective→Candidate→Leader (KafkaRaftClient.java:567-571).

Pre-Vote and the Prospective role (KIP-996)

When a voter's fetch timeout expires, it does not immediately bump the epoch and become a candidate. Instead it transitions to Prospective (pollFollowerAsVotertransitionToProspective, KafkaRaftClient.java:3326-3328) and sends Vote requests with PreVote=true at the current epoch. The vote request schema added this field at version 2 (VoteRequest.json:51; comment at VoteRequest.json:22: "Version 2 adds PreVote field and renames candidate to replica").

A pre-vote is granted only if the prospective's log is up-to-date (ProspectiveState.canGrantVoteunattachedOrProspectiveCanGrantVote, QuorumState.java:886), and crucially a follower will grant a pre-vote even though it has a leader as long as it has not yet successfully fetched from that leader (FollowerState.canGrantVote, FollowerState.java:228-231: isPreVote && !hasFetchedFromLeader && isLogUpToDate). Only when a majority grant the pre-vote does the prospective call transitionToCandidate, bumping the epoch (maybeTransitionToCandidate, KafkaRaftClient.java:688). On pre-vote loss/timeout it falls back to Follower (if a leader is known with endpoints) or Unattached (prospectiveTransitionAfterElectionLoss, KafkaRaftClient.java:3277).

Design rationale

Pre-vote (KIP-996) prevents a partitioned or bouncing voter from repeatedly incrementing the epoch and disrupting a healthy leader. The candidate canvasses for support at the current epoch before committing to a disruptive epoch bump. KIP-853 reuses the same PreVote flag to fence a removed voter from forcing elections. If any voter responds UNSUPPORTED_VERSION to a pre-vote (i.e. doesn't understand PreVote), the prospective falls back to a real candidacy immediately (handleVoteResponse, KafkaRaftClient.java:954-961).

Vote tallying

EpochElection (EpochElection.java) tracks each voter as UNRECORDED/GRANTED/REJECTED. The local node records a grant for itself on construction (e.g. CandidateState.java:77). isVoteGranted() is numGranted >= majoritySize where majoritySize = voterStates.size()/2 + 1; isVoteRejected() short-circuits the election when numGranted + numUnrecorded < majoritySize, i.e. winning is already impossible (EpochElection.java:85-97). On each response, handleVoteResponse records the grant/reject and calls maybeTransitionForward (Prospective→Candidate or Candidate→Leader) or maybeHandleElectionLoss (KafkaRaftClient.java:1006-1014).

Vote granting rules (handleVoteRequest)

For an incoming Vote (handleVoteRequest, KafkaRaftClient.java:829):

  1. Validate cluster id, topic-partition, and epoch sanity. For a standard vote, the request's epoch must exceed the log's last epoch (lastEpoch >= replicaEpoch is illegal); for pre-vote, lastEpoch > replicaEpoch is illegal (KafkaRaftClient.java:861).
  2. If replicaEpoch > quorum.epoch() and it is a standard vote, transition to Unattached at the new epoch first (KafkaRaftClient.java:890).
  3. Verify the request's voter key matches the local replica key (id + directory id) → else INVALID_VOTER_KEY.
  4. Grant only if the candidate's log is at least as up-to-date: lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0 (KafkaRaftClient.java:919-923).
  5. For a granted standard vote, persist the vote by transitioning to Unattached/Prospective with the candidate's key (unattachedAddVotedState / prospectiveAddVotedState).

The pull-based commit: how the leader advances the HWM

The leader holds a ReplicaState per voter. On each Fetch from a follower, updateReplicaState records the follower's fetch offset and timestamp (LeaderState.java:846), then maybeUpdateHighWatermark (LeaderState.java:727):

  1. Sort all voter ReplicaState by descending end offset.
  2. Take the element at index voterStates.size()/2, the offset that a majority (including the leader, which counts itself) has reached.
  3. Apply the leader-epoch commit restriction: only advance the HWM past epochStartOffset, i.e. the leader must have committed at least one record from its own epoch before exposing any earlier-epoch records (LeaderState.java:747).
  4. The HWM is monotonic; a computed value below the current HWM is logged and ignored (it can only legitimately drop on a voter-set change), guarding against committed-data loss (LeaderState.java:762-768).
Invariant

A freshly-elected leader does not inherit the previous HWM (QuorumState.transitionToLeader comment, QuorumState.java:699-708). To advance the global HWM it must replicate a record from its new epoch to a majority. To force this without waiting for client writes, onBecomeLeader immediately appends a LeaderChange control record via appendStartOfEpochControlRecords (KafkaRaftClient.java:667, LeaderState.java:392).

The leader batch pipeline

Client appends (from the controller) enter via RaftClient.prepareAppendappend (KafkaRaftClient.java:3697), which writes into the LeaderState's BatchAccumulator. The accumulator coalesces records into batches under an appendLock (ReentrantLock), bounded by MAX_BATCH_SIZE_BYTES = 8 MiB and MAX_NUMBER_OF_BATCHES = 10 (KafkaRaftClient.java:170-173). A linger timer (default controller.quorum.append.linger.ms = 25) is tracked with an AtomicLong so the IO thread can read the deadline without locking (BatchAccumulator.SimpleTimer, BatchAccumulator.java:629).

On the next poll, pollLeadermaybeAppendBatches drains ready batches, appends them to the log as leader, parks each in the appendPurgatory awaiting commit, then flushLeaderLog updates the end offset and fsyncs (KafkaRaftClient.java:3082-3107). When the HWM later crosses a batch's last offset, the purgatory completes and the listener is notified (onUpdateLeaderHighWatermarkappendPurgatory.maybeComplete, KafkaRaftClient.java:383). The leader optimizes by routing the in-memory record objects straight to its own listener rather than re-reading from disk (ListenerContext.fireHandleCommit(...) overload, KafkaRaftClient.java:4073).

The follower fetch loop & log reconciliation

A follower builds a Fetch request carrying its log end offset, last fetched epoch, the current leader epoch, its directory id, and its current HWM (buildFetchRequest, KafkaRaftClient.java:2985). The leader validates the (fetchOffset, lastFetchedEpoch) against its log (validateOffsetAndEpoch). Three outcomes drive reconciliation in handleFetchResponse (KafkaRaftClient.java:1689):

  • Diverging epoch: the leader returns a divergingEpoch (epoch, endOffset). The follower truncates to that point, log.truncateToEndOffset, but never below its own HWM (a violation throws KafkaException), then re-fetches (KafkaRaftClient.java:1742-1764).
  • Snapshot id: the follower's offset is below the leader's log start; the leader returns a snapshotId and the follower switches to FetchSnapshot (KafkaRaftClient.java:1765-1807).
  • Records: appended via appendAsFollower, which writes to the log, fsyncs if the node is (or can become) a voter, updates the control-record state machine, and advances the follower HWM to min(LEO, leaderHWM) (KafkaRaftClient.java:1808-1814, updateFollowerHighWatermark, KafkaRaftClient.java:341).
Invariant

There is at most one in-flight Fetch per follower for its LEO. RequestManager tracks a single in-flight correlation id per connection and maybeSendFetchToBestNode only sends when !hasAnyInflightRequest (KafkaRaftClient.java:3410). Two concurrent Fetches could cause the follower to append the same offset twice, the comment at RequestManager.java:66-68 spells this out.

BeginQuorumEpoch & EndQuorumEpoch

After winning, the leader periodically sends BeginQuorumEpoch to voters that haven't fetched recently (maybeSendBeginQuorumEpochRequests, interval fetchTimeoutMs/2, LeaderState.java:156). It retries until each voter acknowledges (addAcknowledgementFrom) or a new election occurs. This is how a voter that was Unattached (because it didn't know the leader's endpoint) discovers and follows the new leader (maybeTransition, KafkaRaftClient.java:2691).

On graceful shutdown or forced resignation, the leader transitions to Resigned and sends EndQuorumEpoch to all voters (pollResigned, KafkaRaftClient.java:3141). The request includes a preferred successors list ordered by descending fetch offset (nonLeaderVotersByDescendingFetchOffset). A receiving follower shortens its fetch timeout using a strict-exponential backoff keyed on its position in that list, so the most caught-up voter is most likely to win the next election quickly (endEpochElectionBackoff / strictExponentialElectionBackoffMs, KafkaRaftClient.java:1320, :1058).

Snapshots (KIP-630)

Because the metadata log would grow unbounded, KRaft periodically snapshots state and truncates the log prefix. Snapshot files are named <20-digit-offset>-<10-digit-epoch>.checkpoint (Snapshots.filenameFromSnapshotId, Snapshots.java:63), e.g. 00000000000000001234-0000000007.checkpoint, living alongside the log segments. Partial downloads use .checkpoint.part; pending deletions use .checkpoint.deleted (Snapshots.java:40-41).

SnapshotHeaderRecordcontrol batch · lastContainedLogTime
KRaftVersionRecordcontrol batch · finalized kraft.version (v1+)
VotersRecordcontrol batch · voter set at snapshot (v1+)
state-machine recordsdata batches · serialized state
SnapshotFooterRecordcontrol batch
Snapshot file <offset>-<epoch>.checkpoint, in batch order. Identified by snapshotId = (endOffset, epoch); covers offsets [0, endOffset). The KRaftVersion and Voters batches are written only when kraft.version 1 (reconfiguration) is supported. Built by RecordsSnapshotWriter; voters/version are sourced from KRaftControlRecordStateMachine at the snapshot's last contained offset.
each cell = one record batch (left → right = on-disk order) bold = record / batch name small text = batch kind & what it carries cell width ∝ relative footprint (data batches dominate) colour distinguishes adjacent batches only, not a semantic category

Generating: a state-machine client calls RaftClient.createSnapshot(snapshotId, lastContainedLogTimestamp) (KafkaRaftClient.java:3798). The writer is wrapped in a NotifyingRawSnapshotWriter whose freeze callback trims the in-memory control-record history (partitionState.truncateOldEntries). The voter set and kraft.version embedded come from voterSetAtOffset/kraftVersionAtOffset at snapshotId.offset() - 1 (KafkaRaftClient.java:3821-3822).

Fetching: when a lagging follower is told to fetch a snapshot, it issues FetchSnapshot requests carrying the SnapshotId and a byte Position (FetchSnapshotRequest.json:41-50); the leader replies with a slice of the file as UnalignedRecords (records are not offset-aligned). The follower appends slices until sizeInBytes == size, freezes the snapshot, then fully truncates its log to the snapshot via log.truncateToLatestSnapshot() and reloads control-record state (handleFetchSnapshotResponse, KafkaRaftClient.java:2181-2210).

Loading on startup: KRaftControlRecordStateMachine.updateState() reads the latest snapshot first, then replays log batches up to the LEO (KRaftControlRecordStateMachine.java:116, :259), maintaining the invariant that this listener has always read to the LEO (described at KafkaRaftClient.java:202-213).

Retention & cleaning: a timer-driven RaftMetadataLogCleanerManager calls log.maybeClean() every 60 s (KafkaRaftClient.java:314). KafkaRaftLog.maybeClean deletes old snapshots/segments by retention size then retention time, but always keeps at least one snapshot and respects segment invariants (KafkaRaftLog.java:538-583). Snapshot generation cadence is governed by metadata.log.max.snapshot.interval.ms (default 1 hour) and metadata.log.max.record.bytes.between.snapshots (default 20 MiB), defined in MetadataLogConfig.java:38-41 (the controller decides when to call createSnapshot; see Log Management, Retention & Compaction).

Dynamic quorum reconfiguration (KIP-853)

Under kraft.version 1, the voter set can change at runtime without restarting controllers. Three RPCs are handled by dedicated handlers: AddRaftVoter, RemoveRaftVoter, UpdateRaftVoter. Each appends a new VotersRecord to the log; the change takes effect when the KRaftControlRecordStateMachine reads the (even uncommitted) record, and is finalized when it commits.

The add-voter algorithm (AddVoterHandler.java:45-64) is exacting:

  1. Reject if a voter change is already pending, or if the leader has no HWM yet (epoch not committed) → REQUEST_TIMED_OUT.
  2. Reject if the cluster doesn't support kraft.version 1 → UNSUPPORTED_VERSION; if the latest VotersRecord isn't committed yet → REQUEST_TIMED_OUT; if the voter id already exists → DUPLICATE_VOTER.
  3. Send ApiVersions to the new voter to learn its supported kraft.version range; abort if it doesn't support the finalized version, or if it isn't caught up to the leader's LEO (isReplicaCaughtUp, fetched within the last hour, LeaderState.java:487).
  4. Append the updated VotersRecord, then wait for it to commit using the new majority before acking (highWatermarkUpdated, AddVoterHandler.java:334).
Caution

KRaft adds/removes one voter at a time. A newly-added voter must be caught up before it joins, and the change must commit under the new majority. Removing a voter when the quorum is already at minimum availability can lose the quorum, the leader requires its current epoch to be committed (HWM established) before processing any membership change, which is the mechanism that prevents split-brain reconfiguration.

Observers configured with controller.quorum.auto.join.enable=true automatically send AddRaftVoter (or RemoveRaftVoter to evict a stale entry sharing their id) once reconfiguration is supported (pollFollowerAsObserver, KafkaRaftClient.java:3365). Voters also periodically send UpdateRaftVoter when their endpoints or supported-version range drift from what's recorded (shouldSendUpdateVoteRequest, KafkaRaftClient.java:3300). Upgrading kraft.version 0→1 is coordinated through LeaderState.maybeAppendUpgradedKRaftVersion using optimistic locking on an AtomicReference<KRaftVersionUpgrade> (LeaderState.java:107, :528), it requires every voter to support v1 and to have a known directory id.

Concurrency & threading

KRaft is overwhelmingly single-threaded by design. Almost all mutable state, QuorumState, every role object, RequestManager, LeaderState's replica map, the listener contexts, is touched only on the kafka-raft-io-thread. The crossing points are:

Shared stateGuardOther thread(s)
Inbound messages & responsesBlockingMessageQueue (LinkedBlockingQueue) + wakeup()network threads enqueue; IO thread dequeues
Listener registrationConcurrentLinkedQueue<Registration> drained in pollListenersany thread calling register/unregister
Leader append bufferBatchAccumulator.appendLock (ReentrantLock); linger via AtomicLong; completed is a ConcurrentLinkedQueuecontroller threads calling prepareAppend
Resignation requestvolatile boolean resignRequested (set via requestResign())external thread calling RaftClient.resign
Graceful shutdown flagAtomicReference<GracefulShutdown>thread calling shutdown()
kraft.version upgradeAtomicReference<KRaftVersionUpgrade> with CASclient calling upgradeKRaftVersion
Control-record historysynchronized(voterSetHistory) / synchronized(kraftVersionHistory) + volatile nextOffsetsnapshot-freezing threads reading voterSetAtOffset
Listener offsetssynchronized(ListenerContext) for nextOffset/lastSentthe listener thread via BatchReader.onClose

Fetch and append purgatories (ThresholdPurgatory<Long> backed by a TimingWheelExpirationService) complete futures from an expiration thread, but the success-path completion of a parked Fetch is always re-evaluated on the IO thread (handleFetchRequest's comment, KafkaRaftClient.java:1586-1589). The IO thread sleeps in messageQueue.poll(timeoutMs) where the timeout is the minimum of every role timer and the snapshot cleaner deadline; any state change that should shorten the sleep calls wakeup() (KafkaRaftClient.java:3645).

Configuration reference

All knobs are defined in QuorumConfig.java under the controller.quorum. prefix (snapshot knobs are in MetadataLogConfig).

KeyDefaultEffect
controller.quorum.votersemptyStatic {id}@{host}:{port} voter map (legacy / kraft.version 0). Should NOT be set with dynamic quorums.
controller.quorum.bootstrap.serversempty{host}:{port} endpoints for discovering the metadata quorum (dynamic quorums). Bootstrap nodes get decreasing negative ids (KafkaRaftClient.java:318).
controller.quorum.election.timeout.ms1000Max time without fetching from the leader before starting an election. Actual timeout is randomized: t + random.nextInt(t) (QuorumState.java:745).
controller.quorum.fetch.timeout.ms2000Follower: max time without a successful fetch before becoming Prospective. Leader: max time without a fetch from a majority before resigning (check-quorum = 1.5×, LeaderState.java:65).
controller.quorum.election.backoff.max.ms1000Cap for the strict-exponential election backoff that prevents gridlocked elections.
controller.quorum.append.linger.ms25How long the leader lets writes accumulate in the BatchAccumulator before flushing to disk.
controller.quorum.request.timeout.ms2000RPC timeout used by RequestManager; a timed-out connection is reset.
controller.quorum.retry.backoff.ms20Backoff after a failed RPC before the connection is retried.
controller.quorum.auto.join.enablefalseIf true, a controller observer auto-issues AddRaftVoter to join its cluster's quorum (KIP-853).
controller.quorum.fetch.max.bytes1048576Max bytes per Fetch (always returns ≥1 batch).
controller.quorum.fetch.snapshot.max.bytes1048576Max bytes per FetchSnapshot.
metadata.log.max.snapshot.interval.ms3600000Max wall-clock between snapshot generations (MetadataLogConfig.java:38).
metadata.log.max.record.bytes.between.snapshots20971520Max log bytes accumulated before a snapshot is generated.
Note

The defaults are deliberately aggressive (election ~1 s, fetch ~2 s). The QuorumConfig header explains the philosophy (QuorumConfig.java:42-50): leader changes should be quick because the KIP-631 controller standby is "hot", it has the metadata in memory and can take over without a cold reload.

Failure modes, edge cases & recovery

  • Leader isolation (check-quorum): if a leader stops receiving fetches from a majority of voters within 1.5 × fetch.timeout.ms, timeUntilCheckQuorumExpires hits zero and the leader resigns (pollLeader, KafkaRaftClient.java:3176-3179). A single-voter quorum never expires this timer (LeaderState.java:225).
  • Stale leader endpoint: if a restarted voter knows a leader id but not its endpoint (e.g. leader not in the local static voter set), it starts Unattached and learns the endpoint from a BeginQuorumEpoch or a bootstrap Fetch rather than getting stuck (QuorumState.java:204-231).
  • Log divergence after election: handled inline on the Fetch path via diverging-epoch truncation; a follower will never truncate below its HWM (throws if asked to, KafkaRaftClient.java:1749-1753).
  • Snapshot deleted mid-download: if the leader GC'd the snapshot, FetchSnapshot returns SNAPSHOT_NOT_FOUND; the follower resets its fetching-snapshot state and retries from a plain Fetch (KafkaRaftClient.java:2122-2138).
  • Crash before flush: starting a former leader as Resigned in the same epoch protects against re-using an (offset, epoch) if unflushed records were lost (QuorumState.java:173-189).
  • Graceful-shutdown timeout: GracefulShutdown uses a timer; if a resigning leader can't hand off within the window the future completes exceptionally with TimeoutException (KafkaRaftClient.java:3931). A follower/observer or sole voter can shut down immediately (maybeCompleteShutdown, KafkaRaftClient.java:3604-3610).
  • Disruptive rejoining voter: pre-vote (above) prevents it from bumping the epoch; a partitioned voter spins in Prospective without disturbing the leader.
  • Cluster-id mismatch: every RPC validates the cluster id and returns INCONSISTENT_CLUSTER_ID on mismatch (e.g. handleVoteRequest, KafkaRaftClient.java:834).

Invariants & guarantees

  • At most one leader per epoch. Epochs increase by exactly one per election; the durable quorum-state write before transition means a vote is never forgotten across a crash. Voted/leader state is retained when transitioning within the same epoch (QuorumState.transitionToFollower preserves votedKey, QuorumState.java:583).
  • Election safety via log up-to-dateness. A vote is granted only if the candidate's (lastEpoch, lastOffset) is ≥ the voter's, so any new leader has all committed records (KafkaRaftClient.java:919).
  • Leader completeness via the epoch-start commit rule. The HWM cannot advance past prior-epoch records until the leader commits a record from its own epoch (LeaderState.java:747).
  • Monotonic HWM. Both leader and follower reject non-monotonic HWM updates (FollowerState.updateHighWatermark throws, FollowerState.java:190).
  • Records uniquely identified by (offset, epoch). Underpins truncation/reconciliation correctness.
  • One in-flight Fetch per follower for its LEO (prevents double-append).
  • Control-record listener is always read to the LEO, so the latest voter set and kraft.version are always current (KafkaRaftClient.java:202).

Interactions with other subsystems

  • The KRaft Controller registers a RaftClient.Listener; it receives handleCommit for committed metadata batches, handleSnapshot/handleLoadBootstrap, and handleLeaderChange (fired only once the listener has caught up to the new epoch's start offset, KafkaRaftClient.java:4121-4137). The controller, in turn, decides when to call createSnapshot and upgradeKRaftVersion.
  • Metadata Propagation: brokers run a KafkaRaftClient as an observer, fetching the same log to build their MetadataImage.
  • The Log Storage Engine and Log Management: KafkaRaftLog is a thin RaftLog over the same segment storage, batch format, and indexes used by ordinary partitions.
  • The Fetch Path and Replication, ISR & HWM: KRaft reuses the Fetch RPC and a HWM/commit concept analogous to ISR replication, but the quorum here is a strict majority of voters, not an ISR.
  • Wire Protocol & RPC Framework: the Vote/BeginQuorumEpoch/EndQuorumEpoch/FetchSnapshot/Add|Remove|UpdateRaftVoter RPCs are all defined as message JSON and listed on the controller listener.
  • Network Layer & Threading: inbound requests arrive via KafkaRaftClientDriver.handleRequest and are placed on the message queue; outbound requests go through a KafkaNetworkChannel.

RPC summary

APIApiKeyDirectionPurpose
Vote52nominee → votersRequest a (pre-)vote; carries last (offset, epoch) and PreVote flag (v2).
BeginQuorumEpoch53leader → votersAnnounce leadership of a new epoch; carries leader endpoints (v1).
EndQuorumEpoch54leader → votersGraceful resignation + preferred-successor ordering.
Fetch1follower/observer → leaderPull records; piggybacks leader/epoch, HWM, divergence & snapshot id.
FetchSnapshot59follower → leaderDownload a snapshot by (snapshotId, byte position).
DescribeQuorum, admin → leaderReport voter/observer replication state.
AddRaftVoter / RemoveRaftVoter / UpdateRaftVoter, client/observer → leaderDynamic reconfiguration (KIP-853).

Design rationale & evolution

KRaft has accreted across several KIPs, all visible in the code and message schemas:

  • KIP-595 The core Raft protocol for the metadata quorum, pull-based fetch, log reconciliation, BeginQuorumEpoch.
  • KIP-630 Snapshots: the FetchSnapshot RPC, .checkpoint files, and the SnapshotHeader/Footer control records.
  • KIP-853 Dynamic KRaft quorums: directory ids in ReplicaKey/VotersRecord, leader endpoints in responses, AddVoter/RemoveVoter/UpdateVoter, and kraft.version 1 (the JSON comments cite KIP-853 explicitly, e.g. VoteRequest.json:21).
  • KIP-996 Pre-Vote: the Prospective role and the PreVote field, preventing disruptive epoch bumps.
  • KIP-631 The quorum-controller design that motivates the low default timeouts (hot standby).
Gotcha

"Voter" is not a config you flip per node at runtime; it is whether your ReplicaKey appears in the latest committed (or even uncommitted) VotersRecord. A node whose broker id is in the voter set but with a different directory id is treated as an observer (the directory ids don't match in VoterSet.isVoter), which is exactly why auto-join may issue a RemoveRaftVoter for the stale entry before adding itself (KafkaRaftClient.java:3375-3385). Mixing the legacy controller.quorum.voters with dynamic quorums is unsupported and the config doc warns against it.

Gotcha

The quorum-state file only stores leader/epoch/vote, under kraft.version 1 it no longer stores the voter set (the CurrentVoters field is version-0 only). The authoritative voter set lives in the metadata log/snapshot. Hand-editing quorum-state to "fix" membership will not work and risks violating the at-most-one-leader-per-epoch invariant.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.