krivaltsevich.com Kafka Internals4.4

17 · The Consumer Client

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

The KafkaConsumer reads records from topic partitions, tracks a per-partition position, optionally manages group membership and committed offsets, and turns raw fetch responses into deserialized ConsumerRecords. As of Kafka 4.x the public class is a thin façade that delegates to one of two engines: the legacy single-threaded ClassicKafkaConsumer (the entire coordinator + fetch loop runs on the user's poll() thread) or the KIP-848 AsyncKafkaConsumer, which splits work across the application thread and a dedicated ConsumerNetworkThread communicating over event queues. This chapter dissects both, the shared SubscriptionState, the fetch pipeline, and offset management, all grounded in the source.

Role & responsibilities

The consumer client is the client-side half of Kafka's read path. Its jobs are:

  • Position tracking, for every assigned partition, remember the offset of the next record to deliver, plus the high watermark (HW), log-start offset, and last-stable-offset (LSO) reported by the broker.
  • Fetching, build Fetch RPCs that respect fetch.min.bytes/fetch.max.bytes/max.partition.fetch.bytes/fetch.max.wait.ms, parse and decompress the returned batches, skip aborted transactions under read_committed, and hand back at most max.poll.records records per poll().
  • Group membership (optional), join a consumer group, receive a partition assignment, run ConsumerRebalanceListener callbacks, send heartbeats, and detect liveness via max.poll.interval.ms.
  • Offset management (optional), commit positions to the group coordinator (auto or manual) and fetch committed offsets on startup.

A consumer with no group.id can still assign() partitions manually and fetch; it simply cannot commit offsets or use group management. See Group Coordination & Rebalance Protocols for the broker side and The Fetch Path & Replica Fetchers for how the broker serves these fetches.

Key idea

The consumer is not thread-safe. Both engines enforce single-threaded access with a lightweight CAS lock that throws ConcurrentModificationException rather than blocking (AsyncKafkaConsumer.acquire(), clients/.../internals/AsyncKafkaConsumer.java:2216). The only sanctioned multi-thread interaction is calling wakeup() from another thread.

Where it lives in the code

Class / interfaceFileRole
KafkaConsumerclients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.javaPublic façade; holds a ConsumerDelegate and forwards every method.
ConsumerDelegateCreator.../consumer/internals/ConsumerDelegateCreator.javaFactory; picks the engine from group.protocol.
ClassicKafkaConsumer.../consumer/internals/ClassicKafkaConsumer.javaLegacy single-threaded engine (CLASSIC protocol).
AsyncKafkaConsumer.../consumer/internals/AsyncKafkaConsumer.javaKIP-848 engine (CONSUMER protocol); application thread + background thread.
ConsumerNetworkThread.../consumer/internals/ConsumerNetworkThread.javaBackground event loop; drives request managers + network I/O.
SubscriptionState.../consumer/internals/SubscriptionState.javaThread-safe per-partition state machine; subscription type, positions, pause.
FetchRequestManager / AbstractFetch.../consumer/internals/{FetchRequestManager,AbstractFetch}.javaBuilds fetch requests, manages fetch sessions, fills the buffer.
FetchBuffer / CompletedFetch / FetchCollector.../consumer/internals/{FetchBuffer,CompletedFetch,FetchCollector}.javaThread-safe buffer; per-partition decode/iterate; collection into a Fetch.
CommitRequestManager.../consumer/internals/CommitRequestManager.javaAuto/manual commit, committed-offset fetch (async engine).
OffsetsRequestManager.../consumer/internals/OffsetsRequestManager.javaupdateFetchPositions: committed-offset / list-offsets / validation (async engine).
ConsumerMembershipManager / AbstractMembershipManager.../consumer/internals/{ConsumerMembershipManager,AbstractMembershipManager}.javaKIP-848 member state machine & assignment reconciliation.
ConsumerHeartbeatRequestManager / AbstractHeartbeatRequestManager.../consumer/internals/{ConsumerHeartbeatRequestManager,AbstractHeartbeatRequestManager}.javaSends ConsumerGroupHeartbeat; owns the poll timer.
ConsumerCoordinator / AbstractCoordinator.../consumer/internals/{ConsumerCoordinator,AbstractCoordinator}.javaCLASSIC-protocol JoinGroup/SyncGroup + heartbeat thread.
event classes.../consumer/internals/events/*.javaApplicationEvent / BackgroundEvent hierarchy crossing the two threads.

Choosing an engine: the delegator

KafkaConsumer stores a single field private final ConsumerDelegate<K, V> delegate (KafkaConsumer.java:543) and every public method simply forwards (e.g. poll at KafkaConsumer.java:916 returns delegate.poll(timeout)). The delegate is built by ConsumerDelegateCreator.create(), which reads group.protocol and dispatches:

GroupProtocol gp = GroupProtocol.valueOf(config.getString(GROUP_PROTOCOL_CONFIG).toUpperCase());
if (gp == GroupProtocol.CONSUMER) return new AsyncKafkaConsumer<>(...);   // KIP-848
else                             return new ClassicKafkaConsumer<>(...);  // legacy

ConsumerDelegateCreator.java:61. The enum has exactly two values, CLASSIC("CLASSIC") and CONSUMER("CONSUMER") (GroupProtocol.java:30), and group.protocol defaults to classic (ConsumerConfig.java:115, DEFAULT_GROUP_PROTOCOL = GroupProtocol.CLASSIC.name().toLowerCase()). Both engines implement ConsumerDelegate<K,V>, so the façade is oblivious to which one it holds.

Design rationale

KIP-848 set out to move rebalancing logic from the client into a broker-side group coordinator and to give the consumer a new threading model that never blocks the user's thread on coordination. The façade/delegate split lets the two implementations co-exist so users opt in with a single config (group.protocol=consumer) without changing code. Cite KIP-848; the new client metrics come from KIP-1068.

The classic engine: one thread does everything

ClassicKafkaConsumer owns a ConsumerNetworkClient (client), a Fetcher, an OffsetFetcher, and, when grouped, a ConsumerCoordinator (ClassicKafkaConsumer.java:127). Its poll(Timer) loop runs entirely on the caller's thread (ClassicKafkaConsumer.java:648):

  1. client.maybeTriggerWakeup(), honor a pending wakeup().
  2. updateAssignmentMetadataIfNeeded(timer, false), runs coordinator.poll(timer, waitForJoinGroup=false) (joins the group, sends heartbeats, runs rebalance callbacks, auto-commits) then updateFetchPositions.
  3. pollForFetches(timer), fetcher.collectFetch() returns buffered data immediately if present; otherwise sendFetches() issues new fetch RPCs and client.poll(pollTimer, () -> !fetcher.hasAvailableFetches()) blocks for responses.
  4. If the fetch is non-empty, pipeline: sendFetches() + client.transmitSends() before returning, so the next batch is already in flight while the user processes records.

Crucially, heartbeats in the classic protocol are also sent off a background heartbeat thread named kafka-coordinator-heartbeat-thread | <groupId> (AbstractCoordinator.java:124,1471), but JoinGroup/SyncGroup and all rebalance work happen on the user thread inside coordinator.poll. This is the structural limitation KIP-848 removes.

user thread · poll(timeout)the entire loop runs on the caller's thread
coordinator.poll(timer)JoinGroup / SyncGroup · rebalance callbacks · auto-commit
updateFetchPositionsOffsetFetch / ListOffsets / validate
fetcher.sendFetches()build Fetch RPCs for fetchable partitions
client.poll(…)send / await responses on this thread
fetcher.collectFetch()⇒ ConsumerRecords
broker / coordinatorserves Fetch + coordination RPCs
kafka-coordinator-heartbeat-threadseparate background thread · HB only
Classic engine: coordination, position resolution and blocking fetch I/O all share the application thread; only heartbeats run on a separate background thread.
application thread (caller) broker-facing fetch / metadata step coordination / heartbeat decoded records cylinder = broker / coordinator pill = dedicated thread blocking network I/O async / heartbeats

The async engine: two threads, two queues

AsyncKafkaConsumer divides responsibilities between the application thread (whatever calls poll()/commit*()/etc.) and a single background thread, ConsumerNetworkThread, whose Java thread name is consumer_background_thread (ConsumerNetworkThread.java:68). They communicate through two blocking queues:

  • Application event queue (LinkedBlockingQueue<ApplicationEvent>): app thread → background. Wrapped by ApplicationEventHandler; add() enqueues and wakes the network thread (ApplicationEventHandler.java:94), addAndGet() enqueues a CompletableApplicationEvent and blocks on its future (ApplicationEventHandler.java:135).
  • Background event queue (LinkedBlockingQueue<BackgroundEvent>): background → app. Carries errors to surface to the user and rebalance-callback requests, drained by AsyncKafkaConsumer.BackgroundEventProcessor on the app thread (AsyncKafkaConsumer.java:192).

The background thread holds the actual networking machinery, the NetworkClientDelegate and a list of RequestManagers, none of which the app thread is allowed to touch directly.

application threadpoll(): add AsyncPollEvent · commitAsync(): add AsyncCommitEvent
processBackgroundEvents()surface errors · run rebalance callbacks
consumer_background_thread · runOnce()applicationEventQueue.drainTo(…)
ApplicationEventProcessor.process()one handler per ApplicationEvent
each RequestManager.poll(now)Coordinator · Commit · Heartbeat · Membership · Offsets · TopicMetadata · Fetch
networkClientDelegate.poll(…)single network step: send · read · fire callbacks
FetchBufferone CompletedFetch per partition
ConsumerRecordsreturned to the caller of poll()
Async engine: the application thread never blocks on network I/O; it exchanges events with the background loop. The background thread drains application events, polls every RequestManager (Coordinator, Commit, Heartbeat, Membership, Offsets, TopicMetadata, Fetch), does one network step, fills the FetchBuffer, and pushes errors / callbacks back over the background event queue.
application thread (caller) background-thread machinery request managers FetchBuffer (shared store) data / event flow background event queue (errors / callbacks ↑)

The background event loop

ConsumerNetworkThread.runOnce() (ConsumerNetworkThread.java:210) is the heart of the engine and executes repeatedly:

  1. processApplicationEvents(), drainTo the application queue and applicationEventProcessor.process(event) each one; CompletableEvents are registered with a CompletableEventReaper so they can be timed out.
  2. For each RequestManager rm : requestManagers.entries(): PollResult pr = rm.poll(now) then networkClientDelegate.addAll(pr), accumulating the minimum poll timeout.
  3. networkClientDelegate.poll(pollWaitTimeMs, now), single network step: send staged requests, read responses, fire callbacks.
  4. Recompute cachedMaximumTimeToWait as min(rm.maximumTimeToWait(now)) so the app thread can later read the safe block duration via maximumTimeToWait() without touching the managers (ConsumerNetworkThread.java:340).
  5. reapExpiredApplicationEvents(now) and surface metadata errors to events implementing MetadataErrorNotifiableEvent.

The managers iterated each pass include (in order) the CoordinatorRequestManager, CommitRequestManager, ConsumerHeartbeatRequestManager, ConsumerMembershipManager, OffsetsRequestManager, TopicMetadataRequestManager, and FetchRequestManager (RequestManagers.java:91). Each is Optional except offsets/metadata/fetch, so a groupless consumer simply has no coordinator/commit/heartbeat/membership managers.

poll() on the application thread: the AsyncPollEvent

Rather than block waiting for coordination, AsyncKafkaConsumer.poll() (AsyncKafkaConsumer.java:933) submits a single multi-stage AsyncPollEvent and then polls the fetch buffer non-blocking. The event encapsulates "do a poll round-trip": reconciliation check, position validation, and fetch-request creation, executed sequentially on the background thread. Its lifecycle is managed by checkInflightPoll():

  • On the first pass of a poll() call, if a previous inflightPoll exists, maybeClearPreviousInflightPoll() inspects it: complete-with-error → throw; complete-and-buffer-empty → clear and submit a fresh one; complete-and-buffer-non-empty → keep it (avoids starvation when poll(0) is called repeatedly).
  • If no event is inflight, create new AsyncPollEvent(deadlineMs, now) and applicationEventHandler.add(it) (fire-and-forget, no blocking).
  • Run offsetCommitCallbackInvoker.executeCallbacks() and processBackgroundEvents() (user callbacks / errors), these are on the app thread.

On the background side, ApplicationEventProcessor.process(AsyncPollEvent) (events/ApplicationEventProcessor.java:757) runs the stages:

  1. consumerMembershipManager.maybeReconcile(true), safely commit + mark revoked partitions before any new fetching, then event.markReconciliationCheckComplete().
  2. commitRequestManager.updateTimerAndMaybeCommit(pollTimeMs) (auto-commit on interval) and membershipManager.onConsumerPoll() + heartbeatRequestManager.resetPollTimer(pollTimeMs), this is what keeps the poll timer alive.
  3. offsetsRequestManager.updateFetchPositions(deadlineMs) then event.markValidatePositionsComplete().
  4. On completion, fetchRequestManager.createFetchRequests(); then event.completeSuccessfully().

The two completion checks isValidatePositionsComplete (a volatile boolean, events/AsyncPollEvent.java:50) and isReconciliationCheckComplete() (a method returning reconciliationCheckFuture.isDone(), backed by a final CompletableFuture<Void>, events/AsyncPollEvent.java:51,104) let the app thread know when it is safe to read the buffer.

Invariant

The application thread must not collect records until the background thread has finished validating positions for the inflight poll. collectFetch() returns Fetch.empty() while inflightPoll != null && !inflightPoll.isValidatePositionsComplete() (AsyncKafkaConsumer.java:2072). This prevents both threads from racing to write SubscriptionState.position() for the same partition.

pollForFetches and prefetch pipelining

After checkInflightPoll, poll() calls pollForFetches(timer) (AsyncKafkaConsumer.java:1975): it first tries collectFetch(); if empty, it computes a poll timeout bounded by the background thread's maximumTimeToWait(), then blocks on fetchBuffer.awaitWakeup(pollTimer) until data arrives or a wakeup fires, and finally collectFetch()s again. When a non-empty fetch is returned, sendPrefetches(timer) enqueues a CreateFetchRequestsEvent so the next batch is requested while the user processes the current one (AsyncKafkaConsumer.java:2123), the async analogue of the classic transmitSends() pipelining. Records are passed through interceptors.onConsume(...) before return.

SubscriptionState: the shared core

Both engines share one SubscriptionState instance, documented as thread-safe, every public method is synchronized (SubscriptionState.java:74). It models what the consumer is reading and where.

Subscription type

A single enum tracks mutually-exclusive modes (SubscriptionState.java:80): NONE, AUTO_TOPICS (collection subscribe), AUTO_PATTERN (java Pattern), AUTO_PATTERN_RE2J (broker-evaluated regex, KIP-848), USER_ASSIGNED (manual assign), and AUTO_TOPICS_SHARE (share groups). setSubscriptionType() throws "Subscription to topics, partitions and pattern are mutually exclusive" if you mix modes (SubscriptionState.java:185). This is why you cannot call both subscribe() and assign() on one consumer.

Per-partition state & the fetch-state machine

Each assigned partition has a TopicPartitionState (SubscriptionState.java:1011) held in an ordered PartitionStates map (order matters for fetch building and round-robin fairness). Its fields:

FieldMeaning
FetchState fetchStateOne of INITIALIZING, FETCHING, AWAIT_RESET, AWAIT_VALIDATION.
FetchPosition positionLast consumed position = offset of next record to deliver, plus offsetEpoch and currentLeader (SubscriptionState.java:1382).
Long highWatermarkHW from the last fetch response (used for lag and for read_uncommitted end).
Long logStartOffsetLog-start offset (earliest available).
Long lastStableOffsetLSO; under read_committed this bounds delivery and lag.
boolean pausedUser called pause(); not fetchable until resume().
boolean pendingRevocationSet during a rebalance to stop fetching before the partition is given up.
AutoOffsetResetStrategy resetStrategyStrategy to apply when in AWAIT_RESET.
Integer preferredReadReplica + expiryFollower-fetch target with a lease (KIP-392).

A partition is fetchable only when !paused && !pendingRevocation && !pendingOnAssignedCallback && hasValidPosition() (SubscriptionState.java:1250). Transitions are validated against an explicit table encoded in the FetchStates enum (SubscriptionState.java:1306):

INITIALIZINGseekValidated / position set FETCHINGFETCHING → FETCHING: consume, advance position INITIALIZING → AWAIT_VALIDATION: needs epoch check · INITIALIZING → AWAIT_RESET: no position AWAIT_VALIDATIONFETCHING ⇄ AWAIT_VALIDATION: new leader epoch / validated AWAIT_RESETFETCHING → AWAIT_RESET: offset out of range / seekToX · AWAIT_VALIDATION → AWAIT_RESET: truncation detected FETCHINGAWAIT_RESET → FETCHING: ListOffsets earliest / latest
FetchState transitions. The spine is INITIALIZING → FETCHING (which self-loops to advance the position); AWAIT_VALIDATION guards against log truncation via leader-epoch checks, and AWAIT_RESET resolves a concrete offset via auto.offset.reset before returning to FETCHING.
pill = a fetch state FETCHING (steady, position required) AWAIT_RESET (no position; resolving offset) transition (label = trigger) = initial state

transitionState() enforces an interesting invariant: states that requiresPosition() (FETCHING, AWAIT_VALIDATION) must have a non-null position, and states that don't (INITIALIZING, AWAIT_RESET) have their position nulled out on entry (SubscriptionState.java:1055).

position vs committed

These two notions are easy to conflate but distinct:

position
The in-memory offset of the next record poll() will return for a partition. Advanced locally by every successful fetch. Read via consumer.position(tp), which will block to fetch/reset a position if none is set yet (AsyncKafkaConsumer.java:1230).
committed
The offset durably stored at the group coordinator, the recovery point after a restart. Read via consumer.committed(partitions) (a FetchCommittedOffsetsEvent in the async engine, AsyncKafkaConsumer.java:1260). Has nothing to do with the live position until a commit happens.

The fetch pipeline in detail

Building fetch requests

AbstractFetch.prepareFetchRequests() (AbstractFetch.java:421) computes the fetchable-and-unbuffered partitions, groups them by their leader (or preferred read replica), and builds one FetchSessionHandler.Builder per node. Per partition it adds a FetchRequest.PartitionData(topicId, position.offset, INVALID_LOG_START_OFFSET, fetchConfig.fetchSize, position.currentLeader.epoch, …), where fetchSize = max.partition.fetch.bytes. It deliberately skips a node if (a) there is already an inflight fetch to it, or (b) it currently hosts buffered partitions, fetching other partitions from a node with buffered data would drop the buffered ones from the broker's fetch session and risk session eviction (AbstractFetch.java:461).

The request-level knobs are stamped in createFetchRequest() (AbstractFetch.java:310): forConsumer(maxVersion, fetchConfig.maxWaitMs, fetchConfig.minBytes, …), .isolationLevel(...), .setMaxBytes(fetchConfig.maxBytes), .rackId(clientRackId). So fetch.max.wait.ms, fetch.min.bytes, and fetch.max.bytes are all request-level; max.partition.fetch.bytes is per-partition. The version caps at 12 unless the session can use topic IDs.

Receiving and buffering

On a successful response, AbstractFetch.handleFetchSuccess() wraps each partition's data in a CompletedFetch and calls fetchBuffer.add(completedFetch) (AbstractFetch.java:225). FetchBuffer is a ConcurrentLinkedQueue<CompletedFetch> guarded by a ReentrantLock with a Condition (FetchBuffer.java:53); addAll() sets a wokenup flag and signalAll()s so a waiting app thread in awaitWakeup() unblocks. There is at most one CompletedFetch per partition in the queue.

Decoding, max.poll.records, and aborted transactions

FetchCollector.collectFetch(buffer) (FetchCollector.java:91) pulls up to maxPollRecords records across partitions: it takes the head CompletedFetch, lazily initialize()s it (validates that the response's first offset still matches the live position, updates HW/LSO/log-start), then fetchRecords() from it. If the partition was paused after the fetch arrived, the records are stashed and re-queued for a later poll (FetchCollector.java:125). It also re-checks isAssigned/isFetchable per partition so records for a partition revoked mid-flight are dropped, and only advances the position when nextInLineFetch.nextFetchOffset() == position.offset (i.e. not a stale response) (FetchCollector.java:168).

CompletedFetch.fetchRecords() (CompletedFetch.java:257) iterates batches via a streaming, decompressing iterator (currentBatch.streamingIterator(decompressionBufferSupplier)). The transaction filtering lives in nextFetchedRecord() (CompletedFetch.java:187): under read_committed, for each batch with a producer ID it (1) drains the per-partition abortedTransactions priority queue (ordered by firstOffset) up to the batch's last offset, adding their producer IDs to abortedProducerIds; (2) if the batch is a control batch carrying an abort marker, removes that producer ID; (3) if isBatchAborted() (transactional & producer ID is in the aborted set), skips the entire batch by advancing nextFetchOffset to currentBatch.nextOffset(). Control batches are never returned to the user. Out-of-range records (offset < nextFetchOffset) are skipped.

next batchproducerId present
drain abortedTransactions PQby firstOffset, up to batch.lastOffset ⇒ add producerIds
control batch with abort marker?
remove producerIdfrom abortedProducerIds
isBatchAborted? (txnl & producerId in set)
skip batchnextFetchOffset = batch.nextOffset()
emit recordsoffset ≥ nextFetchOffset · skip control records
Aborted-batch skipping under read_committed. The broker sends the aborted-txn list; the client filters client-side, bounded by the LSO, control batches are consumed for their markers but never returned to the user.
record batch from the log aborted-txn bookkeeping rounded = decision abort path (batch skipped) records delivered to the user emit / keep abort / skip
Note

max.poll.records caps how many records each poll() returns but does not change the wire fetch, the broker still ships up to max.partition.fetch.bytes per partition; leftover records stay buffered for the next poll. CRC validation per batch/record is governed by check.crcs (default true).

Offset management

Auto vs manual commit

enable.auto.commit (default true, ConsumerConfig.java:458) and auto.commit.interval.ms (default 5000, ConsumerConfig.java:463) drive periodic background commits. In the async engine, CommitRequestManager holds an Optional<AutoCommitState> with a Timer and a hasInflightCommit guard (CommitRequestManager.java:1497). On each poll round, updateTimerAndMaybeCommit()maybeAutoCommitAsync() fires a commit of subscriptions.allConsumed() only if the timer expired and no commit is already inflight (CommitRequestManager.java:277). A retriable failure resets the timer with the retry backoff rather than the full interval.

Manual commits: commitSync() blocks until the commit lands (or times out); commitAsync(callback) enqueues an AsyncCommitEvent and returns immediately, invoking the callback later on the app thread via OffsetCommitCallbackInvoker (AsyncKafkaConsumer.java:1112). With no explicit offsets, both commit subscriptions.allConsumed() resolved on the background thread; the sync path blocks on commitEvent.offsetsReady() first so the committed set isn't perturbed by fetches that begin after the call (AsyncKafkaConsumer.java:1135).

Gotcha

Auto-commit commits the position (next offset to fetch) as of poll time, regardless of whether your application finished processing those records. If you crash mid-batch after an auto-commit, those records are skipped on restart (at-most-once for the un-processed tail). For at-least-once, disable auto-commit and commitSync() only after processing.

Committing before rebalance

Before revoking partitions, the async membership manager calls CommitRequestManager.maybeAutoCommitSyncBeforeRebalance(deadlineMs) (CommitRequestManager.java:330), which retries until success/fatal/timeout. Notably it treats STALE_MEMBER_EPOCH as retriable (re-sends with the latest epoch) but UNKNOWN_TOPIC_OR_PARTITION as fatal so a deleted topic can't stall the rebalance forever.

updateFetchPositions: where positions come from

OffsetsRequestManager.updateFetchPositions(deadlineMs) (OffsetsRequestManager.java:235) is the canonical "find a position for every partition" routine:

  1. validatePositionsIfNeeded(), for partitions in AWAIT_VALIDATION, send OffsetsForLeaderEpoch and detect log truncation.
  2. If hasAllFetchPositions(), done.
  3. Otherwise, for the initializing partitions: if grouped, initWithCommittedOffsetsIfNeeded() issues an OffsetFetch and seeds positions from committed offsets; then initWithPartitionOffsetsIfNeeded() handles any still-missing positions by marking them AWAIT_RESET and issuing ListOffsets per auto.offset.reset (OffsetsRequestManager.java:280,337).

resetInitializingPositions() throws NoOffsetForPartitionException if no reset strategy is configured and there is no committed offset, that is exactly when auto.offset.reset=none surfaces. The default reset is latest (ConsumerConfig.java:546, AutoOffsetResetStrategy.LATEST).

Group membership & rebalancing (async engine)

The KIP-848 member is a state machine in ConsumerMembershipManager/AbstractMembershipManager. The states (MemberState.java:27) are UNSUBSCRIBED, JOINING, RECONCILING, ACKNOWLEDGING, STABLE, FENCED, PREPARE_LEAVING, LEAVING, FATAL, and STALE, with explicit previousValidStates tables enforcing legal transitions.

UNSUBSCRIBEDsubscribe JOININGheartbeat resp, epoch ≥ 1 + target assignment RECONCILINGcommit ⇒ onPartitionsRevoked ⇒ onPartitionsAssigned ACKNOWLEDGINGack via heartbeat STABLE STABLE → RECONCILING: new target assignment STABLE → FENCED: UNKNOWN_MEMBER_ID / FENCED_MEMBER_EPOCH · FENCED → JOINING: onPartitionsLost ⇒ re-join FENCED STABLE → PREPARE_LEAVING: unsubscribe / close → LEAVING: callbacks done LEAVING → STALE: poll timer expired (max.poll.interval.ms) · STALE → JOINING: next poll LEAVINGheartbeat epoch -1 / -2 UNSUBSCRIBED
Async member state machine (MemberState). The spine is UNSUBSCRIBED → JOINING → RECONCILING → ACKNOWLEDGING → STABLE; the lower rows list the back-edges (STABLE → RECONCILING on a new assignment, FENCED, the leave path, and the STALE poll-timer recovery). Reconciliation runs entirely in the background but defers revocations / callbacks to the application thread.
pill = a member state STABLE (steady membership) FENCED (epoch / id lost) transition (label = trigger) = initial, ◎ = terminal (left group)

Reconciliation (AbstractMembershipManager.maybeReconcile(canCommit), AbstractMembershipManager.java:875) diffs the broker-provided target assignment against currently owned partitions to compute added and revoked sets. The ordering is strict: markPendingRevocationToPauseFetching(revoked) → auto-commit (signalReconciliationStarted()) → onPartitionsRevokedonPartitionsAssigned (revokeAndAssign, AbstractMembershipManager.java:993). Because revocations and user callbacks must run on the application thread, the manager emits PartitionsRemoved (carrying both revocations and losses, dispatched by methodName())/PartitionsAssigned background events; AsyncKafkaConsumer.BackgroundEventProcessor runs the listener and signals completion back with a ConsumerRebalanceListenerCallbackCompletedEvent (AsyncKafkaConsumer.java:274).

Key idea

A reconciliation that only adds partitions can run from the plain background poll (new partitions have no buffered records). A reconciliation that revokes partitions, or any auto-commit, is only triggered from the AsyncPollEvent path (canCommit=true), because the application thread may already be returning records for a to-be-revoked partition (AbstractMembershipManager.java:929). The reconciliationCheckFuture on the poll event gates collectFetch() so the app thread waits for revocation handling before reading the buffer.

Liveness: the poll timer and max.poll.interval.ms

AbstractHeartbeatRequestManager owns a pollTimer initialized to max.poll.interval.ms (default 300000, ConsumerConfig.java:629) (AbstractHeartbeatRequestManager.java:119). Each poll(now) updates it; if it has expired and the member isn't already leaving, it logs the well-known warning and calls membershipManager().transitionToSendingLeaveGroup(true), sending a leave heartbeat (AbstractHeartbeatRequestManager.java:171). resetPollTimer(pollMs), called from the AsyncPollEvent handler, resets it on every user poll(); if it had expired (member is STALE), the next poll transitions it back toward JOINING. To guarantee the timer never silently expires while the app blocks in poll(), maximumTimeToWait() returns at most pollTimer.remainingMs() / 2 (AbstractHeartbeatRequestManager.java:262). Heartbeats themselves go out on the interval (heartbeat.interval.ms default 3000, session.timeout.ms default 45000 for the classic protocol; ConsumerConfig.java:438,443).

Concurrency & threading model

State / resourceOwner / guardNotes
Public API re-entrancyAtomicLong currentThread + AtomicInteger refCountCAS lock; throws ConcurrentModificationException on cross-thread use (AsyncKafkaConsumer.java:2216).
SubscriptionStatesynchronized methodsShared by app thread (seek/pause/position read) and background (assignment, HW updates).
FetchBufferReentrantLock + ConditionProducer = background (adds CompletedFetch); consumer = app thread (collect/await).
Application event queueLinkedBlockingQueueApp thread enqueues; background drains.
Background event queueLinkedBlockingQueueBackground enqueues errors/callbacks; app thread drains in processBackgroundEvents().
RequestManagers / NetworkClientbackground thread onlyApp thread reads only the cached maximumTimeToWait() snapshot.
groupMetadata, groupAssignmentSnapshotAtomicReferenceUpdated from background via MemberStateListener; read by app thread.
wakeupTriggerWakeupTriggerLets another thread interrupt a blocking poll/commit via WakeupException.

The background thread is a daemon (super(BACKGROUND_THREAD_NAME, true)) and swallows per-iteration exceptions so one bad event cannot kill the loop (ConsumerNetworkThread.java:160). Resources are created inside the thread (initializeResources()) and an initializationLatch + initializationError propagate construction failures (e.g. a bad SASL JAAS config) back to the constructor rather than hanging.

Configuration reference

KeyDefaultEffect
group.protocolclassicconsumer selects AsyncKafkaConsumer (KIP-848); classic selects ClassicKafkaConsumer.
group.idnullEnables group management + offset commits. nullassign()-only, no commits.
group.remote.assignor(unset)Server-side assignor name; CONSUMER protocol only.
enable.auto.committruePeriodic background commit of consumed positions. Forced false when group.id is null.
auto.commit.interval.ms5000Interval between auto-commits.
auto.offset.resetlatestearliest/latest/none when no committed offset exists; none throws NoOffsetForPartitionException.
max.poll.records500Max records returned per poll(); does not change the wire fetch.
max.poll.interval.ms300000Max time between polls before the member is removed from the group.
fetch.min.bytes1Broker waits for this many bytes (or fetch.max.wait.ms) before responding.
fetch.max.bytes52428800 (50 MiB)Soft cap on total bytes per fetch response.
fetch.max.wait.ms500Max broker wait when fetch.min.bytes isn't yet satisfied.
max.partition.fetch.bytes1048576 (1 MiB)Per-partition byte cap (= FetchConfig.fetchSize).
isolation.levelread_uncommittedread_committed bounds delivery to the LSO and skips aborted batches.
session.timeout.ms / heartbeat.interval.ms45000 / 3000CLASSIC-protocol liveness; CONSUMER protocol uses a broker-controlled interval.
check.crcstrueValidate batch/record CRCs during decode.
default.api.timeout.ms60000Default deadline for blocking calls without an explicit timeout.

Failure modes, edge cases & recovery

  • Wakeup & interrupt, wakeup() from another thread makes a blocking poll/commitSync/committed throw WakeupException; the async engine routes this through WakeupTrigger on the fetch buffer and the active task future. A blocking call must not be interrupted between updating the position and returning records, both engines re-check the wakeup trigger before polling fetches, never between collect and return (AsyncKafkaConsumer.java:953).
  • Offset out of range, a fetch error transitions the partition to AWAIT_RESET and triggers a ListOffsets per auto.offset.reset; with none, the user sees OffsetOutOfRangeException/NoOffsetForPartitionException.
  • Log truncation, AWAIT_VALIDATION + OffsetsForLeaderEpoch detect a divergent epoch; a LogTruncationException (clients/.../consumer/LogTruncationException.java) carries the divergent offset, built from the LogTruncation holder (fields topicPartition, fetchPosition, divergentOffsetOpt; SubscriptionState.java:1422).
  • Fenced member, UNKNOWN_MEMBER_ID/FENCED_MEMBER_EPOCH moves the member to FENCED, invokes onPartitionsLost, and re-joins as a fresh member.
  • Poll-loop too slow, exceeding max.poll.interval.ms proactively leaves the group (member → STALE), invoking onPartitionsLost; the next poll() rejoins. The deserializer/processing time counts against this budget, so a large max.poll.records with slow processing is the classic trigger.
  • Stale fetch responses, responses whose first offset no longer matches the live position (after a seek/rebalance) are discarded in FetchCollector.handleInitializeSuccess() (FetchCollector.java:259).
  • Deserialization failure, a RecordDeserializationException is thrown only if no records were already collected; the offending offset must be skipped via seek() to make progress (CompletedFetch.java:295).
  • Async-engine construction failure, surfaced from the background thread via the init latch; partially-built objects are closed to avoid leaks (AsyncKafkaConsumer.java:597).

Invariants & guarantees

Invariant

A partition's position only advances after its records have been collected for return to the user, and only when the response offset matches the current position; the position is therefore the offset of the next undelivered record. Returning an empty Fetch guarantees the position is unchanged (FetchCollector.java:82).

Invariant

Subscription type is single-valued: subscribe() (topics/pattern) and assign() (manual) are mutually exclusive for a consumer's lifetime until unsubscribe() resets it.

Invariant

At most one CompletedFetch per partition exists in the FetchBuffer, and a node with buffered partitions is excluded from new fetch requests, preserving the broker's fetch-session incrementality.

Within a partition, records are delivered in strictly increasing offset order with no gaps the consumer can observe (aborted/control records are skipped invisibly under read_committed). Across partitions there is no ordering guarantee. Delivery semantics are application-determined by when you commit relative to processing.

Interactions with other subsystems

  • Group Coordination, the membership manager and heartbeat/commit managers speak ConsumerGroupHeartbeat/OffsetCommit/OffsetFetch to the group coordinator; the classic engine uses JoinGroup/SyncGroup.
  • The Fetch Path, the broker side that serves the Fetch RPCs this chapter builds, including follower-fetch (preferred read replica) and HW/LSO semantics.
  • Record Format & Batches, CompletedFetch decodes the very batches/records defined there, including control batches and transactional markers.
  • Transactions & EOS, isolation.level=read_committed and the aborted-transaction list make the consumer the read side of exactly-once.
  • Wire Protocol & RPC Framework, every request the managers emit (FetchRequest.Builder, etc.) is framed by that layer.
  • Share Groups, the parallel KafkaShareConsumer reuses much of this machinery (ShareConsumeRequestManager, ShareFetchBuffer) with acknowledge-based delivery.
  • Kafka Streams, drives the async engine through the same event model but with a StreamsRebalanceListener and task-based callbacks.

Design rationale & evolution

Design rationale

The async engine exists because the classic consumer interleaves coordination (join, sync, callbacks) with the user's poll() thread, so a long rebalance or a slow callback stalls record processing and a slow processing loop can trip the session timeout. KIP-848 moves assignment computation to the broker-side coordinator and introduces the ConsumerNetworkThread so that, in steady state, most members keep fetching during a rebalance. The non-blocking AsyncPollEvent design specifically targets parity with the classic poll() latency while keeping all network I/O off the application thread. New observability for this engine arrived with KIP-1068 (e.g. application-event queue size/time, time-between-network-thread-polls in AsyncConsumerMetrics).

The async engine remains feature-compatible: the same public Consumer contract, the same ConsumerRebalanceListener, the same SubscriptionState and fetch pipeline. The differences are confined to how coordination is threaded and where state transitions happen.

Gotchas & operational notes

Gotcha

Calling consumer methods from a rebalance callback is constrained: the async engine runs callbacks on the application thread inside the poll, and re-entrant API calls take the same single-thread lock via refCount (nested acquire by the same thread is allowed). But blocking calls (e.g. commitSync) inside onPartitionsRevoked remain the supported pattern for committing before giving up partitions.

Caution

poll(Duration.ZERO) with the async engine relies on a subtle rule: if a previous inflight poll already filled the buffer, a new event is not submitted, otherwise a zero-timeout poll could spin forever re-validating positions and never return buffered data (AsyncKafkaConsumer.java:1045). Don't assume poll(0) performs no work, it still drains background events and may complete a prior poll's stages.

Note

To size fetches: raise fetch.min.bytes (and accept up to fetch.max.wait.ms latency) for throughput; raise max.partition.fetch.bytes if single records/batches exceed 1 MiB (it must be ≥ the broker/topic max.message.bytes or a partition can stall). Tune max.poll.records down (not max.partition.fetch.bytes) to bound per-poll processing time against max.poll.interval.ms.

Note

Background thread name consumer_background_thread and (classic) kafka-coordinator-heartbeat-thread | <groupId> are useful when reading thread dumps: a hot background thread points at fetch/commit volume; a stuck application thread in awaitWakeup is simply an idle poll waiting for data.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.