09 · The Fetch Path & Replica Fetchers
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
There is exactly one read path in Kafka, served by a single piece of machinery, and two kinds of clients walk it: follower brokers replicating from a partition leader, and consumers draining committed data. Both speak the Fetch API; both land in ReplicaManager.fetchMessages, which reads from the log engine, optionally parks the request in a purgatory until min.bytes/max.wait.ms are satisfied, and streams record batches back with zero-copy sendfile. This chapter dissects the follower side (the AbstractFetcherThread pull loop, work partitioning, the leader-epoch truncation handshake, and intra-broker JBOD moves) and the leader-serving side (log reads, DelayedFetch, the high-watermark/last-stable-offset bounds, incremental fetch sessions per KIP-227, and rack-aware fetch-from-follower per KIP-392).
Role & responsibilities
The fetch subsystem has two distinct halves that meet at the wire:
- Follower replication (the fetcher side). Each broker that is a follower for some partitions runs a small pool of long-lived threads that continuously pull records from the partition's leader, append them to the local log, and advance their own high watermark from the leader's reported value. This is how the ISR stays caught up (see Replication, ISR & High Watermark).
- Serving fetches (the leader/broker side). When a follower's fetch request or a consumer's fetch request arrives, the broker reads from its local log and returns data, either immediately or after a bounded wait in purgatory. Consumers and followers differ only in a handful of parameters (isolation bound, who may be throttled, whether a preferred read replica is computed).
A third, closely-related actor reuses the same fetcher framework for an entirely local purpose: ReplicaAlterLogDirsThread copies a partition from one local log directory to another (JBOD intra-broker reassignment), fetching from a local leader endpoint instead of a remote socket.
Follower replication is just a fetch loop. A follower is a client of its leader's Fetch API; the very same ReplicaManager.fetchMessages path that serves consumers serves followers, distinguished by FetchParams.replicaId being a valid broker id rather than the consumer sentinel.
Where it lives in the code
| Class / file | Responsibility |
|---|---|
core/src/main/scala/kafka/server/AbstractFetcherThread.scala | The pull loop: truncate, build fetch, send, process responses, advance state. Shared by replica and alter-dir fetchers. |
core/src/main/scala/kafka/server/AbstractFetcherManager.scala | Partitions topic-partitions across N fetcher threads keyed by (leader, fetcherId); thread-pool resize; failed-partition tracking. |
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala | Follower-specific append, HW advance, and post-fetch delayed-fetch completion. |
core/src/main/scala/kafka/server/ReplicaFetcherManager.scala | Creates ReplicaFetcherThreads with a RemoteLeaderEndPoint + FetchSessionHandler. |
core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala | Talks to a remote leader broker over a blocking socket; builds incremental fetch requests; issues ListOffsets / OffsetsForLeaderEpoch. |
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala | "Fetches" from a local replica for the alter-log-dirs path by calling ReplicaManager.fetchMessages in-process. |
core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala | Copies a partition between local log directories; promotes the future replica. |
server/src/main/java/org/apache/kafka/server/LeaderEndPoint.java | Interface abstracting "the leader I fetch from" (remote vs local). |
server/src/main/java/org/apache/kafka/server/PartitionFetchState.java | Per-partition fetcher state record: offset, epoch, lag, FETCHING/TRUNCATING, delay. |
core/src/main/scala/kafka/server/ReplicaManager.scala | fetchMessages, readFromLog, findPreferredReadReplica, throttling, purgatory wiring. |
core/src/main/scala/kafka/server/DelayedFetch.scala | The delayed operation that waits for min.bytes or expiry in the fetch purgatory. |
server/src/main/java/org/apache/kafka/server/FetchSession.java | Leader-side incremental fetch session + CachedPartition + FetchSessionCache. |
server/src/main/java/org/apache/kafka/server/FetchManager.java / FetchContext.java / FetchSessionCacheShard.java | Server-side session creation, epoch validation, eviction. |
clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java | Client/follower-side session bookkeeping (which partitions to send vs forget). |
clients/src/main/java/org/apache/kafka/common/replica/RackAwareReplicaSelector.java | Default pluggable rack-locality selector (KIP-392). |
Core concepts & terminology
- Fetcher thread
- A
ShutdownableThreadthat loopsdoWork()= truncate-if-needed, then fetch, for a set of partitions sharing one leader. NamedReplicaFetcherThread-{fetcherId}-{leaderId}(ReplicaFetcherManager.scala:42). - Leader endpoint
- The
LeaderEndPointa fetcher reads from.RemoteLeaderEndPointwraps a TCPBlockingSend;LocalLeaderEndPointreads in-process for JBOD moves. - High watermark (HW)
- The largest offset known to be replicated to all in-sync replicas; the consumer-visible end of the log. Followers learn it from the leader's fetch response and copy it locally (
ReplicaFetcherThread.scala:143). - Last stable offset (LSO)
- The bound for
read_committedconsumers: the offset below which there are no open transactions. See Transactions & EOS. - Fetch offset / fetched epoch
- The follower's next-to-read offset and the leader epoch of its last appended batch, sent so the leader can detect log divergence.
- Diverging epoch
- A signal in the fetch response telling the follower its log has diverged from the leader and must truncate (KIP-595 truncation-on-fetch).
- Incremental fetch session
- A stateful "subscription" (KIP-227): after a full fetch establishes a
sessionId, subsequent requests omit unchanged partitions and the response omits partitions with no news. - Preferred read replica
- A follower the leader nominates for a consumer to read from, for rack locality (KIP-392).
Data structures
PartitionFetchState (in-memory, per partition per fetcher)
The follower's complete per-partition fetch state is the record PartitionFetchState (server/.../PartitionFetchState.java:32):
| Field | Type | Meaning |
|---|---|---|
topicId | Optional<Uuid> | Topic ID for topic-ID-aware fetch (FetchRequest v13+). |
fetchOffset | long | Next offset to fetch; equals the follower's log-end offset once caught up. |
lag | Optional<Long> | leaderHW − nextOffset; isReplicaInSync ⇔ lag ≤ 0 (:71). |
currentLeaderEpoch | int | The leader epoch this follower believes is current; sent in every fetch/epoch request for fencing. |
delay / dueMs | Optional<Long> | Backoff: if set and dueMs > now, the partition is skipped (isDelayed, :78). |
state | ReplicaState | FETCHING or TRUNCATING. isReadyForFetch ⇔ FETCHING && !delayed (:66). |
lastFetchedEpoch | Optional<Integer> | Leader epoch of the last batch appended; lets the leader detect divergence. |
All states live in a PartitionStates<PartitionFetchState> inside the fetcher (AbstractFetcherThread.scala:70), an insertion-ordered map; updateAndMoveToEnd rotates a just-served partition to the back for round-robin fairness.
CachedPartition (leader-side, per fetch session)
For every partition in an incremental fetch session, the broker keeps a memory-frugal CachedPartition (FetchSession.java:220). It deliberately stores topic/topicId/partition as primitives rather than a TopicPartition object, because a broker may hold millions of them. Key fields: maxBytes, fetchOffset, highWatermark, leaderEpoch, fetcherLogStartOffset (the follower's LSO), localLogStartOffset (this broker's LSO for the partition), and lastFetchedEpoch. Its maybeUpdateResponseData decides whether a partition must appear in the response (see below). Equality/hash switch on whether topic IDs are in use (:380, :397).
FetchSession & the sharded cache
A FetchSession (FetchSession.java:42) carries an id, a privileged flag (true for follower sessions, false for consumers), the partitionMap of CachedPartitions, a monotonically increasing epoch, and creationMs/lastUsedMs. Each session is guarded by its own monitor lock. Sessions are kept in a FetchSessionCache split into NumFetchSessionCacheShards = 8 shards (KafkaBroker.scala:79), each owning a contiguous sessionIdRange = Int.MaxValue / 8 band of IDs so the shard for an ID is sessionId / sessionIdRange (FetchSession.java:471). Total capacity is max.incremental.fetch.session.cache.slots (default 1000) divided across shards (BrokerServer.scala:465).
Architecture & control/data flow
The fetcher manager: partitioning work across threads
AbstractFetcherManager owns a fetcherThreadMap: HashMap[BrokerIdAndFetcherId, T] (AbstractFetcherManager.scala:40). When partitions are assigned (addFetcherForPartitions, :131) each is bucketed by BrokerAndFetcherId(leader, getFetcherId(tp)). The fetcher id is a stable hash:
fetcherId = Utils.abs(31 * topic.hashCode + partition) % numFetchersPerBroker // :113
So a partition's fetcher is a function of (its leader broker, hash of its name). With num.replica.fetchers = 1 (default) a follower runs one fetcher thread per source leader broker; raising it fans partitions from the same leader across more threads. A thread is reused if it already targets that broker; otherwise the stale one is shut down and a fresh one started (:147–:156). resizeThreadPool drains every thread's partitions and re-adds them under the new modulus so the hash mapping stays consistent across add/delete (:69–:99). shutdownIdleFetcherThreads reaps threads with zero partitions (:207).
The total number of replica-fetcher threads on a broker is bounded by num.replica.fetchers × (number of distinct leader brokers it follows), not by partition count. Many partitions multiplex onto each thread via one incremental fetch session per (follower, leader) pair.
The pull loop in detail
Every iteration of AbstractFetcherThread.doWork() is maybeTruncate(); maybeFetch() (:112). ReplicaFetcherThread appends a third step, completeDelayedFetchRequests() (:107).
- maybeFetch (
:117) takespartitionMapLock, callsleader.buildFetch(partitionStates), and, if no partition is ready, waits onpartitionMapCondfor up tofetchBackOffMs(=replica.fetch.backoff.ms, default 1000) before looping. If a request was built, it is sent outside the lock byprocessFetchRequest. - processFetchRequest (
:318) callsleader.fetch(request)(a blocking round-trip), then re-takespartitionMapLockand, per partition, validates the response is still relevant, the partition must still be present, itsfetchOffsetandcurrentLeaderEpochmust match what was requested, and it must beisReadyForFetch(:348). This guards against the partition being moved/truncated/re-added while the request was in flight. - On
Errors.NONE, if the response carries a diverging epoch and the leader supports truncation-on-fetch, the partition is recorded for truncation but not appended (HW/LSO are left untouched until the next fetch) (:356). Otherwise it hands the batch toprocessPartitionDataand advancesfetchOffset = lastOffset + 1, recomputeslag = max(0, respHW − nextOffset), and rotates the partition to the back ofpartitionStates(:380–:393).
Follower append (ReplicaFetcherThread.processPartitionData, :113): it asserts fetchOffset == log.logEndOffset (a follower must always fetch exactly at its LEO; mismatch throws IllegalStateException, :124), converts FileRecords to MemoryRecords if needed, appends via partition.appendRecordsToFollowerOrFutureReplica, then log.maybeUpdateHighWatermark(partitionData.highWatermark) and log.maybeIncrementLogStartOffset(leaderLogStartOffset...). Critically the batch is only appended up to the leader epoch that was current when the FETCH was built (passed as partitionLeaderEpoch) to avoid appending across a truncation/append race, see KAFKA-18723 (AbstractFetcherThread.scala:367). Replication-quota bytes are recorded if the partition is throttled.
A follower's high watermark never exceeds the leader's: it is set strictly from the value the leader reports in the fetch response (ReplicaFetcherThread.scala:143), and only ever moves forward (maybeUpdateHighWatermark). A follower also fetches strictly at its own log-end offset (:124), so its log is always a prefix of what the leader has acknowledged.
The truncation handshake (becoming a follower)
When a replica becomes a follower (or the leader changes), its log may contain records the new leader never had (e.g. after an unclean election). It must truncate to the largest common prefix before appending anything new. Two mechanisms exist:
1. Truncation-on-fetch (the modern path, KIP-595/KIP-320)
RemoteLeaderEndPoint.isTruncationOnFetchSupported = true (:68). The follower includes lastFetchedEpoch in each FetchRequest.PartitionData (:191). The leader compares it against its own leader-epoch history; if they diverge it returns a diverging epoch (epoch + endOffset) in the fetch response rather than data. The follower's processFetchRequest collects these and calls truncateOnFetchResponse (:471, :233), which runs getOffsetTruncationState and truncates, no separate round-trip needed.
2. The explicit OffsetsForLeaderEpoch handshake
For partitions that enter TRUNCATING state, maybeTruncate (:172) builds an OffsetForLeaderEpoch request from each partition's latestEpoch (fetchTruncatingPartitions, :150), sends it via leader.fetchEpochEndOffsets, then, re-checking under the lock that no leadership change occurred meanwhile (:216), truncates to the offset computed by getOffsetTruncationState (:604). That function encodes the full KIP-101/KIP-279 logic:
| Leader reply | Truncate to |
|---|---|
endOffset == UNDEFINED_EPOCH_OFFSET | the follower's initial/HW fetch offset (leader has no such epoch). |
leaderEpoch == UNDEFINED_EPOCH | min(leaderEndOffset, followerLEO) (legacy protocol). |
follower knows an epoch < the replied epoch | truncate to that smaller epoch's end offset, then send another epoch request (truncationCompleted = false). |
| follower's epoch matches | min(followerEpochEndOffset, leaderEndOffset, followerLEO), the common prefix. |
On the new follower, partitionFetchState (:508) decides the initial state: with truncation-on-fetch and a known latestEpoch it starts directly in FETCHING (divergence will be caught on the first fetch); otherwise it starts in TRUNCATING.
FENCED_LEADER_EPOCH is handled by comparing the request's epoch to the current state: if they match, the follower is genuinely behind and the partition is marked failed until a fresh LeaderAndIsr-equivalent metadata update arrives; if the local epoch is already newer, it just retries later (onPartitionFenced, :301). A failed partition is removed from the fetcher and tracked in FailedPartitions (:494).
Out-of-range and unclean-election recovery
handleOutOfRangeError → fetchOffsetAndTruncate (:660) covers the corner cases. If the leader's LEO is behind the follower's (the follower was an old leader that over-ran), it truncates down to the leader's LEO and resumes (:681). If the follower is below the leader's log start offset (it was offline while old data was deleted), it truncates fully and restarts at the leader's start offset (truncateFullyAndStartAt, :719). Otherwise it fetches from max(leaderStartOffset, followerLEO).
Tiered-storage interplay on the fetch path
If the leader has moved the requested offset to remote storage, it returns OFFSET_MOVED_TO_TIERED_STORAGE. The follower's handleOffsetsMovedToTieredStorage (:786) drives a TierStateMachine.start to rebuild the partition's auxiliary state (leader-epoch cache, producer snapshot) from the leader's earliest-local or earliest-pending-upload offset, then resumes fetching from local data. A special "fetch from last tiered offset" mode (follower.fetch.last.tiered.offset.enable, default off; ReplicaFetcherThread.scala:71) lets a brand-new empty replica (replicaEndOffset == 0) skip ahead instead of replaying everything. See Tiered Storage.
Serving fetches on the leader
ReplicaManager.fetchMessages
fetchMessages (ReplicaManager.scala:1665) first does a synchronous readFromLog(..., readFromPurgatory = false), accumulating bytesReadable. It then responds immediately if there is no remote fetch pending and any of: maxWaitMs ≤ 0, no partitions requested, bytesReadable ≥ minBytes, a read error, a diverging epoch, or a preferred read replica was chosen (:1705). Otherwise it constructs a DelayedFetch and registers it in the purgatory keyed by every partition (:1728–:1745), so a later produce to any of those partitions can complete it.
readFromLog and the isolation bound
readFromLog (:1753) iterates partitions, shrinking a shared limitBytes budget (so one fetch response never exceeds fetch.max.bytes) but always returning at least one message for the first partition (minOneMessage, :1878) to guarantee progress past a large batch. For each partition it calls partition.fetchRecords, whose upper read bound is the FetchIsolation derived from the request (FetchIsolation.java:31):
| Requester | Isolation | Reads up to |
|---|---|---|
| Follower (valid broker id) | LOG_END | the leader's log-end offset, followers see uncommitted data so the HW can advance. |
Consumer, read_uncommitted | HIGH_WATERMARK | the high watermark. |
Consumer, read_committed | TXN_COMMITTED | the last stable offset (LSO). |
fetchOnlyLeader() (FetchParams.java:79) forces leader-only reads for followers and for consumers that did not supply client metadata; a consumer with rack metadata may be steered to a follower.
The HW/LSO bound is enforced on the leader at read time, not by the client. A consumer literally cannot read past the high watermark (read_uncommitted) or the last stable offset (read_committed), because fetchRecords clamps the read to that offset.
DelayedFetch: waiting for min.bytes
DelayedFetch.tryComplete (DelayedFetch.scala:70) re-evaluates completability whenever poked. It recomputes each partition's bound offset for the request's isolation and force-completes if any partition has rolled to a newer segment (Case F), or if data is now available below the bound; it accumulates min(bytesAvailable, partitionMaxBytes) and completes once the total reaches params.minBytes (Case G, :142). It short-circuits on leadership loss, offline dir, or fenced epoch (Cases A–E), and force-completes if a diverging epoch appeared while parked (Case H, :109), so truncation is not delayed. On expiry (max.wait.ms elapsed) or completion, onComplete re-reads with readFromPurgatory = true and fires the response callback (:158). Followers are excluded from triggering completion while throttled (:99), to avoid ISR thrashing.
How does a parked consumer fetch ever wake? Two ways. A produce that advances the HW calls delayedFetchPurgatory.checkAndComplete on the affected key (ReplicaManager.scala:864). And a follower, after appending and advancing the leader's HW locally, collects partitionsWithNewHighWatermark and calls completeDelayedFetchRequests at the end of its loop (ReplicaFetcherThread.scala:166), so a leader's own follower-replication progress is what releases consumer fetches waiting on freshly-committed data.
Incremental fetch sessions (KIP-227)
Sending the full partition set on every fetch does not scale to thousands of partitions. KIP-227 makes fetches incremental.
Server side: FetchManager.newContext
Each fetch carries FetchMetadata(sessionId, epoch). FetchManager.newContext (FetchManager.java:58) dispatches:
- Full request (
isFull()): any existing session with that id is removed; a newFullFetchContextis created (orSessionlessFetchContextifepoch == FINAL_EPOCH). AFullFetchContext.updateAndGenerateResponseDatacallsmaybeCreateSessionto allocate a session and assign a fresh id (FetchContext.java:215). - Incremental request: the session is looked up in its shard under lock; the request
epochmust equal the stored epoch or it is rejected withINVALID_FETCH_SESSION_EPOCH(FetchManager.java:95). A topic-ID/version mismatch yieldsFETCH_SESSION_TOPIC_ID_ERROR. On success the session'spartitionMapis patched with added/updated/removed partitions,lastUsedMsis touched, the epoch is bumped vianextEpoch, and anIncrementalFetchContextis returned (:106–:125). If the request emptied the session it is removed and treated as sessionless.
When building the response, CachedPartition.maybeUpdateResponseData (FetchSession.java:336) returns true, i.e. the partition is included, only if it has new records, a changed high watermark or log-start offset, an error, a diverging epoch, or a preferred-replica nomination. Unchanged partitions are silently dropped from the response, which is the entire point.
S at epoch 1; each incremental fetch must echo the exact stored epoch, which the broker bumps on every accepted request; any gap is rejected with INVALID_FETCH_SESSION_EPOCH, forcing the client back to a full fetch. A final-epoch full fetch closes the session.Cache eviction
When a shard is full, FetchSessionCacheShard.tryEvict (:204) evicts in priority order: (1) a session idle longer than evictionMs (= MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS = 120000 ms, KafkaBroker.scala:73); else (2) the least-valuable evictable entry if the newcomer outranks it. The EvictableKey ordering is unprivileged-before-privileged, then smaller, then by id (FetchSession.java:438), so follower (privileged) sessions and larger sessions survive over small consumer sessions. Evictions increment IncrementalFetchSessionEvictionsPerSec.
Client/follower side: FetchSessionHandler
The follower's RemoteLeaderEndPoint.buildFetch (:179) uses a FetchSessionHandler builder: it adds the current FetchRequest.PartitionData for every ready, un-throttled partition, then build() diffs against the previous session to produce toSend, toForget, toReplace, and the session metadata (:207–:222). If nothing is to send or forget, no request is issued at all (:208). After the response, fetch calls fetchSessionHandler.handleResponse to validate the session id/epoch before decoding partition data (RemoteLeaderEndPoint.scala:85).
Fetch-from-follower for rack locality (KIP-392)
When a consumer sends FetchRequest v11+ it includes a rackId; KafkaApis wraps it in DefaultClientMetadata (KafkaApis.scala:747). In readFromLog, if the broker is the leader and the requester is a consumer (not a valid broker id), findPreferredReadReplica (ReplicaManager.scala:1964) is consulted. It builds a PartitionView of the leader plus every in-sync remote replica whose logEndOffset ≥ fetchOffset ≥ logStartOffset (:1985), only in-sync replicas, so the consumer never gets steered to a lagging follower, and passes it to the configured ReplicaSelector.select. The default RackAwareReplicaSelector returns the most caught-up replica sharing the client's rack, or the leader if none (RackAwareReplicaSelector.java:31).
If a non-leader replica is chosen, the leader returns no data, only a preferredReadReplica id and an offset snapshot (ReplicaManager.scala:1806). The consumer then redirects its fetches to that follower (which serves them at the HIGH_WATERMARK/TXN_COMMITTED bound like any leader-serving read). This nomination always force-completes the fetch immediately (it is one of the immediate-response conditions, :1706). No replica.selector.class configured ⇒ replicaSelectorPlugin is None and all consumers read from the leader (createReplicaSelector, ReplicaManager.scala).
Followers serve consumer reads only up to their own high watermark, which lags the leader's by one replication round-trip. Fetch-from-follower trades a small extra staleness for reduced cross-rack/cross-AZ bandwidth. A follower that has fallen out of the ISR is excluded from selection precisely so the consumer cannot get pinned to a stalled replica indefinitely (ReplicaManager.scala:1985).
Zero-copy delivery (sendfile)
Records read from a log segment are returned as FileRecords, a window over the segment's FileChannel, not a heap copy. When the network layer writes a fetch response, FileRecords.writeTo (FileRecords.java:291) delegates to destChannel.transferFrom(channel, position, count), which on Linux bottoms out in the sendfile(2) syscall: bytes go from page cache straight to the socket without traversing user space or the JVM heap. The follower's fetcher converts FileRecords to MemoryRecords only because it must inspect and re-append them (AbstractFetcherThread.toMemoryRecords, :910); consumers receive the file-backed bytes untouched. See Network Layer & Threading for how the Send is driven.
Concurrency & threading
| Thread | Count | Does what | Guards |
|---|---|---|---|
ReplicaFetcherThread-{id}-{leader} | num.replica.fetchers × #leaders | Runs the truncate→fetch→append loop for its partitions. | partitionMapLock (a ReentrantLock) + partitionMapCond. |
ReplicaAlterLogDirsThread-… | ≤ num.replica.alter.log.dirs.threads | Copies partitions between local dirs; promotes future replicas. | same lock; promotionStates is a ConcurrentHashMap. |
| Request handler (KafkaRequestHandler) | num.io.threads | Runs handleFetchRequest → fetchMessages → synchronous readFromLog. | per-session monitor; shard monitor in FetchSessionCacheShard. |
ExpirationReaper-…-Fetch | 1 per purgatory | Times out DelayedFetch ops at max.wait.ms. | purgatory internal locks. |
The fetcher thread is a ShutdownableThread; its run() simply loops doWork() while isRunning() (ShutdownableThread.java:131). All mutation of partitionStates, add, remove, truncate, advance offset, delay, happens under partitionMapLock, while the actual blocking network fetch in processFetchRequest happens without it, then re-acquires it to apply results (re-validating each partition because it may have changed meanwhile). FailedPartitions and the manager's fetcherThreadMap are each guarded by their own monitor. FetcherLagStats uses a ConcurrentHashMap (AbstractFetcherThread.scala:950) whose per-partition FetcherLagMetrics each hold an AtomicLong gauge (:933).
A response is applied to partitionStates only if, under partitionMapLock, the partition is still present and its fetchOffset + currentLeaderEpoch still match what was requested (AbstractFetcherThread.scala:348). This prevents a stale in-flight response from clobbering state for a partition that was reassigned to a different fetcher thread or truncated in the interim.
Configuration reference
| Key | Default | Effect |
|---|---|---|
num.replica.fetchers | 1 | Fetcher threads per source broker; raises replication parallelism per leader. ReplicationConfigs.java:96. |
replica.fetch.max.bytes | 1 MiB (1048576) | Per-partition fetch size cap (soft; one batch is always returned). :67. |
replica.fetch.response.max.bytes | 10 MiB | Cap on a whole follower fetch response. :87. |
replica.fetch.wait.max.ms | 500 | Follower's max.wait.ms on the leader, how long the leader parks the follower fetch. :74. |
replica.fetch.min.bytes | 1 | Follower's min.bytes, leader waits for this much before responding. :79. |
replica.fetch.backoff.ms | 1000 | Sleep when no partition is ready or after an error. :83. |
fetch.max.bytes | 50 MiB (52428800) | Server cap on any single consumer fetch response. ConsumerConfig.java:200. |
max.incremental.fetch.session.cache.slots | 1000 | Total cached fetch sessions, split across 8 shards. ServerConfigs.java:102. |
replica.selector.class | null (leader only) | Pluggable ReplicaSelector enabling KIP-392 fetch-from-follower. ReplicationConfigs.java:140. |
follower.fetch.last.tiered.offset.enable | false | New empty replicas with tiered storage may start at the last tiered offset. KafkaConfig.scala:255. |
Failure modes, edge cases & recovery
- Corrupt/invalid records.
CorruptRecordException/InvalidRecordExceptionduring append are logged and the partition is added topartitionsWithError(backed off), not failed, one bad batch must not stall a thread's other partitions (AbstractFetcherThread.scala:399). - Storage failure. A
KafkaStorageExceptionon truncate or append marks the partition failed and removes it from the fetcher; the offline directory is later taken down byReplicaManager.handleLogDirFailure(:408,:494). - Leadership churn.
NOT_LEADER_OR_FOLLOWER,UNKNOWN_TOPIC_OR_PARTITION,UNKNOWN_TOPIC_ID,INCONSISTENT_TOPIC_IDare all transient-during-reassignment and simply trigger backoff (:437–:458). - Lost session. If the leader evicted the follower's session, the next incremental request gets
FETCH_SESSION_ID_NOT_FOUND/INVALID_FETCH_SESSION_EPOCH;FetchSessionHandler.handleResponsereturns false and the follower falls back to a full fetch. - Diverging epoch while parked. A consumer/follower request sitting in purgatory is force-completed the moment truncation creates a divergence, so the follower truncates promptly (
DelayedFetch.scala:109). - Offset out of range / moved to tier. Recovered via
fetchOffsetAndTruncateor theTierStateMachineas described above; for a consumer,handleOffsetOutOfRangeErroron the leader may convert it into a remote-storage read (ReplicaManager.scala:1895).
Invariants & guarantees
- A follower always fetches at exactly its log-end offset; its log is a prefix of the leader's committed log (
ReplicaFetcherThread.scala:124). - Follower HW = the leader's reported HW, monotonically non-decreasing; never ahead of the leader.
- Consumers cannot observe data beyond the HW (read_uncommitted) or LSO (read_committed); the bound is enforced at read time on the serving broker.
- Fetch session epochs are gap-free per session; any mismatch forces a full re-sync, so client and broker never silently disagree about the partition set.
- A fetch response is applied to fetcher state only if the partition's offset/epoch are unchanged since the request was built.
- Only in-sync replicas are eligible preferred read replicas, so fetch-from-follower cannot pin a consumer to a lagging follower.
Interactions with other subsystems
- Replication, ISR & HW, the fetcher is the mechanism by which followers stay in the ISR; the leader advances its HW from follower fetch offsets recorded during these reads.
- Log Storage Engine,
fetchRecords,appendRecordsToFollowerOrFutureReplica,maybeUpdateHighWatermark,truncateTo, and the leader-epoch cache all live there. - Request Processing,
KafkaApis.handleFetchRequestwires the session context, authorization, throttling, and the response callback. - Tiered Storage,
OFFSET_MOVED_TO_TIERED_STORAGEhandling and theTierStateMachine. - Transactions & EOS, the LSO bound and aborted-transaction metadata in fetch responses.
- Quotas & Throttling, replication and consumer fetch quotas gate both
buildFetch(follower) and the response path (consumer). - Consumer Client / Producer Client, the consumer drives this path for reads; producer appends are what wake parked consumer fetches.
- Share Groups, share-fetch reuses
FetchParamswith the trailingshareFetchRequest = trueflag (KafkaApis.scala:3510) but routes throughSharePartitionManager.fetchMessages(KafkaApis.scala:3522), notReplicaManager.fetchMessagesdirectly.
Design rationale & evolution
Incremental fetch sessions (KIP-227). With thousands of partitions, re-listing every partition on every fetch made request size and broker CPU scale with total partition count rather than with the number of active partitions. A stateful session lets a request send only changed partitions and a response return only partitions with news, decoupling steady-state cost from fleet size.
Fetch from closest replica (KIP-392). In multi-AZ deployments, leader-only reads forced consumers to cross availability-zone boundaries, incurring cost and latency. A pluggable ReplicaSelector lets the leader nominate a same-rack in-sync follower, keeping reads local; the leader returns only the nomination so the consumer redirects itself.
Truncation on fetch (KIP-595, building on leader-epoch fencing in KIP-320/KIP-101). Sending lastFetchedEpoch in the fetch and returning a diverging epoch in the response folds divergence detection into the data path, eliminating a separate OffsetsForLeaderEpoch round-trip in the common case and shrinking the window during which a follower could append diverged data.
Other lineage: tiered-storage fetch handling is KIP-405; the JBOD intra-broker directory move and future-replica promotion machinery in ReplicaAlterLogDirsThread use the directory-assignment events of KIP-858 (KRaft JBOD).
Gotchas & operational notes
- One fetcher thread per leader by default. If many partitions share a single hot leader, replication can bottleneck on one thread; raise
num.replica.fetchers. Resizing reshuffles all partitions to keep the hash mapping consistent (AbstractFetcherManager.scala:69). replica.fetch.wait.max.msshould be belowreplica.lag.time.max.ms(default 30000) or a follower could be marked out-of-sync merely because its fetch was parked too long.- Alter-log-dirs copies one partition at a time on a thread (
LocalLeaderEndPoint.selectPartitionToFetch,:207) to maximize a single replica's catch-up rate, and usesmaxWait = minBytes = 0so the local fetch returns instantly once the future log has caught up (:262). - Session eviction favours followers. Under cache pressure, consumer (unprivileged) sessions are evicted before follower (privileged) ones; a flood of short-lived consumers can churn the cache, visible via
IncrementalFetchSessionEvictionsPerSec. - Preferred-replica reads add staleness. A same-rack follower lags the leader's HW by a replication round-trip; latency-sensitive consumers that need the freshest committed data should not enable fetch-from-follower.
- The follower-to-leader fetch is privileged. It requires
ClusterActionon the cluster resource (KafkaApis.scala:589); consumer fetches requireReadon each topic, see Security.
For the broader picture of how leaders advance the high watermark from these fetch offsets and manage the ISR, continue to Replication, ISR & High Watermark; for the segment-level read mechanics, see The Log Storage Engine.