krivaltsevich.com Kafka Internals4.4

04 · Log Management, Retention & Compaction

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

Chapter 03 described a single UnifiedLog, an ordered sequence of immutable segments. This chapter zooms out to the broker-wide lifecycle that surrounds those logs: LogManager, the registry of every UnifiedLog across all log.dirs (JBOD), which creates and deletes logs, recovers them in parallel on startup, and drives the background schedulers for flushing, checkpointing, and retention. It then covers the two reclamation strategies, size/time retention (cleanup.policy=delete, drop whole old segments) and log compaction (cleanup.policy=compact, keep only the latest record per key), and the asynchronous, crash-safe machinery (.deleted/.cleaned/.swap renames, the LogCleaner thread pool, the SkimpyOffsetMap, and the cleaner-offset checkpoint) that makes both safe in the presence of disk failure, truncation, and exactly-once transactions.

Role & responsibilities

LogManager is documented in its own Javadoc as "the entry point to the kafka log management subsystem ... responsible for log creation, retrieval, and cleaning" (storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java:178). All read/write I/O is delegated down to the individual UnifiedLog instances; LogManager owns the cross-cutting concerns:

  • Directory ownership & JBOD placement. It validates and locks each directory in log.dirs, tracks which are live vs. offline vs. cordoned, and places each new partition in the directory holding the fewest partitions.
  • Startup recovery. A per-directory thread pool loads every partition log in parallel, recovering unflushed segments when the prior shutdown was unclean.
  • Background schedulers. Five recurring tasks: retention cleanup, flush of dirty logs, recovery-point checkpointing, log-start-offset checkpointing, and async deletion of whole partition directories.
  • Disk-failure handling. Reacting to a LogDirFailureChannel event by taking a whole directory offline without crashing the broker (unless it was the last one).
  • Compaction orchestration. Owning the LogCleaner pool and mediating the race between retention and compaction (pausing the cleaner on a partition while retention or truncation runs).
Key idea

Retention and compaction are two mutually-coordinated reclamation paths over the same segment files. Retention (run by the kafka-log-retention scheduler thread) deletes whole oldest-first segments; compaction (run by the kafka-log-cleaner-thread-N pool) rewrites segments to drop superseded keys. A per-partition state machine in LogCleanerManager guarantees they never touch the same partition concurrently.

Where it lives in the code

Class / interfaceFileResponsibility
LogManagerstorage/.../internals/log/LogManager.javaRegistry of all logs; dir validation/locking; startup recovery; schedulers; create/delete; disk-failure handling.
LogCleanerstorage/.../internals/log/LogCleaner.javaThe pool of CleanerThreads; reconfiguration; throttling; metrics. Implements BrokerReconfigurable.
LogCleaner.CleanerThreadstorage/.../internals/log/LogCleaner.java:467One background thread; loop = grab filthiest log, clean it, delete eligible segments.
Cleanerstorage/.../internals/log/Cleaner.javaThe actual two-pass compaction logic for one log: build offset map, then recopy segments.
LogCleanerManagerstorage/.../internals/log/LogCleanerManager.javaPer-partition cleaning state machine; dirtiest-log selection; cleaner-offset checkpoints; uncleanable set.
LogCleaningStatestorage/.../internals/log/LogCleaningState.javaSealed interface: InProgress / Aborted / Paused(n).
LogToCleanstorage/.../internals/log/LogToClean.javaA cleanable log + its clean/dirty byte split and cleanable ratio (the comparator key).
OffsetMap / SkimpyOffsetMapstorage/.../internals/log/{OffsetMap,SkimpyOffsetMap}.javaHash table mapping key-hash → latest offset, the dedupe core of compaction.
CleanerConfigstorage/.../internals/log/CleanerConfig.javaCleaner tunables (threads, dedupe buffer, I/O buffer/throttle, backoff).
LogDirFailureChannelstorage/.../internals/log/LogDirFailureChannel.javaBounded queue from any I/O thread to the failure-handler thread; each dir enqueued once.
OffsetCheckpointFilestorage/.../internals/checkpoint/OffsetCheckpointFile.javaText-format (topic partition offset) checkpoint used for recovery points, log-start offsets, and cleaner offsets.
LogDirFailureHandlercore/.../server/ReplicaManager.scala:227The broker thread that drains the failure channel and invokes handleLogDirFailure.

Core concepts & terminology

Live / offline / cordoned dir
A directory is live if validated and lockable; offline once an IOException takes it out of service (until restart); cordoned if administratively excluded from new-partition placement (cordonedLogDirs, LogManager.java:138).
Current vs. future log
currentLogs holds the serving replica; futureLogs holds a copy being built in a different dir during an intra-broker move (a -future directory) that will atomically replace the current log once it catches up (LogManager.java:92-96).
Recovery point
The offset up to which a log is known durably flushed; segments above it are re-validated on unclean restart. Checkpointed to recovery-point-offset-checkpoint.
Log start offset
The lowest offset still readable (raised by DeleteRecords or segment deletion). Checkpointed to log-start-offset-checkpoint so deleted data is not re-exposed after restart.
Clean / dirty / cleanable section
For a compacted log, segments below the cleaner checkpoint are clean; above it is dirty, split into a cleanable prefix and an uncleanable tail (active segment, anything past the last stable offset, anything younger than min.compaction.lag.ms) (LogCleaner.java:53-56).
Tombstone
A record with a non-null key and null value; treated as a delete during compaction. Retained for delete.retention.ms after the batch enters the clean section, then dropped.
Cleanable ratio
cleanableBytes / (cleanBytes + cleanableBytes), the cleaner's "dirtiness" estimate and the selection key for the filthiest log (LogToClean.java:45-54).

Data structures

In-memory state in LogManager

The registry is two concurrent maps plus several coordination structures (LogManager.java:91-142):

FieldTypeGuards / purpose
currentLogsConcurrentHashMap<TopicPartition, UnifiedLog>The serving logs.
futureLogsConcurrentHashMap<TopicPartition, UnifiedLog>In-progress intra-broker moves.
logCreationOrDeletionLockObject monitorSerializes create/delete/rename so the two maps stay consistent.
logsToBeDeletedLinkedBlockingQueue<Map.Entry<UnifiedLog, Long>>Logs whose dir was renamed …-delete, paired with the millis they were enqueued; drained by kafka-delete-logs.
liveLogDirsConcurrentLinkedQueue<File>The currently-serving directories.
directoryIdsConcurrentHashMap<String, Uuid>Each dir's UUID from its meta.properties (for JBOD directory assignment).
dirLocksList<FileLock>One .lock per dir, preventing a second broker process from opening it.
recoveryPointCheckpoints / logStartOffsetCheckpointsMap<File, OffsetCheckpointFile>One checkpoint file per dir for each of the two offset kinds.
partitionsInitializingConcurrentHashMap<TopicPartition, Boolean>Detects a config update racing with log load (KAFKA-8813); the flag forces a config reload after init.

Cleaning state in LogCleanerManager

Two maps under one ReentrantLock (LogCleanerManager.java:84-100):

  • inProgress: Map<TopicPartition, LogCleaningState>, the per-partition state machine.
  • uncleanablePartitions: Map<String, Set<TopicPartition>>, partitions that threw during cleaning, keyed by log dir; excluded until restart or recovery.
  • checkpoints: Map<File, OffsetCheckpointFile>, the cleaner-offset-checkpoint per dir, recording the first dirty offset (cleaner point) of each compacted partition.

On-disk: the offset checkpoint file

All three offset checkpoints (recovery point, log-start, cleaner) share one plain-text format (OffsetCheckpointFile.java:40-50): a version line (CURRENT_VERSION = 0), an entry-count line, then one topic partition offset triple per line:

Linerecovery-point-offset-checkpointcleaner-offset-checkpoint
version00
#entries21
entry 1orders 0 105823orders 0 98112  (first dirty offset)
entry 2orders 1 105640
Three offset-checkpoint kinds, one text format: a version line, an entry-count line, then one topic partition offset triple per line. Writes go through CheckpointFileWithFailureHandler, which on IOException routes the dir to the LogDirFailureChannel and throws KafkaStorageException.
topic partition offset = one space-separated triple per line on-disk text file — one per log.dir, per checkpoint kind recovery-point = durably-flushed offset · cleaner = first-dirty offset

Writes are crash-consistent: checkpoint.write ultimately calls Utils.atomicMoveWithFallback, which writes a temp file, fsyncs, renames, and flushes the parent directory (noted at LogManager.java:1054). The three checkpoint file names are constants: RECOVERY_POINT_CHECKPOINT_FILE, LOG_START_OFFSET_CHECKPOINT_FILE (LogManager.java:85-86), and LogCleanerManager.OFFSET_CHECKPOINT_FILE = "cleaner-offset-checkpoint" (LogCleanerManager.java:66).

The dedupe hash table, SkimpyOffsetMap

Compaction's working memory is a fixed-size open-addressing hash table that stores a cryptographic hash of each key (not the key itself) alongside the latest offset for that key (SkimpyOffsetMap.java:28-88). Each slot is bytesPerEntry = hashSize + 8 bytes; with the default MD5 (16-byte digest) that is 24 bytes per entry, so a per-thread buffer holds memory / 24 slots.

MD5(key)16 bytes · @0
offsetint64 · @16
One slot = bytesPerEntry = hashSize + 8 = 24 bytes (MD5). Empty-slot test: all 24 bytes are zero (isEmpty). Probe sequence reads successive 4-byte ints from the hash itself, then degrades to linear probing once exhausted.
The map "does not support deletes"; clear() zeroes the buffer between logs. A full map throws on put; the load factor (default 0.9) caps how full it is allowed to get.
field of one open-addressing slot --w = field width in bytes (drawn to scale) type · @off = field type and byte offset within the slot

This design trades a tiny false-collision probability (two distinct keys with the same MD5 → one wrongly dropped) for very low memory per key. The Javadoc on CleanerConfig spells out the tradeoff: a higher load factor "will allow more log to be cleaned at once but will lead to more hash collisions" (CleanerConfig.java:69). The buffer is allocated once per Cleaner at config.dedupeBufferSize / config.numThreads bytes (LogCleaner.java:480), capped at 2 GiB.

Architecture & control flow

LogManagerone per broker · registry: currentLogs / futureLogs · liveLogDirs / dirLocks
Background schedulersKafkaScheduler pool
kafka-log-retention
UnifiedLog instancesthe partition logs / segments
kafka-log-flusher
recovery-point + log-start checkpoints
kafka-delete-logsself-rescheduling
LogCleaner poolowned by LogManager
CleanerThread 0..N−1grabFilthiest → Cleaner.clean → deleteOld
LogCleanerManagerinProgress state machine · cleaner-offset checkpoint
LogDirFailureChannelbounded queue
LogDirFailureHandlerReplicaManager thread
take dir offlineremove from live set · stop cleaning it · mark partitions offline
The broker-wide log subsystem. Retention runs on the generic scheduler; compaction runs on a dedicated, throttled thread pool driven by the LogCleanerManager state machine; disk failures flow through a bounded channel to a single handler thread.
broker component — LogManager, schedulers, cleaner log / storage — UnifiedLog segments coordination — cleaner state machine, failure queue failure path — offline dir owns / acts on drives asynchronously cylinder = log/store, pill = scheduled task

Startup: validate, lock, recover, schedule

Construction calls createAndValidateLogDirs then lockLogDirs then loadDirectoryIds (LogManager.java:229-232). Validation creates missing dirs, rejects duplicates by canonical path, and routes any failing dir to the failure channel rather than aborting, unless every dir fails, in which case the broker halts (LogManager.java:376-379). Locking acquires an exclusive FileLock on each dir's .lock file (LogManager.java:449-462).

loadLogs (LogManager.java:608) recovers logs per directory, in parallel:

  1. For each live dir it spins up a fixed pool of num.recovery.threads.per.data.dir threads named log-recovery-<dirPath>-<n> (LogManager.java:590-591,625).
  2. It reads the dir's CleanShutdownFileHandler. If the clean-shutdown marker exists, it is deleted immediately and hadCleanShutdown is cached, so a crash mid-load is correctly treated as unclean next boot (KAFKA-10471, LogManager.java:629-636).
  3. It reads the recovery-point and log-start checkpoints (tolerating a corrupt checkpoint by resetting to 0 / first segment).
  4. It lists topic-partition subdirectories, skipping the remote-log-index-cache dir and the cluster-metadata topic (LogManager.java:653-660), and submits one loadLog job per partition. With a clean shutdown, segment recovery is skipped entirely; otherwise every segment above the recovery point is re-validated.
  5. loadLog (LogManager.java:511) opens each UnifiedLog and routes it: a -delete-suffixed dir → logsToBeDeleted; a -stray dir → logged; a still-present-but-orphaned replica → renamed -stray; otherwise inserted into currentLogs/futureLogs (a duplicate is a fatal IllegalStateException).

Two gauges, remainingLogsToRecover (per dir) and remainingSegmentsToRecover (per recovery thread), let operators watch progress (LogManager.java:741-753). After loading, startupWithConfigOverrides schedules the five tasks and, if cleanerConfig.enableCleaner, builds and starts the LogCleaner (LogManager.java:811-841).

Note

All five schedulers fire after an initial delay of log.initial.task.delay.ms (default 30 000 ms, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT), staggering background work away from the startup spike (LogManager.java:814-833).

Detailed mechanics, retention (cleanup.policy=delete)

The kafka-log-retention task runs cleanupLogs every log.retention.check.interval.ms (default 5 minutes, ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_DEFAULT). It first pauses the cleaner on all non-compacted partitions to avoid a race, iterates them calling UnifiedLog.deleteOldSegments() (and the same on any future log), then resumes (LogManager.java:1705-1744).

cleanupLogs():
  deletable = cleaner.pauseCleaningForNonCompactedPartitions()   // non-compact logs, now Paused(1)
  for (tp, log) in deletable:
      total += log.deleteOldSegments()
      futureLogs[tp]?.deleteOldSegments()
  finally: cleaner.resumeCleaning(deletable.keySet())

UnifiedLog.deleteOldSegments() (UnifiedLog.java:1967) dispatches on policy. For cleanup.policy=delete it runs three predicates in order, log-start-offset breach, then size breach, then time breach:

PredicateCondition (per candidate segment)Code
logStartOffset breachnextSegment.baseOffset() ≤ logStartOffset, the whole segment is below the start offset.UnifiedLog.java:2071
retention.bytes breachWalk oldest-first while running total of bytes-to-remove (logSize − retentionSize) still covers the segment; delete those.UnifiedLog.java:2040
retention.ms breachnow − segment.largestTimestamp() > retentionMs.UnifiedLog.java:2002

A segment's "age" is largestTimestamp(): the segment's max record timestamp if one exists, else its file lastModified time (LogSegment.java:861-866). Future-dated records are detected and logged but still block deletion (a segment with a future timestamp is never "old enough", UnifiedLog.java:2008-2009).

deletableSegments walks segments oldest-first and stops at the first that fails the predicate (UnifiedLog.java:1879-1914). Two crucial guards:

  • High-watermark guard. A segment is deletable only if highWatermark() ≥ upperBoundOffset, never delete data at or above the HW, so the log start offset can never exceed the HW (UnifiedLog.java:1892-1894).
  • Never-empty guard. The final segment, if empty, is never returned; and if deletion would remove all segments, deleteSegments first rolls a fresh active segment so the log always has at least one (UnifiedLog.java:1933-1942).

On deletion, deleteSegments advances the (local) log-start offset to the base of the next surviving segment before removing the segment from the index, then calls localLog.removeAndDeleteSegments and prunes producer snapshots (UnifiedLog.java:1944-1955).

The asynchronous .deleted rename

Physical deletion is deferred and crash-safe (LocalLog.deleteSegmentFiles, LocalLog.java:846-877):

t0 · rename segment files00098112.{log,index,timeindex,txnindex}*.deleted
remove segment from in-memory indeximmediate, no longer served
await delaylets in-flight reads release their file handles
scheduler task ‘delete-file’Files.deleteIfExists(…) for each .deleted file
Two-phase delete. The rename is instant and removes the segment from the live index; the unlink is scheduled file.delete.delay.ms later so in-flight reads (which captured a file handle) can finish.
log / storage — segment files broker — in-memory index, scheduler task wait — deferral window step order asynchronous / timed delay cylinder = log/store, pill = scheduled task

The default delay is file.delete.delay.ms = 60000 (ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, wired as the default for TopicConfig.FILE_DELETE_DELAY_MS_CONFIG at LogConfig.java:224). If asyncDelete is false the unlink runs inline (LocalLog.java:872-876). Any leftover .deleted files after a crash are removed during recovery in LogLoader (LocalLog.java:982-983).

Deleting a whole partition

When a partition is removed from the broker, asyncDelete renames its directory to <topic>-<p>.<uuid>-delete and enqueues it (LogManager.java:1589-1625). The kafka-delete-logs task (deleteLogs, LogManager.java:1415) drains logsToBeDeleted, deleting each only after file.delete.delay.ms has elapsed since it was enqueued, and self-reschedules for the remaining time of the next-due entry (nextDeleteDelayMs, LogManager.java:1397-1406,1436). Before enqueueing it aborts any cleaning of that partition and updates checkpoints (LogManager.java:1597-1617).

Invariant

A log always has at least one segment, and the active segment is never deleted by retention. The log start offset never exceeds the high watermark. These two together guarantee a consumer can always read from the current start offset up to the HW.

Detailed mechanics, compaction (cleanup.policy=compact)

Compaction's contract (LogCleaner.java:50-51): "A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'." The cleaner keeps, for every key, only the record at its highest offset (plus tombstones within their retention window).

Choosing the filthiest log

Each CleanerThread.doWork() calls tryCleanFilthiestLog; if nothing was cleaned it sleeps log.cleaner.backoff.ms (default 15 s) (LogCleaner.java:512-523). Selection happens in LogCleanerManager.grabFilthiestCompactedLog under the manager lock (LogCleanerManager.java:239):

  1. Read all cleaner checkpoints (allCleanerCheckpoints), the per-partition first-dirty offset.
  2. For every compacted log not already in-progress and not uncleanable, compute cleanableOffsets(firstDirtyOffset, firstUncleanableDirtyOffset), and build a LogToClean (which computes cleanBytes, cleanableBytes, and the cleanable ratio).
  3. Keep only logs that either need compaction now (forced by max.compaction.lag.ms) with non-zero cleanable bytes, or whose cleanable ratio exceeds the per-topic min.cleanable.dirty.ratio (LogCleanerManager.java:281-283).
  4. Pick the maximum cleanable ratio; mark it LOG_CLEANING_IN_PROGRESS and return it.

cleanableOffsets computes the first uncleanable offset as the minimum of three bounds (LogCleanerManager.java:730-742):

  • the last stable offset (never clean past the LSO, see transactions below);
  • the active segment's base offset (the active segment is always uncleanable);
  • if min.compaction.lag.ms > 0, the base offset of the first segment whose largestTimestamp > now − minCompactionLagMs (findFirstUncleanableSegment, LogCleanerManager.java:767-783).

It also self-heals a bad checkpoint: if the checkpointed dirty offset is below the log start or above the log end, it resets firstDirtyOffset to the log start and flags forceUpdateCheckpoint (LogCleanerManager.java:702-722). max.compaction.lag.ms forces cleaning even of a barely-dirty log: maxCompactionDelay measures how long the earliest dirty segment has waited beyond the max-lag and sets needCompactionNow when positive (LogCleanerManager.java:669-680).

clean (already compacted)offset 0 … firstDirtyOffset
cleanable (dirty)… firstUncleanableDirtyOffset
uncleanableLSO / active seg / min.compaction.lag
firstDirtyOffset = the cleaner checkpoint; cleaning recopies [firstDirtyOffset, firstUncleanableDirtyOffset).   cleanableRatio = cleanableBytes / (cleanBytes + cleanableBytes); eligible if cleanableRatio > min.cleanable.dirty.ratio OR needCompactionNow (max.compaction.lag.ms exceeded).
The cleaner point is exactly the checkpointed first-dirty offset; cleaning recopies [firstDirtyOffset, firstUncleanableDirtyOffset).
contiguous offset region of one compacted log (left → right = increasing offset) --w = region width drawn to scale clean < firstDirtyOffsetcleanable < firstUncleanableDirtyOffsetuncleanable

Pass 1, build the offset map

Cleaner.doClean (Cleaner.java:159) first calls buildOffsetMap over the dirty section [firstDirtyOffset, firstUncleanableOffset) (Cleaner.java:710). For each dirty segment it iterates batches, and for each non-control, non-aborted record with a key at or above the start offset, it puts key → offset into the SkimpyOffsetMap (Cleaner.java:807-817). Because later offsets overwrite earlier ones, the map ends holding each key's highest offset in the dirty range. It stops adding once the map reaches slots × loadFactor entries and returns "full", so a single clean may cover only part of the dirty range; endOffset = map.latestOffset() + 1 becomes the new cleaner point (Cleaner.java:178,810-814).

Pass 2, recopy segments keeping only survivors

Segments from offset 0 up to endOffset are grouped by groupSegmentsBySize so a group's combined log and index bytes stay under segment.bytes / segment.index.bytes and the offset span stays under Integer.MAX_VALUE, this merges tiny segments and "prevents segment sizes from shrinking too much" with repeated cleanings (Cleaner.java:641-676). Each group is rewritten by cleanSegments into one or more .cleaned segments (Cleaner.java:228), and finally swapped in via UnifiedLog.replaceSegments (Cleaner.java:328).

The retain/discard decision for each record is shouldRetainRecord (Cleaner.java:567-602):

  • If record.offset() > map.latestOffset() the record is past the mapped range → always retain (it is beyond what this pass deduplicated).
  • Else look up the key. Retain only if this is the latest offset for the key (record.offset() ≥ foundOffset) and the record either has a value or is a tombstone still within its delete-retention window.
  • A record with no key is invalid in a compacted log; it is dropped and counted in stats.invalidMessagesRead (Cleaner.java:598-601).

The atomic, crash-safe swap

UnifiedLog.createNewCleanedSegment writes to *.cleaned files. replaceSegments (LocalLog.java:1004-1062) then performs a two-phase swap designed to be recoverable at any crash point (the ordering described at LocalLog.java:974-990):

1 · write cleaned outputXXXX.cleaned (.log/.index/.timeindex/.txnindex)
2 · rename new ⇒ .swapDESCENDING base-offset order, for crash recovery
3 · add swap segment(s) to index
4 · retire each old segmentremove from index · rename ⇒ .deleted · schedule async unlink
5 · rename .swap ⇒ ‘’drop suffix; segment now live
6 · flushDir(dir)fsync the directory entry
Recovery rule: on restart, any .swap file whose offset exceeds the minimum-offset .cleaned file is deleted, and the swap is replayed; any .deleted files are removed (LocalLog.java:974-990). The descending-rename order makes this deterministic.
log / storage — on-disk segment files broker — in-memory segment index ordered, crash-safe step cylinder = log/store · numbers = strict ordering for recoverability

Tombstones and delete.retention.ms

A tombstone (null value) must survive long enough that a consumer reading from offset 0 still observes the delete before it disappears, otherwise it could see the pre-tombstone value and never learn the key was removed. The cleaner therefore retains a tombstone for delete.retention.ms (default 86 400 000 ms = 24 h, LogConfig.DEFAULT_DELETE_RETENTION_MS) measured from when its batch enters the clean section. For the modern v2 record format this is tracked by stamping a delete horizon into the batch header on the first clean that encounters it; the record is dropped only once currentTime ≥ batch.deleteHorizonMs() (Cleaner.java:589-596). For legacy (< v2) formats a coarser horizon based on the last clean segment's lastModified − deleteRetentionMs is used (Cleaner.java:166-170,260).

Interaction with transactions / markers

The cleaner's transaction handling is summarized at LogCleaner.java:81-96 and enforced in cleanInto (Cleaner.java:366):

  • Never clean past the LSO. Because firstUncleanableOffset is bounded by lastStableOffset(), every record the cleaner sees is already committed or aborted, so the transaction index can be consulted up front (LogCleanerManager.java:730-733).
  • Aborted-transaction records are dropped unconditionally, ignoring keys (CleanedTransactionMetadata drives discardBatchRecords, Cleaner.java:386-391).
  • The last batch of each active producer is retained even if empty (RETAIN_EMPTY) to preserve sequence-number continuity; likewise the batch whose next offset equals the high watermark is retained so the last-offset information survives (Cleaner.java:394-420).
  • Transaction markers (control batches) are retained until all records from their transaction are gone and a delete-horizon has passed, the same tombstone-retention logic, applied to markers (Cleaner.java:384-389).

Concurrency & threading

Thread(s)Count / nameWork
Recovery poolnum.recovery.threads.per.data.dir per dir; log-recovery-<dir>-<n> (default 2)Parallel log load/recovery at startup; non-daemon (LogManager.java:585,625).
Generic schedulershared KafkaScheduler poolRuns kafka-log-retention, kafka-log-flusher, the two checkpoint tasks, kafka-delete-logs, and per-segment delete-file tasks.
Cleaner poollog.cleaner.threads; kafka-log-cleaner-thread-<id> (default 1)ShutdownableThreads; each owns a Cleaner with its own offset map and I/O buffers (LogCleaner.java:467-492).
Dir-failure handler1; LogDirFailureHandlerBlocks on takeNextOfflineLogDir(), calls handleLogDirFailure (ReplicaManager.scala:227-232).
Close poolper-dir at shutdown; log-closing-<dir>Flush + close every log, then write checkpoints and the clean-shutdown marker (LogManager.java:874-916).

Locks. LogManager.logCreationOrDeletionLock (a plain monitor) serializes create/delete/rename and disk-failure handling. LogCleanerManager.lock (a ReentrantLock with a pausedCleaningCond condition) guards all access to inProgress, uncleanablePartitions, and the checkpoint map (LogCleanerManager.java:93-100). Retention runs under UnifiedLog's own lock while deleting segments (UnifiedLog.java:1840). I/O throttling across all cleaner threads is a single shared Throttler sized to log.cleaner.io.max.bytes.per.second (LogCleaner.java:168).

The cleaning state machine

LogCleaningState is a sealed interface with three cases (LogCleaningState.java:24-72). Transitions are all guarded by the manager lock:

grabFilthiestCompactedLog InProgress doneCleaning (clean path) InProgress → abortAndPauseCleaning Aborted checkDone() throws LogCleaningAbortedException ↺ (unwind · delete .cleaned) doneCleaning / doneDeleting Paused(n) resumeCleaning (n reaches 0)
The per-partition lifecycle. From None (), grabFilthiestCompactedLog moves a partition to InProgress; a normal doneCleaning returns it to None. If interrupted, abortAndPauseCleaning flags it Aborted; the cleaner thread notices that flag at the next checkDone (which throws LogCleaningAbortedException, unwinding and deleting .cleaned output), then transitions to Paused(n). abortAndPauseCleaning blocks on pausedCleaningCond until the partition is Paused (LogCleanerManager.java:367-392,452-459). Pauses are reentrant: Paused(n) requires n matching resumeCleaning calls before returning to None.
pill = cleaning state warn = Aborted (failure / interrupt) ◉ = None (start / terminal, not being cleaned) guarded transition (under manager lock) = self-transition · label = triggering call

This is how LogManager.truncateTo, truncateFullyAndStartAt, intra-broker moves, and partition deletion safely interleave with compaction: each calls abortAndPauseCleaning(tp), does its work, then resumeCleaning(tp) (LogManager.java:944-1003,1453-1455). If a clean threw an unexpected exception, tryCleanFilthiestLog marks the partition uncleanable so it is skipped until maintainUncleanablePartitions or a restart clears it (LogCleaner.java:542-551; LogCleanerManager.java:640-652).

Configuration reference

Retention / segment-lifecycle (topic-level keys; broker defaults via ServerLogConfigs):

KeyDefaultEffect
cleanup.policydeletedelete, compact, or both; selects the reclamation path (TopicConfig.java:169-171).
retention.ms604800000 (7 d)Delete segments older than this by largest timestamp (DEFAULT_RETENTION_MS, LogConfig.java:134).
retention.bytes-1 (unbounded)Delete oldest segments once the log exceeds this size (LOG_RETENTION_BYTES_DEFAULT = -1).
file.delete.delay.ms60000Delay between .deleted rename and physical unlink (LOG_DELETE_DELAY_MS_DEFAULT).
segment.bytes1073741824 (1 GiB)Max bytes per group when grouping segments for compaction; also the roll size (DEFAULT_SEGMENT_BYTES).
flush.msLong.MAX_VALUEMax time before kafka-log-flusher fsyncs a dirty log (LOG_FLUSH_SCHEDULER_INTERVAL_MS_DEFAULT; LogManager.java:1812).

Compaction (topic-level; cleaner defaults in CleanerConfig / LogConfig):

KeyDefaultEffect
min.cleanable.dirty.ratio0.5Cleanable-ratio threshold to make a log eligible (DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, LogConfig.java:138).
min.compaction.lag.ms0Minimum age before a record may be compacted; segments younger than this are uncleanable (DEFAULT_MIN_COMPACTION_LAG_MS).
max.compaction.lag.msLong.MAX_VALUEMaximum age a dirty record may remain uncompacted; forces cleaning when exceeded (DEFAULT_MAX_COMPACTION_LAG_MS; range atLeast(1)).
delete.retention.ms86400000 (24 h)How long tombstones/markers are retained after entering the clean section (DEFAULT_DELETE_RETENTION_MS).

Broker-level cleaner & manager knobs (CleanerConfig.java, reconfigurable set at LogCleaner.java:101-109):

KeyDefaultEffect
log.cleaner.threads1Number of CleanerThreads (LOG_CLEANER_THREADS). Reconfigurable, but only by ±2× / ÷2 at a time (LogCleaner.java:266-275).
log.cleaner.dedupe.buffer.size134217728 (128 MiB)Total dedupe memory across threads; per-thread buffer = size/threads, capped 2 GiB (LOG_CLEANER_DEDUPE_BUFFER_SIZE).
log.cleaner.io.buffer.load.factor0.9Max fullness of the dedupe buffer; higher = more cleaning per pass but more collisions (LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR).
log.cleaner.io.buffer.size524288 (512 KiB)Total read+write I/O buffer across threads (LOG_CLEANER_IO_BUFFER_SIZE).
log.cleaner.io.max.bytes.per.secondDouble.MAX_VALUEThrottle on total cleaner read+write I/O (LOG_CLEANER_IO_MAX_BYTES_PER_SECOND).
log.cleaner.backoff.ms15000Sleep when no log is eligible (LOG_CLEANER_BACKOFF_MS).
log.cleaner.enabletrueDeprecated (since 4.1, removed in 5.0); the cleaner will always run (CleanerConfig.java:53-54,72-74).
log.retention.check.interval.ms300000 (5 min)Period of the retention task (LOG_CLEANUP_INTERVAL_MS_DEFAULT).
num.recovery.threads.per.data.dir2Parallelism of startup recovery and shutdown close per dir (NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT).
log.flush.offset.checkpoint.interval.ms60000Period of the recovery-point checkpoint task (LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_DEFAULT).
log.initial.task.delay.ms30000Initial delay before background tasks start (LOG_INITIAL_TASK_DELAY_MS_DEFAULT).

Failure modes, edge cases & recovery

Disk failure

Any class doing log I/O routes an IOException to LogDirFailureChannel.maybeAddOfflineLogDir, which records the dir once and enqueues it onto a bounded queue (LogDirFailureChannel.java:60-65). The single LogDirFailureHandler thread drains it and calls ReplicaManager.handleLogDirFailureLogManager.handleLogDirFailure (LogManager.java:411), which under logCreationOrDeletionLock:

  1. removes the dir from liveLogDirs and directoryIds, and halts the broker if it was the last live dir (LogManager.java:416-419);
  2. drops the dir's recovery-point and log-start checkpoint maps;
  3. tells the cleaner to stop cleaning that dir (drops its cleaner-offset checkpoint, LogCleanerManager.handleLogDirFailure);
  4. removes and closes all current and future logs under that dir, marking those partitions offline;
  5. destroys the dir's .lock.

An offline dir stays offline until broker restart (LogDirFailureChannel.java:36).

Caution

The failure channel's queue is sized to the number of log dirs (new ArrayBlockingQueue<>(logDirNum), LogDirFailureChannel.java:45). Because each dir is enqueued at most once (the putIfAbsent guard), the queue can never overflow.

Truncation during compaction

If a log is truncated below the active segment's base while it is being cleaned, LogManager.truncateTo aborts and pauses cleaning, truncates, then maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset rewinds the cleaner checkpoint if it now points past the data, before resuming (LogManager.java:944-967,1143-1147; LogCleanerManager.maybeTruncateCheckpoint:552-567). Mid-clean, checkDone raises LogCleaningAbortedException and cleanSegments deletes its partial .cleaned output (Cleaner.java:330-341).

Buffer too small for a batch

If a chunk read yields zero complete batches, the cleaner doubles its I/O buffers up to max(maxLogMessageSize, maxIoBufferSize) (a single oversized batch, e.g. compression pushing a set just over max.message.bytes, or a later reduction of that config, is still made to fit), failing only if the message genuinely exceeds the allowed maximum (Cleaner.java:508-617). A first write to an empty cleaned segment is always allowed to avoid an infinite overflow loop (Cleaner.java:469-472).

Unclean shutdown

If the clean-shutdown marker is absent (or was deleted mid-load), every segment above the recovery point is re-validated on startup. The marker is written at shutdown only for dirs that either had it before or whose logs were all recovered this run, and it carries the broker epoch for KRaft fencing (LogManager.java:905-914).

Invariants & guarantees

  • No concurrent retention + compaction on a partition. Enforced by the inProgress state machine; retention pauses non-compacted partitions, and compaction marks its target InProgress before touching it.
  • Compaction never crosses the LSO. firstUncleanableOffset ≤ lastStableOffset, so the cleaner only ever sees decided (committed/aborted) records.
  • Latest value per key survives. The offset map keeps each key's highest offset; shouldRetainRecord retains only that record (or a within-window tombstone).
  • Crash-atomic swap. The .cleaned → .swap → live sequence plus descending-offset renames make replaceSegments recoverable at any crash point.
  • Deleted data is not re-exposed. The log-start-offset checkpoint is written before/at deletion so a restart does not reveal segments below the start offset.
  • Producer sequence continuity. The last batch of each active producer (and the marker at the HW) is retained even when emptied by compaction.

Interactions with other subsystems

  • 03 · The Log Storage Engine, UnifiedLog/LocalLog/LogSegment provide the segments this chapter creates, deletes, recopies, and swaps; replaceSegments and deleteSegmentFiles live there.
  • 05 · Tiered Storage, when remote copy is enabled, retention switches to local.retention.ms/local.retention.bytes and only deletes local segments already uploaded (UnifiedLog.java:1857-1869,1979-1982).
  • 08 · Replication, ISR & High Watermark, the high-watermark guard in deletableSegments and the LSO bound on compaction tie reclamation to replication progress.
  • 14 · Transactions & Exactly-Once, the cleaner consults the transaction index, drops aborted records, and retains markers/last-producer batches.
  • 12 · Metadata Propagation & Broker Lifecycle, stray-replica detection (isStrayReplica) and intra-broker moves (future logs) are driven by the metadata image and the JBOD directory assignment.
  • 13 · Group Coordination, the internal __consumer_offsets topic is compacted; the cleaner must run for offset/group state not to grow unbounded (the deprecation note warns about exactly this, CleanerConfig.java:73-74).
  • 07 · Request Processing, DeleteRecords raises the log start offset, which then drives deleteLogStartOffsetBreachedSegments.

Design rationale & evolution

Design rationale

Log compaction was introduced in KIP-58; the min.compaction.lag.ms/max.compaction.lag.ms bounds that let operators trade compaction promptness against rewrite amplification came with KIP-354. The cleaner's transaction-aware behaviour (never cleaning past the LSO, dropping aborted records, retaining markers and the last producer batch) is part of the exactly-once design line of KIP-98 and the delete-horizon batch stamping refined under KIP-534 (record v2 baseTimestamp/delete-horizon). The MD5 SkimpyOffsetMap deliberately stores hashes rather than keys to bound memory to ~24 bytes per dirty key regardless of key size.

The cleaner is being simplified for Kafka 5.0: log.cleaner.enable is deprecated and the cleaner will always run (LogManager.java:838-841, CleanerConfig.java:53). Reconfiguration of cleaner threads (LogCleaner implements BrokerReconfigurable) shuts down and recreates the pool, which also resurrects any threads that had died (LogCleaner.java:289-301).

Gotchas & operational notes

Gotcha

A compacted topic whose only churn is below min.cleanable.dirty.ratio (default 0.5) may never compact unless max.compaction.lag.ms is set, half the log must be dirty before the ratio rule fires. For __consumer_offsets and similar, set max.compaction.lag.ms to bound staleness.

Gotcha

Tombstones are not gone the instant a key is superseded, they linger for delete.retention.ms (24 h by default) after entering the clean section so slow full-scan consumers still observe the delete. Shrinking this risks consumers missing deletes; the value is also a soft bound on how long a from-zero scan may take (CleanerConfig.java:75-77).

Gotcha

A wall-clock-future record timestamp makes a segment ineligible for retention.ms deletion (it is never "old enough"); the broker logs a warning but cannot reclaim it until time catches up (UnifiedLog.java:2008-2009). Misconfigured producer clocks can therefore stall time-based retention.

Gotcha

Watch uncleanable-partitions-count / uncleanable-bytes (per dir) and DeadThreadCount: a partition that throws during cleaning is marked uncleanable and silently skipped until restart, so its compacted topic grows unbounded meanwhile (LogCleaner.java:542-551, LogCleanerManager.java:70-71).

Note

Per-partition compaction throughput is gated by one offset map per thread: a partition with more distinct dirty keys than fit in slots × loadFactor is cleaned in multiple passes, advancing the cleaner point a chunk at a time. Increasing log.cleaner.dedupe.buffer.size (or threads) raises the per-pass key budget (Cleaner.java:781,810-813,178).

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.