03 · The Log Storage Engine
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
Every Kafka partition is an append-only log on disk: an ordered set of immutable segments, each a paired .log data file plus three sparse memory-mapped indexes (.index, .timeindex, .txnindex), fronted by per-partition producer-idempotency state and a leader-epoch checkpoint. This chapter dissects the layered abstraction UnifiedLog → LocalLog → LogSegments → LogSegment, the byte layouts of the index and snapshot files, the exact append and read-by-offset algorithms, segment rolling, crash recovery in LogLoader, and the concurrency model that lets a single writer coexist with lock-free readers. It is the physical substrate on which replication, the fetch path, retention/compaction, and transactions all stand.
Role & responsibilities
The log storage engine is the broker subsystem that turns a stream of record batches into durable, offset-addressable bytes and back again. Its concrete duties:
- Durable ordered storage. Assign a monotonically increasing 64-bit offset to every record and persist batches in offset order in pre-allocated segment files.
- Offset and timestamp lookup. Translate a logical offset (or a timestamp) into a physical file position for reads, using sparse indexes that fit in page cache.
- Boundary bookkeeping. Track the log start offset (LSO), recovery point, log end offset (LEO) and high watermark (HW), and expose them to replication and consumers.
- Idempotency & transaction metadata. Maintain per-producer sequence state (dedup), snapshot it to
.snapshotfiles, and record aborted transactions forREAD_COMMITTEDreads. - Leader-epoch lineage. Persist the (epoch → start offset) map that drives truncation-safe follower replication.
- Crash recovery. On startup, validate segments, rebuild indexes, truncate corrupt tails, and reconstruct producer state.
Where it lives in the code
The engine was migrated from Scala (kafka.log) into a self-contained Java module, org.apache.kafka.storage.internals.log, so it can be reused by KRaft's metadata log and tiered storage.
| Class / file | Path | Responsibility |
|---|---|---|
UnifiedLog | storage/.../internals/log/UnifiedLog.java | Public façade for a partition log; offsets/HW/LSO, append & read entry points, producer-state & epoch-cache orchestration. |
LocalLog | storage/.../internals/log/LocalLog.java | The local on-disk log: holds LogSegments, the LEO (nextOffsetMetadata) and recovery point; does append-to-active-segment, multi-segment read, roll, truncate, flush. |
LogSegments | storage/.../internals/log/LogSegments.java | Thread-safe ConcurrentSkipListMap<baseOffset, LogSegment> with floor/higher/last navigation. |
LogSegment | storage/.../internals/log/LogSegment.java | One segment: a FileRecords log + lazy offset/time indexes + txn index; append, translate-offset, read, recover, roll-out. |
AbstractIndex | storage/.../internals/log/AbstractIndex.java | mmap'd sparse index base class; binary search with a "warm section" optimisation, resize/truncate/flush. |
OffsetIndex / TimeIndex | storage/.../internals/log/OffsetIndex.java, TimeIndex.java | 8-byte (relOffset,position) and 12-byte (timestamp,relOffset) entries. |
TransactionIndex | storage/.../internals/log/TransactionIndex.java | Append-only list of aborted-txn records (no mmap). |
LazyIndex | storage/.../internals/log/LazyIndex.java | Defers mmap of an index until first access (startup-time optimisation). |
LogLoader | storage/.../internals/log/LogLoader.java | Loads & recovers segments from disk; returns LSO, recovery point, LEO. |
LogValidator | storage/.../internals/log/LogValidator.java | Validates batches, assigns offsets/timestamps, stamps leader epoch, (de)compresses. |
ProducerStateManager | storage/.../internals/log/ProducerStateManager.java | Per-producer dedup state, snapshots, LSO/first-unstable-offset, aborted-txn tracking. |
LeaderEpochFileCache | storage/.../internals/epoch/LeaderEpochFileCache.java | (epoch → start offset) map + leader-epoch-checkpoint file. |
LogFileUtils | storage/.../internals/log/LogFileUtils.java | Segment file naming, suffix constants, offset-from-filename. |
Core concepts & terminology
- Offset
- Monotonic 64-bit position of a record within a partition. Assigned by the leader at append time.
- Base offset
- The lowest possible offset a segment can hold; also its filename prefix and the origin for the index's relative offsets.
- Active segment
- The last (highest-base-offset) segment, the only one currently taking appends.
LogSegments.activeSegment()=lastSegment().get()(LogSegments.java:322). - LEO, log end offset
- Offset of the next record to be written;
LocalLog.nextOffsetMetadata.messageOffset(LocalLog.java:296). - HW, high watermark
- Highest offset replicated to all in-sync replicas; consumer-visible upper bound. Stored as
UnifiedLog.highWatermarkMetadata(UnifiedLog.java:160). See Replication. - LSO, log start offset
- Lowest retained offset; advanced by retention /
deleteRecords/ tiering. FieldlogStartOffset(UnifiedLog.java:162). - LSO, last stable offset
- (Same acronym, different concept.) Upper bound of committed transactional data for
READ_COMMITTED;min(firstUnstableOffset, HW). - Recovery point
- First offset not yet guaranteed fsync'd; recovery on restart starts here.
LocalLog.recoveryPoint(LocalLog.java:95). - Sparse index
- An index that holds an entry only every
index.interval.bytes(default 4096) of log, not per-record.
On-disk directory layout of a partition
A partition orders-2 in a configured log directory is a sub-directory orders-2/. Segment files are named by their zero-padded 20-digit base offset (LogFileUtils.filenamePrefixFromOffset, LogFileUtils.java:102), which makes ls sort them numerically.
<log.dir>/orders-2/ ├─ 00000000000000000000.log ← segment data (FileRecords), baseOffset 0 ├─ 00000000000000000000.index ← offset index (8-byte entries, mmap) ├─ 00000000000000000000.timeindex ← time index (12-byte entries, mmap) ├─ 00000000000000000000.txnindex ← aborted-txn index (created only if needed) ├─ 00000000000000147001.log ← next segment, baseOffset 147001 ├─ 00000000000000147001.index ├─ 00000000000000147001.timeindex ├─ 00000000000000146500.snapshot ← producer-state snapshot (per roll) ├─ 00000000000000147001.snapshot ├─ leader-epoch-checkpoint ← (epoch → startOffset) text file └─ partition.metadata ← topic-id binding (version + UUID)
LogFileUtils; transient files carry .deleted, .cleaned, .swap suffixes during deletion/compaction/split.Per-directory (not per-partition) the engine also keeps recovery-point-offset-checkpoint, log-start-offset-checkpoint, and replication-offset-checkpoint (high-watermark) files managed by LogManager, see Log Management.
The layered abstraction
Partition · KafkaApisappendAsLeader/Follower · read · HW · LSO · leaderEpochCache · producerStateManagernextOffsetMetadata (LEO) · recoveryPoint · append · read · roll · truncateToConcurrentSkipListMap<baseOffset, LogSegment>FileRecords .log · LazyIndex<OffsetIndex> · LazyIndex<TimeIndex> · TransactionIndexUnifiedLog = semantics & metadata; LocalLog = the local segment set & offsets; LogSegment = one file pair.org.apache.kafka.storage.internals.log
UnifiedLog owns all semantics (offset assignment, HW/LSO, producer state, epochs) and is the only class that synchronizes on the partition lock; LocalLog owns the physical segments and the LEO. A "unified" log can additionally front tiered (remote) segments, which is why local boundaries (localLogStartOffset) are tracked separately from logStartOffset, see Tiered Storage.
LogSegment: a file pair plus indexes
A LogSegment (LogSegment.java:80–103) bundles: a FileRecords log (the .log); LazyIndex<OffsetIndex> lazyOffsetIndex; LazyIndex<TimeIndex> lazyTimeIndex; a TransactionIndex txnIndex; the immutable baseOffset; indexIntervalBytes; and a per-roll random rollJitterMs. Two pieces of mutable bookkeeping drive indexing and rolling:
bytesSinceLastIndexEntry, bytes appended since the last sparse index entry; when it exceedsindexIntervalBytes, the next batch triggers an index append (LogSegment.java:271).maxTimestampAndOffsetSoFar(volatile TimestampOffset) androllingBasedTimestamp, the largest timestamp seen and the first-batch timestamp, used for the time index and time-based rolling.
LogSegment is explicitly not thread-safe for mutation (LogSegment.java:64): append/recover/truncateTo assume the caller holds UnifiedLog#lock. Only the read-style methods (translateOffset, read, readNextOffset) are documented thread-safe, relying on the index's mmap and FileRecords positional reads.
Data structures: the index files, field by field
All three index types that are mmap'd share AbstractIndex; the transaction index is a plain append-only channel. Each index stores offsets relative to the segment base offset so a 64-bit offset fits in 4 bytes (OffsetIndex.java:44–47). AbstractIndex.toRelative rejects any offset whose delta exceeds Integer.MAX_VALUE (AbstractIndex.java:555), the root cause of segment offset overflow.
OffsetIndex, relative offset → physical position
Entry size 8 bytes (OffsetIndex.java:56); the file is pre-allocated to a fixed maximum (segment.index.bytes, default 10 MiB) and trimmed when the segment goes inactive.
OffsetIndex.parseEntry / append (OffsetIndex.java:143,199). append rejects a non-increasing offset and a full index.absOffset = baseOffset + relativeOffset
TimeIndex, max timestamp → relative offset (KIP-33)
Entry size 12 bytes (TimeIndex.java:56). An entry (T, O) asserts "the largest timestamp at or before offset O is T"; timestamps are monotonically increasing within the file. To find an offset by timestamp, LogSegment.findOffsetByTimestamp (LogSegment.java:755) binary-searches the time index for the largest T ≤ target, then resolves that offset through the offset index, then linearly scans the log from there.
(T, O) asserts “the largest timestamp at or before offset O is T” (TimeIndex.java:56).TimeIndex.isFull() reserves one slot, it returns true at entries ≥ maxEntries − 1 (TimeIndex.java:122). That last slot is filled when the segment rolls/closes via maybeAppend(maxTs, offset, skipFullCheck=true) so the segment's true max timestamp is always recorded (LogSegment.onBecomeInactiveSegment, LogSegment.java:686).
TransactionIndex, aborted transactions
Unlike the others this is not mmap'd, is created lazily (only when an abort is recorded), and is appended through a FileChannel (TransactionIndex.java:64,202). Each record is a version-prefixed AbortedTxn (schema clients/.../message/AbortedTxn.json, validVersions 0): ProducerId, FirstOffset, LastOffset, LastStableOffset, four int64s after a 2-byte version, so 34 bytes. collectAbortedTxns(fetchOffset, upperBoundOffset) (TransactionIndex.java:167) returns every aborted txn overlapping the fetch range and signals whether the scan must continue into the next segment (when an entry's lastStableOffset ≥ upperBoundOffset). Used by the fetch path for READ_COMMITTED.
AbstractIndex: mmap, the warm-section search, and locking
On construction the file is created (if new), pre-allocated to roundDownToExactMultiple(maxIndexSize, entrySize), mapped READ_WRITE (or READ_ONLY), and the buffer position set to the last entry (AbstractIndex.java:100–124,474–490). maxEntries = mmap.limit()/entrySize; isFull() = entries ≥ maxEntries.
Lookups use a custom, cache-friendly binary search. The pure binary search touches pages near the file start that go cold for low-traffic partitions, causing multi-hundred-millisecond page faults on the produce hot path. AbstractIndex instead keeps a warm section of the last warmEntries() = 8192/entrySize entries (AbstractIndex.java:387): if the target is within it, search only that range; otherwise search the cold prefix (indexSlotRangeFor, AbstractIndex.java:495). In-sync followers and tailing consumers, which always read near the end, stay entirely in warm pages.
8192 bytes is chosen so the three entries every warm lookup touches (end, end−N, (2·end−N)/2) always fall within ≤3 pages on a 4 KiB-page host, guaranteeing the warm section is genuinely resident. The 57-line comment at AbstractIndex.java:330–386 records the original ~1 s produce-latency spike that motivated this.
Concurrency & the index locks
AbstractIndex holds two locks (AbstractIndex.java:59–61):
ReentrantLock lock, serializes all mutations (append,truncate,resize,flush). Readers do not take it.ReentrantReadWriteLock remapLock, readers take the read lock (inRemapReadLock) while reading the buffer;resize/closeHandlertake the write lock so the mmap can be safely unmapped and re-mapped (the buffer reference is swapped under exclusivity).
Lock-free reads against a MappedByteBuffer are safe because (a) readers always mmap().duplicate() to get a private position, and (b) clients only ever read committed data (below the HW/LSO). A follower that races an in-progress truncate may read stale bytes, but CRC and leader-epoch checks downstream reject it (AbstractIndex.java:50–58).
Index files are force-unmapped on close rather than waiting for GC, because mmap cleanup can stall application threads reading metadata from a slow disk (KAFKA-4614, AbstractIndex.closeHandler, AbstractIndex.java:287–293). LazyIndex (LazyIndex.java) wraps the index so the file is only mmap'd on first get(), a startup-time win for brokers with very many segments (KIP-263).
The append path (leader)
Producer writes enter UnifiedLog.appendAsLeader → append(...) (UnifiedLog.java:1020,1115). The whole body runs inside synchronized(lock), a single writer per partition.
LocalLog.logEndOffset()producerStateManager.update · segment.updateTxnIndex · maybeIncrementFirstUnstableOffset · flush if unflushed ≥ flush.messagesUnifiedLog.append (UnifiedLog.java:1115–1292). Everything below synchronized(lock) runs single-writer per partition.UnifiedLog / validator step
control — leader-epoch metadata
coord — lock / producer & txn state
storage — bytes hit the log
error / early return
rounded italic = decision
pill = lock boundary
control flow (top → bottom)
Offset, timestamp & epoch assignment (LogValidator)
LogValidator.validateMessagesAndAssignOffsets (LogValidator.java:123) picks the cheapest viable strategy:
- In-place (
assignOffsetsNonCompressed,LogValidator.java:209) when source and target are uncompressed and the magic already matches, it mutates each batch header:setLastOffset(offsetCounter−1),setPartitionLeaderEpoch(...), andsetMaxTimestamp(...), advancing a sharedoffsetCounterby one per record. - Re-build (
convertAndAssignOffsetsNonCompressed) when a format conversion is needed. - Decompress/recompress (
validateMessagesAndAssignOffsetsCompressed) for compressed data.
If message.timestamp.type=LogAppendTime, all records take the broker wall-clock now; otherwise CreateTime values are validated against message.timestamp.before/after.max.ms. The leader epoch is stamped into every v2 batch header here, then re-asserted into the epoch cache in UnifiedLog at step 7. A regression to a pre-v2 magic causes the epoch cache to be cleared so the broker falls back to HW-based truncation (UnifiedLog.java:1216–1224).
Follower append
appendAsFollower (UnifiedLog.java:1080) passes validateAndAssignOffsets=false: offsets are taken verbatim from the leader. The path still rebuilds producer state, updates the epoch cache from batch headers, and writes the txn index, but skips offset assignment and most validation. See Replica Fetchers.
Writing into the segment & the sparse index
LocalLog.append (LocalLog.java:527) calls activeSegment().append(lastOffset, records) then bumps the LEO to lastOffset+1 via updateLogEndOffset (which rebuilds nextOffsetMetadata from the active segment's base offset and current size, LocalLog.java:304). Inside LogSegment.append (LogSegment.java:251):
- Record
physicalPosition = log.sizeInBytes(); verify the largest offset is convertible to a relative offset (elseLogSegmentOffsetOverflowException). log.append(records)writes the bytes to the channel.- For each batch: update
maxTimestampAndOffsetSoFar; ifbytesSinceLastIndexEntry > indexIntervalBytes, append(lastOffset → physicalPosition)to the offset index and(maxTs → offset)to the time index, then reset the byte counter.
Offsets in a segment's data file and offset-index are strictly increasing; the index is sparse (one entry per ≥index.interval.bytes). OffsetIndex.append throws InvalidOffsetException if asked to append a non-increasing offset (OffsetIndex.java:156), and the position must equal entries·8 (OffsetIndex.java:154).
Segment rolling
maybeRoll (UnifiedLog.java:2146) asks the active segment shouldRoll(RollParams) (LogSegment.java:168), which rolls when any of:
- adding this batch would exceed
segment.bytes(size > maxSegmentBytes − messagesSize), or - the segment is non-empty and has waited longer than
segment.ms − rollJitterMs(time measured from the first batch's timestamp if present, else create time,timeWaitedForRoll,LogSegment.java:714), or - the offset index or time index
isFull(), or - the next offset cannot be expressed relative to the base offset (
canConvertToRelativeOffset).
The new base offset is normally appendInfo.firstOffset(); for pre-v2 formats where the first offset is unknown it is conservatively maxOffsetInMessages − Integer.MAX_VALUE (UnifiedLog.java:2174). roll(expectedNextOffset) (UnifiedLog.java:2201) then: snapshots producer state aligned to the new base offset, opens the new segment via LocalLog.roll (which calls onBecomeInactiveSegment() on the old segment to write the final time-index entry and trim indexes), updates the HW, and asynchronously flushes the old segment and snapshot on the scheduler thread to avoid holding the partition lock across fsync.
If segment.index.bytes is set so small that maxEntries rounds to 0, an empty active segment can report isFull()==true and try to roll onto its own base offset. LocalLog.roll has an explicit KAFKA-6388 branch that recreates the zero-size segment in place instead of throwing (LocalLog.java:595–610).
The read / fetch-by-offset path
Reads enter UnifiedLog.read(startOffset, maxLength, isolation, minOneMessage) (UnifiedLog.java:1649). The isolation level selects the upper bound: LOG_END → LEO, HIGH_WATERMARK → HW, TXN_COMMITTED → last stable offset. It then delegates to LocalLog.read (LocalLog.java:463):
- Bound-check: throw
OffsetOutOfRangeExceptionifstartOffset > LEOor no floor segment exists. - If
startOffset == maxOffset(e.g. caught up to HW) return an emptyFetchDataInfoimmediately. segments.floorSegment(startOffset)finds the segment whose base offset is ≤ start; iterate forward until a segment yields data.- Compute
maxPosition: full segment size if the bound is in a later segment, else the bound's relative position, else empty. segment.read(...)→translateOffset→offsetIndex().lookup(offset)gives a greatest-lower-bound position, thenFileRecords.searchForOffsetFromPositionscans forward to the exact batch (LogSegment.java:399–464). The result is a zero-copyFileRecords.slice.- For
TXN_COMMITTED,addAbortedTransactionsattaches the overlapping aborted-txn list (LocalLog.java:532).
pos forward to the exact batch containing OFileRecords view returned to the fetch pathindex.interval.bytes.UnifiedLog.read entry point
storage — segment / index / file lookup
cylinder = mmap'd index lookup
step (top → bottom)
Class.method() = exact call site
Offset boundaries: LSO, LEO, HW, recovery point
UnifiedLog maintains the offset invariants. The HW can never exceed the LEO (maybeIncrementHighWatermark guards newHW.messageOffset > logEndOffset(), UnifiedLog.java:564) and increases monotonically (maybeIncrementHighWatermark only moves it forward). Advancing the LSO requires newLogStartOffset ≤ HW (maybeIncrementLogStartOffset throws OffsetOutOfRangeException otherwise, UnifiedLog.java:1358), and cascades into leaderEpochCache.truncateFromStartAsyncFlush and producerStateManager.onLogStartOffsetIncremented.
logStartOffset ≤ lastStableOffset ≤ highWatermark ≤ logEndOffset, and localLogStartOffset ≥ logStartOffset. The HW and LSO are volatile so readers see updates without the lock; all writes happen under synchronized(lock).
The recovery point is decoupled from durability of bytes: flush(offset, includingOffset) (UnifiedLog.java:2254) only fsyncs and advances the recovery point when flushOffset > recoveryPoint; an in-band flush fires when unflushedMessages ≥ flush.messages (default Long.MAX_VALUE, i.e. rely on OS), and LogManager additionally flushes on flush.ms. Crucially, the recovery point is advanced only at an actual flush, never speculatively, so a crash after a checkpoint but before fsync still re-recovers the unflushed tail (LogLoader.java:500–511).
Crash recovery & startup: LogLoader
LogLoader.load() (LogLoader.java:128) reconstructs a UnifiedLog from disk in a fixed sequence of passes:
- Clean temp files / resolve swaps. Delete stray
.deletedfiles; collect.swapfiles; delete.cleanedfiles. A.swapwhose base offset ≥ the smallest.cleanedoffset is from an incomplete compaction split and is removed (KAFKA-6264,LogLoader.java:261–314). - Delete superseded segments between min/max swap offsets, then rename surviving
.swapfiles to live segments. - Load all segments (
loadSegmentFiles,LogLoader.java:357) in ascending order. For each.log: open the segment, runsanityCheck; an orphan index without a.logis deleted. A missing offset index or aCorruptIndexExceptiontriggersrecoverSegment. - recoverLog (
LogLoader.java:456): skipped entirely if a clean-shutdown marker existed (hadCleanShutdown). Otherwise every segment from the recovery point up is recovered:LogSegment.recoverreplays batches, validates each (ensureValid()), rebuilds the offset/time indexes, re-derives the leader-epoch cache and producer state, and truncates the first invalid byte onward (LogSegment.java:483–529). If a segment is truncated, all later segments are deleted. - Reconcile offsets & producer state. Compute the new LSO; truncate the epoch cache to the recovered next offset and to the LSO; require producer state to be empty;
removeStraySnapshots; thenrebuildProducerStatefrom the most recent snapshot up to the LEO (LogLoader.java:220–250).
LogSegment.sanityCheck deliberately validates only the txn index and (for a newly created time index) resizes it to 0, it skips offset/time-index sanity because any segment above the recovery point is going to be re-recovered anyway (LogSegment.java:181–192). Combined with LazyIndex, this is KIP-263's startup-time optimisation; KAFKA-19200 later revisited reintroducing some index sanity checking.
If the on-disk log-start-offset-checkpoint exceeds the actual log end offset (e.g. segment files were deleted out from under the broker), deleteSegmentsIfLogStartGreaterThanLogEnd wipes all segments, clears the epoch cache, and restarts the log empty at the checkpoint (LogLoader.java:423–439). Hand-deleting segment files is therefore data loss, not a recovery tactic.
ProducerStateManager: idempotency, snapshots & transactions
ProducerStateManager (ProducerStateManager.java:70) keeps the per-partition state that makes the idempotent producer and transactions work. Core in-memory structures:
Map<Long, ProducerStateEntry> producers, keyed by producer id. EachProducerStateEntryretains a deque of up toNUM_BATCHES_TO_RETAIN = 5BatchMetadatarecords (ProducerStateEntry.java:35), enabling duplicate detection across the producer's last five in-flight batches.TreeMap<Long, TxnMetadata> ongoingTxnsandunreplicatedTxns, open transactions keyed by first offset, used to compute the first unstable offset and the last stable offset.Map<Long, VerificationStateEntry> verificationStates, transaction-verification guards (KIP-890). See Transactions.
On append, analyzeAndValidateProducerState (UnifiedLog.java:1388) checks each producer batch against lastEntry.findDuplicateBatch; a hit short-circuits the write and returns the original batch's offsets to the client. lastStableOffset(completedTxn) returns the first offset of the next still-open txn, or completedTxn.lastOffset+1 if none (ProducerStateManager.java:526).
Snapshot file format
A snapshot named <lastOffset>.snapshot captures the producer table at a given offset (taken on every roll, UnifiedLog.roll). The layout (schema ProducerSnapshot.json, validVersions 1; offsets ProducerStateManager.java:74–76):
Each ProducerEntry (46 bytes):
writeSnapshot/readSnapshot (ProducerStateManager.java:618,664). A CRC mismatch throws CorruptSnapshotException and the snapshot is discarded.ProducerEntry records
int16/int32/int64/uint32 = field type & size
Recovery reads the most recent snapshot at or below the target offset and replays only the segments after it (rebuildProducerState, UnifiedLog.java:2534), avoiding a full-log rescan. After a clean shutdown with no snapshots (an upgrade), it skips the scan and writes a fresh snapshot at the LEO.
LeaderEpochFileCache
This cache (LeaderEpochFileCache.java:53) maps each leader epoch to the first offset written in that epoch (TreeMap<Integer, EpochEntry> epochs, guarded by a ReentrantReadWriteLock). Its checkpoint is the plain-text leader-epoch-checkpoint file: a version line, a count line, then epoch startOffset per line (LeaderEpochCheckpointFile.java:33–39,74).
The cache is the backbone of truncation-safe replication. endOffsetFor(requestedEpoch, logEndOffset) (LeaderEpochFileCache.java:284) answers an OffsetsForLeaderEpoch query: for the current epoch it returns the LEO; otherwise it returns the start offset of the next epoch (the requested epoch's exclusive end). A follower uses this to find exactly where its log diverged from the leader and truncate there, see Replication & ISR. Truncations flush asynchronously (truncateFromEndAsyncFlush, truncateFromStartAsyncFlush) because they are called by replica-fetcher threads where fsync latency would stall fetching.
Epochs are append-only and strictly increasing; once assigned, an epoch's start offset is never reassigned (LeaderEpochFileCache.assign, LeaderEpochFileCache.java:103). The earliest cached epoch's start offset tracks the log start offset; the latest is the current leader epoch still being written.
Configuration reference
| Config key | Default | Effect |
|---|---|---|
segment.bytes | 1073741824 (1 GiB) | Roll the active segment when it would exceed this size. Min 1 MiB. (LogConfig.java:131) |
segment.ms | 604800000 (7 days) | Roll a non-empty segment after this much time (measured from first-batch timestamp). (LogConfig.java:132) |
segment.jitter.ms | 0 | Random jitter subtracted from segment.ms to de-synchronize rolling across partitions (randomSegmentJitter, LogConfig.java:384). |
segment.index.bytes | 10485760 (10 MiB) | Pre-allocated max size of each offset/time index file; also bounds entries per segment (ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT). |
index.interval.bytes | 4096 | Approx. bytes of log between sparse index entries; smaller = denser index, faster lookup, more memory (ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT). |
flush.messages | Long.MAX_VALUE | Force an fsync after this many appended messages; default defers to the OS page cache (LOG_FLUSH_INTERVAL_MESSAGES_DEFAULT). |
flush.ms | Long.MAX_VALUE | Max time before forcing an fsync (driven by LogManager's flush scheduler). |
preallocate | false | Pre-allocate the segment file to segment.bytes on creation (initFileSize, LogConfig.java:398); useful on Windows. |
These are topic-level keys; the broker defaults come from log.* server configs of the same meaning. The warmEntries() size (8 KiB) is a hard-coded constant, not configurable.
Failure modes, edge cases & recovery
- Offset overflow. A batch whose last offset cannot fit in 4 bytes relative to the base offset throws
LogSegmentOffsetOverflowException;LogLoader.retryOnOffsetOverflowsplits the legacy segment into compliant ones and retries (KAFKA-6264,LogLoader.java:326). - Corrupt index. Indexes carry no checksum, on a
CorruptIndexExceptionor a missing index the segment is recovered and the index rebuilt from the log (LogLoader.java:384). - Truncated/torn tail.
LogSegment.recoverstops at the first batch that failsensureValid()and truncates the log + indexes to the last valid byte (LogSegment.java:515–528). - Txn-index inconsistency after partial append. The data append and LEO bump happen before the txn-index write, so a failure between them can leave the txn index behind; the comment at
UnifiedLog.java:1256–1261notes this is reconciled during recovery and the LSO simply does not advance. - Disk failure. Mutating ops are wrapped by
maybeHandleIOException, which signals theLogDirFailureChanneland converts toKafkaStorageException, taking the log dir offline rather than corrupting state. - Unclean shutdown. Absence of the clean-shutdown marker forces full recovery from the recovery point; a clean marker skips it entirely.
Interactions with other subsystems
- Record format
- The bytes a segment stores are v2 record batches; the engine reads batch headers (offsets, epoch, producer id, max timestamp) but never decompresses on the read hot path.
- Replication & ISR
- The leader assigns offsets and the HW; followers append verbatim and use
LeaderEpochFileCache.endOffsetForto truncate. The HW gates consumer visibility. - Fetch path
UnifiedLog.readproduces the zero-copyFileRecords.slicethe fetch session returns;collectAbortedTransactionsfeedsREAD_COMMITTED.- Log management
LogManagerowns theUnifiedLoginstances, the recovery/HW/LSO checkpoint files, the flush scheduler, and retention/compaction (deleteOldSegments, theLogCleaner).- Tiered storage
localLogStartOffset/highestOffsetInRemoteStoragelet the local log be the hot tail while older segments live remotely.- Transactions / EOS
ProducerStateManager+TransactionIndeximplement idempotent dedup, the last stable offset, and aborted-txn reporting.- KRaft
- The controller quorum's metadata log is a specialised consumer of the same segment/index machinery (a Raft log over
FileRecords).
Design rationale & evolution
- KIP-33 Added the time index (
.timeindex) enabling offset-by-timestamp lookups, the structure and "max timestamp before offset" semantics described above. - KIP-263 Lazy index mmap (
LazyIndex) + skipping sanity checks of inactive segments on startup, slashing broker restart time for log directories with many segments. - KIP-101/KIP-279 Introduced leader-epoch-based truncation, replacing HW-based truncation; the persisted
leader-epoch-checkpointandendOffsetForlogic are its on-disk footprint. - KIP-890 Transaction-verification guards (the
VerificationStateEntrymap) hardening hanging-transaction handling. - The Scala→Java migration of
kafka.logintoorg.apache.kafka.storage.internals.logmade the engine reusable by KRaft and tiered storage and is whyUnifiedLog(not the oldkafka.log.Log) is the entry point in 4.x.
Gotchas & operational notes
- Two different "LSO"s. "Log start offset" and "last stable offset" share the acronym; in code they are
logStartOffsetvs. the value fromlastStableOffset(). Read the surrounding context. - Indexes are disposable. Deleting
.index/.timeindexfiles is safe, they are rebuilt on next recovery. Deleting.logfiles is data loss and can trip the "log start > log end" wipe path. - Snapshots gate fast recovery. Missing
.snapshotfiles force a producer-state rescan from earlier in the log; routine roll-time snapshots keep recovery cheap. - Default durability is OS-driven. With
flush.messages/flush.msat their maxima, Kafka relies on replication, not fsync, for durability; the recovery point stays well behind the LEO by design. - Index density is a memory/latency trade. Lowering
index.interval.bytesshortens the forward scan after a lookup but enlarges mmap'd index files and page-cache pressure.
For the per-partition lifecycle that wraps this engine, assignment, leadership, and HW propagation, see Replication, ISR & High Watermark and Metadata Propagation; for retention, compaction, and the checkpoint files, see Log Management, Retention & Compaction.