08 · Replication, ISR & High Watermark
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
Replication is the durability engine of Kafka: every topic-partition is a small replicated state machine whose authoritative metadata (assignment, leader, leader epoch, in-sync replica set) lives in the KRaft metadata log, while the actual log data is pushed from a single leader broker to its followers via the Fetch protocol. This chapter dissects how a broker materialises that metadata into live Partition objects through ReplicaManager.applyDelta, how the leader tracks each follower's log end offset and decides who is in-sync, how the high watermark (the committed-data boundary) advances as the minimum LEO over the ISR, how ISR changes are committed by round-tripping an AlterPartition RPC to the controller, how leader epochs and epoch-based truncation prevent silent log divergence, and how acks=all producers block in a purgatory until the high watermark crosses their offset.
Role & responsibilities
The replication subsystem answers one question for every byte a producer writes: is this record safe? "Safe" means it has been durably replicated to enough replicas that it survives the loss of any tolerated number of brokers and will never be silently dropped by a future leader. Concretely the subsystem owns:
- Per-broker partition ownership.
ReplicaManagerholds every hosted partition, applies metadata deltas to drive each into a leader or follower role, routes produce/fetch to the rightUnifiedLog, and periodically checkpoints high watermarks to disk. - Per-partition state & durability accounting.
Partitionis the state machine: it tracks the assignment, the ISR, the leader epoch, the local log, and oneReplicaobject per remote follower carrying that follower's LEO and last-fetch timestamps. - ISR maintenance. Shrinking the ISR when a follower falls behind (
replica.lag.time.max.ms), expanding it when a follower catches up, and committing both through the controller viaAlterPartition. - High-watermark advancement, computing the committed offset as the min LEO over the (maximal) ISR, and unblocking
acks=allproduce and follower fetch requests when it moves. - Divergence prevention, leader epochs, the epoch→start-offset cache, and epoch-based truncation so a returning follower trims exactly the records that were never committed.
In KRaft, the ISR is controller-authoritative. The leader broker proposes ISR changes; the controller commits them by appending a PartitionChangeRecord to the metadata log and bumping the partition epoch. The leader only learns the committed ISR back, either via the AlterPartition response or via the replayed metadata delta. This makes the metadata log the single linearizable history of leadership and membership and removes the split-brain risks of the old ZooKeeper-era ISR writes (KIP-497).
Where it lives in the code
| Concern | Principal class | File |
|---|---|---|
| Per-broker partition owner, leader/follower transitions, HW checkpoint, produce/fetch dispatch | ReplicaManager | core/src/main/scala/kafka/server/ReplicaManager.scala |
| Per-partition state machine: ISR, assignment, leader epoch, HW logic, AlterPartition driver | Partition | core/src/main/scala/kafka/cluster/Partition.scala |
| Leader-side view of one remote follower (LEO, last fetch, broker epoch) | Replica | server/src/main/java/org/apache/kafka/server/replica/Replica.java |
| Immutable snapshot of a follower's offsets & caught-up time | ReplicaState (record) | server/src/main/java/org/apache/kafka/server/replica/ReplicaState.java |
| Committed vs. pending ISR states; maximal ISR | PartitionState, CommittedPartitionState, PendingExpandIsr, PendingShrinkIsr | server/src/main/java/org/apache/kafka/server/partition/*.java |
| Asynchronous ISR-change RPC to the controller | AlterPartitionManager / DefaultAlterPartitionManager | server/src/main/java/org/apache/kafka/server/partition/DefaultAlterPartitionManager.java |
| Leader-epoch → start-offset cache (truncation, OffsetsForLeaderEpoch) | LeaderEpochFileCache | storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java |
| Authoritative on-disk partition metadata (assignment, ISR, ELR, epochs) | PartitionRegistration | metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java |
| HW/LEO/LSO with segment position | LogOffsetMetadata | storage/src/main/java/org/apache/kafka/storage/internals/log/LogOffsetMetadata.java |
| Controller commit of ISR/leader changes; eligibility checks | ReplicationControlManager, PartitionChangeBuilder | metadata/src/main/java/org/apache/kafka/controller/*.java |
Core concepts & terminology
- AR (assigned replicas)
- The ordered broker-id list assigned to the partition. The first element is the preferred leader. Held in
assignmentState.replicas(Partition.scala:196). - ISR (in-sync replicas)
- The subset of AR the controller currently believes is caught up. Committed copy is
partitionState.isr. Membership is the precondition for being elected leader without data loss (under clean election). - Maximal ISR
- The ISR the leader uses locally for HW advancement and
acks=allquorum, which during a pending expansion optimistically includes the not-yet-committed replica. Defined per pending state viaPartitionState.maximalIsr(). - LEO (log end offset)
- The offset of the next record to be appended. The leader's LEO is its own; each follower's LEO is recorded from its Fetch requests in the corresponding
Replica. - HW (high watermark)
- The committed boundary: the largest offset known to be replicated to all of the (maximal) ISR. Consumers (READ_UNCOMMITTED) may only read below it. HW = min LEO over the ISR, gated by min ISR.
- LSO (last stable offset)
- The boundary below which there are no open transactions; READ_COMMITTED consumers read below it. Tracked by
UnifiedLog, surfaced viafetchOffsetSnapshot. - Leader epoch
- A monotonically increasing integer the controller assigns on every leadership change. Stamped into each record batch and cached as epoch→start-offset, it is the anchor for truncation (KIP-101).
- Partition epoch
- A second monotonic counter bumped on every partition metadata change (including ISR-only changes). It is the optimistic-concurrency token for
AlterPartition. - ELR (eligible leader replicas)
- Replicas the controller knows are complete up to the HW but which dropped out of the ISR (e.g. a clean shutdown). They can be elected cleanly even though they are not currently in the ISR (KIP-966).
Data structures
PartitionRegistration, the authoritative metadata record
PartitionRegistration (PartitionRegistration.java:153-163) is the in-memory image of one partition's controller-owned metadata, reconstructed by replaying PartitionRecord/PartitionChangeRecord from the metadata log. Its fields:
| Field | Type | Meaning |
|---|---|---|
replicas | int[] | Assigned replica broker ids; replicas[0] is the preferred leader. |
directories | Uuid[] | Per-replica log-directory id (JBOD directory assignment). |
isr | int[] | Committed in-sync replica set. |
addingReplicas / removingReplicas | int[] | Reassignment deltas (KIP-455). |
elr | int[] | Eligible leader replicas, complete up to HW but outside ISR (KIP-966). |
lastKnownElr | int[] | Last-known ELR members; when configured for balanced recovery it retains the single last good leader, allowing recovery when ISR and ELR are empty. |
leader | int | Current leader id, or NO_LEADER (-1) when none. |
leaderRecoveryState | LeaderRecoveryState | RECOVERED or RECOVERING (set after an unclean election). |
leaderEpoch | int | Bumped on every leader change. |
partitionEpoch | int | Bumped on every change. See merge() at PartitionRegistration.java:259,274: a leader change increments both leaderEpoch and partitionEpoch; an ISR-only change increments only partitionEpoch. |
elr and lastKnownElr are persisted as tagged fields only when ELR is enabled and non-empty (PartitionRegistration.java:387-392), keeping records small on clusters that have not adopted KIP-966.
Replica & ReplicaState, the leader's view of a follower
On the leader, each remote follower is a Replica holding a single AtomicReference<ReplicaState> (Replica.java:36,42). All updates are lock-free compare-and-set via replicaState.updateAndGet(...); readers grab an immutable snapshot through stateSnapshot(). ReplicaState is a record (ReplicaState.java:32-39):
| Field | Meaning |
|---|---|
logStartOffset | Follower's log start offset (drives the partition low watermark for DeleteRecords). |
logEndOffsetMetadata | Follower's LEO (with segment position) as of its last Fetch, the value fed into HW computation. |
lastFetchLeaderLogEndOffset | The leader's LEO at the moment that fetch arrived; used to detect "did this fetch reach what was then the end?". |
lastFetchTimeMs | Wall-clock time of the last Fetch from this follower. |
lastCaughtUpTimeMs | The crux of lag detection: the latest time t at which this follower's fetch offset was >= the leader's LEO at time t. |
brokerEpoch | Broker epoch carried in the Fetch, used to fence stale fetch-state updates after a follower reboot. |
The "caught up" rule is computed in updateFetchStateOrThrow (Replica.java:84-91): if the follower's fetch offset reached the leader's current LEO, lastCaughtUpTime jumps to now; else if it reached the leader's LEO as of the previous fetch (lastFetchLeaderLogEndOffset), it advances to the previous fetch time; otherwise it is unchanged. This two-tier rule lets a follower remain "in sync" even though its fetch offset chronically trails a busy leader by a few records, what matters is that it keeps closing the gap that existed when it last asked.
ReplicaState.isCaughtUp (ReplicaState.java:62-67) returns true iff the follower's LEO equals the leader's LEO or currentTimeMs - lastCaughtUpTimeMs <= replicaMaxLagMs. A follower is dropped from the ISR exactly when this becomes false.
LeaderEpochFileCache, epoch → start-offset
Each log keeps a TreeMap<Integer,EpochEntry> mapping leader epoch → start offset of that epoch, guarded by its own ReentrantReadWriteLock and persisted to a leader-epoch-checkpoint file (LeaderEpochFileCache.java:59-60). assign(epoch, startOffset) appends a new entry and flushes synchronously; truncation flushes asynchronously to avoid fsync stalls on the fetcher/log threads (LeaderEpochFileCache.java:335-355,367-388). The pivotal query is endOffsetFor(requestedEpoch, logEndOffset) (LeaderEpochFileCache.java:284-328): it returns the largest cached epoch <= the request and the exclusive end offset of that epoch (= start offset of the next epoch, or the LEO for the latest epoch). That is precisely the answer to the OffsetsForLeaderEpoch RPC and the truncation pivot for a returning follower.
LogOffsetMetadata, an offset plus its physical position
HW, LEO and LSO are not bare longs; they are LogOffsetMetadata (LogOffsetMetadata.java:33-35): a messageOffset, the segmentBaseOffset of the segment containing it, and the relativePositionInSegment. Carrying the segment position lets the leader answer fetches and compute byte deltas without re-resolving the offset, and lets maybeIncrementHighWatermark advance the HW even when the offset is unchanged but the log has rolled to a new segment (UnifiedLog.java:573-574).
The ISR state hierarchy
partitionState is a @volatile PartitionState implementing a tiny state machine. CommittedPartitionState (steady state) reports maximalIsr() == isr() and isInflight() == false. While an AlterPartition is outstanding the field is swapped (under the write lock) to a pending record:
| State | isr() | maximalIsr() | isInflight() |
|---|---|---|---|
CommittedPartitionState | committed ISR | = isr | false |
PendingExpandIsr | last committed ISR | committed ISR + new replica | true |
PendingShrinkIsr | last committed ISR | = last committed ISR (not shrunk) | true |
The asymmetry (PendingExpandIsr.java:43-47 vs PendingShrinkIsr.java:41-43) is a deliberate safety choice: optimistically counting the new replica during an expansion only makes the HW condition stricter, so if the controller rejects the expansion no committed offset was over-advanced; conversely a shrink is not applied to the maximal ISR until the controller confirms it, so a rejected shrink cannot have prematurely advanced the HW past a replica that is still required.
Architecture & control/data flow
ReplicaManager, which drives leader/follower roles; the leader↔follower Fetch loop carries data and the HW, while ISR changes round-trip back to the controller via AlterPartition.Detailed mechanics
Becoming leader or follower: applyDelta
Metadata changes arrive as a TopicsDelta. ReplicaManager.applyDelta (ReplicaManager.scala:2360) takes replicaStateChangeLock, processes deletions first (so a recreated topic with the same name is not clobbered), then splits the local changes into leaders and followers and applies each:
- Leaders (
applyLocalLeadersDelta,ReplicaManager.scala:2418): remove any follower fetcher for the partition, then callpartition.makeLeader(registration, isNew, checkpoints...). - Followers (
applyLocalFollowersDelta,ReplicaManager.scala:2449): callpartition.makeFollower(...); if the leader epoch changed, (re)start aReplicaFetcherThreadtargeting the new leader atinitialFetchOffset(log).
makeLeader (Partition.scala:588) runs under the per-partition leaderIsrUpdateLock write lock. It rejects stale registrations (partitionRegistration.partitionEpoch < partitionEpoch), updates the assignment and ISR from the registration via updateAssignmentAndIsr, ensures the local log exists, and, only if the leader epoch advanced, records the new epoch's start offset in the epoch cache (leaderLog.assignEpochStartOffset, Partition.scala:649), resets every follower's ReplicaState via resetReplicaState, and sets leaderEpoch/leaderEpochStartOffsetOpt. It finishes by calling maybeIncrementLeaderHW because an ISR that just dropped to one replica may allow the HW to jump to the local LEO. makeFollower (Partition.scala:694) is simpler: it sets leaderReplicaIdOpt/leaderEpoch, clears the local ISR (a follower does not track ISR), and returns whether the leader epoch changed so the caller knows to restart the fetcher.
initialFetchOffset (ReplicaManager.scala:2093-2098): a follower whose epoch cache is non-empty begins fetching from its own LEO (epoch-based truncation in the fetch path will trim any uncommitted tail). A follower with an empty epoch cache (e.g. a freshly created replica) starts from its high watermark. Truncation itself happens in the fetch path via OffsetsForLeaderEpoch, see The Fetch Path & Replica Fetchers.
Recording follower progress and advancing the high watermark
When a follower's Fetch is served, the leader calls updateFollowerFetchState (Partition.scala:767). Under the ISR read lock it pushes the follower's new offsets into its Replica (updateFetchStateOrThrow), then attempts two things outside that read lock: maybeExpandIsr (might this follower now rejoin the ISR?) and, if the follower's LEO actually advanced, maybeIncrementLeaderHW.
maybeIncrementLeaderHW (Partition.scala:1010) is the heart of commit. It refuses to advance while the partition is under min ISR (isUnderMinIsr), then walks the leader's LEO downward against every remote replica:
newHW = leader.LEO
for each remote replica r:
if r.LEO < newHW and
(maximalIsr.contains(r) or (r.isCaughtUp(leader.LEO, now, lagMs) and isReplicaIsrEligible(r))):
newHW = r.LEO
leaderLog.maybeIncrementHighWatermark(newHW) // monotonic; only moves up
So the HW is the minimum LEO over the maximal ISR, but it additionally waits for any replica that is "caught up and eligible" even though it has not yet been committed to the ISR. The comment at Partition.scala:992-1004 spells out the reason: a replica that is considered caught-up but whose LEO momentarily trails the HW should hold the HW back, otherwise it could be elected leader and serve a stale view; and counting newly-added replicas (the maximal ISR) makes the commit condition strictly safer (KIP-497). maybeIncrementHighWatermark (UnifiedLog.java:563-581) enforces monotonicity and that the new HW never exceeds the LEO.
= 102 (min)
= 104
LEO = 105
HW = min(105, 104, 102) = 102. Records up to 101 are on all of {1,2,3} → committed, readable. Records [102‥104] are on replica 2 but not replica 3 → not yet committed. When replica 3 catches up to 105 the HW jumps to 105 and acks=all producers at offsets ≤ 105 complete.
Shrinking the ISR (lag detection)
A scheduler task isr-expiration runs every replicaLagTimeMaxMs / 2 ms (ReplicaManager.scala:285) and calls maybeShrinkIsr() on every online partition (ReplicaManager.scala:2100-2107). Partition.maybeShrinkIsr (Partition.scala:1089) first checks under the read lock (cheap, no write contention) whether any follower is out of sync via getOutOfSyncReplicas(replicaLagTimeMaxMs) (Partition.scala:1155), which filters ISR members (excluding the leader) by !isCaughtUp(...). If so it re-checks under the write lock, builds a PendingShrinkIsr, swaps partitionState, and submits an AlterPartition. The lag covers two failure shapes (Partition.scala:1142-1151): a stuck follower whose LEO has not moved for maxLagMs, and a slow follower that never closes the gap to the leader's LEO within maxLagMs, both collapse to the single lastCaughtUpTimeMs test.
Expanding the ISR
Expansion is driven by fetches, not a timer. After recording a follower's progress, maybeExpandIsr (Partition.scala:876) checks canAddReplicaToIsr (not already in ISR, not in-flight, and isReplicaIsrEligible) and isFollowerInSync (Partition.scala:907-912): the follower's LEO must be >= the leader's HW and >= the leader-epoch start offset. The second clause is essential, a replica must have caught up within the current leader epoch before it joins, otherwise it could be elected and lose data that sits between the HW and LEO (Partition.scala:862-872). Eligibility (isReplicaIsrEligible, Partition.scala:914-932) additionally requires the broker not be fenced, not be in controlled shutdown, and that its cached broker epoch matches the broker epoch carried in its Fetch (or the Fetch sent -1 to bypass).
Committing the change: the AlterPartition round-trip
Both shrink and expand call submitAlterPartition(pendingState) (Partition.scala:1687), which builds a LeaderAndIsr carrying the proposed ISR with each member's broker epoch (addBrokerEpochToIsr, Partition.scala:1661) and hands it to AlterPartitionManager.submit. DefaultAlterPartitionManager (DefaultAlterPartitionManager.java:53) batches updates: a ConcurrentHashMap holds at most one pending item per partition, and an AtomicBoolean inflightRequest ensures only one AlterPartition RPC is on the wire at a time (DefaultAlterPartitionManager.java:60-64,118-125). The request is sent to the active controller through the NodeToControllerChannelManager and is never timed out, it retries indefinitely until answered or until a newer metadata update supersedes it (DefaultAlterPartitionManager.java:138-140).
On the controller, ReplicationControlManager validates the proposal (ReplicationControlManager.java:1300-1356): the partition epoch must match (else INVALID_UPDATE_VERSION); the new ISR must be a subset of the assignment and must contain the leader (else INVALID_REQUEST); and every proposed member must be eligible, alive, unfenced, with a matching broker epoch, else INELIGIBLE_REPLICA. If valid, it appends a PartitionChangeRecord (bumping partitionEpoch) and returns the committed LeaderAndIsr.
Back on the leader, the completion handler (Partition.scala:1693) runs under the write lock. If partitionState no longer equals the proposed state (a leader election or metadata replay moved it on), the response is ignored as stale. On success, handleAlterPartitionUpdate (Partition.scala:1794) installs a fresh CommittedPartitionState, updates the local partitionEpoch, and re-runs maybeIncrementLeaderHW (a shrink may now permit the HW to advance). On error, handleAlterPartitionError (Partition.scala:1733) maps the code to a retry-or-give-up decision; notably INELIGIBLE_REPLICA and OPERATION_NOT_ATTEMPTED are authoritative "not applied" signals that let the leader safely reset to the last committed state, while transient codes trigger a resubmit.
An AlterPartition only commits if its partitionEpoch matches the controller's current value. Because the controller bumps the epoch on every change and the metadata log is linearizable, two leaders (e.g. a deposed one and its successor) can never both have their ISR proposals accepted: the older epoch is rejected with INVALID_UPDATE_VERSION, and a successful AlterPartition response always carries an epoch >= the local one (Partition.scala:1803).
The acks=all produce path
A produce with acks=-1 must not be acknowledged until its records are committed (HW >= the produced offset). ReplicaManager.appendRecords (ReplicaManager.scala:638) appends to the leader's log via Partition.appendRecordsToLeader (Partition.scala:1219), which under the read lock first guards durability, if the ISR is smaller than the effective min ISR and acks=-1, it throws NotEnoughReplicasException before writing (Partition.scala:1234-1238), then appends and re-checks the HW. If the append did not already satisfy the request, maybeAddDelayedProduce (ReplicaManager.scala:878) parks a DelayedProduce in the produce purgatory keyed by each partition. The operation completes when checkEnoughReplicasReachOffset(requiredOffset) (Partition.scala:947) sees leaderLog.highWatermark >= requiredOffset; if at that moment the maximal ISR has fallen below min ISR it completes with NOT_ENOUGH_REPLICAS_AFTER_APPEND (the data is durable but the durability contract weakened). The trigger that re-checks the purgatory is HW movement: whenever an append or follower fetch increases the HW, an action enqueued by addCompletePurgatoryAction (ReplicaManager.scala:853-876) calls delayedProducePurgatory.checkAndComplete.
acks=all completes on high-watermark advancement, not on the local append: a parked DelayedProduce is only released once a follower fetch pushes the HW past the produced offset.- acks=0
- Producer does not wait; the broker does not even send a response body. No durability guarantee.
- acks=1
- Leader responds as soon as its local append succeeds;
delayedProduceRequestRequiredis false (ReplicaManager.scala:1361-1367). Survives leader loss only if a follower had already replicated the record. - acks=-1 (all)
- Leader waits for HW >= offset, i.e. replication to the whole (maximal) ISR, and refuses to write at all if
ISR < min.insync.replicas.
Leader epochs & epoch-based truncation
Before KIP-101, a follower truncated to the leader's HW on becoming a follower, which could diverge if leadership changed faster than the HW propagated. Now every batch carries the leader epoch in which it was written, and the LeaderEpochFileCache records where each epoch began. When a follower (re)starts fetching it asks the leader, via OffsetsForLeaderEpoch, for the end offset of the follower's latest epoch; endOffsetFor returns the start offset of the next epoch (the exact divergence point), and the follower truncates there. In the fetch path the leader detects divergence inline: readRecords (Partition.scala:1367) compares the follower's lastFetchedEpoch against the cache and, if they diverge, returns an empty fetch carrying a divergingEpoch (Partition.scala:1401-1413) instead of records, prompting the follower to truncate. The server-side answer is Partition.lastOffsetForLeaderEpoch (Partition.scala:1585).
endOffsetFor returns the start offset of the next epoch, which is the precise divergence point.Leader epochs and OffsetsForLeaderEpoch were introduced by KIP-101 to eliminate log divergence on leader change, and fixed for the multi-leader-change case by KIP-279. The cache stores start offsets so that the "end offset of epoch N" can always be derived as the start offset of epoch N+1, keeping the structure append-only except for monotonicity repair (maybeTruncateNonMonotonicEntries, LeaderEpochFileCache.java:150).
Unclean leader election & Eligible Leader Replicas
If every ISR member is unavailable, the partition has no safe leader. With unclean.leader.election.enable=false (the default, LogConfig.java:139) the partition stays offline until an ISR member returns. If unclean election is permitted, the controller may elect an out-of-sync replica, which can lose committed records; that replica's leaderRecoveryState becomes RECOVERING, and the controller refuses any AlterPartition that grows the ISR beyond one replica or flips RECOVERED→RECOVERING while recovering (ReplicationControlManager.java:1326-1344). PartitionRegistration.electionWasUnclean keys off this state (PartitionRegistration.java:165-167).
KIP-966 narrows the gap between "lose data" and "stay offline": the controller tracks an ELR, replicas it knows still hold all data up to the HW even after they leave the ISR. When the ISR empties, PartitionChangeBuilder can elect from the ELR cleanly (PartitionChangeBuilder.java:319-336): a replica is an acceptable leader if it is in the ISR, or, when the ISR is empty, in the ELR; on election from the ELR it becomes the sole ISR member and is removed from the ELR. As a last resort, lastKnownElr retains the last good leader so a single-member recovery can proceed even when both ISR and ELR are empty (PartitionChangeBuilder.java:247,270,299-312). ELR/lastKnownElr are persisted only when the feature is enabled.
Concurrency & threading
Each Partition is thread-safe via a ReentrantReadWriteLock leaderIsrUpdateLock (Partition.scala:183), with a documented lock-ordering contract (Partition.scala:148-162): the partition lock is acquired before the log lock. The read lock guards the common hot path (appends, follower fetches, HW reads); the write lock guards leadership transitions and ISR mutations. The double-checked pattern in maybeExpandIsr/maybeShrinkIsr (test under read lock, re-test and mutate under write lock) avoids serialising the fetch path on the rare ISR change. Per-follower state needs no partition lock at all: Replica uses an AtomicReference updated by CAS.
| Thread / pool | Responsibility | Touches |
|---|---|---|
| kafka-request-handler-* (KafkaApis) | Produce appends, leader-side fetch reads, recording follower progress | appendRecordsToLeader, fetchRecords, updateFollowerFetchState (read lock) |
isr-expiration scheduler task | Detect lagging followers, propose ISR shrink | maybeShrinkIsr (read then write lock) |
highwatermark-checkpoint scheduler task | Persist HW per log dir every replica.high.watermark.checkpoint.interval.ms | checkpointHighWatermarks (ReplicaManager.scala:2116) |
| ReplicaFetcherThread (follower) | Pull records from the leader, truncate on diverging epoch, append as follower | appendRecordsToFollowerOrFutureReplica, epoch-cache truncation |
| AlterPartition network client thread | Send the AlterPartition RPC and run the completion callback (which takes the write lock) | DefaultAlterPartitionManager response handler → Partition.handleAlterPartitionUpdate |
| Broker metadata-publishing thread | Apply metadata deltas, drive leader/follower transitions | applyDelta under replicaStateChangeLock |
| Purgatory reaper / completing threads | Complete DelayedProduce/DelayedFetch when HW advances or on timeout | delayedProducePurgatory.checkAndComplete |
updateFollowerFetchState runs under the partition read lock specifically to interlock with ISR updates and prevent a rebooted follower's stale Fetch from corrupting the broker-epoch check during expansion (Partition.scala:779-789). A Fetch whose broker epoch is older than the cached alive epoch throws NotLeaderOrFollowerException and is rejected (Replica.java:78-82).
Configuration reference
| Key | Default | Effect |
|---|---|---|
replica.lag.time.max.ms | 30000 | A follower not caught up to the leader's LEO within this window is removed from the ISR. The expiration task runs every half this interval. ReplicationConfigs.java:54-55. |
min.insync.replicas | 1 | With acks=all, the minimum ISR size required to accept a write and to allow the HW to advance. Effective value is min(this, replicationFactor) (Partition.scala:246-248). ServerLogConfigs.MIN_IN_SYNC_REPLICAS_DEFAULT. |
unclean.leader.election.enable | false | Whether the controller may elect a non-ISR replica when the ISR is empty, risking data loss. LogConfig.java:139. |
replica.high.watermark.checkpoint.interval.ms | 5000 | How often the HW for every partition is flushed to the replication-offset-checkpoint file. ReplicationConfigs.java:102-103. |
replica.fetch.wait.max.ms | 500 | Max time a follower's Fetch blocks on the leader waiting for data; bounds replication latency. ReplicationConfigs.java:74-75. |
replica.fetch.max.bytes | 1048576 | Per-partition byte budget a follower attempts to fetch. ReplicationConfigs.java:67-68. |
replica.fetch.min.bytes | 1 | Minimum bytes before a follower Fetch returns early. ReplicationConfigs.java:79-80. |
replica.fetch.backoff.ms | 1000 | Sleep after a fetch error before retrying. ReplicationConfigs.java:83-84. |
Note that min.insync.replicas and unclean.leader.election.enable are topic-level overridable; the fetcher tunings are broker-level. The HW checkpoint filename is replication-offset-checkpoint (ReplicaManager.scala:89).
Failure modes, edge cases & recovery
- Leader crash. The controller fences the broker, elects a new leader from the ISR (or ELR), bumps the leader epoch, and replays the change to all brokers. Followers restart fetchers and truncate to the new leader's epoch boundary; no committed record (below the old HW) is lost because every ISR member held all committed data.
- Follower falls behind. Dropped from the ISR after
replica.lag.time.max.ms; the HW can then advance using the smaller ISR. The follower re-enters once its LEO reaches the HW and the current epoch's start offset. - ISR shrinks below min ISR.
acks=allproduces are refused withNotEnoughReplicasbefore writing; in-flight delayed produces may complete withNOT_ENOUGH_REPLICAS_AFTER_APPEND. The HW stops advancing (maybeIncrementLeaderHWreturns early whenisUnderMinIsr). - AlterPartition rejected.
INVALID_UPDATE_VERSIONmeans another change won the race, the leader awaits fresh metadata.INELIGIBLE_REPLICA/OPERATION_NOT_ATTEMPTEDare safe "not applied" signals and reset to the last committed state. Transient errors retry after 50 ms (DefaultAlterPartitionManager.java:174). - Returning zombie leader. A deposed leader that still thinks it leads will have its
AlterPartitionrejected (stale partition epoch) and its produce/fetch rejected by epoch checks; its followers' fetches carry a newer leader epoch and it learns it is no longer leader. - Log directory failure (JBOD).
handleLogDirFailure(ReplicaManager.scala:2157) marks affected partitions offline, removes their fetchers, drops the dir from the HW checkpoint map, and notifies the controller via the directory event handler so leadership moves elsewhere. - Append offset below log start (delete-records race). A follower whose first fetched batch straddles the leader's advanced log start offset triggers
truncateFullyAndStartAtto restart its log from the batch's base offset (Partition.scala:1197-1216). - Stale HW after unclean election. If the HW lags the epoch start offset, ListOffsets for "latest" returns
OFFSET_NOT_AVAILABLEuntil the HW recovers (Partition.scala:1455-1460).
Invariants & guarantees
1. HW is monotonically non-decreasing on a given log and never exceeds the LEO (UnifiedLog.java:564,573).
2. HW = min LEO over the maximal ISR, and never advances while ISR.size < effectiveMinIsr (Partition.scala:1010-1014,1028-1031).
3. Every committed record (offset < HW) is present on every ISR member, so any clean leader election preserves all committed data.
4. The ISR always contains the leader and is always a subset of the assignment (controller-enforced, ReplicationControlManager.java:1310-1325).
5. partitionEpoch and leaderEpoch are strictly monotonic per partition; an AlterPartition commits only at the matching partitionEpoch.
6. A replica may not join the ISR until its LEO reaches both the HW and the current epoch's start offset (Partition.scala:907-912).
Interactions with other subsystems
- The Log Storage Engine, appends, HW/LEO/LSO bookkeeping, and the leader-epoch checkpoint all live in
UnifiedLog;Partitionis a thin replication wrapper over it. - The Fetch Path & Replica Fetchers, the follower side that pulls records, performs epoch-based truncation, and reports LEO back; the leader's
fetchRecordsemits diverging-epoch responses. - The KRaft Controller, owns the authoritative ISR/leader/ELR, validates and commits
AlterPartition, runs leader election and reassignment. - Metadata Propagation & Broker Lifecycle, delivers the
TopicsDeltathatapplyDeltaturns into leader/follower transitions; broker registration/fencing feeds ISR eligibility. - Request Processing (KafkaApis), dispatches Produce/Fetch into
ReplicaManagerand wires the delayed-operation purgatories. - Transactions & Exactly-Once Semantics, the LSO and the min-ISR durability gate underpin transactional commit visibility.
- The Producer Client, the
ackssetting selects which of the durability paths above the producer relies on.
Design rationale & evolution
KIP-497 moved ISR changes from direct ZooKeeper writes to the AlterPartition (originally AlterIsr) RPC, making the controller the sole writer of ISR state and introducing the "maximal ISR" so a leader can advance the HW optimistically during an expansion without risking premature commit. KIP-101/KIP-279 introduced leader epochs and OffsetsForLeaderEpoch to make follower truncation precise. KIP-966 adds Eligible Leader Replicas so the controller can elect a guaranteed-complete replica even after it leaves the ISR, shrinking the window where operators must choose between unavailability and unclean (lossy) election. In KRaft these all compose cleanly because the metadata log is a single linearizable source of truth, there is no separate consensus system to reconcile.
Gotchas / operational notes
min.insync.replicas is only consulted on the acks=-1 path (Partition.scala:1234). With acks=1 a leader will happily accept writes even when it is the only ISR member, so a topic that wants the strong guarantee must set both min.insync.replicas > 1 and produce with acks=all.
Setting min.insync.replicas above the replication factor does not deadlock writes: effectiveMinIsr caps it at remoteReplicasMap.size + 1 (Partition.scala:246-248). The UnderMinIsr/AtMinIsr gauges reflect this effective value.
The HW is flushed only every replica.high.watermark.checkpoint.interval.ms (default 5 s) and on shutdown. A hard crash can lose the most recent HW advance from disk; on restart the partition recovers a slightly stale HW and re-derives the true value from fetches. This is safe (the HW only moves up) but means the on-disk HW is a lower bound, not the live value.
Partition exports UnderReplicated, UnderMinIsr, AtMinIsr, InSyncReplicasCount, ReplicasCount, and LastStableOffsetLag per topic-partition (Partition.scala:221-226); ReplicaManager defines and exports IsrExpandsPerSec, IsrShrinksPerSec and FailedIsrUpdatesPerSec (ReplicaManager.scala:100-102,257-259), marked through the AlterPartitionListener (Partition.scala:101-111). Sustained UnderMinIsr > 0 on a busy topic is the canonical "producers are being rejected" alarm.