krivaltsevich.com Kafka Internals4.4

14 · Transactions & Exactly-Once Semantics

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

Kafka builds exactly-once semantics (EOS) in two layers that compose. The lower layer is the idempotent producer: every record batch carries a (producerId, producerEpoch, baseSequence) triple, and the partition leader's ProducerStateManager deduplicates retries and enforces a strictly increasing per-partition sequence, giving exactly-once-in-order delivery to a single partition. The upper layer is transactions (KIP-98): a stable transactional.id is mapped to a producer id with epoch fencing of zombies; a TransactionCoordinator drives a two-phase commit over an internal __transaction_state log and writes transaction markers (control records) into every partition the transaction touched. Consumers reading at read_committed use the Last Stable Offset and a per-segment aborted-transaction index to filter out aborted data. This chapter traces the whole path end to end, grounded in the source.

Role & responsibilities

The transaction/EOS subsystem must deliver three guarantees:

  • Idempotence, a producer retrying a batch (e.g. after a network hiccup) must not create a duplicate. This is enforced per (producerId, epoch) per partition by the leader's sequence-number check and a 5-batch dedup cache.
  • Atomic multi-partition writes, a set of writes across many partitions (and, for consume-transform-produce pipelines, the consumer offset commit) either all become visible or none do. This is the job of the TransactionCoordinator + __transaction_state log + transaction markers.
  • Zombie fencing, a stale producer instance (one that crashed and was replaced) must be prevented from writing into a transaction owned by its successor. This is achieved with the producer epoch: InitProducerId bumps the epoch, and the leader rejects any batch/marker with a lower epoch.

Together these implement consume-transform-produce EOS: a Kafka Streams (see Kafka Streams Architecture) or hand-written application reads from input partitions, produces derived records, and commits its input offsets, all inside one transaction. The offsets are written to __consumer_offsets by the group coordinator but only become visible to other consumers when the transaction commits.

Where it lives in the code

ConcernPrincipal classFile
Coordinator request handling (state machine driver)TransactionCoordinatorcore/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
Txn metadata cache + __transaction_state log read/write/loadTransactionStateManagercore/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
Marker fan-out to partition leaders (sender thread)TransactionMarkerChannelManagercore/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
Per-txn metadata + transition state machineTransactionMetadatatransaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java
States enum & valid transitionsTransactionStatetransaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionState.java
Txn log key/value (de)serializationTransactionLogtransaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java
Broker-side PID block allocationRPCProducerIdManagertransaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java
Controller-side PID block generatorProducerIdControlManagermetadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
Idempotence dedup + sequence/epoch validationProducerStateManager, ProducerAppendInfo, ProducerStateEntrystorage/src/main/java/org/apache/kafka/storage/internals/log/
Per-segment aborted-txn index (LSO support)TransactionIndexstorage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
Producer client state machine + sequence assignmentTransactionManagerclients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
Consumer-side aborted-record filteringCompletedFetchclients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java

Core concepts & terminology

Producer ID (PID)
A cluster-unique int64 stamped in every batch produced by an idempotent/transactional producer. Allocated in blocks of 1000 (ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE = 1000, server-common/.../ProducerIdsBlock.java:30).
Producer epoch
An int16 generation counter for a PID, used to fence zombies. Bumped on InitProducerId and (under Transaction V2) on every commit/abort.
Sequence number
A per-partition int32 that starts at 0 and increments by the number of records in each batch; the leader requires nextSeq == lastSeq + 1.
transactional.id
A stable, user-supplied string identifying a logical producer across restarts. It maps deterministically to one __transaction_state partition (and thus one coordinator).
Transaction marker
A control record (COMMIT or ABORT) the coordinator writes into each data partition to atomically end a transaction. Carries the coordinator epoch for fencing.
Last Stable Offset (LSO)
min(highWatermark, firstUnstableOffset), the highest offset below which there are no in-flight (undecided) transactions. read_committed consumers may not read past it.
Coordinator epoch
The leader epoch of the __transaction_state partition; rises on every coordinator failover and fences stale marker writes / log appends.
Transaction Version (TV)
A cluster feature flag (transaction.version). TV_0/TV_1 are the legacy protocol; TV_2 (KIP-890) bumps the epoch per transaction so each transaction is uniquely identified by (pid, epoch) (server-common/.../TransactionVersion.java:24-31).

The idempotent producer

PID allocation: controller block-allocation

PIDs are allocated by the active KRaft controller in monotonic blocks, never one at a time, so that allocation is cheap and survives controller failover via the metadata log. ProducerIdControlManager.generateNextProducerId takes the next start id, builds a ProducerIdsBlock of size 1000, and emits a ProducerIdsRecord whose nextProducerId field advances past the block; the new value is committed to the metadata log before the block is handed out (metadata/.../ProducerIdControlManager.java:84-101). On replay, the manager asserts strict monotonicity, a replayed nextProducerId must exceed the current block's first id (:111-120), which is the persistent invariant that no PID is ever reused.

On the broker side, RPCProducerIdManager caches the current block and serves generateProducerId() lock-free by currentProducerIdBlock.get().claimNextId(), an atomic getAndIncrement on the block's counter (server-common/.../ProducerIdsBlock.java:50-56). When 90% of the block is consumed (PID_PREFETCH_THRESHOLD = 0.90) it asynchronously requests the next block from the controller via AllocateProducerIds, gated by a single-flight requestInFlight CAS; if the block runs dry before the next arrives it throws a retriable COORDINATOR_LOAD_IN_PROGRESS rather than blocking (transaction-coordinator/.../RPCProducerIdManager.java:80-109).

Gotcha

generateProducerId() deliberately returns COORDINATOR_LOAD_IN_PROGRESS when the block is exhausted, not REQUEST_TIMED_OUT, because older clients treat the latter as fatal when it should be retriable (RPCProducerIdManager.java:97-100).

Sequence validation & deduplication on the leader

The partition leader's per-PID state lives in ProducerStateEntry, which keeps a bounded deque of the last 5 batches (ProducerStateEntry.NUM_BATCHES_TO_RETAIN = 5, storage/.../ProducerStateEntry.java:35). Each entry records producerEpoch, coordinatorEpoch, lastTimestamp, an optional currentTxnFirstOffset, and for each retained batch a BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp) (storage/.../BatchMetadata.java:21).

On a client produce, UnifiedLog.analyzeAndValidateProducerState first checks for an exact duplicate: for each batch with a producer id it asks ProducerStateEntry.findDuplicateBatch, which (for the matching epoch) looks for a cached batch with the identical [baseSequence, lastSequence] range (storage/.../ProducerStateEntry.java:128-137). If found, the append short-circuits, no data is written, and the leader returns the original offset/timestamp of the already-stored batch to the client, so a retry looks like a success (storage/.../UnifiedLog.java:1247-1254).

If it is not a duplicate, ProducerAppendInfo.checkSequence enforces ordering (storage/.../ProducerAppendInfo.java:156-194):

  • Same epoch: require inSequence(currentLastSeq, appendFirstSeq), i.e. appendFirstSeq == currentLastSeq + 1 (with wraparound at Integer.MAX_VALUE → 0). A gap throws OutOfOrderSequenceException (:196-198, :188-192).
  • New epoch with non-zero first sequence and a known prior epoch throws OutOfOrderSequenceException, a fresh epoch must restart at sequence 0 (:169-176).
  • If all prior records for the PID were deleted (epoch reset to NO_PRODUCER_EPOCH) the leader accepts any sequence (:188), which is why a long-idle producer can hit UNKNOWN_PRODUCER_ID and recover.
Invariant

For a fixed (producerId, epoch) on a single partition, the sequence of persisted batches has strictly contiguous sequence ranges starting at 0. Retries are absorbed by the 5-batch dedup window; gaps are rejected. This is exactly-once-in-order per partition.

Caution

The dedup window is only 5 batches deep. If a producer has more than 5 in-flight batches to a partition and a low-sequence batch needs to retry after higher ones committed, the low batch can fall outside the window and loop on OutOfOrderSequenceException. This is why the idempotent producer caps max.in.flight.requests.per.connection at 5 (see The Producer Client).

Epoch fencing at the leader

ProducerAppendInfo.checkProducerEpoch rejects stale writers. The comparison depends on the transaction version of the incoming marker: for TV_2 a marker is valid only if markerEpoch > currentEpoch (strictly greater, because TV_2 bumps the epoch before writing the marker); for legacy TV_0/TV_1 it accepts markerEpoch >= currentEpoch (storage/.../ProducerAppendInfo.java:117-153). A client data batch whose epoch is below the leader's last-seen epoch throws InvalidProducerEpochException (a recoverable error, the client aborts and retries), not the older fatal ProducerFenced (:145-152).

Transactions: data structures

In-memory: TransactionMetadata

Each transactional.id the coordinator owns has one TransactionMetadata, guarded by its own ReentrantLock exposed via inLock(...) (transaction-coordinator/.../TransactionMetadata.java:62, 111-118). Key fields:

FieldTypeMeaning
producerIdlongCurrent PID bound to this transactional id.
prevProducerIdlongPID of the last committed transaction (for retry-after-overflow detection).
nextProducerIdlongPre-allocated PID to switch to when the epoch overflows.
producerEpochshortCurrent epoch; fences zombies.
lastProducerEpochshortPrevious epoch (retry detection).
txnTimeoutMsintPer-txn timeout; Integer.MAX_VALUE marks a distributed 2PC txn (never auto-aborted).
stateTransactionStateCurrent state (see machine below).
pendingStateOptional<TransactionState>The target state of an in-flight transition; blocks concurrent transitions.
topicPartitionsHashSet<TopicPartition>Partitions registered in the current transaction.
txnStartTimestamp / txnLastUpdateTimestampvolatile longStart time (for timeout) and last-update time (for transactional-id expiration). Volatile so the background timeout scan reads them lock-free.
hasFailedEpochFencebooleanSet if a fence-epoch log write failed; prevents bumping the epoch twice (TV_0/TV_1).
clientTransactionVersionTransactionVersionTV the client used when the state was transitioned.

A prospective transition is captured in an immutable TxnTransitMetadata record (transaction-coordinator/.../TxnTransitMetadata.java:27-41). The two-step protocol is prepareXxx() → set pendingState and return a TxnTransitMetadata, then after the log write succeeds, completeTransitionTo(transit) validates and atomically overwrites the live fields (TransactionMetadata.java:318-446). completeTransitionTo re-checks state validity, epoch progression, timestamp monotonicity and that the old partition set is a subset of the new, corrupt transitions throw IllegalStateException marked FATAL.

Persistent: the __transaction_state log

Transaction metadata is durably stored in the internal compacted topic __transaction_state. Its topic config, min.insync.replicas=2 (from transaction.state.log.min.isr), uncompressed, cleanup.policy=compact, unclean.leader.election=false, plus segment size, is set in TransactionCoordinator.transactionStateTopicConfigs (TransactionCoordinator.scala:1014-1022); the partition count (50 by default) and replication factor (3) are not set there but come from transaction.state.log.num.partitions / .replication.factor (TransactionLogConfig.java:32, 40) and are applied when the internal topic is created. The record key is the transactional id, prefixed with a coordinator record type id; the value is a versioned TransactionLogValue (transaction-coordinator/.../TransactionLog.java:49-95).

ProducerIdint64 · PID bound to this txnal id
PreviousProducerIdint64 · tag 0 · last committed PID
NextProducerIdint64 · tag 1 · PID after epoch overflow
ProducerEpochint16 · current epoch
NextProducerEpochint16 · tag 3
TransactionTimeoutMsint32
TransactionStatusint8 · TransactionState.id()
TransactionPartitions[ {Topic, [PartitionIds]} ] · nullable
TransactionLastUpdateTimestampMsint64
TransactionStartTimestampMsint64
ClientTransactionVersionint16 · tag 2
Tagged fields (tags 0–3) appear only in v1 (flexible) and may be absent.A null value is a tombstone, transactional id expired / removed.
On-disk transaction-state record (transaction-coordinator/src/main/resources/common/message/TransactionLogValue.json). Value version is 1 when TV ≥ 1, else 0; v1 fields are tagged for backward compatibility.
each cell = one value field (left → right = serialized order) cell width ∝ field byte-width (int64 = 8, int16 = 2, int8 = 1) bold = field name · small text = type · tag · meaning tag N = tagged field, present only in v1 (flexible), may be absent nullable = a null whole value is a tombstone (id expired / removed)

Because the topic is compacted, only the latest record per transactional id is retained, which is sufficient to rebuild the coordinator's in-memory state after failover. TransactionLog.read decodes a record into a TxnRecord (id + metadata), a TxnTombstone, or an unknown-version marker that is logged and skipped (TransactionLog.java:118-164).

The transaction state machine

TransactionState defines 8 states and a VALID_PREVIOUS_STATES map that prepareTransitionTo enforces (transaction-coordinator/.../TransactionState.java:31-103):

new metadata EMPTY AddPartitions / AddOffsets ONGOING EndTxn(COMMIT) → persist prepare PREPARE_COMMIT all markers acked → persist complete COMPLETE_COMMIT transactional.id expiry (tombstone) DEAD
Abort spine (mirrors the commit path):
· ONGOING → PREPARE_ABORT, EndTxn(ABORT)
· PREPARE_ABORT → COMPLETE_ABORT, all markers acked
· EMPTY → PREPARE_ABORT, TV_2 direct abort (client unsure of server state, no partitions added)
Reuse & fence branches (off the commit spine):
· COMPLETE_COMMIT / COMPLETE_ABORT → ONGOING, a new transaction reuses the metadata
· ONGOING → PREPARE_EPOCH_FENCE, InitProducerId arrives mid-txn, or the timeout reaper fires
· PREPARE_EPOCH_FENCE → PREPARE_ABORT, internal step: bump epoch, then abort the dangling txn
· COMPLETE_COMMIT / COMPLETE_ABORT → DEAD, transactional.id expiry
Transaction state machine (commit spine shown vertically; the symmetric abort path and off-spine branches are noted below). PREPARE_* are persisted before markers are sent; COMPLETE_* are persisted after all markers are acknowledged. PREPARE_EPOCH_FENCE is an internal step reachable only from ONGOING that always proceeds to PREPARE_ABORT. DEAD is transient during transactional.id expiry, never persisted. Source: transaction-coordinator/.../TransactionState.java:31-103.
pill = transaction state accent = COMPLETE_COMMIT (committed) warn = DEAD (expired / removed) ◉ = initial (new metadata) · ◉ end = terminal (id expiry) transition (label = trigger / persisted record) A → B = off-spine transition (see notes)

Notable edges from the code:

  • EMPTY, COMPLETE_COMMIT, COMPLETE_ABORT can all transition to ONGOING (a new transaction reuses the metadata) and, under TV_2, directly to PREPARE_ABORT, because a client uncertain of server state may send EndTxn(abort) with no partitions added (TransactionState.java:94-103; logic in TransactionCoordinator.endTransaction, :894-899).
  • PREPARE_EPOCH_FENCE is reachable only from ONGOING and is used when a new InitProducerId arrives while a transaction is live, or when the background timeout reaper fires: the coordinator bumps the epoch, aborts the dangling transaction, then returns CONCURRENT_TRANSACTIONS to force the client to retry (TransactionCoordinator.scala:281-287, 190-207).
  • DEAD is a transient state during transactional-id expiration; on success the metadata is removed from the cache, never persisted as DEAD (TransactionMetadata.java:421-426).

The protocol flow, step by step

ProducerTxn Coordinator__transaction_state logPartition leaders
InitProducerId
append EMPTY, epoch+1 (prepareIncrementProducerEpoch)
durable
pid, epoch
AddPartitionsToTxn(p)
append ONGOING, {p} (prepareAddPartitions)
durable
ok
produce(p) [transactional]
Partition leader: verify partition-in-txn, then append the batch
offset
EndTxn(COMMIT)
append PREPARE_COMMIT (prepareAbortOrCommit), commit point
durable
ok (pid, epoch), respond early
WriteTxnMarkers (COMMIT control record)
acked
append COMPLETE_COMMIT (prepareComplete)
durable
Happy-path commit over time (top → bottom). Steps map to handleInitProducerId, handleAddPartitionsToTransaction, the produce path, and handleEndTransaction. The PREPARE_COMMIT append is the durable commit point, the coordinator responds to the client immediately after it and finishes the markers asynchronously.
producer (client) transaction coordinator __transaction_state log partition leaders request / state-log append response · durable ack arrow points from → to over time (top → bottom)

1 · InitProducerId, bind, fence, abort dangling

TransactionCoordinator.handleInitProducerId (TransactionCoordinator.scala:117-226) routes on the transactional id:

  • Null id → a pure idempotent (non-transactional) producer: blindly allocate a fresh PID with epoch 0 and return (:125-132).
  • Real id → look up or create the metadata (a new metadata gets a freshly generated PID, epoch -1, state EMPTY, :154-166), then prepareInitProducerIdTransit decides the transition under the metadata lock.

prepareInitProducerIdTransit (:228-296) is where fencing and recovery live. If the state is EMPTY/COMPLETE_* it bumps the epoch via prepareIncrementProducerEpoch; if the epoch is exhausted it instead rotates to a new PID via prepareProducerIdRotation (:264-271). If a transaction is ONGOING, it returns prepareFenceProducerEpoch(), the coordinator will abort that transaction first and reply CONCURRENT_TRANSACTIONS so the client retries (:281-287, 190-207). An expectedProducerIdAndEpoch that does not match (and is not a valid epoch-exhaustion retry) yields PRODUCER_FENCED (:252-253, isValidProducerId at :234-246).

Key idea

Epoch bumping on InitProducerId is the zombie fence: when a crashed producer restarts and calls initTransactions(), the epoch jumps, so any in-flight write from the dead instance (carrying the old epoch) is now rejected by every partition leader. The coordinator also rolls back whatever transaction the zombie left open.

2 · AddPartitionsToTxn, register partitions

Before producing to a partition, the client tells the coordinator about it. handleAddPartitionsToTransaction validates producerId/producerEpoch against the metadata (mismatches → INVALID_PRODUCER_ID_MAPPING / PRODUCER_FENCED), short-circuits with NONE if the partitions are already present, otherwise calls prepareAddPartitions and appends an ONGOING record (TransactionCoordinator.scala:416-468). The first AddPartitions sets txnStartTimestamp, which arms the timeout clock (TransactionMetadata.java:214-233).

There is also a server-internal verification path (handleVerifyPartitionsInTransaction, :361-414): when a partition leader receives a transactional produce it can ask the coordinator "is this partition really in this producer's open transaction?". This closes the hanging transaction hole where a delayed produce could land after a marker. The leader's side uses a VerificationGuard, a unique token created in maybeStartTransactionVerification and checked in analyzeAndValidateProducerState, to defeat the ABA race between a verified response and an abort marker (storage/.../UnifiedLog.java:1410-1440, see the long comment at :1414-1426). Verification is on by default (transaction.partition.verification.enable=true, TransactionLogConfig.java:52-54).

3 · Produce, transactional data batches

Produced batches carry the PID, epoch, sequence and the transactional flag. The leader runs the idempotence checks above and the verification check; the first transactional batch on a partition sets currentTxnFirstOffset in the producer state and starts a TxnMetadata in the ProducerStateManager.ongoingTxns map (storage/.../ProducerAppendInfo.java:234-243, ProducerStateManager.java:398). A non-transactional write while a transaction is open throws InvalidTxnStateException; the converse, a transactional write with no open transaction, is not an error, it is exactly how a transaction begins (sets currentTxnFirstOffset and adds a TxnMetadata) (ProducerAppendInfo.java:235-243).

4 · Consume-transform-produce: binding offsets (KIP-447)

For EOS pipelines the application commits its input consumer offsets inside the transaction. The producer client method sendOffsetsToTransaction(offsets, groupMetadata) (clients/.../TransactionManager.java:432-463) takes the full ConsumerGroupMetadata (group id, generation, member id), this is the KIP-447 improvement that lets the group coordinator fence offset commits by generation, so a single transactional producer can serve a partition without one-producer-per-input-partition. Under TV_1 it sends AddOffsetsToTxn (which registers the __consumer_offsets partition with the txn coordinator, exactly like AddPartitionsToTxn) then TxnOffsetCommit; under TV_2 it skips AddOffsetsToTxn and sends TxnOffsetCommit directly (:443-459). The committed offsets are written to __consumer_offsets by the group coordinator but stay invisible (LSO-gated) until the transaction's COMMIT marker lands on that partition.

5 · EndTxn, two-phase commit

handleEndTransactionendTransaction implements the 2PC. There are two implementations selected by the client's TV: endTransactionWithTV1 (TransactionCoordinator.scala:543-710) and the TV_2 endTransaction (:758-1007). The shape is the same:

  1. Phase 1 (prepare). Under the metadata lock, validate pid/epoch and current state, compute the target (PREPARE_COMMIT or PREPARE_ABORT) via prepareAbortOrCommit, and append that record to __transaction_state (:854-860, 1003-1004). Under TV_2, prepareAbortOrCommit bumps the epoch and, if the epoch is exhausted, rotates to nextProducerId (:818-841, TransactionMetadata.java:235-263).
  2. Respond early. Once the prepare record is durable, the coordinator replies to the client immediately (returning the post-bump pid/epoch under TV_2, :975) and continues asynchronously, the client does not wait for markers.
  3. Phase 2 (commit). addTxnMarkersToSend hands the transaction to the TransactionMarkerChannelManager, which writes COMMIT/ABORT markers to every involved partition leader. When all markers are acknowledged, the coordinator calls prepareComplete and appends COMPLETE_COMMIT/COMPLETE_ABORT (TransactionMarkerChannelManager.scala:276-315, TransactionMetadata.java:265-288).
Design rationale

The prepare record is the commit point of the 2PC: once PREPARE_COMMIT is durable, the outcome is decided even if the coordinator crashes, a new coordinator that loads PREPARE_COMMIT will re-drive the markers and write COMPLETE_COMMIT (see recovery). Responding to the client before markers finish keeps commit latency low while preserving atomicity, because the markers are guaranteed to be (re)sent. The two-phase, marker-based design is KIP-98; the per-transaction epoch bump that makes each transaction uniquely identifiable by (pid, epoch) and closes hanging-transaction / duplicate-across-transactions holes is KIP-890 (TV_2).

The transaction marker (control record)

key.versionint16 · @0 · ControlRecordType
key.typeint16 · @2 · 0=ABORT, 1=COMMIT
value.versionint16 · @0 · EndTxnMarker
value.coordinatorEpochint32 · @2 · fences zombie coordinators
Key = ControlRecordType (4 bytes).Value = EndTxnMarker (version + coordinatorEpoch).Carried in a control batch (isControlBatch=true) appended to each data partition.
clients/.../record/internal/EndTransactionMarker.java:34-110, ControlRecordType.java:43-83. Control records are never returned to consumers and are not compacted away.
each cell = one field (left → right = byte order) cell width ∝ field byte-width (int16 = 2, int32 = 4) bold = field name · small text = type · @offset · meaning key.* = ControlRecordType (4 B) · value.* = EndTxnMarker key.type: 0 = ABORT, 1 = COMMIT

On the receiving partition leader, ProducerAppendInfo.appendEndTxnMarker validates the marker epoch (TV_2: strictly greater) and the coordinator epoch (a marker with an older coordinator epoch is a zombie and throws TransactionCoordinatorFencedException, ProducerAppendInfo.java:246-257). If the producer had an open transaction (currentTxnFirstOffset present), it emits a CompletedTxn recording (producerId, firstOffset, lastOffset=markerOffset, isAborted) (:259-290).

Marker fan-out: TransactionMarkerChannelManager

The channel manager is an InterBrokerSendThread named TxnMarkerSenderThread-<brokerId> (TransactionMarkerChannelManager.scala:169) with its own dedicated NetworkClient using the inter-broker listener. It owns several concurrent structures:

StructureTypePurpose
markersQueuePerBrokerConcurrentHashMap[Int, TxnMarkerQueue]Markers grouped by destination broker; each TxnMarkerQueue further partitions by txn-topic partition for fast emigration cleanup.
markersQueueForUnknownBrokerTxnMarkerQueueHolding queue for partitions whose leader is currently unknown; retried once the leader is discovered.
transactionsWithPendingMarkersConcurrentHashMap[String, PendingCompleteTxn]Tracks which transactional ids still have markers outstanding.
txnLogAppendRetryQueueLinkedBlockingQueue[PendingCompleteTxn]Completion records (COMPLETE_*) whose append failed transiently and must be retried.

addTxnMarkersToSend records the pending completion, then addTxnMarkersToBrokerQueue groups the transaction's partitions by their leader (via metadataCache.getPartitionLeaderEndpoint) and enqueues a TxnMarkerEntry per destination (:317-458). The send thread's generateRequests drains per-broker queues into WriteTxnMarkersRequests, attaching a TransactionMarkerRequestCompletionHandler (:238-274). As each partition leader acknowledges, the handler removes that partition from the metadata's topicPartitions; when the set empties, maybeWriteTxnCompletion fires writeTxnCompletiontryAppendToLog to persist the COMPLETE_* record (:339-383). A partition whose leader has been deleted is simply dropped from the set so completion can proceed (:421-445).

On the receiving broker, KafkaApis.handleWriteTxnMarkersRequest appends each marker to its partitions with an AtomicInteger numAppends counter, sending the response only after the last partition's append completes (core/src/main/scala/kafka/server/KafkaApis.scala:1729-1871).

Concurrency & threading model

The coordinator's locking discipline is documented at the top of TransactionStateManager (:60-68) and is strict:

  • stateLock (a ReentrantReadWriteLock) guards the metadata cache and the loading/unloading partition sets. Loading and emigration take the write lock; request handling takes the read lock.
  • Each TransactionMetadata has its own ReentrantLock for field mutation.
  • Ordering rule: never acquire stateLock.readLock while holding a txnMetadata lock; never acquire a txnMetadata lock while holding stateLock.writeLock; never call ReplicaManager.appendRecords while holding a txnMetadata lock.
ThreadResponsibility
Request handler (KafkaApis) threadsRun handleInitProducerId/AddPartitions/EndTxn under the metadata lock; issue the __transaction_state append.
TxnMarkerSenderThread-<id>Single sender thread: drains marker queues, sends WriteTxnMarkers, retries log-append failures, writes COMPLETE_*.
transaction-abort scheduler taskEvery transaction.abort.timed.out.transaction.cleanup.interval.ms (default 10 s) scans for ONGOING transactions past their timeout and aborts them (TransactionCoordinator.scala:1047-1083, 1091-1095).
transactionalId-expiration scheduler taskEvery transaction.remove.expired.transaction.cleanup.interval.ms (default 1 h) writes tombstones for transactional ids idle past transactional.id.expiration.ms (default 7 d) (TransactionStateManager.scala:300-307, 151-243).
load-txns-for-partition-* scheduler taskOne-shot task per onElection to read a __transaction_state partition into the cache.

The append path holds the read lock across appendRecords deliberately (:772-813): otherwise an emigration+immigration between the coordinator-epoch check and the append could let a stale-epoch record replicate, corrupting the log. The post-append updateCacheCallback re-verifies the coordinator epoch before applying completeTransitionTo, mapping mismatches to NOT_COORDINATOR (:666-770).

Note

The background timeout scan reads txnStartTimestamp/state without the metadata lock because those fields are volatile and a stale read is harmless, the actual abort re-acquires the lock and re-validates before transitioning (TransactionStateManager.timedOutTransactions, :124-149). Distributed 2PC transactions (txnTimeoutMs == Integer.MAX_VALUE) are explicitly excluded from timeout (:137-141).

The consumer side: read_committed, LSO & the aborted index

Computing the LSO on the leader

The leader tracks firstUnstableOffsetMetadata, the earliest offset belonging to an undecided or not-yet-replicated transaction. ProducerStateManager.firstUnstableOffset returns the min of the first ongoingTxns entry (undecided) and the first unreplicatedTxns entry (decided but its marker is above the HW) (ProducerStateManager.java:246-258). UnifiedLog.lastStableOffset is then min(highWatermark, firstUnstableOffset) (UnifiedLog.java:678-686). When a transaction completes, completeTxn moves it from ongoingTxns to unreplicatedTxns; it is only dropped (and the LSO allowed past it) once the high watermark advances beyond its marker, via onHighWatermarkUpdated (ProducerStateManager.java:545-554, 264-266).

Invariant

A read_committed consumer never observes an offset ≥ LSO, so it never sees a record whose transaction outcome is still undecided. Aborted records below the LSO are present in the log but filtered client-side.

The aborted-transaction index

When a transaction aborts, the segment's TransactionIndex gets an AbortedTxn entry. The on-disk record is fixed-size (no flexible fields), 4 × int64 = 32 bytes plus a 2-byte version prefix:

versionint16 · @0
ProducerIdint64 · @2
FirstOffsetint64 · @10
LastOffsetint64 · @18
LastStableOffsetint64 · @26
Fixed-size, no flexible fields: 2-byte version prefix + 4 × int64 = 34 bytes per entry.
clients/.../message/AbortedTxn.json, version 0. TransactionIndex stores, per aborted transaction, the PID and offset span plus the LSO at abort time. Entries are appended in increasing LastOffset order (TransactionIndex.java:83-93).
each cell = one field (left → right = byte order) cell width ∝ field byte-width (int16 = 2, int64 = 8) bold = field name · small text = type · @offset fixed = fixed-size record (no flexible / tagged fields): 34 B total

On a read_committed fetch the leader returns the relevant AbortedTransaction list (gathered by TransactionIndex.collectAbortedTxns) alongside the records, and the fetch is capped at the LSO (UnifiedLog.read passes isolation == TXN_COMMITTED, UnifiedLog.java:1659).

Client-side filtering

CompletedFetch.nextFetchedRecord performs the actual filtering (clients/.../CompletedFetch.java:187-244). It keeps a priority queue of aborted transactions ordered by first offset and a live abortedProducerIds set. For each batch with a producer id:

  1. consumeAbortedTransactionsUpTo(batch.lastOffset()) moves every aborted txn whose first offset ≤ the batch into the active set (:356-364).
  2. If the batch is an abort marker, the producer id is removed from the active set (the abort is now consumed).
  3. If the batch's producer id is in the active set, the whole batch is skipped and nextFetchOffset advances past it (:215-223).
  4. Control batches (the markers themselves) are never surfaced to the application (:234-240).
Key idea

Aborted data physically remains in the log; "exactly once read" is realized by the consumer filtering batches whose PID appears in an aborted-transaction entry that began at or before that batch. The LSO guarantees the consumer has the complete abort information for everything it is allowed to read.

Producer-client state machine

TransactionManager mirrors the server with a client-side machine (clients/.../TransactionManager.java:157-195): UNINITIALIZED → INITIALIZING → READY → IN_TRANSACTION → {COMMITTING,ABORTING}_TRANSACTION → READY, plus PREPARED_TRANSACTION (2PC) and the error sinks ABORTABLE_ERROR / FATAL_ERROR. It maintains per-partition sequence counters (sequenceNumber(tp)nextSequence(), :710-713), tracks in-flight batches for retry/reordering, and serializes coordinator interactions through a priority queue (FIND_COORDINATOR < INIT_PRODUCER_ID < ADD_PARTITIONS_OR_OFFSETS < END_TXN < EPOCH_BUMP, :201-213). Recoverable errors transition to ABORTABLE_ERROR so the next operation must be an abort; only an abortable retry of a bumped epoch reuses the machine. Under KIP-360, the client can bump its idempotent epoch and reset sequences to recover from UNKNOWN_PRODUCER_ID/OutOfOrderSequence without becoming fatal (requestIdempotentEpochBumpForPartition, bumpIdempotentEpochAndResetIdIfNeeded, :668-708). Details of batching and the sender loop are in The Producer Client.

Configuration reference

KeyDefaultEffect
transaction.state.log.num.partitions50Partitions of __transaction_state; fixes the coordinator-to-id mapping (partitionFor = abs(hash) % count). Must not change post-deploy.
transaction.state.log.replication.factor3RF of __transaction_state.
transaction.state.log.min.isr2Min ISR for txn-state writes; written with acks=−1.
transaction.state.log.segment.bytes104857600 (100 MiB)Kept small to speed compaction and coordinator load.
transaction.state.log.load.buffer.size5242880 (5 MiB)Read-batch size when loading a txn-state partition into cache (soft limit).
transaction.max.timeout.ms900000 (15 min)Upper bound on a client's requested transaction.timeout.ms; exceed → INVALID_TRANSACTION_TIMEOUT at InitProducerId.
transactional.id.expiration.ms604800000 (7 days)Idle time after which a transactional id is tombstoned; never expires while a txn is ongoing.
transaction.abort.timed.out.transaction.cleanup.interval.ms10000Sweep interval for aborting timed-out ONGOING transactions.
transaction.remove.expired.transaction.cleanup.interval.ms3600000 (1 h)Sweep interval for transactional-id expiration.
transaction.two.phase.commit.enablefalseAllow external-coordinator 2PC (timeout becomes Integer.MAX_VALUE); off → 2PC InitProducerId returns TRANSACTIONAL_ID_AUTHORIZATION_FAILED.
transaction.partition.verification.enabletrueLeader verifies partition-in-txn before appending transactional records (anti-hanging-txn). Broker dynamic.
producer.id.expiration.ms86400000 (1 day)Leader-side idle time before a PID's ProducerStateEntry is dropped; influences UNKNOWN_PRODUCER_ID. Broker dynamic.
producer.id.expiration.check.interval.ms600000 (10 min)Sweep interval for expired PID entries.
transaction.version (feature)TV_2 (LATEST_PRODUCTION)Cluster feature flag: TV_2 enables per-transaction epoch bump (KIP-890); set via storage/features tools.

Sources: TransactionLogConfig.java, TransactionStateManagerConfig.java, TransactionVersion.java:35.

Failure modes, edge cases & recovery

Coordinator failover

When a broker becomes leader for a __transaction_state partition, TransactionCoordinator.onElection first clears any stale marker queues, then loadTransactionsForTxnTopicPartition replays the compacted partition into the cache (TransactionCoordinator.scala:477-487, TransactionStateManager.scala:538-597). Crucially, for any transaction loaded in PREPARE_COMMIT or PREPARE_ABORT, the new coordinator immediately re-drives phase 2, it calls prepareComplete and re-sends the markers (:562-588). This is what makes the prepare record the durable commit point: a crash between prepare and complete is transparently finished by the successor. The partition is removed from loadingPartitions before the markers are sent so the completion append is not blocked by a self-imposed COORDINATOR_LOAD_IN_PROGRESS (:579-582).

Log-append failures

appendTransactionToLog.updateCacheCallback maps storage errors to client-facing coordinator errors: NOT_ENOUGH_REPLICAS/REQUEST_TIMED_OUTCOORDINATOR_NOT_AVAILABLE (retriable); NOT_LEADER_OR_FOLLOWER/KAFKA_STORAGE_ERRORNOT_COORDINATOR (TransactionStateManager.scala:680-697). On error it resets pendingState so the metadata is not stuck mid-transition (:734-767), unless the caller asked to retain it for retry. Completion-record appends that hit COORDINATOR_NOT_AVAILABLE are re-enqueued on txnLogAppendRetryQueue and retried by the sender thread (TransactionMarkerChannelManager.scala:354-383).

Epoch exhaustion & overflow

The coordinator never lets a client use epoch Short.MAX_VALUE (isEpochExhausted: producerEpoch >= Short.MAX_VALUE - 1, TransactionMetadata.java:64-66) so it always retains the ability to fence. When the epoch is about to overflow, the next commit/abort rotates to a pre-allocated nextProducerId and resets the epoch to 0 while keeping lastProducerEpoch = MAX-1 for client-visible continuity (prepareComplete, :265-288; validProducerEpoch overflow handling, :469-487). TV_2 retry detection (retryOnOverflow) recognizes a client replaying an EndTxn that overflowed: prevProducerId == producerId && producerEpoch == MAX-1 && metadata.producerEpoch == 0 (TransactionCoordinator.scala:791-792).

Hanging transactions & late writes

ProducerStateManager.hasLateTransaction flags a transaction whose oldest update is older than maxTransactionTimeoutMs + LATE_TRANSACTION_BUFFER_MS (5 min, ProducerStateManager.java:72, 130-133); this surfaces a metric/health signal so operators can detect a partition pinned by a stuck transaction (which would freeze its LSO and stall read_committed consumers). The verification path (above) is the primary prevention.

UNKNOWN_PRODUCER_ID

If retention or DeleteRecords removes a PID's last batch, the leader drops the ProducerStateEntry and a subsequent append with a non-zero sequence cannot be validated → UNKNOWN_PRODUCER_ID. The producer recovers by bumping its idempotent epoch and resetting sequences (KIP-360). Setting producer.id.expiration.ms ≥ delivery.timeout.ms avoids spurious expiration during retries.

Invariants & guarantees (summary)

  • PID uniqueness: the controller's nextProducerId is strictly monotonic in the metadata log; blocks are never reissued (ProducerIdControlManager.java:111-120).
  • Per-partition EOS-in-order: persisted batches for a fixed (pid, epoch) have contiguous sequence ranges from 0; retries are deduped (ProducerAppendInfo, ProducerStateEntry.findDuplicateBatch).
  • Atomicity: a transaction's records become visible (pass the LSO) iff a COMMIT marker was written to every involved partition; the prepare record fixes the outcome before any marker (handleEndTransaction + failover replay).
  • Zombie fencing: a write/marker with epoch below the leader's last-seen epoch (TV_2: not strictly greater) is rejected (checkProducerEpoch); a marker with an older coordinator epoch is rejected (checkCoordinatorEpoch).
  • read_committed correctness: consumers never read past the LSO and filter every batch whose PID belongs to an aborted transaction beginning at/before it (CompletedFetch).

Interactions with other subsystems

Design rationale & evolution

  • KIP-98 introduced idempotent and transactional producers: PID + epoch + sequence, the __transaction_state log, two-phase commit and transaction markers, the foundation of this chapter.
  • KIP-360 made the idempotent producer resilient: the client can bump its epoch and reset sequences to recover from UNKNOWN_PRODUCER_ID instead of failing fatally.
  • KIP-447 changed sendOffsetsToTransaction to take ConsumerGroupMetadata so the group coordinator fences offset commits by generation, enabling one transactional producer per application instance rather than per input partition.
  • KIP-890 (Transaction V2) bumps the producer epoch on every commit/abort so each transaction is uniquely identified by (pid, epoch). EndTxnResponse (v5+) returns the new pid/epoch; the client need not call InitProducerId except at startup. This closes the duplicate-across-transactions and hanging-transaction holes that required the server-side verification step on legacy versions. Enabled by the transaction.version feature flag (TransactionVersion.java:24-100).
  • KIP-915 made the coordinator records flexible/tagged so future fields can be added without breaking downgrade (note in TransactionLogValue.json:21).
Note

In this build TransactionVersion.LATEST_PRODUCTION = TV_2 and TV_2's bootstrap metadata version is IBP_4_0_IV2 (TransactionVersion.java:31, 35), consistent with the KRaft-only, ZooKeeper-removed 4.x line. The legacy TV_1 endTransactionWithTV1 path is retained for clients that have not negotiated TV_2.

Gotchas & operational notes

  • Partition count is permanent. Because partitionFor(transactionalId) = abs(hash) % transactionTopicPartitionCount (TransactionStateManager.scala:439) determines which coordinator owns an id, changing transaction.state.log.num.partitions after deploy reshuffles ownership and breaks fencing; the loader even throws KafkaException if it detects a change mid-load (:641-646).
  • A stuck transaction freezes its partitions' LSO. Any partition with an undecided transaction holds back the LSO, stalling all read_committed consumers of that partition until the txn commits, aborts, or times out, watch the late-transaction metric and transaction.max.timeout.ms.
  • Markers are written even for empty partitions sets carefully. Partitions whose leaders vanished are dropped from the set so completion can proceed; the marker for a deleted partition is skipped, not blocked (TransactionMarkerChannelManager.scala:432-445).
  • 2PC transactions never time out. With transaction.two.phase.commit.enable=true a transaction's timeout becomes Integer.MAX_VALUE and the abort reaper skips it (TransactionMetadata.isDistributedTwoPhaseCommitTxn, :308-310), the external coordinator is responsible for resolution.
  • Tune producer.id.expiration.ms vs. retries. If a PID's last record is retention-deleted while the producer is between sends, the next send hits UNKNOWN_PRODUCER_ID; aligning expiration with delivery.timeout.ms avoids surprise duplicates.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.