14 · Transactions & Exactly-Once Semantics
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
Kafka builds exactly-once semantics (EOS) in two layers that compose. The lower layer is the idempotent producer: every record batch carries a (producerId, producerEpoch, baseSequence) triple, and the partition leader's ProducerStateManager deduplicates retries and enforces a strictly increasing per-partition sequence, giving exactly-once-in-order delivery to a single partition. The upper layer is transactions (KIP-98): a stable transactional.id is mapped to a producer id with epoch fencing of zombies; a TransactionCoordinator drives a two-phase commit over an internal __transaction_state log and writes transaction markers (control records) into every partition the transaction touched. Consumers reading at read_committed use the Last Stable Offset and a per-segment aborted-transaction index to filter out aborted data. This chapter traces the whole path end to end, grounded in the source.
Role & responsibilities
The transaction/EOS subsystem must deliver three guarantees:
- Idempotence, a producer retrying a batch (e.g. after a network hiccup) must not create a duplicate. This is enforced per
(producerId, epoch)per partition by the leader's sequence-number check and a 5-batch dedup cache. - Atomic multi-partition writes, a set of writes across many partitions (and, for consume-transform-produce pipelines, the consumer offset commit) either all become visible or none do. This is the job of the
TransactionCoordinator+__transaction_statelog + transaction markers. - Zombie fencing, a stale producer instance (one that crashed and was replaced) must be prevented from writing into a transaction owned by its successor. This is achieved with the producer epoch:
InitProducerIdbumps the epoch, and the leader rejects any batch/marker with a lower epoch.
Together these implement consume-transform-produce EOS: a Kafka Streams (see Kafka Streams Architecture) or hand-written application reads from input partitions, produces derived records, and commits its input offsets, all inside one transaction. The offsets are written to __consumer_offsets by the group coordinator but only become visible to other consumers when the transaction commits.
Where it lives in the code
| Concern | Principal class | File |
|---|---|---|
| Coordinator request handling (state machine driver) | TransactionCoordinator | core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala |
Txn metadata cache + __transaction_state log read/write/load | TransactionStateManager | core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala |
| Marker fan-out to partition leaders (sender thread) | TransactionMarkerChannelManager | core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala |
| Per-txn metadata + transition state machine | TransactionMetadata | transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionMetadata.java |
| States enum & valid transitions | TransactionState | transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionState.java |
| Txn log key/value (de)serialization | TransactionLog | transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/TransactionLog.java |
| Broker-side PID block allocation | RPCProducerIdManager | transaction-coordinator/src/main/java/org/apache/kafka/coordinator/transaction/RPCProducerIdManager.java |
| Controller-side PID block generator | ProducerIdControlManager | metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java |
| Idempotence dedup + sequence/epoch validation | ProducerStateManager, ProducerAppendInfo, ProducerStateEntry | storage/src/main/java/org/apache/kafka/storage/internals/log/ |
| Per-segment aborted-txn index (LSO support) | TransactionIndex | storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java |
| Producer client state machine + sequence assignment | TransactionManager | clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java |
| Consumer-side aborted-record filtering | CompletedFetch | clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java |
Core concepts & terminology
- Producer ID (PID)
- A cluster-unique
int64stamped in every batch produced by an idempotent/transactional producer. Allocated in blocks of 1000 (ProducerIdsBlock.PRODUCER_ID_BLOCK_SIZE = 1000,server-common/.../ProducerIdsBlock.java:30). - Producer epoch
- An
int16generation counter for a PID, used to fence zombies. Bumped onInitProducerIdand (under Transaction V2) on every commit/abort. - Sequence number
- A per-partition
int32that starts at 0 and increments by the number of records in each batch; the leader requiresnextSeq == lastSeq + 1. - transactional.id
- A stable, user-supplied string identifying a logical producer across restarts. It maps deterministically to one
__transaction_statepartition (and thus one coordinator). - Transaction marker
- A control record (
COMMITorABORT) the coordinator writes into each data partition to atomically end a transaction. Carries the coordinator epoch for fencing. - Last Stable Offset (LSO)
min(highWatermark, firstUnstableOffset), the highest offset below which there are no in-flight (undecided) transactions.read_committedconsumers may not read past it.- Coordinator epoch
- The leader epoch of the
__transaction_statepartition; rises on every coordinator failover and fences stale marker writes / log appends. - Transaction Version (TV)
- A cluster feature flag (
transaction.version). TV_0/TV_1 are the legacy protocol; TV_2 (KIP-890) bumps the epoch per transaction so each transaction is uniquely identified by(pid, epoch)(server-common/.../TransactionVersion.java:24-31).
The idempotent producer
PID allocation: controller block-allocation
PIDs are allocated by the active KRaft controller in monotonic blocks, never one at a time, so that allocation is cheap and survives controller failover via the metadata log. ProducerIdControlManager.generateNextProducerId takes the next start id, builds a ProducerIdsBlock of size 1000, and emits a ProducerIdsRecord whose nextProducerId field advances past the block; the new value is committed to the metadata log before the block is handed out (metadata/.../ProducerIdControlManager.java:84-101). On replay, the manager asserts strict monotonicity, a replayed nextProducerId must exceed the current block's first id (:111-120), which is the persistent invariant that no PID is ever reused.
On the broker side, RPCProducerIdManager caches the current block and serves generateProducerId() lock-free by currentProducerIdBlock.get().claimNextId(), an atomic getAndIncrement on the block's counter (server-common/.../ProducerIdsBlock.java:50-56). When 90% of the block is consumed (PID_PREFETCH_THRESHOLD = 0.90) it asynchronously requests the next block from the controller via AllocateProducerIds, gated by a single-flight requestInFlight CAS; if the block runs dry before the next arrives it throws a retriable COORDINATOR_LOAD_IN_PROGRESS rather than blocking (transaction-coordinator/.../RPCProducerIdManager.java:80-109).
generateProducerId() deliberately returns COORDINATOR_LOAD_IN_PROGRESS when the block is exhausted, not REQUEST_TIMED_OUT, because older clients treat the latter as fatal when it should be retriable (RPCProducerIdManager.java:97-100).
Sequence validation & deduplication on the leader
The partition leader's per-PID state lives in ProducerStateEntry, which keeps a bounded deque of the last 5 batches (ProducerStateEntry.NUM_BATCHES_TO_RETAIN = 5, storage/.../ProducerStateEntry.java:35). Each entry records producerEpoch, coordinatorEpoch, lastTimestamp, an optional currentTxnFirstOffset, and for each retained batch a BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp) (storage/.../BatchMetadata.java:21).
On a client produce, UnifiedLog.analyzeAndValidateProducerState first checks for an exact duplicate: for each batch with a producer id it asks ProducerStateEntry.findDuplicateBatch, which (for the matching epoch) looks for a cached batch with the identical [baseSequence, lastSequence] range (storage/.../ProducerStateEntry.java:128-137). If found, the append short-circuits, no data is written, and the leader returns the original offset/timestamp of the already-stored batch to the client, so a retry looks like a success (storage/.../UnifiedLog.java:1247-1254).
If it is not a duplicate, ProducerAppendInfo.checkSequence enforces ordering (storage/.../ProducerAppendInfo.java:156-194):
- Same epoch: require
inSequence(currentLastSeq, appendFirstSeq), i.e.appendFirstSeq == currentLastSeq + 1(with wraparound atInteger.MAX_VALUE → 0). A gap throwsOutOfOrderSequenceException(:196-198,:188-192). - New epoch with non-zero first sequence and a known prior epoch throws
OutOfOrderSequenceException, a fresh epoch must restart at sequence 0 (:169-176). - If all prior records for the PID were deleted (epoch reset to
NO_PRODUCER_EPOCH) the leader accepts any sequence (:188), which is why a long-idle producer can hitUNKNOWN_PRODUCER_IDand recover.
For a fixed (producerId, epoch) on a single partition, the sequence of persisted batches has strictly contiguous sequence ranges starting at 0. Retries are absorbed by the 5-batch dedup window; gaps are rejected. This is exactly-once-in-order per partition.
The dedup window is only 5 batches deep. If a producer has more than 5 in-flight batches to a partition and a low-sequence batch needs to retry after higher ones committed, the low batch can fall outside the window and loop on OutOfOrderSequenceException. This is why the idempotent producer caps max.in.flight.requests.per.connection at 5 (see The Producer Client).
Epoch fencing at the leader
ProducerAppendInfo.checkProducerEpoch rejects stale writers. The comparison depends on the transaction version of the incoming marker: for TV_2 a marker is valid only if markerEpoch > currentEpoch (strictly greater, because TV_2 bumps the epoch before writing the marker); for legacy TV_0/TV_1 it accepts markerEpoch >= currentEpoch (storage/.../ProducerAppendInfo.java:117-153). A client data batch whose epoch is below the leader's last-seen epoch throws InvalidProducerEpochException (a recoverable error, the client aborts and retries), not the older fatal ProducerFenced (:145-152).
Transactions: data structures
In-memory: TransactionMetadata
Each transactional.id the coordinator owns has one TransactionMetadata, guarded by its own ReentrantLock exposed via inLock(...) (transaction-coordinator/.../TransactionMetadata.java:62, 111-118). Key fields:
| Field | Type | Meaning |
|---|---|---|
producerId | long | Current PID bound to this transactional id. |
prevProducerId | long | PID of the last committed transaction (for retry-after-overflow detection). |
nextProducerId | long | Pre-allocated PID to switch to when the epoch overflows. |
producerEpoch | short | Current epoch; fences zombies. |
lastProducerEpoch | short | Previous epoch (retry detection). |
txnTimeoutMs | int | Per-txn timeout; Integer.MAX_VALUE marks a distributed 2PC txn (never auto-aborted). |
state | TransactionState | Current state (see machine below). |
pendingState | Optional<TransactionState> | The target state of an in-flight transition; blocks concurrent transitions. |
topicPartitions | HashSet<TopicPartition> | Partitions registered in the current transaction. |
txnStartTimestamp / txnLastUpdateTimestamp | volatile long | Start time (for timeout) and last-update time (for transactional-id expiration). Volatile so the background timeout scan reads them lock-free. |
hasFailedEpochFence | boolean | Set if a fence-epoch log write failed; prevents bumping the epoch twice (TV_0/TV_1). |
clientTransactionVersion | TransactionVersion | TV the client used when the state was transitioned. |
A prospective transition is captured in an immutable TxnTransitMetadata record (transaction-coordinator/.../TxnTransitMetadata.java:27-41). The two-step protocol is prepareXxx() → set pendingState and return a TxnTransitMetadata, then after the log write succeeds, completeTransitionTo(transit) validates and atomically overwrites the live fields (TransactionMetadata.java:318-446). completeTransitionTo re-checks state validity, epoch progression, timestamp monotonicity and that the old partition set is a subset of the new, corrupt transitions throw IllegalStateException marked FATAL.
Persistent: the __transaction_state log
Transaction metadata is durably stored in the internal compacted topic __transaction_state. Its topic config, min.insync.replicas=2 (from transaction.state.log.min.isr), uncompressed, cleanup.policy=compact, unclean.leader.election=false, plus segment size, is set in TransactionCoordinator.transactionStateTopicConfigs (TransactionCoordinator.scala:1014-1022); the partition count (50 by default) and replication factor (3) are not set there but come from transaction.state.log.num.partitions / .replication.factor (TransactionLogConfig.java:32, 40) and are applied when the internal topic is created. The record key is the transactional id, prefixed with a coordinator record type id; the value is a versioned TransactionLogValue (transaction-coordinator/.../TransactionLog.java:49-95).
transaction-coordinator/src/main/resources/common/message/TransactionLogValue.json). Value version is 1 when TV ≥ 1, else 0; v1 fields are tagged for backward compatibility.int64 = 8, int16 = 2, int8 = 1)
bold = field name · small text = type · tag · meaning
tag N = tagged field, present only in v1 (flexible), may be absent
nullable = a null whole value is a tombstone (id expired / removed)
Because the topic is compacted, only the latest record per transactional id is retained, which is sufficient to rebuild the coordinator's in-memory state after failover. TransactionLog.read decodes a record into a TxnRecord (id + metadata), a TxnTombstone, or an unknown-version marker that is logged and skipped (TransactionLog.java:118-164).
The transaction state machine
TransactionState defines 8 states and a VALID_PREVIOUS_STATES map that prepareTransitionTo enforces (transaction-coordinator/.../TransactionState.java:31-103):
· ONGOING → PREPARE_ABORT, EndTxn(ABORT)
· PREPARE_ABORT → COMPLETE_ABORT, all markers acked
· EMPTY → PREPARE_ABORT, TV_2 direct abort (client unsure of server state, no partitions added)
· COMPLETE_COMMIT / COMPLETE_ABORT → ONGOING, a new transaction reuses the metadata
· ONGOING → PREPARE_EPOCH_FENCE, InitProducerId arrives mid-txn, or the timeout reaper fires
· PREPARE_EPOCH_FENCE → PREPARE_ABORT, internal step: bump epoch, then abort the dangling txn
· COMPLETE_COMMIT / COMPLETE_ABORT → DEAD, transactional.id expiry
PREPARE_* are persisted before markers are sent; COMPLETE_* are persisted after all markers are acknowledged. PREPARE_EPOCH_FENCE is an internal step reachable only from ONGOING that always proceeds to PREPARE_ABORT. DEAD is transient during transactional.id expiry, never persisted. Source: transaction-coordinator/.../TransactionState.java:31-103.Notable edges from the code:
EMPTY,COMPLETE_COMMIT,COMPLETE_ABORTcan all transition toONGOING(a new transaction reuses the metadata) and, under TV_2, directly toPREPARE_ABORT, because a client uncertain of server state may sendEndTxn(abort)with no partitions added (TransactionState.java:94-103; logic inTransactionCoordinator.endTransaction,:894-899).PREPARE_EPOCH_FENCEis reachable only fromONGOINGand is used when a newInitProducerIdarrives while a transaction is live, or when the background timeout reaper fires: the coordinator bumps the epoch, aborts the dangling transaction, then returnsCONCURRENT_TRANSACTIONSto force the client to retry (TransactionCoordinator.scala:281-287, 190-207).DEADis a transient state during transactional-id expiration; on success the metadata is removed from the cache, never persisted asDEAD(TransactionMetadata.java:421-426).
The protocol flow, step by step
handleInitProducerId, handleAddPartitionsToTransaction, the produce path, and handleEndTransaction. The PREPARE_COMMIT append is the durable commit point, the coordinator responds to the client immediately after it and finishes the markers asynchronously.__transaction_state log
partition leaders
request / state-log append
response · durable ack
arrow points from → to over time (top → bottom)
1 · InitProducerId, bind, fence, abort dangling
TransactionCoordinator.handleInitProducerId (TransactionCoordinator.scala:117-226) routes on the transactional id:
- Null id → a pure idempotent (non-transactional) producer: blindly allocate a fresh PID with epoch 0 and return (
:125-132). - Real id → look up or create the metadata (a new metadata gets a freshly generated PID, epoch
-1, stateEMPTY,:154-166), thenprepareInitProducerIdTransitdecides the transition under the metadata lock.
prepareInitProducerIdTransit (:228-296) is where fencing and recovery live. If the state is EMPTY/COMPLETE_* it bumps the epoch via prepareIncrementProducerEpoch; if the epoch is exhausted it instead rotates to a new PID via prepareProducerIdRotation (:264-271). If a transaction is ONGOING, it returns prepareFenceProducerEpoch(), the coordinator will abort that transaction first and reply CONCURRENT_TRANSACTIONS so the client retries (:281-287, 190-207). An expectedProducerIdAndEpoch that does not match (and is not a valid epoch-exhaustion retry) yields PRODUCER_FENCED (:252-253, isValidProducerId at :234-246).
Epoch bumping on InitProducerId is the zombie fence: when a crashed producer restarts and calls initTransactions(), the epoch jumps, so any in-flight write from the dead instance (carrying the old epoch) is now rejected by every partition leader. The coordinator also rolls back whatever transaction the zombie left open.
2 · AddPartitionsToTxn, register partitions
Before producing to a partition, the client tells the coordinator about it. handleAddPartitionsToTransaction validates producerId/producerEpoch against the metadata (mismatches → INVALID_PRODUCER_ID_MAPPING / PRODUCER_FENCED), short-circuits with NONE if the partitions are already present, otherwise calls prepareAddPartitions and appends an ONGOING record (TransactionCoordinator.scala:416-468). The first AddPartitions sets txnStartTimestamp, which arms the timeout clock (TransactionMetadata.java:214-233).
There is also a server-internal verification path (handleVerifyPartitionsInTransaction, :361-414): when a partition leader receives a transactional produce it can ask the coordinator "is this partition really in this producer's open transaction?". This closes the hanging transaction hole where a delayed produce could land after a marker. The leader's side uses a VerificationGuard, a unique token created in maybeStartTransactionVerification and checked in analyzeAndValidateProducerState, to defeat the ABA race between a verified response and an abort marker (storage/.../UnifiedLog.java:1410-1440, see the long comment at :1414-1426). Verification is on by default (transaction.partition.verification.enable=true, TransactionLogConfig.java:52-54).
3 · Produce, transactional data batches
Produced batches carry the PID, epoch, sequence and the transactional flag. The leader runs the idempotence checks above and the verification check; the first transactional batch on a partition sets currentTxnFirstOffset in the producer state and starts a TxnMetadata in the ProducerStateManager.ongoingTxns map (storage/.../ProducerAppendInfo.java:234-243, ProducerStateManager.java:398). A non-transactional write while a transaction is open throws InvalidTxnStateException; the converse, a transactional write with no open transaction, is not an error, it is exactly how a transaction begins (sets currentTxnFirstOffset and adds a TxnMetadata) (ProducerAppendInfo.java:235-243).
4 · Consume-transform-produce: binding offsets (KIP-447)
For EOS pipelines the application commits its input consumer offsets inside the transaction. The producer client method sendOffsetsToTransaction(offsets, groupMetadata) (clients/.../TransactionManager.java:432-463) takes the full ConsumerGroupMetadata (group id, generation, member id), this is the KIP-447 improvement that lets the group coordinator fence offset commits by generation, so a single transactional producer can serve a partition without one-producer-per-input-partition. Under TV_1 it sends AddOffsetsToTxn (which registers the __consumer_offsets partition with the txn coordinator, exactly like AddPartitionsToTxn) then TxnOffsetCommit; under TV_2 it skips AddOffsetsToTxn and sends TxnOffsetCommit directly (:443-459). The committed offsets are written to __consumer_offsets by the group coordinator but stay invisible (LSO-gated) until the transaction's COMMIT marker lands on that partition.
5 · EndTxn, two-phase commit
handleEndTransaction → endTransaction implements the 2PC. There are two implementations selected by the client's TV: endTransactionWithTV1 (TransactionCoordinator.scala:543-710) and the TV_2 endTransaction (:758-1007). The shape is the same:
- Phase 1 (prepare). Under the metadata lock, validate pid/epoch and current state, compute the target (
PREPARE_COMMITorPREPARE_ABORT) viaprepareAbortOrCommit, and append that record to__transaction_state(:854-860, 1003-1004). Under TV_2,prepareAbortOrCommitbumps the epoch and, if the epoch is exhausted, rotates tonextProducerId(:818-841,TransactionMetadata.java:235-263). - Respond early. Once the prepare record is durable, the coordinator replies to the client immediately (returning the post-bump pid/epoch under TV_2,
:975) and continues asynchronously, the client does not wait for markers. - Phase 2 (commit).
addTxnMarkersToSendhands the transaction to theTransactionMarkerChannelManager, which writesCOMMIT/ABORTmarkers to every involved partition leader. When all markers are acknowledged, the coordinator callsprepareCompleteand appendsCOMPLETE_COMMIT/COMPLETE_ABORT(TransactionMarkerChannelManager.scala:276-315,TransactionMetadata.java:265-288).
The prepare record is the commit point of the 2PC: once PREPARE_COMMIT is durable, the outcome is decided even if the coordinator crashes, a new coordinator that loads PREPARE_COMMIT will re-drive the markers and write COMPLETE_COMMIT (see recovery). Responding to the client before markers finish keeps commit latency low while preserving atomicity, because the markers are guaranteed to be (re)sent. The two-phase, marker-based design is KIP-98; the per-transaction epoch bump that makes each transaction uniquely identifiable by (pid, epoch) and closes hanging-transaction / duplicate-across-transactions holes is KIP-890 (TV_2).
The transaction marker (control record)
clients/.../record/internal/EndTransactionMarker.java:34-110, ControlRecordType.java:43-83. Control records are never returned to consumers and are not compacted away.int16 = 2, int32 = 4)
bold = field name · small text = type · @offset · meaning
key.* = ControlRecordType (4 B) · value.* = EndTxnMarker
key.type: 0 = ABORT, 1 = COMMIT
On the receiving partition leader, ProducerAppendInfo.appendEndTxnMarker validates the marker epoch (TV_2: strictly greater) and the coordinator epoch (a marker with an older coordinator epoch is a zombie and throws TransactionCoordinatorFencedException, ProducerAppendInfo.java:246-257). If the producer had an open transaction (currentTxnFirstOffset present), it emits a CompletedTxn recording (producerId, firstOffset, lastOffset=markerOffset, isAborted) (:259-290).
Marker fan-out: TransactionMarkerChannelManager
The channel manager is an InterBrokerSendThread named TxnMarkerSenderThread-<brokerId> (TransactionMarkerChannelManager.scala:169) with its own dedicated NetworkClient using the inter-broker listener. It owns several concurrent structures:
| Structure | Type | Purpose |
|---|---|---|
markersQueuePerBroker | ConcurrentHashMap[Int, TxnMarkerQueue] | Markers grouped by destination broker; each TxnMarkerQueue further partitions by txn-topic partition for fast emigration cleanup. |
markersQueueForUnknownBroker | TxnMarkerQueue | Holding queue for partitions whose leader is currently unknown; retried once the leader is discovered. |
transactionsWithPendingMarkers | ConcurrentHashMap[String, PendingCompleteTxn] | Tracks which transactional ids still have markers outstanding. |
txnLogAppendRetryQueue | LinkedBlockingQueue[PendingCompleteTxn] | Completion records (COMPLETE_*) whose append failed transiently and must be retried. |
addTxnMarkersToSend records the pending completion, then addTxnMarkersToBrokerQueue groups the transaction's partitions by their leader (via metadataCache.getPartitionLeaderEndpoint) and enqueues a TxnMarkerEntry per destination (:317-458). The send thread's generateRequests drains per-broker queues into WriteTxnMarkersRequests, attaching a TransactionMarkerRequestCompletionHandler (:238-274). As each partition leader acknowledges, the handler removes that partition from the metadata's topicPartitions; when the set empties, maybeWriteTxnCompletion fires writeTxnCompletion → tryAppendToLog to persist the COMPLETE_* record (:339-383). A partition whose leader has been deleted is simply dropped from the set so completion can proceed (:421-445).
On the receiving broker, KafkaApis.handleWriteTxnMarkersRequest appends each marker to its partitions with an AtomicInteger numAppends counter, sending the response only after the last partition's append completes (core/src/main/scala/kafka/server/KafkaApis.scala:1729-1871).
Concurrency & threading model
The coordinator's locking discipline is documented at the top of TransactionStateManager (:60-68) and is strict:
stateLock(aReentrantReadWriteLock) guards the metadata cache and the loading/unloading partition sets. Loading and emigration take the write lock; request handling takes the read lock.- Each
TransactionMetadatahas its ownReentrantLockfor field mutation. - Ordering rule: never acquire
stateLock.readLockwhile holding atxnMetadatalock; never acquire atxnMetadatalock while holdingstateLock.writeLock; never callReplicaManager.appendRecordswhile holding atxnMetadatalock.
| Thread | Responsibility |
|---|---|
| Request handler (KafkaApis) threads | Run handleInitProducerId/AddPartitions/EndTxn under the metadata lock; issue the __transaction_state append. |
TxnMarkerSenderThread-<id> | Single sender thread: drains marker queues, sends WriteTxnMarkers, retries log-append failures, writes COMPLETE_*. |
transaction-abort scheduler task | Every transaction.abort.timed.out.transaction.cleanup.interval.ms (default 10 s) scans for ONGOING transactions past their timeout and aborts them (TransactionCoordinator.scala:1047-1083, 1091-1095). |
transactionalId-expiration scheduler task | Every transaction.remove.expired.transaction.cleanup.interval.ms (default 1 h) writes tombstones for transactional ids idle past transactional.id.expiration.ms (default 7 d) (TransactionStateManager.scala:300-307, 151-243). |
load-txns-for-partition-* scheduler task | One-shot task per onElection to read a __transaction_state partition into the cache. |
The append path holds the read lock across appendRecords deliberately (:772-813): otherwise an emigration+immigration between the coordinator-epoch check and the append could let a stale-epoch record replicate, corrupting the log. The post-append updateCacheCallback re-verifies the coordinator epoch before applying completeTransitionTo, mapping mismatches to NOT_COORDINATOR (:666-770).
The background timeout scan reads txnStartTimestamp/state without the metadata lock because those fields are volatile and a stale read is harmless, the actual abort re-acquires the lock and re-validates before transitioning (TransactionStateManager.timedOutTransactions, :124-149). Distributed 2PC transactions (txnTimeoutMs == Integer.MAX_VALUE) are explicitly excluded from timeout (:137-141).
The consumer side: read_committed, LSO & the aborted index
Computing the LSO on the leader
The leader tracks firstUnstableOffsetMetadata, the earliest offset belonging to an undecided or not-yet-replicated transaction. ProducerStateManager.firstUnstableOffset returns the min of the first ongoingTxns entry (undecided) and the first unreplicatedTxns entry (decided but its marker is above the HW) (ProducerStateManager.java:246-258). UnifiedLog.lastStableOffset is then min(highWatermark, firstUnstableOffset) (UnifiedLog.java:678-686). When a transaction completes, completeTxn moves it from ongoingTxns to unreplicatedTxns; it is only dropped (and the LSO allowed past it) once the high watermark advances beyond its marker, via onHighWatermarkUpdated (ProducerStateManager.java:545-554, 264-266).
A read_committed consumer never observes an offset ≥ LSO, so it never sees a record whose transaction outcome is still undecided. Aborted records below the LSO are present in the log but filtered client-side.
The aborted-transaction index
When a transaction aborts, the segment's TransactionIndex gets an AbortedTxn entry. The on-disk record is fixed-size (no flexible fields), 4 × int64 = 32 bytes plus a 2-byte version prefix:
clients/.../message/AbortedTxn.json, version 0. TransactionIndex stores, per aborted transaction, the PID and offset span plus the LSO at abort time. Entries are appended in increasing LastOffset order (TransactionIndex.java:83-93).int16 = 2, int64 = 8)
bold = field name · small text = type · @offset
fixed = fixed-size record (no flexible / tagged fields): 34 B total
On a read_committed fetch the leader returns the relevant AbortedTransaction list (gathered by TransactionIndex.collectAbortedTxns) alongside the records, and the fetch is capped at the LSO (UnifiedLog.read passes isolation == TXN_COMMITTED, UnifiedLog.java:1659).
Client-side filtering
CompletedFetch.nextFetchedRecord performs the actual filtering (clients/.../CompletedFetch.java:187-244). It keeps a priority queue of aborted transactions ordered by first offset and a live abortedProducerIds set. For each batch with a producer id:
consumeAbortedTransactionsUpTo(batch.lastOffset())moves every aborted txn whose first offset ≤ the batch into the active set (:356-364).- If the batch is an abort marker, the producer id is removed from the active set (the abort is now consumed).
- If the batch's producer id is in the active set, the whole batch is skipped and
nextFetchOffsetadvances past it (:215-223). - Control batches (the markers themselves) are never surfaced to the application (
:234-240).
Aborted data physically remains in the log; "exactly once read" is realized by the consumer filtering batches whose PID appears in an aborted-transaction entry that began at or before that batch. The LSO guarantees the consumer has the complete abort information for everything it is allowed to read.
Producer-client state machine
TransactionManager mirrors the server with a client-side machine (clients/.../TransactionManager.java:157-195): UNINITIALIZED → INITIALIZING → READY → IN_TRANSACTION → {COMMITTING,ABORTING}_TRANSACTION → READY, plus PREPARED_TRANSACTION (2PC) and the error sinks ABORTABLE_ERROR / FATAL_ERROR. It maintains per-partition sequence counters (sequenceNumber(tp) → nextSequence(), :710-713), tracks in-flight batches for retry/reordering, and serializes coordinator interactions through a priority queue (FIND_COORDINATOR < INIT_PRODUCER_ID < ADD_PARTITIONS_OR_OFFSETS < END_TXN < EPOCH_BUMP, :201-213). Recoverable errors transition to ABORTABLE_ERROR so the next operation must be an abort; only an abortable retry of a bumped epoch reuses the machine. Under KIP-360, the client can bump its idempotent epoch and reset sequences to recover from UNKNOWN_PRODUCER_ID/OutOfOrderSequence without becoming fatal (requestIdempotentEpochBumpForPartition, bumpIdempotentEpochAndResetIdIfNeeded, :668-708). Details of batching and the sender loop are in The Producer Client.
Configuration reference
| Key | Default | Effect |
|---|---|---|
transaction.state.log.num.partitions | 50 | Partitions of __transaction_state; fixes the coordinator-to-id mapping (partitionFor = abs(hash) % count). Must not change post-deploy. |
transaction.state.log.replication.factor | 3 | RF of __transaction_state. |
transaction.state.log.min.isr | 2 | Min ISR for txn-state writes; written with acks=−1. |
transaction.state.log.segment.bytes | 104857600 (100 MiB) | Kept small to speed compaction and coordinator load. |
transaction.state.log.load.buffer.size | 5242880 (5 MiB) | Read-batch size when loading a txn-state partition into cache (soft limit). |
transaction.max.timeout.ms | 900000 (15 min) | Upper bound on a client's requested transaction.timeout.ms; exceed → INVALID_TRANSACTION_TIMEOUT at InitProducerId. |
transactional.id.expiration.ms | 604800000 (7 days) | Idle time after which a transactional id is tombstoned; never expires while a txn is ongoing. |
transaction.abort.timed.out.transaction.cleanup.interval.ms | 10000 | Sweep interval for aborting timed-out ONGOING transactions. |
transaction.remove.expired.transaction.cleanup.interval.ms | 3600000 (1 h) | Sweep interval for transactional-id expiration. |
transaction.two.phase.commit.enable | false | Allow external-coordinator 2PC (timeout becomes Integer.MAX_VALUE); off → 2PC InitProducerId returns TRANSACTIONAL_ID_AUTHORIZATION_FAILED. |
transaction.partition.verification.enable | true | Leader verifies partition-in-txn before appending transactional records (anti-hanging-txn). Broker dynamic. |
producer.id.expiration.ms | 86400000 (1 day) | Leader-side idle time before a PID's ProducerStateEntry is dropped; influences UNKNOWN_PRODUCER_ID. Broker dynamic. |
producer.id.expiration.check.interval.ms | 600000 (10 min) | Sweep interval for expired PID entries. |
transaction.version (feature) | TV_2 (LATEST_PRODUCTION) | Cluster feature flag: TV_2 enables per-transaction epoch bump (KIP-890); set via storage/features tools. |
Sources: TransactionLogConfig.java, TransactionStateManagerConfig.java, TransactionVersion.java:35.
Failure modes, edge cases & recovery
Coordinator failover
When a broker becomes leader for a __transaction_state partition, TransactionCoordinator.onElection first clears any stale marker queues, then loadTransactionsForTxnTopicPartition replays the compacted partition into the cache (TransactionCoordinator.scala:477-487, TransactionStateManager.scala:538-597). Crucially, for any transaction loaded in PREPARE_COMMIT or PREPARE_ABORT, the new coordinator immediately re-drives phase 2, it calls prepareComplete and re-sends the markers (:562-588). This is what makes the prepare record the durable commit point: a crash between prepare and complete is transparently finished by the successor. The partition is removed from loadingPartitions before the markers are sent so the completion append is not blocked by a self-imposed COORDINATOR_LOAD_IN_PROGRESS (:579-582).
Log-append failures
appendTransactionToLog.updateCacheCallback maps storage errors to client-facing coordinator errors: NOT_ENOUGH_REPLICAS/REQUEST_TIMED_OUT → COORDINATOR_NOT_AVAILABLE (retriable); NOT_LEADER_OR_FOLLOWER/KAFKA_STORAGE_ERROR → NOT_COORDINATOR (TransactionStateManager.scala:680-697). On error it resets pendingState so the metadata is not stuck mid-transition (:734-767), unless the caller asked to retain it for retry. Completion-record appends that hit COORDINATOR_NOT_AVAILABLE are re-enqueued on txnLogAppendRetryQueue and retried by the sender thread (TransactionMarkerChannelManager.scala:354-383).
Epoch exhaustion & overflow
The coordinator never lets a client use epoch Short.MAX_VALUE (isEpochExhausted: producerEpoch >= Short.MAX_VALUE - 1, TransactionMetadata.java:64-66) so it always retains the ability to fence. When the epoch is about to overflow, the next commit/abort rotates to a pre-allocated nextProducerId and resets the epoch to 0 while keeping lastProducerEpoch = MAX-1 for client-visible continuity (prepareComplete, :265-288; validProducerEpoch overflow handling, :469-487). TV_2 retry detection (retryOnOverflow) recognizes a client replaying an EndTxn that overflowed: prevProducerId == producerId && producerEpoch == MAX-1 && metadata.producerEpoch == 0 (TransactionCoordinator.scala:791-792).
Hanging transactions & late writes
ProducerStateManager.hasLateTransaction flags a transaction whose oldest update is older than maxTransactionTimeoutMs + LATE_TRANSACTION_BUFFER_MS (5 min, ProducerStateManager.java:72, 130-133); this surfaces a metric/health signal so operators can detect a partition pinned by a stuck transaction (which would freeze its LSO and stall read_committed consumers). The verification path (above) is the primary prevention.
UNKNOWN_PRODUCER_ID
If retention or DeleteRecords removes a PID's last batch, the leader drops the ProducerStateEntry and a subsequent append with a non-zero sequence cannot be validated → UNKNOWN_PRODUCER_ID. The producer recovers by bumping its idempotent epoch and resetting sequences (KIP-360). Setting producer.id.expiration.ms ≥ delivery.timeout.ms avoids spurious expiration during retries.
Invariants & guarantees (summary)
- PID uniqueness: the controller's
nextProducerIdis strictly monotonic in the metadata log; blocks are never reissued (ProducerIdControlManager.java:111-120). - Per-partition EOS-in-order: persisted batches for a fixed
(pid, epoch)have contiguous sequence ranges from 0; retries are deduped (ProducerAppendInfo,ProducerStateEntry.findDuplicateBatch). - Atomicity: a transaction's records become visible (pass the LSO) iff a
COMMITmarker was written to every involved partition; the prepare record fixes the outcome before any marker (handleEndTransaction+ failover replay). - Zombie fencing: a write/marker with epoch below the leader's last-seen epoch (TV_2: not strictly greater) is rejected (
checkProducerEpoch); a marker with an older coordinator epoch is rejected (checkCoordinatorEpoch). - read_committed correctness: consumers never read past the LSO and filter every batch whose PID belongs to an aborted transaction beginning at/before it (
CompletedFetch).
Interactions with other subsystems
- Record Format & Batches, the PID/epoch/baseSequence and transactional/control flags live in the v2 batch header; markers are control batches.
- The Log Storage Engine,
ProducerStateManager, producer-state snapshots, the transaction index and LSO are part ofUnifiedLog. - Replication, ISR & High Watermark, the LSO can never exceed the HW; markers replicate like any record before influencing visibility.
- The Fetch Path & Replica Fetchers,
read_committedfetches are capped at the LSO and carry the aborted-transaction list. - The KRaft Controller, allocates PID blocks via
AllocateProducerIds/ProducerIdsRecord. - Group Coordination,
TxnOffsetCommitwrites transactional offsets to__consumer_offsets; KIP-447 binds them to the consumer group generation. - The Producer Client / The Consumer Client, the client-side state machines that drive and observe transactions.
- Share Groups, share-group acknowledgements reuse the same PID/epoch machinery (KIP-932) for their internal state.
Design rationale & evolution
- KIP-98 introduced idempotent and transactional producers: PID + epoch + sequence, the
__transaction_statelog, two-phase commit and transaction markers, the foundation of this chapter. - KIP-360 made the idempotent producer resilient: the client can bump its epoch and reset sequences to recover from
UNKNOWN_PRODUCER_IDinstead of failing fatally. - KIP-447 changed
sendOffsetsToTransactionto takeConsumerGroupMetadataso the group coordinator fences offset commits by generation, enabling one transactional producer per application instance rather than per input partition. - KIP-890 (Transaction V2) bumps the producer epoch on every commit/abort so each transaction is uniquely identified by
(pid, epoch).EndTxnResponse(v5+) returns the new pid/epoch; the client need not callInitProducerIdexcept at startup. This closes the duplicate-across-transactions and hanging-transaction holes that required the server-side verification step on legacy versions. Enabled by thetransaction.versionfeature flag (TransactionVersion.java:24-100). - KIP-915 made the coordinator records flexible/tagged so future fields can be added without breaking downgrade (note in
TransactionLogValue.json:21).
In this build TransactionVersion.LATEST_PRODUCTION = TV_2 and TV_2's bootstrap metadata version is IBP_4_0_IV2 (TransactionVersion.java:31, 35), consistent with the KRaft-only, ZooKeeper-removed 4.x line. The legacy TV_1 endTransactionWithTV1 path is retained for clients that have not negotiated TV_2.
Gotchas & operational notes
- Partition count is permanent. Because
partitionFor(transactionalId) = abs(hash) % transactionTopicPartitionCount(TransactionStateManager.scala:439) determines which coordinator owns an id, changingtransaction.state.log.num.partitionsafter deploy reshuffles ownership and breaks fencing; the loader even throwsKafkaExceptionif it detects a change mid-load (:641-646). - A stuck transaction freezes its partitions' LSO. Any partition with an undecided transaction holds back the LSO, stalling all
read_committedconsumers of that partition until the txn commits, aborts, or times out, watch the late-transaction metric andtransaction.max.timeout.ms. - Markers are written even for empty partitions sets carefully. Partitions whose leaders vanished are dropped from the set so completion can proceed; the marker for a deleted partition is skipped, not blocked (
TransactionMarkerChannelManager.scala:432-445). - 2PC transactions never time out. With
transaction.two.phase.commit.enable=truea transaction's timeout becomesInteger.MAX_VALUEand the abort reaper skips it (TransactionMetadata.isDistributedTwoPhaseCommitTxn,:308-310), the external coordinator is responsible for resolution. - Tune
producer.id.expiration.msvs. retries. If a PID's last record is retention-deleted while the producer is between sends, the next send hitsUNKNOWN_PRODUCER_ID; aligning expiration withdelivery.timeout.msavoids surprise duplicates.