krivaltsevich.com Kafka Internals4.4

08 · Replication, ISR & High Watermark

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

Replication is the durability engine of Kafka: every topic-partition is a small replicated state machine whose authoritative metadata (assignment, leader, leader epoch, in-sync replica set) lives in the KRaft metadata log, while the actual log data is pushed from a single leader broker to its followers via the Fetch protocol. This chapter dissects how a broker materialises that metadata into live Partition objects through ReplicaManager.applyDelta, how the leader tracks each follower's log end offset and decides who is in-sync, how the high watermark (the committed-data boundary) advances as the minimum LEO over the ISR, how ISR changes are committed by round-tripping an AlterPartition RPC to the controller, how leader epochs and epoch-based truncation prevent silent log divergence, and how acks=all producers block in a purgatory until the high watermark crosses their offset.

Role & responsibilities

The replication subsystem answers one question for every byte a producer writes: is this record safe? "Safe" means it has been durably replicated to enough replicas that it survives the loss of any tolerated number of brokers and will never be silently dropped by a future leader. Concretely the subsystem owns:

  • Per-broker partition ownership. ReplicaManager holds every hosted partition, applies metadata deltas to drive each into a leader or follower role, routes produce/fetch to the right UnifiedLog, and periodically checkpoints high watermarks to disk.
  • Per-partition state & durability accounting. Partition is the state machine: it tracks the assignment, the ISR, the leader epoch, the local log, and one Replica object per remote follower carrying that follower's LEO and last-fetch timestamps.
  • ISR maintenance. Shrinking the ISR when a follower falls behind (replica.lag.time.max.ms), expanding it when a follower catches up, and committing both through the controller via AlterPartition.
  • High-watermark advancement, computing the committed offset as the min LEO over the (maximal) ISR, and unblocking acks=all produce and follower fetch requests when it moves.
  • Divergence prevention, leader epochs, the epoch→start-offset cache, and epoch-based truncation so a returning follower trims exactly the records that were never committed.
Key idea

In KRaft, the ISR is controller-authoritative. The leader broker proposes ISR changes; the controller commits them by appending a PartitionChangeRecord to the metadata log and bumping the partition epoch. The leader only learns the committed ISR back, either via the AlterPartition response or via the replayed metadata delta. This makes the metadata log the single linearizable history of leadership and membership and removes the split-brain risks of the old ZooKeeper-era ISR writes (KIP-497).

Where it lives in the code

ConcernPrincipal classFile
Per-broker partition owner, leader/follower transitions, HW checkpoint, produce/fetch dispatchReplicaManagercore/src/main/scala/kafka/server/ReplicaManager.scala
Per-partition state machine: ISR, assignment, leader epoch, HW logic, AlterPartition driverPartitioncore/src/main/scala/kafka/cluster/Partition.scala
Leader-side view of one remote follower (LEO, last fetch, broker epoch)Replicaserver/src/main/java/org/apache/kafka/server/replica/Replica.java
Immutable snapshot of a follower's offsets & caught-up timeReplicaState (record)server/src/main/java/org/apache/kafka/server/replica/ReplicaState.java
Committed vs. pending ISR states; maximal ISRPartitionState, CommittedPartitionState, PendingExpandIsr, PendingShrinkIsrserver/src/main/java/org/apache/kafka/server/partition/*.java
Asynchronous ISR-change RPC to the controllerAlterPartitionManager / DefaultAlterPartitionManagerserver/src/main/java/org/apache/kafka/server/partition/DefaultAlterPartitionManager.java
Leader-epoch → start-offset cache (truncation, OffsetsForLeaderEpoch)LeaderEpochFileCachestorage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java
Authoritative on-disk partition metadata (assignment, ISR, ELR, epochs)PartitionRegistrationmetadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
HW/LEO/LSO with segment positionLogOffsetMetadatastorage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java
Controller commit of ISR/leader changes; eligibility checksReplicationControlManager, PartitionChangeBuildermetadata/src/main/java/org/apache/kafka/controller/*.java

Core concepts & terminology

AR (assigned replicas)
The ordered broker-id list assigned to the partition. The first element is the preferred leader. Held in assignmentState.replicas (Partition.scala:196).
ISR (in-sync replicas)
The subset of AR the controller currently believes is caught up. Committed copy is partitionState.isr. Membership is the precondition for being elected leader without data loss (under clean election).
Maximal ISR
The ISR the leader uses locally for HW advancement and acks=all quorum, which during a pending expansion optimistically includes the not-yet-committed replica. Defined per pending state via PartitionState.maximalIsr().
LEO (log end offset)
The offset of the next record to be appended. The leader's LEO is its own; each follower's LEO is recorded from its Fetch requests in the corresponding Replica.
HW (high watermark)
The committed boundary: the largest offset known to be replicated to all of the (maximal) ISR. Consumers (READ_UNCOMMITTED) may only read below it. HW = min LEO over the ISR, gated by min ISR.
LSO (last stable offset)
The boundary below which there are no open transactions; READ_COMMITTED consumers read below it. Tracked by UnifiedLog, surfaced via fetchOffsetSnapshot.
Leader epoch
A monotonically increasing integer the controller assigns on every leadership change. Stamped into each record batch and cached as epoch→start-offset, it is the anchor for truncation (KIP-101).
Partition epoch
A second monotonic counter bumped on every partition metadata change (including ISR-only changes). It is the optimistic-concurrency token for AlterPartition.
ELR (eligible leader replicas)
Replicas the controller knows are complete up to the HW but which dropped out of the ISR (e.g. a clean shutdown). They can be elected cleanly even though they are not currently in the ISR (KIP-966).

Data structures

PartitionRegistration, the authoritative metadata record

PartitionRegistration (PartitionRegistration.java:153-163) is the in-memory image of one partition's controller-owned metadata, reconstructed by replaying PartitionRecord/PartitionChangeRecord from the metadata log. Its fields:

FieldTypeMeaning
replicasint[]Assigned replica broker ids; replicas[0] is the preferred leader.
directoriesUuid[]Per-replica log-directory id (JBOD directory assignment).
isrint[]Committed in-sync replica set.
addingReplicas / removingReplicasint[]Reassignment deltas (KIP-455).
elrint[]Eligible leader replicas, complete up to HW but outside ISR (KIP-966).
lastKnownElrint[]Last-known ELR members; when configured for balanced recovery it retains the single last good leader, allowing recovery when ISR and ELR are empty.
leaderintCurrent leader id, or NO_LEADER (-1) when none.
leaderRecoveryStateLeaderRecoveryStateRECOVERED or RECOVERING (set after an unclean election).
leaderEpochintBumped on every leader change.
partitionEpochintBumped on every change. See merge() at PartitionRegistration.java:259,274: a leader change increments both leaderEpoch and partitionEpoch; an ISR-only change increments only partitionEpoch.

elr and lastKnownElr are persisted as tagged fields only when ELR is enabled and non-empty (PartitionRegistration.java:387-392), keeping records small on clusters that have not adopted KIP-966.

Replica & ReplicaState, the leader's view of a follower

On the leader, each remote follower is a Replica holding a single AtomicReference<ReplicaState> (Replica.java:36,42). All updates are lock-free compare-and-set via replicaState.updateAndGet(...); readers grab an immutable snapshot through stateSnapshot(). ReplicaState is a record (ReplicaState.java:32-39):

FieldMeaning
logStartOffsetFollower's log start offset (drives the partition low watermark for DeleteRecords).
logEndOffsetMetadataFollower's LEO (with segment position) as of its last Fetch, the value fed into HW computation.
lastFetchLeaderLogEndOffsetThe leader's LEO at the moment that fetch arrived; used to detect "did this fetch reach what was then the end?".
lastFetchTimeMsWall-clock time of the last Fetch from this follower.
lastCaughtUpTimeMsThe crux of lag detection: the latest time t at which this follower's fetch offset was >= the leader's LEO at time t.
brokerEpochBroker epoch carried in the Fetch, used to fence stale fetch-state updates after a follower reboot.

The "caught up" rule is computed in updateFetchStateOrThrow (Replica.java:84-91): if the follower's fetch offset reached the leader's current LEO, lastCaughtUpTime jumps to now; else if it reached the leader's LEO as of the previous fetch (lastFetchLeaderLogEndOffset), it advances to the previous fetch time; otherwise it is unchanged. This two-tier rule lets a follower remain "in sync" even though its fetch offset chronically trails a busy leader by a few records, what matters is that it keeps closing the gap that existed when it last asked.

Invariant, caught-up definition

ReplicaState.isCaughtUp (ReplicaState.java:62-67) returns true iff the follower's LEO equals the leader's LEO or currentTimeMs - lastCaughtUpTimeMs <= replicaMaxLagMs. A follower is dropped from the ISR exactly when this becomes false.

LeaderEpochFileCache, epoch → start-offset

Each log keeps a TreeMap<Integer,EpochEntry> mapping leader epoch → start offset of that epoch, guarded by its own ReentrantReadWriteLock and persisted to a leader-epoch-checkpoint file (LeaderEpochFileCache.java:59-60). assign(epoch, startOffset) appends a new entry and flushes synchronously; truncation flushes asynchronously to avoid fsync stalls on the fetcher/log threads (LeaderEpochFileCache.java:335-355,367-388). The pivotal query is endOffsetFor(requestedEpoch, logEndOffset) (LeaderEpochFileCache.java:284-328): it returns the largest cached epoch <= the request and the exclusive end offset of that epoch (= start offset of the next epoch, or the LEO for the latest epoch). That is precisely the answer to the OffsetsForLeaderEpoch RPC and the truncation pivot for a returning follower.

LogOffsetMetadata, an offset plus its physical position

HW, LEO and LSO are not bare longs; they are LogOffsetMetadata (LogOffsetMetadata.java:33-35): a messageOffset, the segmentBaseOffset of the segment containing it, and the relativePositionInSegment. Carrying the segment position lets the leader answer fetches and compute byte deltas without re-resolving the offset, and lets maybeIncrementHighWatermark advance the HW even when the offset is unchanged but the log has rolled to a new segment (UnifiedLog.java:573-574).

The ISR state hierarchy

partitionState is a @volatile PartitionState implementing a tiny state machine. CommittedPartitionState (steady state) reports maximalIsr() == isr() and isInflight() == false. While an AlterPartition is outstanding the field is swapped (under the write lock) to a pending record:

Stateisr()maximalIsr()isInflight()
CommittedPartitionStatecommitted ISR= isrfalse
PendingExpandIsrlast committed ISRcommitted ISR + new replicatrue
PendingShrinkIsrlast committed ISR= last committed ISR (not shrunk)true

The asymmetry (PendingExpandIsr.java:43-47 vs PendingShrinkIsr.java:41-43) is a deliberate safety choice: optimistically counting the new replica during an expansion only makes the HW condition stricter, so if the controller rejects the expansion no committed offset was over-advanced; conversely a shrink is not applied to the maximal ISR until the controller confirms it, so a rejected shrink cannot have prematurely advanced the HW past a replica that is still required.

Architecture & control/data flow

KRaft Controllercommits ISR · bumps epoch (ch 11)
KRaft metadata logPartitionChangeRecord, leader, isr, epochs, elr
MetadataLoader → TopicsDeltach 12
ReplicaManagerapplyLocalLeadersDelta → makeLeader · applyLocalFollowersDelta → makeFollower · hw-checkpoint & isr-expiration threads
ReplicaFetcher / Partition (follower)UnifiedLog (ch 09)
Partition (leader)UnifiedLog · Replica per follower (LEO) · ISR · HW
leader serves the followerrecords up to leader LEO · response carries HW
AlterPartitionManagerone in-flight · retries
KRaft Controllervalidates & commits, loops back to the metadata log
End-to-end replication control and data flow on one broker: metadata descends from the controller through the loader into ReplicaManager, which drives leader/follower roles; the leader↔follower Fetch loop carries data and the HW, while ISR changes round-trip back to the controller via AlterPartition.
controller / metadata metadata log (Raft) broker component metadata / control flow replication data (Fetch) AlterPartition round-trip cylinder = a log / store

Detailed mechanics

Becoming leader or follower: applyDelta

Metadata changes arrive as a TopicsDelta. ReplicaManager.applyDelta (ReplicaManager.scala:2360) takes replicaStateChangeLock, processes deletions first (so a recreated topic with the same name is not clobbered), then splits the local changes into leaders and followers and applies each:

  • Leaders (applyLocalLeadersDelta, ReplicaManager.scala:2418): remove any follower fetcher for the partition, then call partition.makeLeader(registration, isNew, checkpoints...).
  • Followers (applyLocalFollowersDelta, ReplicaManager.scala:2449): call partition.makeFollower(...); if the leader epoch changed, (re)start a ReplicaFetcherThread targeting the new leader at initialFetchOffset(log).

makeLeader (Partition.scala:588) runs under the per-partition leaderIsrUpdateLock write lock. It rejects stale registrations (partitionRegistration.partitionEpoch < partitionEpoch), updates the assignment and ISR from the registration via updateAssignmentAndIsr, ensures the local log exists, and, only if the leader epoch advanced, records the new epoch's start offset in the epoch cache (leaderLog.assignEpochStartOffset, Partition.scala:649), resets every follower's ReplicaState via resetReplicaState, and sets leaderEpoch/leaderEpochStartOffsetOpt. It finishes by calling maybeIncrementLeaderHW because an ISR that just dropped to one replica may allow the HW to jump to the local LEO. makeFollower (Partition.scala:694) is simpler: it sets leaderReplicaIdOpt/leaderEpoch, clears the local ISR (a follower does not track ISR), and returns whether the leader epoch changed so the caller knows to restart the fetcher.

Note, why followers fetch from the HW or LEO

initialFetchOffset (ReplicaManager.scala:2093-2098): a follower whose epoch cache is non-empty begins fetching from its own LEO (epoch-based truncation in the fetch path will trim any uncommitted tail). A follower with an empty epoch cache (e.g. a freshly created replica) starts from its high watermark. Truncation itself happens in the fetch path via OffsetsForLeaderEpoch, see The Fetch Path & Replica Fetchers.

Recording follower progress and advancing the high watermark

When a follower's Fetch is served, the leader calls updateFollowerFetchState (Partition.scala:767). Under the ISR read lock it pushes the follower's new offsets into its Replica (updateFetchStateOrThrow), then attempts two things outside that read lock: maybeExpandIsr (might this follower now rejoin the ISR?) and, if the follower's LEO actually advanced, maybeIncrementLeaderHW.

maybeIncrementLeaderHW (Partition.scala:1010) is the heart of commit. It refuses to advance while the partition is under min ISR (isUnderMinIsr), then walks the leader's LEO downward against every remote replica:

newHW = leader.LEO
for each remote replica r:
    if r.LEO < newHW and
       (maximalIsr.contains(r) or (r.isCaughtUp(leader.LEO, now, lagMs) and isReplicaIsrEligible(r))):
        newHW = r.LEO
leaderLog.maybeIncrementHighWatermark(newHW)   // monotonic; only moves up

So the HW is the minimum LEO over the maximal ISR, but it additionally waits for any replica that is "caught up and eligible" even though it has not yet been committed to the ISR. The comment at Partition.scala:992-1004 spells out the reason: a replica that is considered caught-up but whose LEO momentarily trails the HW should hold the HW back, otherwise it could be elected leader and serve a stale view; and counting newly-added replicas (the maximal ISR) makes the commit condition strictly safer (KIP-497). maybeIncrementHighWatermark (UnifiedLog.java:563-581) enforces monotonicity and that the new HW never exceeds the LEO.

leader log · ISR = {1,2,3}leader appends →
100
101
102high watermark
103
104
replica 3 LEO
= 102 (min)
replica 2 LEO
= 104
replica 1 (leader)
LEO = 105

HW = min(105, 104, 102) = 102. Records up to 101 are on all of {1,2,3} → committed, readable. Records [102‥104] are on replica 2 but not replica 3 → not yet committed. When replica 3 catches up to 105 the HW jumps to 105 and acks=all producers at offsets ≤ 105 complete.

High watermark = min LEO over the in-sync set, gated by min ISR.
committed (offset < HW · readable) pending (replicated to some ISR, not all) next slot (LEO) follower LEO marker leader LEO marker n = monotonic offset

Shrinking the ISR (lag detection)

A scheduler task isr-expiration runs every replicaLagTimeMaxMs / 2 ms (ReplicaManager.scala:285) and calls maybeShrinkIsr() on every online partition (ReplicaManager.scala:2100-2107). Partition.maybeShrinkIsr (Partition.scala:1089) first checks under the read lock (cheap, no write contention) whether any follower is out of sync via getOutOfSyncReplicas(replicaLagTimeMaxMs) (Partition.scala:1155), which filters ISR members (excluding the leader) by !isCaughtUp(...). If so it re-checks under the write lock, builds a PendingShrinkIsr, swaps partitionState, and submits an AlterPartition. The lag covers two failure shapes (Partition.scala:1142-1151): a stuck follower whose LEO has not moved for maxLagMs, and a slow follower that never closes the gap to the leader's LEO within maxLagMs, both collapse to the single lastCaughtUpTimeMs test.

Expanding the ISR

Expansion is driven by fetches, not a timer. After recording a follower's progress, maybeExpandIsr (Partition.scala:876) checks canAddReplicaToIsr (not already in ISR, not in-flight, and isReplicaIsrEligible) and isFollowerInSync (Partition.scala:907-912): the follower's LEO must be >= the leader's HW and >= the leader-epoch start offset. The second clause is essential, a replica must have caught up within the current leader epoch before it joins, otherwise it could be elected and lose data that sits between the HW and LEO (Partition.scala:862-872). Eligibility (isReplicaIsrEligible, Partition.scala:914-932) additionally requires the broker not be fenced, not be in controlled shutdown, and that its cached broker epoch matches the broker epoch carried in its Fetch (or the Fetch sent -1 to bypass).

Committing the change: the AlterPartition round-trip

Both shrink and expand call submitAlterPartition(pendingState) (Partition.scala:1687), which builds a LeaderAndIsr carrying the proposed ISR with each member's broker epoch (addBrokerEpochToIsr, Partition.scala:1661) and hands it to AlterPartitionManager.submit. DefaultAlterPartitionManager (DefaultAlterPartitionManager.java:53) batches updates: a ConcurrentHashMap holds at most one pending item per partition, and an AtomicBoolean inflightRequest ensures only one AlterPartition RPC is on the wire at a time (DefaultAlterPartitionManager.java:60-64,118-125). The request is sent to the active controller through the NodeToControllerChannelManager and is never timed out, it retries indefinitely until answered or until a newer metadata update supersedes it (DefaultAlterPartitionManager.java:138-140).

On the controller, ReplicationControlManager validates the proposal (ReplicationControlManager.java:1300-1356): the partition epoch must match (else INVALID_UPDATE_VERSION); the new ISR must be a subset of the assignment and must contain the leader (else INVALID_REQUEST); and every proposed member must be eligible, alive, unfenced, with a matching broker epoch, else INELIGIBLE_REPLICA. If valid, it appends a PartitionChangeRecord (bumping partitionEpoch) and returns the committed LeaderAndIsr.

Back on the leader, the completion handler (Partition.scala:1693) runs under the write lock. If partitionState no longer equals the proposed state (a leader election or metadata replay moved it on), the response is ignored as stale. On success, handleAlterPartitionUpdate (Partition.scala:1794) installs a fresh CommittedPartitionState, updates the local partitionEpoch, and re-runs maybeIncrementLeaderHW (a shrink may now permit the HW to advance). On error, handleAlterPartitionError (Partition.scala:1733) maps the code to a retry-or-give-up decision; notably INELIGIBLE_REPLICA and OPERATION_NOT_ATTEMPTED are authoritative "not applied" signals that let the leader safely reset to the last committed state, while transient codes trigger a resubmit.

Invariant, partition epoch is the concurrency token

An AlterPartition only commits if its partitionEpoch matches the controller's current value. Because the controller bumps the epoch on every change and the metadata log is linearizable, two leaders (e.g. a deposed one and its successor) can never both have their ISR proposals accepted: the older epoch is rejected with INVALID_UPDATE_VERSION, and a successful AlterPartition response always carries an epoch >= the local one (Partition.scala:1803).

The acks=all produce path

A produce with acks=-1 must not be acknowledged until its records are committed (HW >= the produced offset). ReplicaManager.appendRecords (ReplicaManager.scala:638) appends to the leader's log via Partition.appendRecordsToLeader (Partition.scala:1219), which under the read lock first guards durability, if the ISR is smaller than the effective min ISR and acks=-1, it throws NotEnoughReplicasException before writing (Partition.scala:1234-1238), then appends and re-checks the HW. If the append did not already satisfy the request, maybeAddDelayedProduce (ReplicaManager.scala:878) parks a DelayedProduce in the produce purgatory keyed by each partition. The operation completes when checkEnoughReplicasReachOffset(requiredOffset) (Partition.scala:947) sees leaderLog.highWatermark >= requiredOffset; if at that moment the maximal ISR has fallen below min ISR it completes with NOT_ENOUGH_REPLICAS_AFTER_APPEND (the data is durable but the durability contract weakened). The trigger that re-checks the purgatory is HW movement: whenever an append or follower fetch increases the HW, an action enqueued by addCompletePurgatoryAction (ReplicaManager.scala:853-876) calls delayedProducePurgatory.checkAndComplete.

Produceracks=all
KafkaApis
ReplicaManager.appendRecords
ISR ≥ minISR ?
throw NotEnoughReplicasbefore writing
append to leader logoffset = N
HW already ≥ N ?
respond immediately
DelayedProduce parkedin purgatory (key = tp)
follower Fetchadvances replica LEO
maybeIncrementLeaderHWHW ≥ N
checkAndComplete (purgatory)
respond, committed
acks=all completes on high-watermark advancement, not on the local append: a parked DelayedProduce is only released once a follower fetch pushes the HW past the produced offset.
client / producer reply broker (KafkaApis · ReplicaManager) leader log / HW purgatory (waiting) rejected / throws async wakeup rounded = decision · cylinder = log
acks=0
Producer does not wait; the broker does not even send a response body. No durability guarantee.
acks=1
Leader responds as soon as its local append succeeds; delayedProduceRequestRequired is false (ReplicaManager.scala:1361-1367). Survives leader loss only if a follower had already replicated the record.
acks=-1 (all)
Leader waits for HW >= offset, i.e. replication to the whole (maximal) ISR, and refuses to write at all if ISR < min.insync.replicas.

Leader epochs & epoch-based truncation

Before KIP-101, a follower truncated to the leader's HW on becoming a follower, which could diverge if leadership changed faster than the HW propagated. Now every batch carries the leader epoch in which it was written, and the LeaderEpochFileCache records where each epoch began. When a follower (re)starts fetching it asks the leader, via OffsetsForLeaderEpoch, for the end offset of the follower's latest epoch; endOffsetFor returns the start offset of the next epoch (the exact divergence point), and the follower truncates there. In the fetch path the leader detects divergence inline: readRecords (Partition.scala:1367) compares the follower's lastFetchedEpoch against the cache and, if they diverge, returns an empty fetch carrying a divergingEpoch (Partition.scala:1401-1413) instead of records, prompting the follower to truncate. The server-side answer is Partition.lastOffsetForLeaderEpoch (Partition.scala:1585).

Returning followerLeader
Leader epoch cache: epoch 5 → 100, epoch 7 → 130  ·  Follower holds epoch 5 up to offset 142 (wrote ahead, then lost leadership)
OffsetsForLeaderEpoch · endOffsetFor(epoch 5)
130 (start of epoch 7)
Follower truncates 130‥142, the records written in epoch 5 that were never committed
Fetch from 130 in epoch 7
records (no divergence)
A returning follower trims exactly the records that were never committed: endOffsetFor returns the start offset of the next epoch, which is the precise divergence point.
broker (leader & follower) request (OffsetsForLeaderEpoch · Fetch) response note / cache & truncation state time flows top → down
Design rationale

Leader epochs and OffsetsForLeaderEpoch were introduced by KIP-101 to eliminate log divergence on leader change, and fixed for the multi-leader-change case by KIP-279. The cache stores start offsets so that the "end offset of epoch N" can always be derived as the start offset of epoch N+1, keeping the structure append-only except for monotonicity repair (maybeTruncateNonMonotonicEntries, LeaderEpochFileCache.java:150).

Unclean leader election & Eligible Leader Replicas

If every ISR member is unavailable, the partition has no safe leader. With unclean.leader.election.enable=false (the default, LogConfig.java:139) the partition stays offline until an ISR member returns. If unclean election is permitted, the controller may elect an out-of-sync replica, which can lose committed records; that replica's leaderRecoveryState becomes RECOVERING, and the controller refuses any AlterPartition that grows the ISR beyond one replica or flips RECOVERED→RECOVERING while recovering (ReplicationControlManager.java:1326-1344). PartitionRegistration.electionWasUnclean keys off this state (PartitionRegistration.java:165-167).

KIP-966 narrows the gap between "lose data" and "stay offline": the controller tracks an ELR, replicas it knows still hold all data up to the HW even after they leave the ISR. When the ISR empties, PartitionChangeBuilder can elect from the ELR cleanly (PartitionChangeBuilder.java:319-336): a replica is an acceptable leader if it is in the ISR, or, when the ISR is empty, in the ELR; on election from the ELR it becomes the sole ISR member and is removed from the ELR. As a last resort, lastKnownElr retains the last good leader so a single-member recovery can proceed even when both ISR and ELR are empty (PartitionChangeBuilder.java:247,270,299-312). ELR/lastKnownElr are persisted only when the feature is enabled.

Concurrency & threading

Each Partition is thread-safe via a ReentrantReadWriteLock leaderIsrUpdateLock (Partition.scala:183), with a documented lock-ordering contract (Partition.scala:148-162): the partition lock is acquired before the log lock. The read lock guards the common hot path (appends, follower fetches, HW reads); the write lock guards leadership transitions and ISR mutations. The double-checked pattern in maybeExpandIsr/maybeShrinkIsr (test under read lock, re-test and mutate under write lock) avoids serialising the fetch path on the rare ISR change. Per-follower state needs no partition lock at all: Replica uses an AtomicReference updated by CAS.

Thread / poolResponsibilityTouches
kafka-request-handler-* (KafkaApis)Produce appends, leader-side fetch reads, recording follower progressappendRecordsToLeader, fetchRecords, updateFollowerFetchState (read lock)
isr-expiration scheduler taskDetect lagging followers, propose ISR shrinkmaybeShrinkIsr (read then write lock)
highwatermark-checkpoint scheduler taskPersist HW per log dir every replica.high.watermark.checkpoint.interval.mscheckpointHighWatermarks (ReplicaManager.scala:2116)
ReplicaFetcherThread (follower)Pull records from the leader, truncate on diverging epoch, append as followerappendRecordsToFollowerOrFutureReplica, epoch-cache truncation
AlterPartition network client threadSend the AlterPartition RPC and run the completion callback (which takes the write lock)DefaultAlterPartitionManager response handler → Partition.handleAlterPartitionUpdate
Broker metadata-publishing threadApply metadata deltas, drive leader/follower transitionsapplyDelta under replicaStateChangeLock
Purgatory reaper / completing threadsComplete DelayedProduce/DelayedFetch when HW advances or on timeoutdelayedProducePurgatory.checkAndComplete
Note, broker-epoch fencing on reboot

updateFollowerFetchState runs under the partition read lock specifically to interlock with ISR updates and prevent a rebooted follower's stale Fetch from corrupting the broker-epoch check during expansion (Partition.scala:779-789). A Fetch whose broker epoch is older than the cached alive epoch throws NotLeaderOrFollowerException and is rejected (Replica.java:78-82).

Configuration reference

KeyDefaultEffect
replica.lag.time.max.ms30000A follower not caught up to the leader's LEO within this window is removed from the ISR. The expiration task runs every half this interval. ReplicationConfigs.java:54-55.
min.insync.replicas1With acks=all, the minimum ISR size required to accept a write and to allow the HW to advance. Effective value is min(this, replicationFactor) (Partition.scala:246-248). ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT.
unclean.leader.election.enablefalseWhether the controller may elect a non-ISR replica when the ISR is empty, risking data loss. LogConfig.java:139.
replica.high.watermark.checkpoint.interval.ms5000How often the HW for every partition is flushed to the replication-offset-checkpoint file. ReplicationConfigs.java:102-103.
replica.fetch.wait.max.ms500Max time a follower's Fetch blocks on the leader waiting for data; bounds replication latency. ReplicationConfigs.java:74-75.
replica.fetch.max.bytes1048576Per-partition byte budget a follower attempts to fetch. ReplicationConfigs.java:67-68.
replica.fetch.min.bytes1Minimum bytes before a follower Fetch returns early. ReplicationConfigs.java:79-80.
replica.fetch.backoff.ms1000Sleep after a fetch error before retrying. ReplicationConfigs.java:83-84.

Note that min.insync.replicas and unclean.leader.election.enable are topic-level overridable; the fetcher tunings are broker-level. The HW checkpoint filename is replication-offset-checkpoint (ReplicaManager.scala:89).

Failure modes, edge cases & recovery

  • Leader crash. The controller fences the broker, elects a new leader from the ISR (or ELR), bumps the leader epoch, and replays the change to all brokers. Followers restart fetchers and truncate to the new leader's epoch boundary; no committed record (below the old HW) is lost because every ISR member held all committed data.
  • Follower falls behind. Dropped from the ISR after replica.lag.time.max.ms; the HW can then advance using the smaller ISR. The follower re-enters once its LEO reaches the HW and the current epoch's start offset.
  • ISR shrinks below min ISR. acks=all produces are refused with NotEnoughReplicas before writing; in-flight delayed produces may complete with NOT_ENOUGH_REPLICAS_AFTER_APPEND. The HW stops advancing (maybeIncrementLeaderHW returns early when isUnderMinIsr).
  • AlterPartition rejected. INVALID_UPDATE_VERSION means another change won the race, the leader awaits fresh metadata. INELIGIBLE_REPLICA/OPERATION_NOT_ATTEMPTED are safe "not applied" signals and reset to the last committed state. Transient errors retry after 50 ms (DefaultAlterPartitionManager.java:174).
  • Returning zombie leader. A deposed leader that still thinks it leads will have its AlterPartition rejected (stale partition epoch) and its produce/fetch rejected by epoch checks; its followers' fetches carry a newer leader epoch and it learns it is no longer leader.
  • Log directory failure (JBOD). handleLogDirFailure (ReplicaManager.scala:2157) marks affected partitions offline, removes their fetchers, drops the dir from the HW checkpoint map, and notifies the controller via the directory event handler so leadership moves elsewhere.
  • Append offset below log start (delete-records race). A follower whose first fetched batch straddles the leader's advanced log start offset triggers truncateFullyAndStartAt to restart its log from the batch's base offset (Partition.scala:1197-1216).
  • Stale HW after unclean election. If the HW lags the epoch start offset, ListOffsets for "latest" returns OFFSET_NOT_AVAILABLE until the HW recovers (Partition.scala:1455-1460).

Invariants & guarantees

Invariants

1. HW is monotonically non-decreasing on a given log and never exceeds the LEO (UnifiedLog.java:564,573).
2. HW = min LEO over the maximal ISR, and never advances while ISR.size < effectiveMinIsr (Partition.scala:1010-1014,1028-1031).
3. Every committed record (offset < HW) is present on every ISR member, so any clean leader election preserves all committed data.
4. The ISR always contains the leader and is always a subset of the assignment (controller-enforced, ReplicationControlManager.java:1310-1325).
5. partitionEpoch and leaderEpoch are strictly monotonic per partition; an AlterPartition commits only at the matching partitionEpoch.
6. A replica may not join the ISR until its LEO reaches both the HW and the current epoch's start offset (Partition.scala:907-912).

Interactions with other subsystems

  • The Log Storage Engine, appends, HW/LEO/LSO bookkeeping, and the leader-epoch checkpoint all live in UnifiedLog; Partition is a thin replication wrapper over it.
  • The Fetch Path & Replica Fetchers, the follower side that pulls records, performs epoch-based truncation, and reports LEO back; the leader's fetchRecords emits diverging-epoch responses.
  • The KRaft Controller, owns the authoritative ISR/leader/ELR, validates and commits AlterPartition, runs leader election and reassignment.
  • Metadata Propagation & Broker Lifecycle, delivers the TopicsDelta that applyDelta turns into leader/follower transitions; broker registration/fencing feeds ISR eligibility.
  • Request Processing (KafkaApis), dispatches Produce/Fetch into ReplicaManager and wires the delayed-operation purgatories.
  • Transactions & Exactly-Once Semantics, the LSO and the min-ISR durability gate underpin transactional commit visibility.
  • The Producer Client, the acks setting selects which of the durability paths above the producer relies on.

Design rationale & evolution

Design rationale

KIP-497 moved ISR changes from direct ZooKeeper writes to the AlterPartition (originally AlterIsr) RPC, making the controller the sole writer of ISR state and introducing the "maximal ISR" so a leader can advance the HW optimistically during an expansion without risking premature commit. KIP-101/KIP-279 introduced leader epochs and OffsetsForLeaderEpoch to make follower truncation precise. KIP-966 adds Eligible Leader Replicas so the controller can elect a guaranteed-complete replica even after it leaves the ISR, shrinking the window where operators must choose between unavailability and unclean (lossy) election. In KRaft these all compose cleanly because the metadata log is a single linearizable source of truth, there is no separate consensus system to reconcile.

Gotchas / operational notes

Gotcha, min.insync.replicas needs acks=all to matter

min.insync.replicas is only consulted on the acks=-1 path (Partition.scala:1234). With acks=1 a leader will happily accept writes even when it is the only ISR member, so a topic that wants the strong guarantee must set both min.insync.replicas > 1 and produce with acks=all.

Gotcha, effective min ISR is capped by replication factor

Setting min.insync.replicas above the replication factor does not deadlock writes: effectiveMinIsr caps it at remoteReplicasMap.size + 1 (Partition.scala:246-248). The UnderMinIsr/AtMinIsr gauges reflect this effective value.

Caution, HW checkpoint is periodic, not synchronous

The HW is flushed only every replica.high.watermark.checkpoint.interval.ms (default 5 s) and on shutdown. A hard crash can lose the most recent HW advance from disk; on restart the partition recovers a slightly stale HW and re-derives the true value from fetches. This is safe (the HW only moves up) but means the on-disk HW is a lower bound, not the live value.

Note, useful JMX gauges

Partition exports UnderReplicated, UnderMinIsr, AtMinIsr, InSyncReplicasCount, ReplicasCount, and LastStableOffsetLag per topic-partition (Partition.scala:221-226); ReplicaManager defines and exports IsrExpandsPerSec, IsrShrinksPerSec and FailedIsrUpdatesPerSec (ReplicaManager.scala:100-102,257-259), marked through the AlterPartitionListener (Partition.scala:101-111). Sustained UnderMinIsr > 0 on a busy topic is the canonical "producers are being rejected" alarm.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.