17 · The Consumer Client
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
The KafkaConsumer reads records from topic partitions, tracks a per-partition position, optionally manages group membership and committed offsets, and turns raw fetch responses into deserialized ConsumerRecords. As of Kafka 4.x the public class is a thin façade that delegates to one of two engines: the legacy single-threaded ClassicKafkaConsumer (the entire coordinator + fetch loop runs on the user's poll() thread) or the KIP-848 AsyncKafkaConsumer, which splits work across the application thread and a dedicated ConsumerNetworkThread communicating over event queues. This chapter dissects both, the shared SubscriptionState, the fetch pipeline, and offset management, all grounded in the source.
Role & responsibilities
The consumer client is the client-side half of Kafka's read path. Its jobs are:
- Position tracking, for every assigned partition, remember the offset of the next record to deliver, plus the high watermark (HW), log-start offset, and last-stable-offset (LSO) reported by the broker.
- Fetching, build
FetchRPCs that respectfetch.min.bytes/fetch.max.bytes/max.partition.fetch.bytes/fetch.max.wait.ms, parse and decompress the returned batches, skip aborted transactions underread_committed, and hand back at mostmax.poll.recordsrecords perpoll(). - Group membership (optional), join a consumer group, receive a partition assignment, run
ConsumerRebalanceListenercallbacks, send heartbeats, and detect liveness viamax.poll.interval.ms. - Offset management (optional), commit positions to the group coordinator (auto or manual) and fetch committed offsets on startup.
A consumer with no group.id can still assign() partitions manually and fetch; it simply cannot commit offsets or use group management. See Group Coordination & Rebalance Protocols for the broker side and The Fetch Path & Replica Fetchers for how the broker serves these fetches.
The consumer is not thread-safe. Both engines enforce single-threaded access with a lightweight CAS lock that throws ConcurrentModificationException rather than blocking (AsyncKafkaConsumer.acquire(), clients/.../internals/AsyncKafkaConsumer.java:2216). The only sanctioned multi-thread interaction is calling wakeup() from another thread.
Where it lives in the code
| Class / interface | File | Role |
|---|---|---|
KafkaConsumer | clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java | Public façade; holds a ConsumerDelegate and forwards every method. |
ConsumerDelegateCreator | .../consumer/internals/ConsumerDelegateCreator.java | Factory; picks the engine from group.protocol. |
ClassicKafkaConsumer | .../consumer/internals/ClassicKafkaConsumer.java | Legacy single-threaded engine (CLASSIC protocol). |
AsyncKafkaConsumer | .../consumer/internals/AsyncKafkaConsumer.java | KIP-848 engine (CONSUMER protocol); application thread + background thread. |
ConsumerNetworkThread | .../consumer/internals/ConsumerNetworkThread.java | Background event loop; drives request managers + network I/O. |
SubscriptionState | .../consumer/internals/SubscriptionState.java | Thread-safe per-partition state machine; subscription type, positions, pause. |
FetchRequestManager / AbstractFetch | .../consumer/internals/{FetchRequestManager,AbstractFetch}.java | Builds fetch requests, manages fetch sessions, fills the buffer. |
FetchBuffer / CompletedFetch / FetchCollector | .../consumer/internals/{FetchBuffer,CompletedFetch,FetchCollector}.java | Thread-safe buffer; per-partition decode/iterate; collection into a Fetch. |
CommitRequestManager | .../consumer/internals/CommitRequestManager.java | Auto/manual commit, committed-offset fetch (async engine). |
OffsetsRequestManager | .../consumer/internals/OffsetsRequestManager.java | updateFetchPositions: committed-offset / list-offsets / validation (async engine). |
ConsumerMembershipManager / AbstractMembershipManager | .../consumer/internals/{ConsumerMembershipManager,AbstractMembershipManager}.java | KIP-848 member state machine & assignment reconciliation. |
ConsumerHeartbeatRequestManager / AbstractHeartbeatRequestManager | .../consumer/internals/{ConsumerHeartbeatRequestManager,AbstractHeartbeatRequestManager}.java | Sends ConsumerGroupHeartbeat; owns the poll timer. |
ConsumerCoordinator / AbstractCoordinator | .../consumer/internals/{ConsumerCoordinator,AbstractCoordinator}.java | CLASSIC-protocol JoinGroup/SyncGroup + heartbeat thread. |
| event classes | .../consumer/internals/events/*.java | ApplicationEvent / BackgroundEvent hierarchy crossing the two threads. |
Choosing an engine: the delegator
KafkaConsumer stores a single field private final ConsumerDelegate<K, V> delegate (KafkaConsumer.java:543) and every public method simply forwards (e.g. poll at KafkaConsumer.java:916 returns delegate.poll(timeout)). The delegate is built by ConsumerDelegateCreator.create(), which reads group.protocol and dispatches:
GroupProtocol gp = GroupProtocol.valueOf(config.getString(GROUP_PROTOCOL_CONFIG).toUpperCase());
if (gp == GroupProtocol.CONSUMER) return new AsyncKafkaConsumer<>(...); // KIP-848
else return new ClassicKafkaConsumer<>(...); // legacy
ConsumerDelegateCreator.java:61. The enum has exactly two values, CLASSIC("CLASSIC") and CONSUMER("CONSUMER") (GroupProtocol.java:30), and group.protocol defaults to classic (ConsumerConfig.java:115, DEFAULT_GROUP_PROTOCOL = GroupProtocol.CLASSIC.name().toLowerCase()). Both engines implement ConsumerDelegate<K,V>, so the façade is oblivious to which one it holds.
KIP-848 set out to move rebalancing logic from the client into a broker-side group coordinator and to give the consumer a new threading model that never blocks the user's thread on coordination. The façade/delegate split lets the two implementations co-exist so users opt in with a single config (group.protocol=consumer) without changing code. Cite KIP-848; the new client metrics come from KIP-1068.
The classic engine: one thread does everything
ClassicKafkaConsumer owns a ConsumerNetworkClient (client), a Fetcher, an OffsetFetcher, and, when grouped, a ConsumerCoordinator (ClassicKafkaConsumer.java:127). Its poll(Timer) loop runs entirely on the caller's thread (ClassicKafkaConsumer.java:648):
client.maybeTriggerWakeup(), honor a pendingwakeup().updateAssignmentMetadataIfNeeded(timer, false), runscoordinator.poll(timer, waitForJoinGroup=false)(joins the group, sends heartbeats, runs rebalance callbacks, auto-commits) thenupdateFetchPositions.pollForFetches(timer),fetcher.collectFetch()returns buffered data immediately if present; otherwisesendFetches()issues new fetch RPCs andclient.poll(pollTimer, () -> !fetcher.hasAvailableFetches())blocks for responses.- If the fetch is non-empty, pipeline:
sendFetches()+client.transmitSends()before returning, so the next batch is already in flight while the user processes records.
Crucially, heartbeats in the classic protocol are also sent off a background heartbeat thread named kafka-coordinator-heartbeat-thread | <groupId> (AbstractCoordinator.java:124,1471), but JoinGroup/SyncGroup and all rebalance work happen on the user thread inside coordinator.poll. This is the structural limitation KIP-848 removes.
The async engine: two threads, two queues
AsyncKafkaConsumer divides responsibilities between the application thread (whatever calls poll()/commit*()/etc.) and a single background thread, ConsumerNetworkThread, whose Java thread name is consumer_background_thread (ConsumerNetworkThread.java:68). They communicate through two blocking queues:
- Application event queue (
LinkedBlockingQueue<ApplicationEvent>): app thread → background. Wrapped byApplicationEventHandler;add()enqueues and wakes the network thread (ApplicationEventHandler.java:94),addAndGet()enqueues aCompletableApplicationEventand blocks on its future (ApplicationEventHandler.java:135). - Background event queue (
LinkedBlockingQueue<BackgroundEvent>): background → app. Carries errors to surface to the user and rebalance-callback requests, drained byAsyncKafkaConsumer.BackgroundEventProcessoron the app thread (AsyncKafkaConsumer.java:192).
The background thread holds the actual networking machinery, the NetworkClientDelegate and a list of RequestManagers, none of which the app thread is allowed to touch directly.
The background event loop
ConsumerNetworkThread.runOnce() (ConsumerNetworkThread.java:210) is the heart of the engine and executes repeatedly:
processApplicationEvents(),drainTothe application queue andapplicationEventProcessor.process(event)each one;CompletableEvents are registered with aCompletableEventReaperso they can be timed out.- For each
RequestManager rm : requestManagers.entries():PollResult pr = rm.poll(now)thennetworkClientDelegate.addAll(pr), accumulating the minimum poll timeout. networkClientDelegate.poll(pollWaitTimeMs, now), single network step: send staged requests, read responses, fire callbacks.- Recompute
cachedMaximumTimeToWaitasmin(rm.maximumTimeToWait(now))so the app thread can later read the safe block duration viamaximumTimeToWait()without touching the managers (ConsumerNetworkThread.java:340). reapExpiredApplicationEvents(now)and surface metadata errors to events implementingMetadataErrorNotifiableEvent.
The managers iterated each pass include (in order) the CoordinatorRequestManager, CommitRequestManager, ConsumerHeartbeatRequestManager, ConsumerMembershipManager, OffsetsRequestManager, TopicMetadataRequestManager, and FetchRequestManager (RequestManagers.java:91). Each is Optional except offsets/metadata/fetch, so a groupless consumer simply has no coordinator/commit/heartbeat/membership managers.
poll() on the application thread: the AsyncPollEvent
Rather than block waiting for coordination, AsyncKafkaConsumer.poll() (AsyncKafkaConsumer.java:933) submits a single multi-stage AsyncPollEvent and then polls the fetch buffer non-blocking. The event encapsulates "do a poll round-trip": reconciliation check, position validation, and fetch-request creation, executed sequentially on the background thread. Its lifecycle is managed by checkInflightPoll():
- On the first pass of a
poll()call, if a previousinflightPollexists,maybeClearPreviousInflightPoll()inspects it: complete-with-error → throw; complete-and-buffer-empty → clear and submit a fresh one; complete-and-buffer-non-empty → keep it (avoids starvation whenpoll(0)is called repeatedly). - If no event is inflight, create
new AsyncPollEvent(deadlineMs, now)andapplicationEventHandler.add(it)(fire-and-forget, no blocking). - Run
offsetCommitCallbackInvoker.executeCallbacks()andprocessBackgroundEvents()(user callbacks / errors), these are on the app thread.
On the background side, ApplicationEventProcessor.process(AsyncPollEvent) (events/ApplicationEventProcessor.java:757) runs the stages:
consumerMembershipManager.maybeReconcile(true), safely commit + mark revoked partitions before any new fetching, thenevent.markReconciliationCheckComplete().commitRequestManager.updateTimerAndMaybeCommit(pollTimeMs)(auto-commit on interval) andmembershipManager.onConsumerPoll()+heartbeatRequestManager.resetPollTimer(pollTimeMs), this is what keeps the poll timer alive.offsetsRequestManager.updateFetchPositions(deadlineMs)thenevent.markValidatePositionsComplete().- On completion,
fetchRequestManager.createFetchRequests(); thenevent.completeSuccessfully().
The two completion checks isValidatePositionsComplete (a volatile boolean, events/AsyncPollEvent.java:50) and isReconciliationCheckComplete() (a method returning reconciliationCheckFuture.isDone(), backed by a final CompletableFuture<Void>, events/AsyncPollEvent.java:51,104) let the app thread know when it is safe to read the buffer.
The application thread must not collect records until the background thread has finished validating positions for the inflight poll. collectFetch() returns Fetch.empty() while inflightPoll != null && !inflightPoll.isValidatePositionsComplete() (AsyncKafkaConsumer.java:2072). This prevents both threads from racing to write SubscriptionState.position() for the same partition.
pollForFetches and prefetch pipelining
After checkInflightPoll, poll() calls pollForFetches(timer) (AsyncKafkaConsumer.java:1975): it first tries collectFetch(); if empty, it computes a poll timeout bounded by the background thread's maximumTimeToWait(), then blocks on fetchBuffer.awaitWakeup(pollTimer) until data arrives or a wakeup fires, and finally collectFetch()s again. When a non-empty fetch is returned, sendPrefetches(timer) enqueues a CreateFetchRequestsEvent so the next batch is requested while the user processes the current one (AsyncKafkaConsumer.java:2123), the async analogue of the classic transmitSends() pipelining. Records are passed through interceptors.onConsume(...) before return.
SubscriptionState: the shared core
Both engines share one SubscriptionState instance, documented as thread-safe, every public method is synchronized (SubscriptionState.java:74). It models what the consumer is reading and where.
Subscription type
A single enum tracks mutually-exclusive modes (SubscriptionState.java:80): NONE, AUTO_TOPICS (collection subscribe), AUTO_PATTERN (java Pattern), AUTO_PATTERN_RE2J (broker-evaluated regex, KIP-848), USER_ASSIGNED (manual assign), and AUTO_TOPICS_SHARE (share groups). setSubscriptionType() throws "Subscription to topics, partitions and pattern are mutually exclusive" if you mix modes (SubscriptionState.java:185). This is why you cannot call both subscribe() and assign() on one consumer.
Per-partition state & the fetch-state machine
Each assigned partition has a TopicPartitionState (SubscriptionState.java:1011) held in an ordered PartitionStates map (order matters for fetch building and round-robin fairness). Its fields:
| Field | Meaning |
|---|---|
FetchState fetchState | One of INITIALIZING, FETCHING, AWAIT_RESET, AWAIT_VALIDATION. |
FetchPosition position | Last consumed position = offset of next record to deliver, plus offsetEpoch and currentLeader (SubscriptionState.java:1382). |
Long highWatermark | HW from the last fetch response (used for lag and for read_uncommitted end). |
Long logStartOffset | Log-start offset (earliest available). |
Long lastStableOffset | LSO; under read_committed this bounds delivery and lag. |
boolean paused | User called pause(); not fetchable until resume(). |
boolean pendingRevocation | Set during a rebalance to stop fetching before the partition is given up. |
AutoOffsetResetStrategy resetStrategy | Strategy to apply when in AWAIT_RESET. |
Integer preferredReadReplica + expiry | Follower-fetch target with a lease (KIP-392). |
A partition is fetchable only when !paused && !pendingRevocation && !pendingOnAssignedCallback && hasValidPosition() (SubscriptionState.java:1250). Transitions are validated against an explicit table encoded in the FetchStates enum (SubscriptionState.java:1306):
auto.offset.reset before returning to FETCHING.transitionState() enforces an interesting invariant: states that requiresPosition() (FETCHING, AWAIT_VALIDATION) must have a non-null position, and states that don't (INITIALIZING, AWAIT_RESET) have their position nulled out on entry (SubscriptionState.java:1055).
position vs committed
These two notions are easy to conflate but distinct:
- position
- The in-memory offset of the next record
poll()will return for a partition. Advanced locally by every successful fetch. Read viaconsumer.position(tp), which will block to fetch/reset a position if none is set yet (AsyncKafkaConsumer.java:1230). - committed
- The offset durably stored at the group coordinator, the recovery point after a restart. Read via
consumer.committed(partitions)(aFetchCommittedOffsetsEventin the async engine,AsyncKafkaConsumer.java:1260). Has nothing to do with the live position until a commit happens.
The fetch pipeline in detail
Building fetch requests
AbstractFetch.prepareFetchRequests() (AbstractFetch.java:421) computes the fetchable-and-unbuffered partitions, groups them by their leader (or preferred read replica), and builds one FetchSessionHandler.Builder per node. Per partition it adds a FetchRequest.PartitionData(topicId, position.offset, INVALID_LOG_START_OFFSET, fetchConfig.fetchSize, position.currentLeader.epoch, …), where fetchSize = max.partition.fetch.bytes. It deliberately skips a node if (a) there is already an inflight fetch to it, or (b) it currently hosts buffered partitions, fetching other partitions from a node with buffered data would drop the buffered ones from the broker's fetch session and risk session eviction (AbstractFetch.java:461).
The request-level knobs are stamped in createFetchRequest() (AbstractFetch.java:310): forConsumer(maxVersion, fetchConfig.maxWaitMs, fetchConfig.minBytes, …), .isolationLevel(...), .setMaxBytes(fetchConfig.maxBytes), .rackId(clientRackId). So fetch.max.wait.ms, fetch.min.bytes, and fetch.max.bytes are all request-level; max.partition.fetch.bytes is per-partition. The version caps at 12 unless the session can use topic IDs.
Receiving and buffering
On a successful response, AbstractFetch.handleFetchSuccess() wraps each partition's data in a CompletedFetch and calls fetchBuffer.add(completedFetch) (AbstractFetch.java:225). FetchBuffer is a ConcurrentLinkedQueue<CompletedFetch> guarded by a ReentrantLock with a Condition (FetchBuffer.java:53); addAll() sets a wokenup flag and signalAll()s so a waiting app thread in awaitWakeup() unblocks. There is at most one CompletedFetch per partition in the queue.
Decoding, max.poll.records, and aborted transactions
FetchCollector.collectFetch(buffer) (FetchCollector.java:91) pulls up to maxPollRecords records across partitions: it takes the head CompletedFetch, lazily initialize()s it (validates that the response's first offset still matches the live position, updates HW/LSO/log-start), then fetchRecords() from it. If the partition was paused after the fetch arrived, the records are stashed and re-queued for a later poll (FetchCollector.java:125). It also re-checks isAssigned/isFetchable per partition so records for a partition revoked mid-flight are dropped, and only advances the position when nextInLineFetch.nextFetchOffset() == position.offset (i.e. not a stale response) (FetchCollector.java:168).
CompletedFetch.fetchRecords() (CompletedFetch.java:257) iterates batches via a streaming, decompressing iterator (currentBatch.streamingIterator(decompressionBufferSupplier)). The transaction filtering lives in nextFetchedRecord() (CompletedFetch.java:187): under read_committed, for each batch with a producer ID it (1) drains the per-partition abortedTransactions priority queue (ordered by firstOffset) up to the batch's last offset, adding their producer IDs to abortedProducerIds; (2) if the batch is a control batch carrying an abort marker, removes that producer ID; (3) if isBatchAborted() (transactional & producer ID is in the aborted set), skips the entire batch by advancing nextFetchOffset to currentBatch.nextOffset(). Control batches are never returned to the user. Out-of-range records (offset < nextFetchOffset) are skipped.
read_committed. The broker sends the aborted-txn list; the client filters client-side, bounded by the LSO, control batches are consumed for their markers but never returned to the user.max.poll.records caps how many records each poll() returns but does not change the wire fetch, the broker still ships up to max.partition.fetch.bytes per partition; leftover records stay buffered for the next poll. CRC validation per batch/record is governed by check.crcs (default true).
Offset management
Auto vs manual commit
enable.auto.commit (default true, ConsumerConfig.java:458) and auto.commit.interval.ms (default 5000, ConsumerConfig.java:463) drive periodic background commits. In the async engine, CommitRequestManager holds an Optional<AutoCommitState> with a Timer and a hasInflightCommit guard (CommitRequestManager.java:1497). On each poll round, updateTimerAndMaybeCommit() → maybeAutoCommitAsync() fires a commit of subscriptions.allConsumed() only if the timer expired and no commit is already inflight (CommitRequestManager.java:277). A retriable failure resets the timer with the retry backoff rather than the full interval.
Manual commits: commitSync() blocks until the commit lands (or times out); commitAsync(callback) enqueues an AsyncCommitEvent and returns immediately, invoking the callback later on the app thread via OffsetCommitCallbackInvoker (AsyncKafkaConsumer.java:1112). With no explicit offsets, both commit subscriptions.allConsumed() resolved on the background thread; the sync path blocks on commitEvent.offsetsReady() first so the committed set isn't perturbed by fetches that begin after the call (AsyncKafkaConsumer.java:1135).
Auto-commit commits the position (next offset to fetch) as of poll time, regardless of whether your application finished processing those records. If you crash mid-batch after an auto-commit, those records are skipped on restart (at-most-once for the un-processed tail). For at-least-once, disable auto-commit and commitSync() only after processing.
Committing before rebalance
Before revoking partitions, the async membership manager calls CommitRequestManager.maybeAutoCommitSyncBeforeRebalance(deadlineMs) (CommitRequestManager.java:330), which retries until success/fatal/timeout. Notably it treats STALE_MEMBER_EPOCH as retriable (re-sends with the latest epoch) but UNKNOWN_TOPIC_OR_PARTITION as fatal so a deleted topic can't stall the rebalance forever.
updateFetchPositions: where positions come from
OffsetsRequestManager.updateFetchPositions(deadlineMs) (OffsetsRequestManager.java:235) is the canonical "find a position for every partition" routine:
validatePositionsIfNeeded(), for partitions inAWAIT_VALIDATION, sendOffsetsForLeaderEpochand detect log truncation.- If
hasAllFetchPositions(), done. - Otherwise, for the initializing partitions: if grouped,
initWithCommittedOffsetsIfNeeded()issues anOffsetFetchand seeds positions from committed offsets; theninitWithPartitionOffsetsIfNeeded()handles any still-missing positions by marking themAWAIT_RESETand issuingListOffsetsperauto.offset.reset(OffsetsRequestManager.java:280,337).
resetInitializingPositions() throws NoOffsetForPartitionException if no reset strategy is configured and there is no committed offset, that is exactly when auto.offset.reset=none surfaces. The default reset is latest (ConsumerConfig.java:546, AutoOffsetResetStrategy.LATEST).
Group membership & rebalancing (async engine)
The KIP-848 member is a state machine in ConsumerMembershipManager/AbstractMembershipManager. The states (MemberState.java:27) are UNSUBSCRIBED, JOINING, RECONCILING, ACKNOWLEDGING, STABLE, FENCED, PREPARE_LEAVING, LEAVING, FATAL, and STALE, with explicit previousValidStates tables enforcing legal transitions.
MemberState). The spine is UNSUBSCRIBED → JOINING → RECONCILING → ACKNOWLEDGING → STABLE; the lower rows list the back-edges (STABLE → RECONCILING on a new assignment, FENCED, the leave path, and the STALE poll-timer recovery). Reconciliation runs entirely in the background but defers revocations / callbacks to the application thread.Reconciliation (AbstractMembershipManager.maybeReconcile(canCommit), AbstractMembershipManager.java:875) diffs the broker-provided target assignment against currently owned partitions to compute added and revoked sets. The ordering is strict: markPendingRevocationToPauseFetching(revoked) → auto-commit (signalReconciliationStarted()) → onPartitionsRevoked → onPartitionsAssigned (revokeAndAssign, AbstractMembershipManager.java:993). Because revocations and user callbacks must run on the application thread, the manager emits PartitionsRemoved (carrying both revocations and losses, dispatched by methodName())/PartitionsAssigned background events; AsyncKafkaConsumer.BackgroundEventProcessor runs the listener and signals completion back with a ConsumerRebalanceListenerCallbackCompletedEvent (AsyncKafkaConsumer.java:274).
A reconciliation that only adds partitions can run from the plain background poll (new partitions have no buffered records). A reconciliation that revokes partitions, or any auto-commit, is only triggered from the AsyncPollEvent path (canCommit=true), because the application thread may already be returning records for a to-be-revoked partition (AbstractMembershipManager.java:929). The reconciliationCheckFuture on the poll event gates collectFetch() so the app thread waits for revocation handling before reading the buffer.
Liveness: the poll timer and max.poll.interval.ms
AbstractHeartbeatRequestManager owns a pollTimer initialized to max.poll.interval.ms (default 300000, ConsumerConfig.java:629) (AbstractHeartbeatRequestManager.java:119). Each poll(now) updates it; if it has expired and the member isn't already leaving, it logs the well-known warning and calls membershipManager().transitionToSendingLeaveGroup(true), sending a leave heartbeat (AbstractHeartbeatRequestManager.java:171). resetPollTimer(pollMs), called from the AsyncPollEvent handler, resets it on every user poll(); if it had expired (member is STALE), the next poll transitions it back toward JOINING. To guarantee the timer never silently expires while the app blocks in poll(), maximumTimeToWait() returns at most pollTimer.remainingMs() / 2 (AbstractHeartbeatRequestManager.java:262). Heartbeats themselves go out on the interval (heartbeat.interval.ms default 3000, session.timeout.ms default 45000 for the classic protocol; ConsumerConfig.java:438,443).
Concurrency & threading model
| State / resource | Owner / guard | Notes |
|---|---|---|
| Public API re-entrancy | AtomicLong currentThread + AtomicInteger refCount | CAS lock; throws ConcurrentModificationException on cross-thread use (AsyncKafkaConsumer.java:2216). |
SubscriptionState | synchronized methods | Shared by app thread (seek/pause/position read) and background (assignment, HW updates). |
FetchBuffer | ReentrantLock + Condition | Producer = background (adds CompletedFetch); consumer = app thread (collect/await). |
| Application event queue | LinkedBlockingQueue | App thread enqueues; background drains. |
| Background event queue | LinkedBlockingQueue | Background enqueues errors/callbacks; app thread drains in processBackgroundEvents(). |
| RequestManagers / NetworkClient | background thread only | App thread reads only the cached maximumTimeToWait() snapshot. |
groupMetadata, groupAssignmentSnapshot | AtomicReference | Updated from background via MemberStateListener; read by app thread. |
wakeupTrigger | WakeupTrigger | Lets another thread interrupt a blocking poll/commit via WakeupException. |
The background thread is a daemon (super(BACKGROUND_THREAD_NAME, true)) and swallows per-iteration exceptions so one bad event cannot kill the loop (ConsumerNetworkThread.java:160). Resources are created inside the thread (initializeResources()) and an initializationLatch + initializationError propagate construction failures (e.g. a bad SASL JAAS config) back to the constructor rather than hanging.
Configuration reference
| Key | Default | Effect |
|---|---|---|
group.protocol | classic | consumer selects AsyncKafkaConsumer (KIP-848); classic selects ClassicKafkaConsumer. |
group.id | null | Enables group management + offset commits. null ⇒ assign()-only, no commits. |
group.remote.assignor | (unset) | Server-side assignor name; CONSUMER protocol only. |
enable.auto.commit | true | Periodic background commit of consumed positions. Forced false when group.id is null. |
auto.commit.interval.ms | 5000 | Interval between auto-commits. |
auto.offset.reset | latest | earliest/latest/none when no committed offset exists; none throws NoOffsetForPartitionException. |
max.poll.records | 500 | Max records returned per poll(); does not change the wire fetch. |
max.poll.interval.ms | 300000 | Max time between polls before the member is removed from the group. |
fetch.min.bytes | 1 | Broker waits for this many bytes (or fetch.max.wait.ms) before responding. |
fetch.max.bytes | 52428800 (50 MiB) | Soft cap on total bytes per fetch response. |
fetch.max.wait.ms | 500 | Max broker wait when fetch.min.bytes isn't yet satisfied. |
max.partition.fetch.bytes | 1048576 (1 MiB) | Per-partition byte cap (= FetchConfig.fetchSize). |
isolation.level | read_uncommitted | read_committed bounds delivery to the LSO and skips aborted batches. |
session.timeout.ms / heartbeat.interval.ms | 45000 / 3000 | CLASSIC-protocol liveness; CONSUMER protocol uses a broker-controlled interval. |
check.crcs | true | Validate batch/record CRCs during decode. |
default.api.timeout.ms | 60000 | Default deadline for blocking calls without an explicit timeout. |
Failure modes, edge cases & recovery
- Wakeup & interrupt,
wakeup()from another thread makes a blockingpoll/commitSync/committedthrowWakeupException; the async engine routes this throughWakeupTriggeron the fetch buffer and the active task future. A blocking call must not be interrupted between updating the position and returning records, both engines re-check the wakeup trigger before polling fetches, never between collect and return (AsyncKafkaConsumer.java:953). - Offset out of range, a fetch error transitions the partition to
AWAIT_RESETand triggers aListOffsetsperauto.offset.reset; withnone, the user seesOffsetOutOfRangeException/NoOffsetForPartitionException. - Log truncation,
AWAIT_VALIDATION+OffsetsForLeaderEpochdetect a divergent epoch; aLogTruncationException(clients/.../consumer/LogTruncationException.java) carries the divergent offset, built from theLogTruncationholder (fieldstopicPartition,fetchPosition,divergentOffsetOpt;SubscriptionState.java:1422). - Fenced member,
UNKNOWN_MEMBER_ID/FENCED_MEMBER_EPOCHmoves the member toFENCED, invokesonPartitionsLost, and re-joins as a fresh member. - Poll-loop too slow, exceeding
max.poll.interval.msproactively leaves the group (member →STALE), invokingonPartitionsLost; the nextpoll()rejoins. The deserializer/processing time counts against this budget, so a largemax.poll.recordswith slow processing is the classic trigger. - Stale fetch responses, responses whose first offset no longer matches the live position (after a seek/rebalance) are discarded in
FetchCollector.handleInitializeSuccess()(FetchCollector.java:259). - Deserialization failure, a
RecordDeserializationExceptionis thrown only if no records were already collected; the offending offset must be skipped viaseek()to make progress (CompletedFetch.java:295). - Async-engine construction failure, surfaced from the background thread via the init latch; partially-built objects are closed to avoid leaks (
AsyncKafkaConsumer.java:597).
Invariants & guarantees
A partition's position only advances after its records have been collected for return to the user, and only when the response offset matches the current position; the position is therefore the offset of the next undelivered record. Returning an empty Fetch guarantees the position is unchanged (FetchCollector.java:82).
Subscription type is single-valued: subscribe() (topics/pattern) and assign() (manual) are mutually exclusive for a consumer's lifetime until unsubscribe() resets it.
At most one CompletedFetch per partition exists in the FetchBuffer, and a node with buffered partitions is excluded from new fetch requests, preserving the broker's fetch-session incrementality.
Within a partition, records are delivered in strictly increasing offset order with no gaps the consumer can observe (aborted/control records are skipped invisibly under read_committed). Across partitions there is no ordering guarantee. Delivery semantics are application-determined by when you commit relative to processing.
Interactions with other subsystems
- Group Coordination, the membership manager and heartbeat/commit managers speak
ConsumerGroupHeartbeat/OffsetCommit/OffsetFetchto the group coordinator; the classic engine uses JoinGroup/SyncGroup. - The Fetch Path, the broker side that serves the
FetchRPCs this chapter builds, including follower-fetch (preferred read replica) and HW/LSO semantics. - Record Format & Batches,
CompletedFetchdecodes the very batches/records defined there, including control batches and transactional markers. - Transactions & EOS,
isolation.level=read_committedand the aborted-transaction list make the consumer the read side of exactly-once. - Wire Protocol & RPC Framework, every request the managers emit (
FetchRequest.Builder, etc.) is framed by that layer. - Share Groups, the parallel
KafkaShareConsumerreuses much of this machinery (ShareConsumeRequestManager,ShareFetchBuffer) with acknowledge-based delivery. - Kafka Streams, drives the async engine through the same event model but with a
StreamsRebalanceListenerand task-based callbacks.
Design rationale & evolution
The async engine exists because the classic consumer interleaves coordination (join, sync, callbacks) with the user's poll() thread, so a long rebalance or a slow callback stalls record processing and a slow processing loop can trip the session timeout. KIP-848 moves assignment computation to the broker-side coordinator and introduces the ConsumerNetworkThread so that, in steady state, most members keep fetching during a rebalance. The non-blocking AsyncPollEvent design specifically targets parity with the classic poll() latency while keeping all network I/O off the application thread. New observability for this engine arrived with KIP-1068 (e.g. application-event queue size/time, time-between-network-thread-polls in AsyncConsumerMetrics).
The async engine remains feature-compatible: the same public Consumer contract, the same ConsumerRebalanceListener, the same SubscriptionState and fetch pipeline. The differences are confined to how coordination is threaded and where state transitions happen.
Gotchas & operational notes
Calling consumer methods from a rebalance callback is constrained: the async engine runs callbacks on the application thread inside the poll, and re-entrant API calls take the same single-thread lock via refCount (nested acquire by the same thread is allowed). But blocking calls (e.g. commitSync) inside onPartitionsRevoked remain the supported pattern for committing before giving up partitions.
poll(Duration.ZERO) with the async engine relies on a subtle rule: if a previous inflight poll already filled the buffer, a new event is not submitted, otherwise a zero-timeout poll could spin forever re-validating positions and never return buffered data (AsyncKafkaConsumer.java:1045). Don't assume poll(0) performs no work, it still drains background events and may complete a prior poll's stages.
To size fetches: raise fetch.min.bytes (and accept up to fetch.max.wait.ms latency) for throughput; raise max.partition.fetch.bytes if single records/batches exceed 1 MiB (it must be ≥ the broker/topic max.message.bytes or a partition can stall). Tune max.poll.records down (not max.partition.fetch.bytes) to bound per-poll processing time against max.poll.interval.ms.
Background thread name consumer_background_thread and (classic) kafka-coordinator-heartbeat-thread | <groupId> are useful when reading thread dumps: a hot background thread points at fetch/commit volume; a stuck application thread in awaitWakeup is simply an idle poll waiting for data.