krivaltsevich.com Kafka Internals4.4

15 · Share Groups (Queues / KIP-932)

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

Share groups are Kafka's queue-like consumption model: many share consumers cooperatively read the same partitions, and records are delivered individually under a time-bounded acquisition lock rather than committed by offset. Each record carries a per-record delivery state (Available → Acquired → Acknowledged/Archived) and a delivery count, giving at-least-once queue semantics with redelivery, max-attempt limits and an optional dead-letter queue. The broker's SharePartition tracks in-flight record state and the share-partition start offset (SPSO); the durable state lives in the internal __share_group_state topic, written through a persister SPI to the ShareCoordinator. This chapter follows the data and control flow from the ShareConsumer client through ShareFetch/ShareAcknowledge RPCs into SharePartition and on to the coordinator.

Role & responsibilities

A consumer group (chapter 13) assigns each partition to exactly one member; progress is a single committed offset per partition; reprocessing is coarse (rewind the offset). A share group inverts this: the unit of bookkeeping is the individual record, not the partition cursor. The broker that leads a topic-partition keeps an in-memory map of which records are acquired (leased to a member), acknowledged, archived, or available for (re)delivery, and the consumer acknowledges records one at a time as ACCEPT, RELEASE or REJECT.

The subsystem must therefore:

  • Lease records to members with an expiring lock so a crashed/slow consumer cannot block the queue (AcquisitionLockTimerTask).
  • Track and persist per-record delivery state and delivery count so leases survive a broker/leader change.
  • Advance the share-partition start offset (SPSO) as a contiguous prefix becomes terminal, so the log can age out and lag can be computed.
  • Cap concurrency: a maximum number of in-flight (locked) records per share-partition.
  • Redeliver records up to a max delivery count, then archive them, or route them to a dead-letter queue (KIP-1191).
Key idea

Share groups decouple parallelism from partition count. Because acknowledgement is per-record and held in broker memory (durably backed), more consumers than partitions can make progress on the same partition simultaneously, true queue fan-out, without per-key ordering guarantees.

Where it lives in the code

ConcernPrincipal classPath
Per-partition in-flight state engineSharePartitioncore/src/main/java/kafka/server/share/SharePartition.java
Broker orchestration of fetch/ack/sessionsSharePartitionManagercore/src/main/java/kafka/server/share/SharePartitionManager.java
Record state machineRecordState, InFlightState, InFlightBatchserver/src/main/java/org/apache/kafka/server/share/fetch/
Lease-expiry timerAcquisitionLockTimerTaskserver/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
Persister SPI / RPC clientPersister, DefaultStatePersister, PersisterStateManagerserver-common/src/main/java/org/apache/kafka/server/share/persister/
Share coordinator runtimeShareCoordinatorService, ShareCoordinatorShardshare-coordinator/src/main/java/org/apache/kafka/coordinator/share/
Durable state record / batch mergeShareGroupOffset, PersisterStateBatchCombinershare-coordinator/src/main/java/org/apache/kafka/coordinator/share/
Dead-letter queue writerShareGroupDLQStateManagerserver/src/main/java/org/apache/kafka/server/share/dlq/ShareGroupDLQStateManager.java
Share consumer clientShareConsumerImpl, ShareConsumeRequestManagerclients/src/main/java/org/apache/kafka/clients/consumer/internals/
Group config (dynamic)ShareGroupConfig, GroupConfiggroup-coordinator/src/main/java/org/apache/kafka/coordinator/group/

Core concepts & terminology

Share-partition
The (groupId, topic-partition) pair as seen by one share group. State is held on the partition leader by a SharePartition instance.
Acquisition lock
A time-bounded lease on acquired records. On expiry the records return to AVAILABLE (or are archived if the delivery count is exhausted). Implemented as a TimerTask.
Delivery count
How many times a record has been handed to a consumer. Incremented on each acquire; bounded by group.share.delivery.count.limit (default 5).
SPSO
Share-Partition Start Offset, the lowest offset still in-flight. Records below it are terminal (acknowledged/archived) and dropped from the cache. Persisted as startOffset.
State epoch
Fencing version of the durable state, bumped on (re)initialization; protects against stale writers after a coordinator/leader change.
Snapshot / update record
The two coordinator record types in __share_group_state: a full ShareSnapshotValue and an incremental ShareUpdateValue.

The record state machine

RecordState (server/.../fetch/RecordState.java:26) defines five states with an explicit byte id, persisted in the state batches:

StateidMeaningTerminal?
AVAILABLE0Eligible to be acquired/(re)delivered.no
ACQUIRED1Leased to a member under an acquisition lock.no
ACKNOWLEDGED2Consumer ACCEPTed it; successfully processed.yes
ARCHIVING3Transient (KIP-1191): destined for the DLQ before becoming ARCHIVED.no*
ARCHIVED4Permanently dropped: REJECTed, delivery-count exhausted, or compacted away.yes

Transitions are policed by RecordState.validateTransition (RecordState.java:50): ACKNOWLEDGED and ARCHIVED are sinks; AVAILABLE may only go to ACQUIRED; a record may never transition to itself. The acknowledgement-byte → state mapping is fixed in SharePartition.ACK_TYPE_TO_RECORD_STATE (SharePartition.java:149): byte 0 (a gap) → ARCHIVED, ACCEPTACKNOWLEDGED, RELEASEAVAILABLE, REJECTARCHIVED.

initial AVAILABLE acquire, lease, ++deliveryCount ACQUIRED ACCEPT ACKNOWLEDGED terminal
ACQUIRED RELEASE / lock timeout · deliveryCount < max AVAILABLE re-deliver (loop back to ACQUIRED) ACQUIRED
ACQUIRED REJECT  ·  deliveryCount ≥ max, DLQ off ARCHIVED terminal
ACQUIRED deliveryCount ≥ max, DLQ on (KIP-1191) ARCHIVING DLQ write succeeds ARCHIVED terminal
Per-record / per-batch state machine in InFlightState.tryUpdateState. A record starts AVAILABLE; each acquire leases it (ACQUIRED) and increments the delivery count. From ACQUIRED it can be accepted (terminal ACKNOWLEDGED), released or lock-timed-out back to AVAILABLE for redelivery while under the limit, or, on REJECT or once the delivery count is exhausted, archived directly (DLQ off) or via the transient ARCHIVING hop (DLQ on).
in-flight state (available / acquired) acknowledged, success terminal archiving / archived, drop terminal transition (label = trigger / guard) pill = state, ◉ = terminal

The decisive rule lives in InFlightState.tryUpdateState (InFlightState.java:150): when a record is being made AVAILABLE (RELEASE or lock-timeout) but its delivery count has already reached the maximum, it is instead diverted, to ARCHIVING when DLQ support is enabled for the group, otherwise straight to ARCHIVED. Delivery count is bumped via DeliveryCountOps (INCREASE/DECREASE/NO_OP, DeliveryCountOps.java:23); it is incremented on acquire and explicitly not incremented when a record reaches ARCHIVED.

Invariant

A record's delivery count never decreases on the normal path (it is only ever INCREASE on acquire), and a record can be delivered at most maxDeliveryCount times before it is archived. There is no infinite-redelivery loop: every RELEASE / lock-timeout either makes the record available again with a higher count or archives it.

In-memory data structures

A SharePartition (SharePartition.java:103) holds the authoritative in-flight state for one share-partition on its leader. Its salient fields:

cachedState : NavigableMap<Long, InFlightBatch>
A ConcurrentSkipListMap keyed by the first offset of each in-flight batch (SharePartition.java:178,392). This is the heart of the engine.
startOffset, endOffset
SPSO and the highest fetched offset; bound the live region of the cache (SharePartition.java:276,282).
lock : ReentrantReadWriteLock
Guards all access to cachedState/offsets. Mutations take the write lock (SharePartition.java:184,393).
fetchLock : AtomicReference<Uuid>
Ensures a given share-partition is processed by only one in-flight share-fetch at a time; the acquirer's id is stored (SharePartition.java:191).
findNextFetchOffset : boolean
Dirty flag: when records become available out of order, the next fetch offset must be recomputed by scanning the cache (SharePartition.java:224).
deliveryCompleteCount : AtomicInteger
Count of terminal records at/after SPSO; feeds share-partition lag (KIP-1226) (SharePartition.java:305).
stateEpoch, partitionState
Fencing epoch and lifecycle state (EMPTY → INITIALIZING → ACTIVE / FAILED / FENCED, SharePartition.java:123).
timer : Timer
Hashed-wheel timer that fires acquisition-lock expirations (SharePartition.java:230).

InFlightBatch (server/.../fetch/InFlightBatch.java:34) stores firstOffset/lastOffset plus either a single batchState : InFlightState (the whole batch shares one state) or an offsetState : NavigableMap<Long, InFlightState> (per-offset state). The per-offset map is created lazily by maybeInitializeOffsetStateUpdate only when a client acknowledges a subset of a batch, so the common path (whole-batch acks) stays compact. Each InFlightState (InFlightState.java:33) carries state, deliveryCount, memberId, an optional acquisitionLockTimeoutTask, and a rollbackState snapshot used for the persist-then-commit protocol.

SharePartition (group g, topic-partition t-p), leader broker, startOffset=100endOffset=139; nextFetchOffset=131 (first AVAILABLE), findNextFetchOffset=true.

share-partition g · t-p, cachedStateoffsets →
100SPSO
…119
120
…129
130
132
…139
ack’d
dc 1
M1
dc 2
M2
dc 1
avail
dc 1
M2
dc 1
cachedState key (firstOffset)Batch rangeStatedcMember · lockNotes
100100..119batchState=ACKNOWLEDGED1, whole-batch ack
120120..129batchState=ACQUIRED2M1 · lock→t+30swhole batch leased
130130offsetState: ACQUIRED1M2 · lockpartial ack split this batch into per-offset state
131offsetState: AVAILABLE1,
132..139offsetState: ACQUIRED1M2 · lock
Example cache: batches keyed by first offset, with per-offset state only where batch 130 was partially acknowledged. The strip shows the live region startOffset=100 … endOffset=139; the first AVAILABLE offset (131) is where the next fetch resumes.
committed cell, ACKNOWLEDGED (terminal) leased cell, ACQUIRED (held by a member) AVAILABLE, re-deliverable / next fetch SPSO / next fetch = offset markers ▲ marker = member · delivery count under each offset

On-disk / on-wire data structures

Wire: ShareFetch / ShareAcknowledge / AcquiredRecords

The client multiplexes fetch and acknowledge over two RPCs (SHARE_FETCH, SHARE_ACKNOWLEDGE) routed in KafkaApis (core/src/main/scala/kafka/server/KafkaApis.scala:240,241; membership over SHARE_GROUP_HEARTBEAT at :238). ShareFetchRequest carries MaxWaitMs, MaxBytes (default 0x7fffffff), and, since v1, MaxRecords and BatchSize; acknowledgements piggyback on the same request as AcknowledgementBatches (clients/.../message/ShareFetchRequest.json). Each AcknowledgementBatch is a (firstOffset, lastOffset, []acknowledgeType) run, modelled client-side as ShareAcknowledgementBatch.

The leader returns AcquiredRecords ranges and the lease deadline: FirstOffset, LastOffset, DeliveryCount (int16), and a per-response AcquisitionLockTimeoutMs (v1+) (clients/.../message/ShareFetchResponse.json). Note the records themselves ride inside the normal fetch RecordBatch payload, AcquiredRecords is just the offset-range overlay describing what was leased.

Persistent: ShareSnapshotValue / ShareUpdateValue

Durable state is two coordinator-value record types in __share_group_state (topic name Topic.SHARE_GROUP_STATE_TOPIC_NAME = "__share_group_state", clients/.../internals/Topic.java:29). The snapshot is the complete state; the update is an incremental delta.

FieldTypeIn SnapshotIn UpdateNotes
SnapshotEpochint32Bumped per new snapshot; update reuses last snapshot's value.
StateEpochint32, Fencing epoch; updates carry no state epoch.
LeaderEpochint32Partition leader epoch at write time.
StartOffsetint64SPSO; -1 in an update means "not changing".
DeliveryCompleteCountint32 (tag 0)KIP-1226 lag input; default -1 (uninitialized).
CreateTimestamp/WriteTimestampint64, For cold-partition snapshotting/pruning.
StateBatches[]arrayRun-length state: see below.

Each StateBatch is FirstOffset (int64), LastOffset (int64), DeliveryState (int8, 0:Available, 2:Acked, 4:Archived), DeliveryCount (int16) (share-coordinator/.../message/ShareSnapshotValue.json:38). This run-length encoding is why whole-batch acknowledgement is cheap: a contiguous run of identically-stated offsets is one batch entry. In-memory these map to PersisterStateBatch, and to the immutable ShareGroupOffset container (ShareGroupOffset.java:34) which the coordinator keeps in its soft state.

Mechanics, the acquire algorithm

SharePartition.acquire (SharePartition.java:755) runs under the write lock and works at batch granularity: it always acquires whole fetched batches, leaving the client to report intra-batch gaps. The high-level steps:

  1. Bail out if the partition is not ACTIVE, if maxFetchRecords ≤ 0, or if the fetched data is entirely below startOffset (SharePartition.java:766,780).
  2. Compute the cap via lastOffsetAndMaxRecordsToAcquire, this enforces the in-flight limit (see below).
  3. maybeArchiveStaleBatches(fetchOffset, baseOffset): for compacted topics, any cached batch between the fetch offset and the first returned base offset has vanished from the log and is archived so the SPSO can advance (SharePartition.java:803; rationale at :733).
  4. Find the overlapping region with cachedState.floorEntry/subMap. If there is no overlap, acquireNewBatchRecords leases the fresh batch (SharePartition.java:858).
  5. If there is overlap, iterate the sub-map: batches/offsets currently AVAILABLE are re-leased (delivery count incremented), and gaps inside the persister read-gap window are filled with new acquisitions (SharePartition.java:874).

acquireNewBatchRecords (SharePartition.java:1767) decides the acquired range, splits it on BatchSize in BATCH_OPTIMIZED mode (createBatches:1838), inserts the new InFlightBatch (state ACQUIRED, delivery count 1) into cachedState, advances endOffset, schedules the acquisition-lock timer, and returns ShareAcquiredRecords with the count of offsets leased.

Note

Two acquire modes (client config share.acquire.mode, default batch_optimized, clients/.../ShareAcquireMode.java:26): BATCH_OPTIMIZED may split into BatchSize-sized AcquiredRecords for efficient acks but can over-deliver whole batches; RECORD_LIMIT honours MaxRecords precisely. Either way the InFlightBatch in the cache reflects whole batches.

The next-fetch-offset computation

nextFetchOffset (SharePartition.java:608) returns where the leader should read next. The fast path returns endOffset (or endOffset+1) when nothing is re-deliverable. When findNextFetchOffset is set, by a RELEASE, a lock timeout, or a session close, it scans cachedState for the first AVAILABLE offset that has no in-progress transition, also accounting for gaps recorded during the initial persister read. If no available record is found, the flag is cleared and the offset reverts to endOffset+1.

Mechanics, acknowledgement & the persist protocol

acknowledge (SharePartition.java:1016) takes a list of ShareAcknowledgementBatch. Under the write lock it resolves each batch to its sub-map of cachedState, validates ownership/type, and for each affected batch/offset calls startStateTransition, which both attempts the new state and records a rollbackState snapshot (InFlightState.java:203). The actual durable write and final commit happen in rollbackOrProcessStateUpdates (SharePartition.java:2528):

  1. If local validation already failed, every started transition is rolled back via completeStateTransition(false) and the future fails (SharePartition.java:2540).
  2. Otherwise writeShareGroupState(stateBatches) is called, an RPC to the share coordinator (SharePartition.java:2858).
  3. On RPC success, every transition is committed with completeStateTransition(true), the SPSO is advanced (maybeUpdateCachedStateAndOffsets), and any delayed share-fetch waiting on this partition is notified (SharePartition.java:2597,2608).
  4. On RPC failure, transitions roll back to ACQUIRED, unless the acquisition lock expired in the meantime, in which case the rollback target is AVAILABLE/ARCHIVED (InFlightState.completeStateTransition:219; SharePartition.java:2578).
Invariant, persist before commit

An acknowledgement is reflected in the in-memory cache as committed only after the share coordinator has durably persisted the new state. A coordinator write failure leaves the record back in ACQUIRED (still leased) or, if the lease lapsed concurrently, AVAILABLE. The cache and the durable log never silently diverge on the happy path.

Advancing the SPSO

maybeUpdateCachedStateAndOffsets (SharePartition.java:2643) is the garbage collector. After checking canMoveStartOffset (the offset at SPSO is terminal and has no in-progress transition:2728), it calls findLastOffsetAcknowledgedAndMetadata to find the highest contiguous terminal offset (:2794). If all cached records are terminal, the cache is cleared and startOffset=endOffset=lastCachedOffset+1. Otherwise only full terminal batches below the boundary are removed; a batch partially acknowledged keeps its entry while startOffset still advances into it. The terminal-record tally is decremented from deliveryCompleteCount as records leave the cache.

Gotcha

SPSO only advances over a contiguous prefix of terminal records. A single un-acknowledged record (e.g. acquired by a stuck consumer, or sitting in a compaction/initialization gap) at the head pins the SPSO and prevents the log from aging out beneath it, even though thousands of later records may be acknowledged. This is the share-group analogue of consumer-group "stuck lag".

Mechanics, the acquisition lock & timeout

When records are acquired, scheduleAcquisitionLockTimeout (SharePartition.java:2941) adds an AcquisitionLockTimerTask to the timer with delay = the group's record-lock duration (dynamic share.record.lock.duration.ms, else the broker default; resolved in configProvider.recordLockDurationMsOrDefault:2929). The task stores its absolute expirationMs and the (memberId, firstOffset, lastOffset) it covers.

On expiry (AcquisitionLockTimerTask.run, AcquisitionLockTimerTask.java:68) it sets a volatile hasExpired=true before invoking the handler, so a racing acknowledgement can detect that the lease already lapsed, then calls releaseAcquisitionLockOnTimeout (SharePartition.java:2961). Under the write lock that handler walks the affected sub-map, transitions each still-ACQUIRED offset back to AVAILABLE (or ARCHIVING/ARCHIVED if the delivery count is exhausted), persists the change, then advances the SPSO and, for any ARCHIVING batches, kicks off the DLQ flow.

acquire ⇒ ACQUIREDmember M · lockTimerTask set for t + lockMs
ack before t + lockMs?
ack handler runscancels timer task · state persisted
timer fires ⇒ hasExpired=truehandler runs under write lock
deliveryCount ≥ max?
AVAILABLEre-deliverable to anyone
ARCHIVED
ARCHIVING ⇒ DLQ ⇒ ARCHIVED
writeShareGroupStatefindNextFetchOffset=true · advance SPSO
Lifecycle of one acquisition lease. An acknowledgement before the lease expires cancels the timer and persists the chosen state; otherwise the timer fires, marks the lease expired, and, under the write lock, either returns the record to AVAILABLE (count below max) or archives it directly (DLQ off) or via the ARCHIVING→DLQ flow (DLQ on), then persists and advances the SPSO.
broker / ack handler timer · lease · waiting durable state write archive / drop path flow lease expired (timeout) rounded italic = decision · cylinder = log/store
Caution

Because the lock timeout returns records to AVAILABLE for any member, a consumer that processes a record but is too slow to acknowledge before the lease lapses can cause a duplicate delivery to another consumer. Share groups are at-least-once: applications must tolerate redelivery (idempotent processing), and the lock duration should comfortably exceed worst-case per-record processing time.

Concurrency & threading

Thread / actorTouchesSynchronization
Request-handler (KafkaApis) threads calling acquire/acknowledgecachedState, offsets, delivery countSharePartition.lock write lock; per-partition fetchLock serializes concurrent fetches
Timer (hashed-wheel) thread firing lock timeoutsSame cache; transitions ACQUIRED→AVAILABLE/ARCHIVEDRe-checks timerTask.isCancelled() / hasExpired under the write lock
Persister callback threads (RPC completion)Commit/rollback of started transitions, SPSO advancelock write lock inside whenComplete; commit gated on rollbackState
Share coordinator runtime threadsCoordinator soft state (shareStateMap)Single-writer per partition via CoordinatorRuntime event loop (share.coordinator.threads, default 1)
ShareGroupDLQSendThread (one per broker DLQ manager)DLQ produce/create-topic RPC batchingInterBrokerSendThread; nodeMapLock guards the coalescing map
Client application thread (poll)currentFetch, acknowledgementsShareConsumerImpl single-threaded access guard (acquireAndEnsureOpen)
Client background/network threadSends ShareFetch/ShareAcknowledge, heartbeatsEvent queue between app and network threads (ShareConsumeRequestManager)

The two AtomicReference/AtomicInteger fields earn their keep: fetchLock stops the same share-partition entering the delayed-fetch purgatory twice concurrently, and deliveryCompleteCount is read for lag without taking the big lock. The rollbackState field plus the hasOngoingStateTransition() guard form a lightweight per-record mutex: a second acknowledgement of a record whose first transition has not yet committed is rejected (InFlightState.java:153) rather than corrupting state.

The share coordinator & persister path

Share-partition leaders never write __share_group_state directly. They go through the persister SPI (Persister interface), whose production implementation is DefaultStatePersister (server-common/.../persister/DefaultStatePersister.java:46, configured by group.share.persister.class.name). It delegates to PersisterStateManager, an inter-broker RPC client that (a) finds the share coordinator for the key via a FindCoordinator-style lookup, then (b) issues WriteShareGroupState/ReadShareGroupState/InitializeShareGroupState/DeleteShareGroupState RPCs, batching by destination node and RPC type (PersisterStateManager.java:90,110). A NoOpStatePersister exists for tests/disabled state.

The owning coordinator is chosen by hashing the share-partition key onto the state topic: ShareCoordinatorService.partitionFor returns Utils.abs(key.asCoordinatorKey().hashCode()) % numPartitions (ShareCoordinatorService.java:284), and the request lands on the leader of that __share_group_state partition. The state topic defaults to 50 partitions, replication factor 3, min ISR 2 (ShareCoordinatorConfig.java:38,42,47).

Snapshot/update compaction inside the coordinator

ShareCoordinatorShard.writeState (ShareCoordinatorShard.java:316) handles a single key per call. generateShareStateRecord (:648) chooses the record type: if the number of update records written since the last snapshot has reached share.coordinator.snapshot.update.records.per.snapshot (default 500, ShareCoordinatorConfig.java:58) it writes a fresh ShareSnapshotValue with snapshotEpoch+1, merging the accumulated batches; otherwise it writes a lightweight ShareUpdateValue delta. On replay, handleShareSnapshot replaces the soft state and handleShareUpdate merges the delta and bumps the per-key update counter (ShareCoordinatorShard.java:245,272).

Overlapping/adjacent state batches are normalized by PersisterStateBatchCombiner before persistence (ShareCoordinatorShard.java:729), keeping the run-length encoding minimal, without it, repeated partial acks would fragment the batch list unboundedly. Two background mechanisms keep the log healthy: cold-partition force-snapshotting (share.coordinator.cold.partition.snapshot.interval.ms, default 5 min) and state-topic pruning of records below the SPSO (share.coordinator.state.topic.prune.interval.ms, default 5 min).

share consumer
SharePartitionleader broker · in-flight state engine
DefaultStatePersisterPersisterStateManager, leader broker
ShareCoordinatorShard.writeStateleader of __share_group_state[partitionFor(key)]
Nth write since snapshot?
ShareSnapshotValuefull state, snapshotEpoch+1
ShareUpdateValueincremental delta
__share_group_state log
The durable-state write path. A share-partition leader never writes the state topic itself: it calls the persister SPI, which finds the owning ShareCoordinator and issues WriteShareGroupState. The coordinator then appends either a full ShareSnapshotValue (every Nth write) or a lightweight ShareUpdateValue delta to the __share_group_state log.
client broker (SharePartition · persister) share coordinator coordinator record (snapshot / update) internal log topic data flow rounded italic = decision · cylinder = log/store
Invariant, recovery

On (re)assignment the leader rebuilds SharePartition by calling persister.readState in maybeInitialize (SharePartition.java:426), seeding startOffset, stateEpoch and the in-flight batches from the latest snapshot + replayed updates. Records that were merely ACQUIRED (leased, not acknowledged) are not persisted as such, after a leader change they reappear as AVAILABLE and may be redelivered. Acknowledged/archived terminal states survive.

Dead-letter queue (KIP-1191)

When DLQ support is enabled (the broker's share.version feature reaches SV_2) and the group has a DLQ topic configured (isDLQEnabledForGroup, SharePartition.java:3356), records that would otherwise be archived for exceeding the delivery limit instead pass through ARCHIVING and are produced to a DLQ topic. The group config keys are errors.deadletterqueue.topic.name (default empty = no DLQ) and errors.deadletterqueue.copy.record.enable (default false) (GroupConfig.java:113,117).

ShareGroupDLQStateManager (server/.../dlq/ShareGroupDLQStateManager.java:88) owns the write path on its own InterBrokerSendThread. For each ShareGroupDLQRecordParameter (offset range, group, delivery count, cause) it: validates/creates the DLQ topic (a CreateTopics RPC if absent and auto-create is enabled), then produces records to partition % numDlqPartitions with acks=-1. It coalesces multiple pending produce handlers destined for the same node into one ProduceRequest (coalesceProduceRequests:922). Each DLQ record gets headers describing the original location: __dlq.errors.topic, ...partition, ...offset, ...group, ...delivery.count, ...message (ShareGroupDLQStateManager.java:242). When copy-record is enabled the original key/value are fetched from the source log (maybeFetchRecordData:690) and copied; otherwise only the context headers are written. Only after the DLQ produce succeeds does the source record move ARCHIVING → ARCHIVED; the DLQ manager retries with exponential backoff up to MAX_REQUEST_ATTEMPTS = 5 (:98).

Gotcha

The two-phase ARCHIVING → ARCHIVED design (KIP-1191) is resumable: a leader that restarts mid-DLQ reads back batches in ARCHIVING state and restarts phase 2 (SharePartition.java:457, the dlqBatches handling in maybeInitialize). This avoids losing a record that was destined for the DLQ but not yet produced, at the cost of a possible duplicate DLQ write after a crash.

The share consumer client

ShareConsumerImpl.poll (clients/.../ShareConsumerImpl.java:597) drives the cycle on the application thread: process background errors, handle completed acknowledgements, then, depending on mode, either implicitly acknowledge the previously returned batch or assert all in-flight records were explicitly acknowledged, before fetching the next batch. Acknowledgements are not a separate user step in the default mode; they piggyback on the next fetch.

The acknowledgement mode is client config share.acknowledgement.mode with values implicit (default behaviour: poll auto-acks the prior batch as ACCEPT) or explicit (the app must call acknowledge(record, type) for every record before the next poll) (ShareAcknowledgementMode.java:29; ShareConsumerImpl.java:796,804,817). In explicit mode, commitSync/commitAsync flush acknowledgements to the broker (ShareConsumerImpl.java:831,886).

ShareConsumeRequestManager (ShareConsumeRequestManager.java:76) on the network thread keeps a per-node map fetchAcknowledgementsToSend and attaches pending acks to the next ShareFetch to that node, or sends a standalone ShareAcknowledge when there is no fetch to ride on (:181,1285). Share-group membership is a separate concern over the SHARE_GROUP_HEARTBEAT RPC handled by ShareMembershipManager/ShareHeartbeatRequestManager, there is no eager rebalance; members come and go and the assignment is computed server-side (the modern protocol of chapter 13).

Configuration reference

Broker/group-level (ShareGroupConfig, with per-group dynamic overrides in GroupConfig):

KeyDefaultEffect
group.share.record.lock.duration.ms30000Acquisition-lock lease length; dynamic per-group override share.record.lock.duration.ms.
group.share.min/max.record.lock.duration.ms15000 / 60000Bounds on the per-group override.
group.share.delivery.count.limit5Max delivery attempts before archive/DLQ (range 2–10).
group.share.min/max.delivery.count.limit2 / 10Bounds on the per-group override (override max range 5–25).
group.share.partition.max.record.locks2000Max in-flight (locked) records per share-partition, the concurrency cap.
group.share.min/max.partition.max.record.locks100 / 4000Bounds on the per-group override.
group.share.max.share.sessions2000Max share sessions per broker.
group.share.persister.class.nameDefaultStatePersisterPersister SPI implementation (internal).
share.auto.offset.reset (group)latestInitial SPSO strategy when none persisted.
share.isolation.level (group)read_uncommittedWhether to skip aborted transactional records when acquiring.
errors.deadletterqueue.topic.name (group)"" (off)DLQ topic; empty disables the DLQ.
errors.deadletterqueue.copy.record.enable (group)falseCopy original key/value to DLQ vs. headers only.

Share-coordinator level (ShareCoordinatorConfig):

KeyDefaultEffect
share.coordinator.state.topic.num.partitions50Partitions of __share_group_state (fixed after deploy).
share.coordinator.state.topic.replication.factor / .min.isr3 / 2Durability of the state topic.
share.coordinator.snapshot.update.records.per.snapshot500Update records between full snapshots (range 0–500).
share.coordinator.threads1Coordinator runtime threads.
share.coordinator.write.timeout.ms5000Timeout for a state write.
share.coordinator.cold.partition.snapshot.interval.ms300000Force-snapshot idle share-partitions.
share.coordinator.state.topic.prune.interval.ms300000Prune state-topic records below SPSO.

Client (ConsumerConfig for the share consumer): share.acknowledgement.mode (default implicit), share.acquire.mode (default batch_optimized) (ConsumerConfig.java:375,386,391).

Failure modes, edge cases & recovery

  • Consumer crash / slow ack. The acquisition lock expires; records return to AVAILABLE and are redelivered (or archived/DLQ'd if exhausted). No manual intervention; this is the queue's liveness mechanism.
  • Coordinator write fails. The acknowledgement is not committed; the record stays ACQUIRED (or AVAILABLE if the lease lapsed). The persister surfaces coordinator errors as retryable (CoordinatorNotAvailableException, NotLeaderOrFollowerException for fenced epochs) via fetchPersisterError (SharePartition.java:2906).
  • Leader change. The new leader re-reads durable state; non-terminal leases evaporate to AVAILABLE; partially-acknowledged batches with gaps are reconstructed and may produce an initial read-gap window that the engine fills before advancing the SPSO (SharePartition.java:288,2691).
  • Compaction. Batches that vanish from the log are detected on the next acquire and archived so the SPSO is not pinned (maybeArchiveStaleBatches); intra-batch holes are reported by the client.
  • Fenced state epoch. A stale leader/persister write is rejected by the coordinator; the partition transitions toward FENCED and is re-initialized.
  • Duplicate acknowledgement. A second ack for an offset with an in-progress transition is dropped (hasOngoingStateTransition()), preventing double state changes from a misbehaving or retrying client.

Invariants & guarantees

  • At-least-once. Every record below the SPSO has reached a terminal state; every record above may be (re)delivered. There is no exactly-once for share-group consumption, contrast chapter 14.
  • No ordering guarantee across consumers. Cooperative consumption of one partition by many members means records are processed out of partition order; per-key ordering is not preserved.
  • Bounded in-flight. At most partition.max.record.locks records are simultaneously leased per share-partition.
  • Bounded redelivery. At most delivery.count.limit deliveries per record before archive/DLQ.
  • Durable terminal state. Acknowledged/archived decisions survive broker and leader restarts via __share_group_state.

Share groups vs. consumer groups vs. transactions

DimensionConsumer group (ch 13/17)Share group (this chapter)Transactions / EOS (ch 14)
Partition → consumer1:1 (exclusive)N:M (cooperative on same partition)n/a (producer concern)
Progress unitOne committed offset / partitionPer-record state + delivery countProducer txn markers + offsets
Delivery semanticsAt-least / at-most-once (commit timing)At-least-once with redeliveryExactly-once
Redelivery controlManual rewind of offsetLock timeout + RELEASE + max attempts + DLQAbort / retry whole txn
Durable state topic__consumer_offsets__share_group_state__transaction_state
Broker memory modelOffset mapIn-flight record state cache (SharePartition)Producer/txn state

Design rationale & evolution

Design rationale

Share groups (KIP-932, "Queues for Kafka") add point-to-point queue semantics without abandoning Kafka's log: records stay in the normal topic log, and only lightweight per-record delivery state lives in a compacted internal topic. Keeping the in-flight state on the partition leader (not in the client) is what allows more consumers than partitions and centralized, fenced redelivery. KIP-1226 added DeliveryCompleteCount to the persisted state so operators can compute share-partition lag. KIP-1191 introduced the transient ARCHIVING state and the dead-letter-queue flow so that records exhausting their delivery budget can be inspected rather than silently dropped.

The feature shipped as a preview in 4.1 and matured through 4.2; this 4.4-SNAPSHOT tree contains the full set including DLQ, lag, and the batch-vs-record acquire modes.

Gotchas & operational notes

  • Lock duration vs. processing time. Set share.record.lock.duration.ms above the p99 processing time per record; too short causes spurious redelivery, too long delays recovery from a dead consumer.
  • SPSO pinning. Monitor share-partition lag (KIP-1226): a non-advancing SPSO indicates a head-of-line record stuck in repeated redelivery, often a poison record that should land in the DLQ.
  • State-topic sizing. The __share_group_state topic's partition count is effectively fixed; size it for the number of share-partitions and ack throughput, since every ack flush is a coordinator write.
  • DLQ topic must not start with __ and, if not auto-created, must exist with DLQ enabled, otherwise the DLQ flow fails validation (ShareGroupDLQStateManager.java:417,424).
  • Implicit vs. explicit acks. In the default implicit mode, simply re-polling acknowledges the prior batch as ACCEPT; to RELEASE/REJECT individual records you must switch to explicit mode and call acknowledge(...).
  • Not exactly-once. Do not treat share groups as a drop-in for transactional consume-process-produce; design consumers to be idempotent.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.