krivaltsevich.com Kafka Internals4.4

03 · The Log Storage Engine

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

Every Kafka partition is an append-only log on disk: an ordered set of immutable segments, each a paired .log data file plus three sparse memory-mapped indexes (.index, .timeindex, .txnindex), fronted by per-partition producer-idempotency state and a leader-epoch checkpoint. This chapter dissects the layered abstraction UnifiedLog → LocalLog → LogSegments → LogSegment, the byte layouts of the index and snapshot files, the exact append and read-by-offset algorithms, segment rolling, crash recovery in LogLoader, and the concurrency model that lets a single writer coexist with lock-free readers. It is the physical substrate on which replication, the fetch path, retention/compaction, and transactions all stand.

Role & responsibilities

The log storage engine is the broker subsystem that turns a stream of record batches into durable, offset-addressable bytes and back again. Its concrete duties:

  • Durable ordered storage. Assign a monotonically increasing 64-bit offset to every record and persist batches in offset order in pre-allocated segment files.
  • Offset and timestamp lookup. Translate a logical offset (or a timestamp) into a physical file position for reads, using sparse indexes that fit in page cache.
  • Boundary bookkeeping. Track the log start offset (LSO), recovery point, log end offset (LEO) and high watermark (HW), and expose them to replication and consumers.
  • Idempotency & transaction metadata. Maintain per-producer sequence state (dedup), snapshot it to .snapshot files, and record aborted transactions for READ_COMMITTED reads.
  • Leader-epoch lineage. Persist the (epoch → start offset) map that drives truncation-safe follower replication.
  • Crash recovery. On startup, validate segments, rebuild indexes, truncate corrupt tails, and reconstruct producer state.

Where it lives in the code

The engine was migrated from Scala (kafka.log) into a self-contained Java module, org.apache.kafka.storage.internals.log, so it can be reused by KRaft's metadata log and tiered storage.

Class / filePathResponsibility
UnifiedLogstorage/.../internals/log/UnifiedLog.javaPublic façade for a partition log; offsets/HW/LSO, append & read entry points, producer-state & epoch-cache orchestration.
LocalLogstorage/.../internals/log/LocalLog.javaThe local on-disk log: holds LogSegments, the LEO (nextOffsetMetadata) and recovery point; does append-to-active-segment, multi-segment read, roll, truncate, flush.
LogSegmentsstorage/.../internals/log/LogSegments.javaThread-safe ConcurrentSkipListMap<baseOffset, LogSegment> with floor/higher/last navigation.
LogSegmentstorage/.../internals/log/LogSegment.javaOne segment: a FileRecords log + lazy offset/time indexes + txn index; append, translate-offset, read, recover, roll-out.
AbstractIndexstorage/.../internals/log/AbstractIndex.javammap'd sparse index base class; binary search with a "warm section" optimisation, resize/truncate/flush.
OffsetIndex / TimeIndexstorage/.../internals/log/OffsetIndex.java, TimeIndex.java8-byte (relOffset,position) and 12-byte (timestamp,relOffset) entries.
TransactionIndexstorage/.../internals/log/TransactionIndex.javaAppend-only list of aborted-txn records (no mmap).
LazyIndexstorage/.../internals/log/LazyIndex.javaDefers mmap of an index until first access (startup-time optimisation).
LogLoaderstorage/.../internals/log/LogLoader.javaLoads & recovers segments from disk; returns LSO, recovery point, LEO.
LogValidatorstorage/.../internals/log/LogValidator.javaValidates batches, assigns offsets/timestamps, stamps leader epoch, (de)compresses.
ProducerStateManagerstorage/.../internals/log/ProducerStateManager.javaPer-producer dedup state, snapshots, LSO/first-unstable-offset, aborted-txn tracking.
LeaderEpochFileCachestorage/.../internals/epoch/LeaderEpochFileCache.java(epoch → start offset) map + leader-epoch-checkpoint file.
LogFileUtilsstorage/.../internals/log/LogFileUtils.javaSegment file naming, suffix constants, offset-from-filename.

Core concepts & terminology

Offset
Monotonic 64-bit position of a record within a partition. Assigned by the leader at append time.
Base offset
The lowest possible offset a segment can hold; also its filename prefix and the origin for the index's relative offsets.
Active segment
The last (highest-base-offset) segment, the only one currently taking appends. LogSegments.activeSegment() = lastSegment().get() (LogSegments.java:322).
LEO, log end offset
Offset of the next record to be written; LocalLog.nextOffsetMetadata.messageOffset (LocalLog.java:296).
HW, high watermark
Highest offset replicated to all in-sync replicas; consumer-visible upper bound. Stored as UnifiedLog.highWatermarkMetadata (UnifiedLog.java:160). See Replication.
LSO, log start offset
Lowest retained offset; advanced by retention / deleteRecords / tiering. Field logStartOffset (UnifiedLog.java:162).
LSO, last stable offset
(Same acronym, different concept.) Upper bound of committed transactional data for READ_COMMITTED; min(firstUnstableOffset, HW).
Recovery point
First offset not yet guaranteed fsync'd; recovery on restart starts here. LocalLog.recoveryPoint (LocalLog.java:95).
Sparse index
An index that holds an entry only every index.interval.bytes (default 4096) of log, not per-record.

On-disk directory layout of a partition

A partition orders-2 in a configured log directory is a sub-directory orders-2/. Segment files are named by their zero-padded 20-digit base offset (LogFileUtils.filenamePrefixFromOffset, LogFileUtils.java:102), which makes ls sort them numerically.

<log.dir>/orders-2/
├─ 00000000000000000000.log         ← segment data (FileRecords), baseOffset 0
├─ 00000000000000000000.index       ← offset index  (8-byte entries, mmap)
├─ 00000000000000000000.timeindex   ← time index    (12-byte entries, mmap)
├─ 00000000000000000000.txnindex    ← aborted-txn index (created only if needed)
├─ 00000000000000147001.log         ← next segment, baseOffset 147001
├─ 00000000000000147001.index
├─ 00000000000000147001.timeindex
├─ 00000000000000146500.snapshot    ← producer-state snapshot (per roll)
├─ 00000000000000147001.snapshot
├─ leader-epoch-checkpoint          ← (epoch → startOffset) text file
└─ partition.metadata               ← topic-id binding (version + UUID)
Files of a single partition. Suffix constants live in LogFileUtils; transient files carry .deleted, .cleaned, .swap suffixes during deletion/compaction/split.
00000000000000000000 = 20-digit zero-padded segment base offset .log = record-batch data · .index / .timeindex / .txnindex = sparse indexes .snapshot = producer-state · leader-epoch-checkpoint / partition.metadata = text metadata ← annotation describing each file (not part of the name)

Per-directory (not per-partition) the engine also keeps recovery-point-offset-checkpoint, log-start-offset-checkpoint, and replication-offset-checkpoint (high-watermark) files managed by LogManager, see Log Management.

The layered abstraction

callersPartition · KafkaApis
UnifiedLogsemantics: appendAsLeader/Follower · read · HW · LSO · leaderEpochCache · producerStateManager
LocalLoglocal segment set + offsets: nextOffsetMetadata (LEO) · recoveryPoint · append · read · roll · truncateTo
LogSegmentsConcurrentSkipListMap<baseOffset, LogSegment>
LogSegmentone file pair: FileRecords .log · LazyIndex<OffsetIndex> · LazyIndex<TimeIndex> · TransactionIndex
Each layer narrows scope: UnifiedLog = semantics & metadata; LocalLog = the local segment set & offsets; LogSegment = one file pair.
broker — in-memory log abstraction / orchestration storage — the on-disk segment set & files cylinder = a log / file store contains / delegates to (top → bottom) Class = Java type in org.apache.kafka.storage.internals.log
Key idea

UnifiedLog owns all semantics (offset assignment, HW/LSO, producer state, epochs) and is the only class that synchronizes on the partition lock; LocalLog owns the physical segments and the LEO. A "unified" log can additionally front tiered (remote) segments, which is why local boundaries (localLogStartOffset) are tracked separately from logStartOffset, see Tiered Storage.

LogSegment: a file pair plus indexes

A LogSegment (LogSegment.java:80–103) bundles: a FileRecords log (the .log); LazyIndex<OffsetIndex> lazyOffsetIndex; LazyIndex<TimeIndex> lazyTimeIndex; a TransactionIndex txnIndex; the immutable baseOffset; indexIntervalBytes; and a per-roll random rollJitterMs. Two pieces of mutable bookkeeping drive indexing and rolling:

  • bytesSinceLastIndexEntry, bytes appended since the last sparse index entry; when it exceeds indexIntervalBytes, the next batch triggers an index append (LogSegment.java:271).
  • maxTimestampAndOffsetSoFar (volatile TimestampOffset) and rollingBasedTimestamp, the largest timestamp seen and the first-batch timestamp, used for the time index and time-based rolling.
Caution

LogSegment is explicitly not thread-safe for mutation (LogSegment.java:64): append/recover/truncateTo assume the caller holds UnifiedLog#lock. Only the read-style methods (translateOffset, read, readNextOffset) are documented thread-safe, relying on the index's mmap and FileRecords positional reads.

Data structures: the index files, field by field

All three index types that are mmap'd share AbstractIndex; the transaction index is a plain append-only channel. Each index stores offsets relative to the segment base offset so a 64-bit offset fits in 4 bytes (OffsetIndex.java:44–47). AbstractIndex.toRelative rejects any offset whose delta exceeds Integer.MAX_VALUE (AbstractIndex.java:555), the root cause of segment offset overflow.

OffsetIndex, relative offset → physical position

Entry size 8 bytes (OffsetIndex.java:56); the file is pre-allocated to a fixed maximum (segment.index.bytes, default 10 MiB) and trimmed when the segment goes inactive.

relativeOffsetint32 · @0 · = absOffset − baseOffset
physicalPositionint32 · @4 · byte offset in .log
OffsetIndex.parseEntry / append (OffsetIndex.java:143,199). append rejects a non-increasing offset and a full index.
each coloured cell = one fixed-width field 8-byte entry, repeated, sorted ascending by offset @n = byte offset within the entry · width ∝ field size lookup(target) — binary-search the greatest entry ≤ target; absOffset = baseOffset + relativeOffset

TimeIndex, max timestamp → relative offset (KIP-33)

Entry size 12 bytes (TimeIndex.java:56). An entry (T, O) asserts "the largest timestamp at or before offset O is T"; timestamps are monotonically increasing within the file. To find an offset by timestamp, LogSegment.findOffsetByTimestamp (LogSegment.java:755) binary-searches the time index for the largest T ≤ target, then resolves that offset through the offset index, then linearly scans the log from there.

timestampint64 · @0
relativeOffsetint32 · @8
An entry (T, O) asserts “the largest timestamp at or before offset O is T” (TimeIndex.java:56).
each coloured cell = one fixed-width field 12-byte entry, repeated, monotonic in timestamp and offset @n = byte offset within the entry · width ∝ field size
Note

TimeIndex.isFull() reserves one slot, it returns true at entries ≥ maxEntries − 1 (TimeIndex.java:122). That last slot is filled when the segment rolls/closes via maybeAppend(maxTs, offset, skipFullCheck=true) so the segment's true max timestamp is always recorded (LogSegment.onBecomeInactiveSegment, LogSegment.java:686).

TransactionIndex, aborted transactions

Unlike the others this is not mmap'd, is created lazily (only when an abort is recorded), and is appended through a FileChannel (TransactionIndex.java:64,202). Each record is a version-prefixed AbortedTxn (schema clients/.../message/AbortedTxn.json, validVersions 0): ProducerId, FirstOffset, LastOffset, LastStableOffset, four int64s after a 2-byte version, so 34 bytes. collectAbortedTxns(fetchOffset, upperBoundOffset) (TransactionIndex.java:167) returns every aborted txn overlapping the fetch range and signals whether the scan must continue into the next segment (when an entry's lastStableOffset ≥ upperBoundOffset). Used by the fetch path for READ_COMMITTED.

AbstractIndex: mmap, the warm-section search, and locking

On construction the file is created (if new), pre-allocated to roundDownToExactMultiple(maxIndexSize, entrySize), mapped READ_WRITE (or READ_ONLY), and the buffer position set to the last entry (AbstractIndex.java:100–124,474–490). maxEntries = mmap.limit()/entrySize; isFull() = entries ≥ maxEntries.

Lookups use a custom, cache-friendly binary search. The pure binary search touches pages near the file start that go cold for low-traffic partitions, causing multi-hundred-millisecond page faults on the produce hot path. AbstractIndex instead keeps a warm section of the last warmEntries() = 8192/entrySize entries (AbstractIndex.java:387): if the target is within it, search only that range; otherwise search the cold prefix (indexSlotRangeFor, AbstractIndex.java:495). In-sync followers and tailing consumers, which always read near the end, stay entirely in warm pages.

Design rationale

8192 bytes is chosen so the three entries every warm lookup touches (end, end−N, (2·end−N)/2) always fall within ≤3 pages on a 4 KiB-page host, guaranteeing the warm section is genuinely resident. The 57-line comment at AbstractIndex.java:330–386 records the original ~1 s produce-latency spike that motivated this.

Concurrency & the index locks

AbstractIndex holds two locks (AbstractIndex.java:59–61):

  • ReentrantLock lock, serializes all mutations (append, truncate, resize, flush). Readers do not take it.
  • ReentrantReadWriteLock remapLock, readers take the read lock (inRemapReadLock) while reading the buffer; resize/closeHandler take the write lock so the mmap can be safely unmapped and re-mapped (the buffer reference is swapped under exclusivity).
Key idea

Lock-free reads against a MappedByteBuffer are safe because (a) readers always mmap().duplicate() to get a private position, and (b) clients only ever read committed data (below the HW/LSO). A follower that races an in-progress truncate may read stale bytes, but CRC and leader-epoch checks downstream reject it (AbstractIndex.java:50–58).

Index files are force-unmapped on close rather than waiting for GC, because mmap cleanup can stall application threads reading metadata from a slow disk (KAFKA-4614, AbstractIndex.closeHandler, AbstractIndex.java:287–293). LazyIndex (LazyIndex.java) wraps the index so the file is only mmap'd on first get(), a startup-time win for brokers with very many segments (KIP-263).

The append path (leader)

Producer writes enter UnifiedLog.appendAsLeader → append(...) (UnifiedLog.java:1020,1115). The whole body runs inside synchronized(lock), a single writer per partition.

maybeFlushMetadataFile()persist topic-id before data
analyzeAndValidateRecords()per-batch CRC / size / monotonicity
validBytes ≤ 0?
returnempty / duplicate
trimInvalidBytes()
synchronized(lock)single writer per partition ↓
firstOffset = LEOLocalLog.logEndOffset()
LogValidatorassign offsets/ts, stamp epoch (in-place if uncompressed)
leaderEpochCache.assign(epoch, baseOffset)per batch
size > segment.size?
RecordBatchTooLargeException
maybeRoll()roll if full (size / time / index)
analyzeAndValidateProducerState()dedup + txn bookkeeping
duplicate?
return duplicate's offsets
LocalLog.append()write bytes · LEO += n · updateHW · producerStateManager.update · segment.updateTxnIndex · maybeIncrementFirstUnstableOffset · flush if unflushed ≥ flush.messages
Leader append, condensed from UnifiedLog.append (UnifiedLog.java:1115–1292). Everything below synchronized(lock) runs single-writer per partition.
brokerUnifiedLog / validator step control — leader-epoch metadata coord — lock / producer & txn state storage — bytes hit the log error / early return rounded italic = decision pill = lock boundary control flow (top → bottom)

Offset, timestamp & epoch assignment (LogValidator)

LogValidator.validateMessagesAndAssignOffsets (LogValidator.java:123) picks the cheapest viable strategy:

  • In-place (assignOffsetsNonCompressed, LogValidator.java:209) when source and target are uncompressed and the magic already matches, it mutates each batch header: setLastOffset(offsetCounter−1), setPartitionLeaderEpoch(...), and setMaxTimestamp(...), advancing a shared offsetCounter by one per record.
  • Re-build (convertAndAssignOffsetsNonCompressed) when a format conversion is needed.
  • Decompress/recompress (validateMessagesAndAssignOffsetsCompressed) for compressed data.

If message.timestamp.type=LogAppendTime, all records take the broker wall-clock now; otherwise CreateTime values are validated against message.timestamp.before/after.max.ms. The leader epoch is stamped into every v2 batch header here, then re-asserted into the epoch cache in UnifiedLog at step 7. A regression to a pre-v2 magic causes the epoch cache to be cleared so the broker falls back to HW-based truncation (UnifiedLog.java:1216–1224).

Follower append

appendAsFollower (UnifiedLog.java:1080) passes validateAndAssignOffsets=false: offsets are taken verbatim from the leader. The path still rebuilds producer state, updates the epoch cache from batch headers, and writes the txn index, but skips offset assignment and most validation. See Replica Fetchers.

Writing into the segment & the sparse index

LocalLog.append (LocalLog.java:527) calls activeSegment().append(lastOffset, records) then bumps the LEO to lastOffset+1 via updateLogEndOffset (which rebuilds nextOffsetMetadata from the active segment's base offset and current size, LocalLog.java:304). Inside LogSegment.append (LogSegment.java:251):

  1. Record physicalPosition = log.sizeInBytes(); verify the largest offset is convertible to a relative offset (else LogSegmentOffsetOverflowException).
  2. log.append(records) writes the bytes to the channel.
  3. For each batch: update maxTimestampAndOffsetSoFar; if bytesSinceLastIndexEntry > indexIntervalBytes, append (lastOffset → physicalPosition) to the offset index and (maxTs → offset) to the time index, then reset the byte counter.
Invariant

Offsets in a segment's data file and offset-index are strictly increasing; the index is sparse (one entry per ≥index.interval.bytes). OffsetIndex.append throws InvalidOffsetException if asked to append a non-increasing offset (OffsetIndex.java:156), and the position must equal entries·8 (OffsetIndex.java:154).

Segment rolling

maybeRoll (UnifiedLog.java:2146) asks the active segment shouldRoll(RollParams) (LogSegment.java:168), which rolls when any of:

  • adding this batch would exceed segment.bytes (size > maxSegmentBytes − messagesSize), or
  • the segment is non-empty and has waited longer than segment.ms − rollJitterMs (time measured from the first batch's timestamp if present, else create time, timeWaitedForRoll, LogSegment.java:714), or
  • the offset index or time index isFull(), or
  • the next offset cannot be expressed relative to the base offset (canConvertToRelativeOffset).

The new base offset is normally appendInfo.firstOffset(); for pre-v2 formats where the first offset is unknown it is conservatively maxOffsetInMessages − Integer.MAX_VALUE (UnifiedLog.java:2174). roll(expectedNextOffset) (UnifiedLog.java:2201) then: snapshots producer state aligned to the new base offset, opens the new segment via LocalLog.roll (which calls onBecomeInactiveSegment() on the old segment to write the final time-index entry and trim indexes), updates the HW, and asynchronously flushes the old segment and snapshot on the scheduler thread to avoid holding the partition lock across fsync.

Gotcha

If segment.index.bytes is set so small that maxEntries rounds to 0, an empty active segment can report isFull()==true and try to roll onto its own base offset. LocalLog.roll has an explicit KAFKA-6388 branch that recreates the zero-size segment in place instead of throwing (LocalLog.java:595–610).

The read / fetch-by-offset path

Reads enter UnifiedLog.read(startOffset, maxLength, isolation, minOneMessage) (UnifiedLog.java:1649). The isolation level selects the upper bound: LOG_END → LEO, HIGH_WATERMARK → HW, TXN_COMMITTED → last stable offset. It then delegates to LocalLog.read (LocalLog.java:463):

  1. Bound-check: throw OffsetOutOfRangeException if startOffset > LEO or no floor segment exists.
  2. If startOffset == maxOffset (e.g. caught up to HW) return an empty FetchDataInfo immediately.
  3. segments.floorSegment(startOffset) finds the segment whose base offset is ≤ start; iterate forward until a segment yields data.
  4. Compute maxPosition: full segment size if the bound is in a later segment, else the bound's relative position, else empty.
  5. segment.read(...)translateOffsetoffsetIndex().lookup(offset) gives a greatest-lower-bound position, then FileRecords.searchForOffsetFromPosition scans forward to the exact batch (LogSegment.java:399–464). The result is a zero-copy FileRecords.slice.
  6. For TXN_COMMITTED, addAbortedTransactions attaches the overlapping aborted-txn list (LocalLog.java:532).
read(offset O)isolation bounds upper offset (LEO / HW / LSO)
segments.floorSegment(O)⇒ segment S with baseOffset ≤ O
OffsetIndex.lookup(O)⇒ (O′, pos): greatest indexed offset ≤ O (sparse, O′ may be below O)
FileRecords.searchForOffsetFromPosition(O, pos)scan batches from pos forward to the exact batch containing O
FileRecords.slice(startPosition, fetchSize)zero-copy FileRecords view returned to the fetch path
Offset translation = sparse index lookup + short forward scan. The forward scan length is bounded by index.interval.bytes.
brokerUnifiedLog.read entry point storage — segment / index / file lookup cylinder = mmap'd index lookup step (top → bottom) Class.method() = exact call site

Offset boundaries: LSO, LEO, HW, recovery point

UnifiedLog maintains the offset invariants. The HW can never exceed the LEO (maybeIncrementHighWatermark guards newHW.messageOffset > logEndOffset(), UnifiedLog.java:564) and increases monotonically (maybeIncrementHighWatermark only moves it forward). Advancing the LSO requires newLogStartOffset ≤ HW (maybeIncrementLogStartOffset throws OffsetOutOfRangeException otherwise, UnifiedLog.java:1358), and cascades into leaderEpochCache.truncateFromStartAsyncFlush and producerStateManager.onLogStartOffsetIncremented.

Invariant

logStartOffset ≤ lastStableOffset ≤ highWatermark ≤ logEndOffset, and localLogStartOffset ≥ logStartOffset. The HW and LSO are volatile so readers see updates without the lock; all writes happen under synchronized(lock).

The recovery point is decoupled from durability of bytes: flush(offset, includingOffset) (UnifiedLog.java:2254) only fsyncs and advances the recovery point when flushOffset > recoveryPoint; an in-band flush fires when unflushedMessages ≥ flush.messages (default Long.MAX_VALUE, i.e. rely on OS), and LogManager additionally flushes on flush.ms. Crucially, the recovery point is advanced only at an actual flush, never speculatively, so a crash after a checkpoint but before fsync still re-recovers the unflushed tail (LogLoader.java:500–511).

Crash recovery & startup: LogLoader

LogLoader.load() (LogLoader.java:128) reconstructs a UnifiedLog from disk in a fixed sequence of passes:

  1. Clean temp files / resolve swaps. Delete stray .deleted files; collect .swap files; delete .cleaned files. A .swap whose base offset ≥ the smallest .cleaned offset is from an incomplete compaction split and is removed (KAFKA-6264, LogLoader.java:261–314).
  2. Delete superseded segments between min/max swap offsets, then rename surviving .swap files to live segments.
  3. Load all segments (loadSegmentFiles, LogLoader.java:357) in ascending order. For each .log: open the segment, run sanityCheck; an orphan index without a .log is deleted. A missing offset index or a CorruptIndexException triggers recoverSegment.
  4. recoverLog (LogLoader.java:456): skipped entirely if a clean-shutdown marker existed (hadCleanShutdown). Otherwise every segment from the recovery point up is recovered: LogSegment.recover replays batches, validates each (ensureValid()), rebuilds the offset/time indexes, re-derives the leader-epoch cache and producer state, and truncates the first invalid byte onward (LogSegment.java:483–529). If a segment is truncated, all later segments are deleted.
  5. Reconcile offsets & producer state. Compute the new LSO; truncate the epoch cache to the recovered next offset and to the LSO; require producer state to be empty; removeStraySnapshots; then rebuildProducerState from the most recent snapshot up to the LEO (LogLoader.java:220–250).
Note

LogSegment.sanityCheck deliberately validates only the txn index and (for a newly created time index) resizes it to 0, it skips offset/time-index sanity because any segment above the recovery point is going to be re-recovered anyway (LogSegment.java:181–192). Combined with LazyIndex, this is KIP-263's startup-time optimisation; KAFKA-19200 later revisited reintroducing some index sanity checking.

Gotcha

If the on-disk log-start-offset-checkpoint exceeds the actual log end offset (e.g. segment files were deleted out from under the broker), deleteSegmentsIfLogStartGreaterThanLogEnd wipes all segments, clears the epoch cache, and restarts the log empty at the checkpoint (LogLoader.java:423–439). Hand-deleting segment files is therefore data loss, not a recovery tactic.

ProducerStateManager: idempotency, snapshots & transactions

ProducerStateManager (ProducerStateManager.java:70) keeps the per-partition state that makes the idempotent producer and transactions work. Core in-memory structures:

  • Map<Long, ProducerStateEntry> producers, keyed by producer id. Each ProducerStateEntry retains a deque of up to NUM_BATCHES_TO_RETAIN = 5 BatchMetadata records (ProducerStateEntry.java:35), enabling duplicate detection across the producer's last five in-flight batches.
  • TreeMap<Long, TxnMetadata> ongoingTxns and unreplicatedTxns, open transactions keyed by first offset, used to compute the first unstable offset and the last stable offset.
  • Map<Long, VerificationStateEntry> verificationStates, transaction-verification guards (KIP-890). See Transactions.

On append, analyzeAndValidateProducerState (UnifiedLog.java:1388) checks each producer batch against lastEntry.findDuplicateBatch; a hit short-circuits the write and returns the original batch's offsets to the client. lastStableOffset(completedTxn) returns the first offset of the next still-open txn, or completedTxn.lastOffset+1 if none (ProducerStateManager.java:526).

Snapshot file format

A snapshot named <lastOffset>.snapshot captures the producer table at a given offset (taken on every roll, UnifiedLog.roll). The layout (schema ProducerSnapshot.json, validVersions 1; offsets ProducerStateManager.java:74–76):

Versionint16 · @0
CRCuint32 · @2
ProducerEntries[]length-prefixed array · @6
File header. CRC = Crc32C over bytes from @6 (PRODUCER_ENTRIES_OFFSET) to EOF.

Each ProducerEntry (46 bytes):

ProducerIdint64 · @0
Epochint16 · @8
LastSequenceint32 · @10
LastOffsetint64 · @14
OffsetDeltaint32 · @22
Timestampint64 · @26
CoordinatorEpochint32 · @34
CurrentTxnFirstOffsetint64 · @38 · −1 = none
writeSnapshot/readSnapshot (ProducerStateManager.java:618,664). A CRC mismatch throws CorruptSnapshotException and the snapshot is discarded.
each coloured cell = one fixed-width field (colour distinguishes adjacent fields only) @n = byte offset within the record · width ∝ field size header = 6 B then a length-prefixed array of 46-byte ProducerEntry records int16/int32/int64/uint32 = field type & size

Recovery reads the most recent snapshot at or below the target offset and replays only the segments after it (rebuildProducerState, UnifiedLog.java:2534), avoiding a full-log rescan. After a clean shutdown with no snapshots (an upgrade), it skips the scan and writes a fresh snapshot at the LEO.

LeaderEpochFileCache

This cache (LeaderEpochFileCache.java:53) maps each leader epoch to the first offset written in that epoch (TreeMap<Integer, EpochEntry> epochs, guarded by a ReentrantReadWriteLock). Its checkpoint is the plain-text leader-epoch-checkpoint file: a version line, a count line, then epoch startOffset per line (LeaderEpochCheckpointFile.java:33–39,74).

The cache is the backbone of truncation-safe replication. endOffsetFor(requestedEpoch, logEndOffset) (LeaderEpochFileCache.java:284) answers an OffsetsForLeaderEpoch query: for the current epoch it returns the LEO; otherwise it returns the start offset of the next epoch (the requested epoch's exclusive end). A follower uses this to find exactly where its log diverged from the leader and truncate there, see Replication & ISR. Truncations flush asynchronously (truncateFromEndAsyncFlush, truncateFromStartAsyncFlush) because they are called by replica-fetcher threads where fsync latency would stall fetching.

Invariant

Epochs are append-only and strictly increasing; once assigned, an epoch's start offset is never reassigned (LeaderEpochFileCache.assign, LeaderEpochFileCache.java:103). The earliest cached epoch's start offset tracks the log start offset; the latest is the current leader epoch still being written.

Configuration reference

Config keyDefaultEffect
segment.bytes1073741824 (1 GiB)Roll the active segment when it would exceed this size. Min 1 MiB. (LogConfig.java:131)
segment.ms604800000 (7 days)Roll a non-empty segment after this much time (measured from first-batch timestamp). (LogConfig.java:132)
segment.jitter.ms0Random jitter subtracted from segment.ms to de-synchronize rolling across partitions (randomSegmentJitter, LogConfig.java:384).
segment.index.bytes10485760 (10 MiB)Pre-allocated max size of each offset/time index file; also bounds entries per segment (ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT).
index.interval.bytes4096Approx. bytes of log between sparse index entries; smaller = denser index, faster lookup, more memory (ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT).
flush.messagesLong.MAX_VALUEForce an fsync after this many appended messages; default defers to the OS page cache (LOG_FLUSH_INTERVAL_MESSAGES_DEFAULT).
flush.msLong.MAX_VALUEMax time before forcing an fsync (driven by LogManager's flush scheduler).
preallocatefalsePre-allocate the segment file to segment.bytes on creation (initFileSize, LogConfig.java:398); useful on Windows.

These are topic-level keys; the broker defaults come from log.* server configs of the same meaning. The warmEntries() size (8 KiB) is a hard-coded constant, not configurable.

Failure modes, edge cases & recovery

  • Offset overflow. A batch whose last offset cannot fit in 4 bytes relative to the base offset throws LogSegmentOffsetOverflowException; LogLoader.retryOnOffsetOverflow splits the legacy segment into compliant ones and retries (KAFKA-6264, LogLoader.java:326).
  • Corrupt index. Indexes carry no checksum, on a CorruptIndexException or a missing index the segment is recovered and the index rebuilt from the log (LogLoader.java:384).
  • Truncated/torn tail. LogSegment.recover stops at the first batch that fails ensureValid() and truncates the log + indexes to the last valid byte (LogSegment.java:515–528).
  • Txn-index inconsistency after partial append. The data append and LEO bump happen before the txn-index write, so a failure between them can leave the txn index behind; the comment at UnifiedLog.java:1256–1261 notes this is reconciled during recovery and the LSO simply does not advance.
  • Disk failure. Mutating ops are wrapped by maybeHandleIOException, which signals the LogDirFailureChannel and converts to KafkaStorageException, taking the log dir offline rather than corrupting state.
  • Unclean shutdown. Absence of the clean-shutdown marker forces full recovery from the recovery point; a clean marker skips it entirely.

Interactions with other subsystems

Record format
The bytes a segment stores are v2 record batches; the engine reads batch headers (offsets, epoch, producer id, max timestamp) but never decompresses on the read hot path.
Replication & ISR
The leader assigns offsets and the HW; followers append verbatim and use LeaderEpochFileCache.endOffsetFor to truncate. The HW gates consumer visibility.
Fetch path
UnifiedLog.read produces the zero-copy FileRecords.slice the fetch session returns; collectAbortedTransactions feeds READ_COMMITTED.
Log management
LogManager owns the UnifiedLog instances, the recovery/HW/LSO checkpoint files, the flush scheduler, and retention/compaction (deleteOldSegments, the LogCleaner).
Tiered storage
localLogStartOffset/highestOffsetInRemoteStorage let the local log be the hot tail while older segments live remotely.
Transactions / EOS
ProducerStateManager + TransactionIndex implement idempotent dedup, the last stable offset, and aborted-txn reporting.
KRaft
The controller quorum's metadata log is a specialised consumer of the same segment/index machinery (a Raft log over FileRecords).

Design rationale & evolution

  • KIP-33 Added the time index (.timeindex) enabling offset-by-timestamp lookups, the structure and "max timestamp before offset" semantics described above.
  • KIP-263 Lazy index mmap (LazyIndex) + skipping sanity checks of inactive segments on startup, slashing broker restart time for log directories with many segments.
  • KIP-101/KIP-279 Introduced leader-epoch-based truncation, replacing HW-based truncation; the persisted leader-epoch-checkpoint and endOffsetFor logic are its on-disk footprint.
  • KIP-890 Transaction-verification guards (the VerificationStateEntry map) hardening hanging-transaction handling.
  • The Scala→Java migration of kafka.log into org.apache.kafka.storage.internals.log made the engine reusable by KRaft and tiered storage and is why UnifiedLog (not the old kafka.log.Log) is the entry point in 4.x.

Gotchas & operational notes

  • Two different "LSO"s. "Log start offset" and "last stable offset" share the acronym; in code they are logStartOffset vs. the value from lastStableOffset(). Read the surrounding context.
  • Indexes are disposable. Deleting .index/.timeindex files is safe, they are rebuilt on next recovery. Deleting .log files is data loss and can trip the "log start > log end" wipe path.
  • Snapshots gate fast recovery. Missing .snapshot files force a producer-state rescan from earlier in the log; routine roll-time snapshots keep recovery cheap.
  • Default durability is OS-driven. With flush.messages/flush.ms at their maxima, Kafka relies on replication, not fsync, for durability; the recovery point stays well behind the LEO by design.
  • Index density is a memory/latency trade. Lowering index.interval.bytes shortens the forward scan after a lookup but enlarges mmap'd index files and page-cache pressure.

For the per-partition lifecycle that wraps this engine, assignment, leadership, and HW propagation, see Replication, ISR & High Watermark and Metadata Propagation; for retention, compaction, and the checkpoint files, see Log Management, Retention & Compaction.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.