13 · Group Coordination & Rebalance Protocols
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
The group coordinator is the broker-resident subsystem that decides who consumes what and remembers how far each consumer has read. In modern Kafka it is a pure-Java replicated state machine: each of the __consumer_offsets partitions hosts one coordinator shard, group and offset mutations are written as records to that partition's log and replayed to rebuild in-memory state, and a request's response is withheld until the record that produced it has been replicated past the partition high watermark. This chapter dissects the runtime (CoordinatorRuntime), the shard (GroupCoordinatorShard) and its two managers (GroupMetadataManager, OffsetMetadataManager), and then the two membership protocols they implement: the legacy classic protocol (JoinGroup/SyncGroup, generations, client-side assignors, eager vs cooperative rebalancing) and the modern consumer group protocol of KIP-848 (incremental server-side heartbeat reconciliation driven by epochs).
Role & responsibilities
The group coordinator owns four intertwined concerns, all keyed by a groupId that is hashed to exactly one __consumer_offsets partition (its coordinator):
- Membership, tracking which consumers belong to a group, detecting their liveness via heartbeats and session timeouts, and admitting/fencing members.
- Assignment, deciding which topic-partitions each member should own and driving the group from one assignment to the next without two members ever holding the same partition simultaneously.
- Committed offsets, persisting and serving the last-consumed offset per
(group, topic, partition), including transactional offset commits (see Transactions & Exactly-Once Semantics), and expiring them under a retention policy. - Group lifecycle, creating, listing, describing and deleting groups, and migrating a group between the classic and consumer protocols.
The same machinery also backs Share Groups (KIP-932) and Streams groups (KIP-1071); this chapter focuses on consumer groups and references the others where the shared code makes that natural.
There is no separate coordinator database. The coordinator's durable state is the __consumer_offsets log. Every mutation is a record; in-memory state is a deterministic replay of those records; failover is "load the partition and replay". The runtime guarantees a response is only released once its records are committed (replicated), so a client that sees success can rely on it surviving coordinator failover.
Where it lives in the code
| Concern | Principal class | File |
|---|---|---|
| Front-door service (one per broker) | GroupCoordinatorService | group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java |
| Replicated state machine (one per partition) | GroupCoordinatorShard | .../group/GroupCoordinatorShard.java |
| Group state & membership | GroupMetadataManager | .../group/GroupMetadataManager.java (9,437 lines) |
| Committed offsets | OffsetMetadataManager | .../group/OffsetMetadataManager.java |
| Record (de)serialization helpers | GroupCoordinatorRecordHelpers, GroupCoordinatorRecordSerde | .../group/GroupCoordinatorRecordHelpers.java |
| Classic group + state enum | ClassicGroup, ClassicGroupState | .../group/classic/ClassicGroup.java |
| Consumer group + member | ConsumerGroup, ConsumerGroupMember | .../group/modern/consumer/ConsumerGroup.java |
| KIP-848 reconciliation engine | CurrentAssignmentBuilder | .../group/modern/consumer/CurrentAssignmentBuilder.java |
| Target assignment computation | TargetAssignmentBuilder | .../group/modern/TargetAssignmentBuilder.java |
| Server-side assignors | UniformAssignor, RangeAssignor | .../group/assignor/ |
| Generic coordinator runtime | CoordinatorRuntime | coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java (2,540 lines) |
| Config keys & defaults | GroupCoordinatorConfig | .../group/GroupCoordinatorConfig.java |
Core concepts & terminology
- Coordinator (for a group)
- The broker that leads the
__consumer_offsetspartitionp = abs(groupId.hashCode()) % numPartitions. Computed inGroupCoordinatorService.partitionFor(GroupCoordinatorService.java:424). - Shard
- The in-memory state machine for one such partition: a
GroupCoordinatorShardwrapping aGroupMetadataManager+OffsetMetadataManager, hosted by aCoordinatorRuntime.CoordinatorContext. - Record / replay
- A key/value pair (a
CoordinatorRecord) written to the log. The samereplaypath applies records during request handling and during initial loading (GroupCoordinatorShard.replay,GroupCoordinatorShard.java:1163). - Generation (classic)
- A monotonically increasing
intper group; bumped on every rebalance. Stale generations are rejected withILLEGAL_GENERATION. - Group epoch / target-assignment epoch / member epoch (KIP-848)
- Three integers that replace the classic generation. The group epoch advances when membership or subscription changes; the target-assignment epoch advances when a new assignment is computed; each member's epoch tracks how far it has reconciled.
- Hard vs soft state
- Hard state is what the replay methods build (durable, snapshot-versioned). Soft state is request-handling scratch. The shard javadoc draws this distinction explicitly (
GroupCoordinatorShard.java:145).
The runtime: a partition as a replicated state machine
GroupCoordinatorService constructs a single CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> (GroupCoordinatorService.java:272). The runtime manages a map of CoordinatorContext, one per __consumer_offsets partition the broker leads. Each context carries (CoordinatorRuntime.java:398 onward):
- a
ReentrantLockguarding all mutable fields of the context; - a
SnapshotRegistryand theGroupCoordinatorShardbuilt on top of it (all hard state lives intimelinedata structures backed by this registry, so reads can be served at an arbitrary committed offset); - a
CoordinatorState,INITIAL → LOADING → ACTIVE, withFAILED/CLOSEDas escape states; legal transitions are encoded per-enum-constant incanTransitionFrom(CoordinatorRuntime.java:265); - a
DeferredEventQueueordering pending responses by the log offset they depend on; - a
HighWatermarkListenerregistered on thePartitionWriterfor that partition.
The write path
replay before the append; the response is released only after replication.__consumer_offsets log
The single-threaded-per-partition discipline is what makes the handlers lock-free. A write handler is a pure function of current state to (records, response); it never blocks and never appends directly. The runtime applies the records through the same replay method used at load time (so the in-memory state always tracks "what the log will say"), accumulates them into a batch, and appends the batch to the log via the PartitionWriter (CoordinatorRuntime.java:712). The returned offset is checked against the expected next offset; a mismatch means the state machine has diverged from the log, and the runtime force-transitions FAILED → LOADING to reload from scratch (CoordinatorRuntime.java:721).
A client future is completed only after the high watermark of the coordinator partition reaches the offset of the record that produced its response (HighWatermarkListener.onHighWatermarkUpdated → deferredEventQueue.completeUpTo, CoordinatorRuntime.java:1830). Therefore any acknowledged commit, join, or assignment survives the loss of the coordinator broker, a new coordinator replays the same committed records and reconstructs identical state.
Read path and snapshots
Reads (e.g. OffsetFetch, ListGroups, *Describe) use scheduleReadOperation/scheduleReadAllOperation and receive a lastCommittedOffset; the handler queries the timeline structures at that snapshot offset (GroupCoordinatorService.listGroups at GroupCoordinatorService.java:1113; GroupCoordinatorShard.fetchOffsets at GroupCoordinatorShard.java:817). This yields read-your-committed-writes semantics without locking against the writer thread.
Batching & linger
Records are accumulated into a currentBatch and flushed either when the batch fills or after an append-linger interval (group.coordinator.append.linger.ms, default -1 = adaptive). An adaptive flush is scheduled as a normal event so it serializes with writes (enqueueAdaptiveFlush, CoordinatorRuntime.java:668). Transactional writes are never accumulated, they flush immediately.
The shard and its managers
GroupCoordinatorShard implements CoordinatorShard<CoordinatorRecord>. Its request handlers are thin delegations to the two managers; its replay(offset, producerId, producerEpoch, record) is a big switch over CoordinatorRecordType that dispatches each record to the correct manager's typed replay overload (GroupCoordinatorShard.java:1163). Lifecycle hooks onLoaded/onUnloaded register or cancel the periodic timers (group-metadata/offset expiration and the group-size gauge) and activate the metrics shard (GroupCoordinatorShard.java:1102).
The full set of record types the shard understands (each = one apiKey in the __consumer_offsets namespace):
| apiKey | Record | Purpose |
|---|---|---|
| 1 | OffsetCommit (Key/Value) | A committed offset for (group,topic,partition). |
| 2 | GroupMetadata | Classic group: generation, leader, protocol, all member subscriptions+assignments. |
| 3 | ConsumerGroupMetadata | Consumer group epoch + metadata hash. |
| 4 | ConsumerGroupPartitionMetadata | (Legacy ≤4.0) subscription metadata; now tombstoned in favour of the hash. |
| 5 | ConsumerGroupMemberMetadata | A consumer-group member's subscription & config. |
| 6 | ConsumerGroupTargetAssignmentMetadata | Target-assignment epoch + computation timestamp. |
| 7 | ConsumerGroupTargetAssignmentMember | One member's slice of the target assignment. |
| 8 | ConsumerGroupCurrentMemberAssignment | A member's current reconciled assignment + epoch + state. |
| 16 | ConsumerGroupRegularExpression | A resolved RE2 subscription regex and the topics it matched. |
| 9–15, 17+ | Share & Streams records | Analogous records for share/streams groups. |
Persistent record layouts (field level)
OffsetCommit, apiKey 1
The bread-and-butter record. Key uniquely identifies the slot; value carries the offset and bookkeeping. Schemas from OffsetCommitKey.json / OffsetCommitValue.json:
| Component | Field | Type | Notes |
|---|---|---|---|
| Key (v0) | group | string | Consumer group id. |
topic | string | Topic name (offsets are keyed by name, not id). | |
partition | int32 | Partition index. | |
| Value (v0–4) | offset | int64 | The next offset to consume. |
leaderEpoch | int32 (v3+, default −1) | Leader epoch of the last consumed record; enables log-truncation detection on the consumer. | |
metadata | string | Opaque client metadata; bounded by offset.metadata.max.bytes. | |
commitTimestamp | int64 | When the commit was written; drives retention. | |
expireTimestamp | int64 (v1 only, default −1) | Legacy explicit expiry. | |
topicId | uuid (tagged, v4+) | Topic id; used to avoid wiping offsets across a same-name topic re-creation. |
A null value is a tombstone, replaying it deletes the offset (OffsetMetadataManager.replay, OffsetMetadataManager.java:1149). LegacyOffsetCommitKey/Value (an older apiKey) are converted on replay into the modern shapes by convertLegacyOffsetCommitKey/Value (GroupCoordinatorShard.java:1132).
GroupMetadata (classic), apiKey 2
A single record holds the entire classic group: protocolType, generation, selected protocol, leader, currentStateTimestamp, and an array of MemberMetadata each with memberId, optional groupInstanceId (static membership), clientId/clientHost, rebalanceTimeout, sessionTimeout, and two opaque byte blobs, subscription (the member's Subscription as serialized by the client assignor) and assignment (the partitions the leader gave it). Because the assignment is opaque bytes, the broker does not interpret classic assignments; the client-side assignor does.
Consumer-group records (KIP-848)
The consumer group spreads its state across many small records so a single member change rewrites only that member's records, not the whole group:
| Record | Salient fields |
|---|---|
ConsumerGroupMetadataValue | Epoch (int32, the group epoch); MetadataHash (int64, a hash of all subscribed topics' metadata, replaces the old subscription-metadata record). |
ConsumerGroupMemberMetadataValue | InstanceId?, RackId?, ClientId, ClientHost, SubscribedTopicNames[], SubscribedTopicRegex?, RebalanceTimeoutMs, ServerAssignor?, and a nullable ClassicMemberMetadata (SessionTimeoutMs + SupportedProtocols[]), non-null only for a classic-protocol member living inside a consumer group. |
ConsumerGroupTargetAssignmentMetadataValue | AssignmentEpoch (int32); AssignmentTimestamp (int64, tagged), when the assignor last ran, used to throttle recomputation. |
ConsumerGroupTargetAssignmentMemberValue | TopicPartitions[] = list of { TopicId (uuid), Partitions (int32[]) }, the desired assignment for one member. |
ConsumerGroupCurrentMemberAssignmentValue | MemberEpoch, PreviousMemberEpoch, State (int8 → MemberState), AssignedPartitions[], PartitionsPendingRevocation[]; each TopicPartitions additionally carries a tagged AssignmentEpochs[] array, the epoch at which each partition was assigned, used to fence zombie commits. |
PreviousMemberEpoch exists for idempotency: if an epoch-bump response is lost in transit, the member retries its heartbeat with the previous epoch and the coordinator recognizes it rather than fencing it.
The classic protocol
The classic protocol predates KIP-848 and remains the fallback for older clients and for Kafka Streams/Connect components not yet on the new protocol. It is a client-coordinated protocol: the broker orchestrates a barrier but the elected group leader (a client) computes the assignment.
Group state machine
A ClassicGroup moves through five states (ClassicGroupState, ClassicGroupState.java:27); valid predecessors are enforced statically in the enum's static block:
ClassicGroupState). The straight path is EMPTY → PREPARING_REBALANCE → COMPLETING_REBALANCE → STABLE; the lower transitions list the back-edges and the universal exit to DEAD. PREPARING_REBALANCE parks all join requests until the barrier closes; COMPLETING_REBALANCE parks follower SyncGroups until the leader posts the assignment.transitionTo asserts the predecessor is legal and stamps currentStateTimestamp (ClassicGroup.java:995). The state determines how each RPC is answered: e.g. heartbeats and SyncGroups in either rebalance state return REBALANCE_IN_PROGRESS, telling the client to (re)join.
The rebalance dance: JoinGroup → SyncGroup
group.initial.rebalance.delay.ms or all joined · selectProtocol() by vote · leader = first to joinMechanics worth noting:
- Member-id allocation. A dynamic member first joins with an empty
memberId; if the request version requires a known id, the coordinator allocates aUuidand repliesMEMBER_ID_REQUIRED, forcing a second join with that id (classicGroupJoinToConsumerGroupshows the same pattern atGroupMetadataManager.java:2637). - Leader election. The first member admitted becomes leader;
leaderIdis set when the member list was empty (ClassicGroup.java:458).maybeElectNewJoinedLeaderre-elects if the current leader did not re-join (ClassicGroup.java:510). - Protocol selection.
selectProtocol()computes the candidate protocols supported by all members, then lets each membervoteamong candidates and picks the plurality winner (ClassicGroup.javaaround line 1010). This is how a group agrees on, say,rangevscooperative-sticky. - Session & rebalance timeouts. The session timeout (validated against
group.min/max.session.timeout.ms, defaults 6000/1800000 ms, inGroupCoordinatorService.joinGroupatGroupCoordinatorService.java:935) bounds how long a member may be silent; the rebalance timeout bounds how long the coordinator waits for everyone to (re)join. - Heartbeats. Between rebalances,
HeartbeatRPCs reset the session timer. If the group is mid-load, the coordinator blindly answersNONEso clients do not storm it (GroupCoordinatorService.heartbeat,GroupCoordinatorService.java:1037).
Eager vs cooperative-incremental rebalancing (client side)
Within the classic protocol the assignor chosen by the group dictates revocation behaviour:
- Eager (e.g.
RangeAssignor,RoundRobinAssignor): on every rebalance each consumer revokes all partitions before re-joining, a global stop-the-world.RangeAssignorassigns contiguous partition ranges so co-subscribed topics co-partition (clients/.../RangeAssignor.java). - Cooperative incremental (KIP-429,
CooperativeStickyAssignor): the consumer keeps partitions it will retain and only revokes the ones being moved away, across two rebalances. It advertises both protocols,supportedProtocols()returns[COOPERATIVE, EAGER], and stitches the previous generation into its user-data so the assignor can be sticky (clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java:52,:67).
Cooperative rebalancing (KIP-429) was the first attack on stop-the-world rebalances: only moved partitions pause. KIP-848 finishes the job by removing the global synchronization barrier altogether and moving assignment to the broker, confirmed GA in 4.0 with server-side assignors selected via group.consumer.assignors (default uniform, also range).
The consumer group protocol (KIP-848)
The new protocol replaces JoinGroup/SyncGroup with a single repeated RPC, ConsumerGroupHeartbeat, that doubles as join, sync, poll-for-assignment, and liveness probe. There is no leader and no client-side assignor: the broker computes the target assignment and each member incrementally reconciles toward it. Coordination becomes a conversation about three epochs.
Three epochs
- Group epoch
- Bumped whenever the group's inputs change, a member joins/leaves, a subscription changes, a server-assignor preference changes, or topic metadata (the metadata hash) changes. Persisted in
ConsumerGroupMetadataValue.Epoch. - Target-assignment epoch
- The group epoch for which the current target assignment was computed. When it lags the group epoch, the assignor must run again. Persisted in
ConsumerGroupTargetAssignmentMetadataValue. - Member epoch
- How far an individual member has reconciled. A member is "fully reconciled" when its epoch equals the target-assignment epoch. Persisted in
ConsumerGroupCurrentMemberAssignmentValue.MemberEpoch.
Heartbeat handler, the three steps
GroupMetadataManager.consumerGroupHeartbeat (the dispatcher at GroupMetadataManager.java:5290, then the worker at :2441) executes a fixed pipeline for every non-leave heartbeat:
- Create/update the member & maybe bump the group epoch. A new
ConsumerGroupMemberis built from the request (instance id, rack, rebalance timeout, server assignor, subscription). If the subscription, regex, or preferred assignor changed, or the metadata hash changed, the group epoch is bumped and a freshConsumerGroupMetadataValuerecord is written (updateSubscriptionMetadata,GroupMetadataManager.java:3982; the hash check & bump at:4019–:4032). A member-metadata record is written if the member is new or changed. - Maybe recompute the target assignment. If
assignmentEpoch < groupEpochand enough time has elapsed since the last computation (group.consumer.assignment.interval.ms, default 1000 ms; gate incanComputeNextTargetAssignment,:4059), the chosen server assignor runs over the whole group viaTargetAssignmentBuilder.ConsumerTargetAssignmentBuilder(:4093). The delta is persisted as per-member target-assignment records plus a new target-assignment-metadata record. - Reconcile this member.
maybeReconcile(:3796) short-circuits if the member is already reconciled to the target epoch and its subscription is unchanged; otherwise it runs theCurrentAssignmentBuilderstate machine (below) and, if the member changed, writes aConsumerGroupCurrentMemberAssignmentValuerecord.
Finally the handler reschedules the session timer (:2583) and builds the response: memberId, memberEpoch, heartbeatIntervalMs, and, only when the member is new, sent a full request, or its assigned partitions changed, the new assignment (:2597).
The reconciliation engine: member state machine
CurrentAssignmentBuilder (CurrentAssignmentBuilder.java:41) is the heart of KIP-848 safety. It moves a member among four MemberState values (modern/MemberState.java): STABLE(0), UNREVOKED_PARTITIONS(1), UNRELEASED_PARTITIONS(2), UNKNOWN(127). The core computation (computeNextAssignment, :368) partitions the member's view into three sets per topic:
newAssigned = currentAssigned ∩ target
pendingRevocation = currentAssigned − newAssigned
pendingAssignment = target − newAssigned − (partitions still owned by others)
CurrentAssignmentBuilder.computeNextAssignment). A member never bumps to the target epoch while it still owes a revocation; it never claims a partition still owned by another member.Two safety properties fall out of this directly:
- Revoke-before-assign. While any partition must be revoked, the member stays on its current epoch in
UNREVOKED_PARTITIONSand the response asks it to drop those partitions. Only after the next heartbeat reports (viaownedTopicPartitions) that the partitions are gone doesownsRevokedPartitionsreturn false (:270) and the member advance. - No double ownership. A target partition still owned by another member (its
currentPartitionEpoch != -1) is held back intoUNRELEASED_PARTITIONSrather than handed over (:404).ConsumerGroup.currentPartitionEpoch(ConsumerGroup.java:145) is the global map of partition → owning epoch that makes this check possible.
There is no global barrier. Each member's heartbeat advances only that member by at most one step of the state machine. A slow or dead member parked in UNREVOKED_PARTITIONS blocks only the specific partitions it must release; every other member keeps converging. That is the structural reason KIP-848 rebalances are incremental and fast.
Server-side assignors
The target assignment is produced by a ConsumerGroupPartitionAssignor. Two are built in (GroupCoordinatorConfig.java:215); the first listed is the default when a member does not request one:
UniformAssignor(name"uniform", the default), balances partitions evenly and can be rack-aware. It picks a homogeneous builder when all members share the same subscription, else a heterogeneous one (UniformAssignor.java:78).RangeAssignor(name"range"), contiguous ranges with co-partitioning, the server-side analogue of the classic range assignor (RangeAssignor.java:85).
A member names its preferred assignor in ConsumerGroupHeartbeatRequest.serverAssignor; the service validates it against the configured set and rejects unknowns with UnsupportedAssignorException (GroupCoordinatorService.java:473). The effective group assignor is recomputed when the preference changes (computePreferredServerAssignor, used at GroupMetadataManager.java:4115), and changing the default does not force a rebalance of existing groups.
Group state (consumer)
A ConsumerGroup exposes EMPTY / ASSIGNING / RECONCILING / STABLE / DEAD (ConsumerGroup.java:84). The state is derived, not stored: maybeUpdateGroupState (:959) sets EMPTY if there are no members, ASSIGNING if groupEpoch > assignmentEpoch (assignor owes work), RECONCILING if any member's epoch lags the assignment epoch, else STABLE.
Liveness, fencing & leaving
Each heartbeat (re)arms a per-member session timer keyed by group + memberId (scheduleConsumerGroupSessionTimeout, GroupMetadataManager.java:5067). On expiry the timer runs consumerGroupFenceMemberOperation. A rebalance timer is armed only while a member sits in UNREVOKED_PARTITIONS (it must finish revoking within its rebalance timeout) and fires only if the member is still on the same epoch (:5122). Fencing (consumerGroupFenceMembers, :4605) writes tombstones for the member's metadata/current/target records, recomputes the metadata hash without the member, and bumps the group epoch; if every member is gone it also advances the target-assignment epoch with a timestamp of 0 so the next assignment is not throttled (:4665). A member leaves by heartbeating with epoch -1 (dynamic) or -2 (static), constants LEAVE_GROUP_MEMBER_EPOCH / LEAVE_GROUP_STATIC_MEMBER_EPOCH (clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java:31); the dispatcher routes these to consumerGroupLeave (GroupMetadataManager.java:5294).
A consumer-group member that fails to advance from epoch e within its rebalance timeout, or stops heartbeating within its session timeout, is fenced, its partitions are released and reassigned. Conversely, a fenced or zombie member that resumes is rejected with FencedMemberEpochException/UNKNOWN_MEMBER_ID and must rejoin from epoch 0.
Static membership (KIP-345)
A member that supplies group.instance.id is static: its identity survives a restart. In the classic protocol this lives in MemberMetadata.groupInstanceId; in the consumer protocol the coordinator maps instanceId → memberId (ConsumerGroup.staticMembers, ConsumerGroup.java:123) so a returning instance reclaims its assignment instead of triggering a full rebalance. When a new memberId claims an existing instanceId, the previous member is removed from the target-assignment computation (GroupMetadataManager.java:4134). The validation path treats a missing instance id on a static-leave as an error (GroupCoordinatorService.java:467).
Static membership (KIP-345) decouples a consumer's logical identity from its session, so rolling restarts and brief network blips do not reshuffle the whole group. Both protocols honour it; the consumer protocol simply re-keys the member by group.instance.id.
Offset management
Offsets are handled entirely by OffsetMetadataManager and are independent of which membership protocol a group uses (even a "simple" group that never joins can commit, replaying an OffsetCommit for an unknown group auto-creates an empty classic group, OffsetMetadataManager.java:1166).
In-memory structure
Committed offsets live in a three-level timeline map group → topic → partition → OffsetAndMetadata (the inner Offsets class, OffsetMetadataManager.java:350). Transactional offsets are staged separately in pendingTransactionalOffsets keyed by producer id, plus an OpenTransactions index (:185, :203); on a transaction marker they are either merged into the main map or discarded (replayEndTransactionMarker, :1223). See Transactions & Exactly-Once Semantics.
Commit path
commitOffset (:610) validates the request (group exists, member epoch/generation matches via a CommitPartitionValidator), rejects metadata longer than offset.metadata.max.bytes (default 4096) with OFFSET_METADATA_TOO_LARGE, computes an expiry timestamp from the request retention (or leaves it open to the broker default), and emits one OffsetCommit record per partition. Transactional commits (commitTransactionalOffset, :681) additionally map a stale member epoch to STALE_MEMBER_EPOCH on v6+ (KIP-1319) or to the legacy ILLEGAL_GENERATION on older versions (:710).
Fetch path
fetchOffsets / fetchAllOffsets (:886, :970) read at the supplied committed snapshot. OffsetFetch is the only path here that is a pure read scheduled through scheduleReadOperation (GroupCoordinatorShard.fetchOffsets, GroupCoordinatorShard.java:817), so it sees only committed (replicated) offsets.
Retention & cleanup
A periodic timer (offsets.retention.check.interval.ms, default 600000 ms) runs cleanupGroupMetadata (GroupCoordinatorShard.java:1005), which for each group whose shouldExpire() holds calls cleanupExpiredOffsets (OffsetMetadataManager.java:1046). An offset is expired only if its group is not subscribed to that topic and the offset is older than offsets.retention.minutes (default 7 days), and there is no pending transactional offset for it; an expired offset becomes a tombstone. If a group ends up with all offsets expired and is otherwise empty, maybeDeleteGroup tombstones the group itself. Topic deletion separately tombstones offsets whose stored topicId matches the deleted incarnation (onTopicsDeleted, :1093), guarding against wiping a re-created same-name topic.
Concurrency & threading model
- Event processor. A
MultiThreadedEventProcessorwithgroup.coordinator.threadsthreads (default 4) runs the per-partition event queues (GroupCoordinatorService.java:257). Events for one partition are processed one at a time and in order, so a handler observing/mutating that shard's state needs no locks. - Background pool.
CoordinatorBackgroundThreadPoolExecutor(group.coordinator.background.threads, default 2) runs long tasks, resolving subscription regexes and offloaded assignor computation (group.consumer.assignor.offload.enable, default true), off the event threads, then folds results back in as write events. - Timeline data structures. All hard state is in
TimelineHashMap/TimelineObject/TimelineIntegertied to the context'sSnapshotRegistry. Writes advance the live version; reads at a committed offset see the snapshot, concurrent reads on other threads never tear. - Context lock. The runtime takes
context.lockaround state transitions (LOADING/ACTIVE/FAILED) and around applying a new high watermark (CoordinatorRuntime.java:1823). Handlers themselves do not take it. - Timers. Session, rebalance, join, sync and expiration timeouts are
TimerTasks on the server timer; when they fire they schedule a write operation on the owning partition, so the actual state change still happens on the serialized event thread.
Configuration reference
| Key | Default | Effect |
|---|---|---|
group.coordinator.threads | 4 | Event-processor threads servicing partition queues. |
group.coordinator.background.threads | 2 | Background pool (regex resolution, offloaded assignment). |
group.coordinator.append.linger.ms | -1 (adaptive) | How long to accumulate records before flushing; trades latency for batch size. |
offsets.commit.timeout.ms | 5000 | Write timeout for a coordinator operation (the runtime's write timeout). |
offset.metadata.max.bytes | 4096 | Max length of per-offset client metadata; over → OFFSET_METADATA_TOO_LARGE. |
offsets.retention.minutes | 10080 (7 days) | How long a committed offset survives once its topic is unsubscribed. |
offsets.retention.check.interval.ms | 600000 | Period of the expiration sweep. |
offsets.load.buffer.size | 5 MiB | Read buffer used when loading a partition's records on failover. |
| Classic protocol | ||
group.min.session.timeout.ms / group.max.session.timeout.ms | 6000 / 1800000 | Bounds on a classic member's session timeout. |
group.initial.rebalance.delay.ms | 3000 | Grace window in PREPARING_REBALANCE to let members join before closing the barrier. |
group.max.size | Integer.MAX_VALUE | Max members in a classic group. |
| Consumer protocol (KIP-848) | ||
group.consumer.session.timeout.ms | 45000 | Default member session timeout (bounded by min/max 45000/60000). |
group.consumer.heartbeat.interval.ms | 5000 | Heartbeat cadence the coordinator tells members (bounded by min/max 5000/15000). |
group.consumer.assignors | [uniform, range] | Server-side assignors; first is the default. |
group.consumer.assignment.interval.ms | 1000 | Minimum spacing between target-assignment recomputations. |
group.consumer.assignor.offload.enable | true | Run the assignor on the background pool rather than the event thread. |
group.consumer.max.size | Integer.MAX_VALUE | Max members in a consumer group. |
group.consumer.migration.policy | bidirectional | Whether non-empty groups may upgrade classic→consumer and/or downgrade. |
group.consumer.regex.refresh.interval.ms | 600000 | How often RE2 subscription regexes are re-resolved against topic metadata. |
Values and keys are from GroupCoordinatorConfig.java (e.g. session/heartbeat at :183–:205, assignors at :219, assignment interval at :242, migration policy at :229). The number of __consumer_offsets partitions (numPartitions) is supplied at startup and fixes the hashing in partitionFor.
Failure modes, edge cases & recovery
- Coordinator failover. When a broker becomes leader of a
__consumer_offsetspartition, the runtime transitions that contextINITIAL/FAILED → LOADING, builds a fresh shard on a new snapshot registry, and replays every record in the partition throughGroupCoordinatorShard.replaybefore goingACTIVE(CoordinatorRuntime.java:530,:579). Until then, RPCs for that partition are answeredCOORDINATOR_LOAD_IN_PROGRESSorCOORDINATOR_NOT_AVAILABLE(the service checksisActivefirst, e.g.GroupCoordinatorService.java:488). - Not the coordinator. If the broker does not lead the group's partition, handlers surface
NOT_COORDINATOR; clients re-issueFindCoordinator. - State/log divergence. If
partitionWriter.appendreturns an unexpected offset, the runtime logs an error and force-reloads the partition (CoordinatorRuntime.java:721), the in-memory state is treated as untrustworthy and rebuilt from the log. - Lost epoch bump. A consumer-protocol response that bumps the member epoch but never reaches the client is recovered: the member retries with its previous epoch, matched against
PreviousMemberEpoch. - Zombie commit. A partition's
AssignmentEpochsfence commits from a member that no longer owns the partition; a stale member epoch yieldsSTALE_MEMBER_EPOCH. - Stuck revocation. A member parked in
UNREVOKED_PARTITIONSthat never acknowledges revocation is fenced when its rebalance timer fires (:5129), releasing the partitions to others. - Simple groups. Tools and frameworks that commit offsets without joining create an empty, protocol-less classic group on first commit, which is later GC'd by expiration.
- Protocol migration. A consumer group may be downgraded to a classic group when the remaining live members all speak the classic protocol (
validateOnlineDowngradeWithFencedMembers,GroupMetadataManager.java:1266; conversion at:4616), governed bygroup.consumer.migration.policy.
Invariants & guarantees
- A group is served by exactly one coordinator at a time, the leader of its hashed
__consumer_offsetspartition. - At most one member owns a given topic-partition at any instant within a group; ownership is handed over only after the previous owner relinquishes it (enforced by
currentPartitionEpoch+ the revoke-before-assign state machine). - Acknowledged commits, joins and assignments are durable across coordinator failover, because responses are gated on replication past the high watermark.
- In-memory state is a deterministic function of the committed prefix of the partition log; replay during loading and during request handling share one code path, so they cannot diverge.
- Member/group epochs are monotonically non-decreasing; a member observing a smaller epoch than it holds is a zombie and is fenced.
Interactions with other subsystems
- Request Processing (KafkaApis) routes
FindCoordinator,JoinGroup,SyncGroup,Heartbeat,LeaveGroup,OffsetCommit,OffsetFetch,ConsumerGroupHeartbeat, and the various*Describe/*DeleteRPCs intoGroupCoordinatorService. - The Log Storage Engine and replication back the
__consumer_offsetspartitions; the high watermark from replication is exactly the trigger that releases coordinator responses. - Metadata Propagation feeds the coordinator a
CoordinatorMetadataImage(topics, partitions, ids); subscription resolution and the group metadata hash depend on it (onMetadataUpdate,GroupCoordinatorShard.java:1128). Topic deletions arrive here and tombstone offsets. - Transactions & EOS use
TxnOffsetCommitplus transaction markers replayed throughreplayEndTransactionMarkerto make offset commits part of a transaction. - The Consumer Client is the counterpart that drives both protocols; Kafka Streams consumes via streams groups (KIP-1071), and Share Groups reuse this very runtime and shard.
Design rationale & evolution
- KIP-848, moved assignment to the broker and replaced JoinGroup/SyncGroup with the incremental
ConsumerGroupHeartbeat; no global barrier, server-side assignors, epoch-based reconciliation. GA in 4.0. - KIP-429, cooperative incremental rebalancing on the client side, the precursor that made classic rebalances non-stop-the-world for the assignor's own moves.
- KIP-345, static membership via
group.instance.idso restarts do not reshuffle the group. - KIP-932, share groups (queue semantics) built on the same coordinator runtime and record machinery.
- KIP-1071, streams groups: server-side task assignment for Kafka Streams, sharing the shard.
- The post-4.0 rewrite from the Scala ZooKeeper-era coordinator to this Java, log-backed, KRaft-native design is what makes the "coordinator is a replicated state machine over
__consumer_offsets" model literal rather than a metaphor.
Gotchas & operational notes
partitionFor hashes groupId.hashCode() modulo the offsets-topic partition count. Changing the number of __consumer_offsets partitions after groups exist re-maps groups to different coordinators and effectively orphans their state, the partition count is fixed for the life of the cluster.
Consumer-protocol offsets are fenced by member epoch and per-partition assignment epoch. A consumer that pauses long enough to be fenced and then tries to commit will be rejected (STALE_MEMBER_EPOCH / UNKNOWN_MEMBER_ID) and must rejoin, its in-flight commits are lost, which is the intended exactly-once-friendly behaviour, not a bug.
Because a write handler mutates in-memory state via replay before the response is released, in-memory state can be momentarily ahead of the committed log. Anything read on the write thread sees that optimistic state, but external reads (OffsetFetch, describes) are served at the committed snapshot. Do not reason about coordinator state from logs of optimistic mutations; only committed records are authoritative.
Subscription metadata was stored per-group before 4.0 (ConsumerGroupPartitionMetadata, apiKey 4) and is now replaced by a 64-bit metadata hash on the group record; the coordinator writes a tombstone for any lingering subscription-metadata record it loads (GroupMetadataManager.java:4039).