04 · Log Management, Retention & Compaction
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
Chapter 03 described a single UnifiedLog, an ordered sequence of immutable segments. This chapter zooms out to the broker-wide lifecycle that surrounds those logs: LogManager, the registry of every UnifiedLog across all log.dirs (JBOD), which creates and deletes logs, recovers them in parallel on startup, and drives the background schedulers for flushing, checkpointing, and retention. It then covers the two reclamation strategies, size/time retention (cleanup.policy=delete, drop whole old segments) and log compaction (cleanup.policy=compact, keep only the latest record per key), and the asynchronous, crash-safe machinery (.deleted/.cleaned/.swap renames, the LogCleaner thread pool, the SkimpyOffsetMap, and the cleaner-offset checkpoint) that makes both safe in the presence of disk failure, truncation, and exactly-once transactions.
Role & responsibilities
LogManager is documented in its own Javadoc as "the entry point to the kafka log management subsystem ... responsible for log creation, retrieval, and cleaning" (storage/src/main/java/org/apache/kafka/storage/internals/log/LogManager.java:178). All read/write I/O is delegated down to the individual UnifiedLog instances; LogManager owns the cross-cutting concerns:
- Directory ownership & JBOD placement. It validates and locks each directory in
log.dirs, tracks which are live vs. offline vs. cordoned, and places each new partition in the directory holding the fewest partitions. - Startup recovery. A per-directory thread pool loads every partition log in parallel, recovering unflushed segments when the prior shutdown was unclean.
- Background schedulers. Five recurring tasks: retention cleanup, flush of dirty logs, recovery-point checkpointing, log-start-offset checkpointing, and async deletion of whole partition directories.
- Disk-failure handling. Reacting to a
LogDirFailureChannelevent by taking a whole directory offline without crashing the broker (unless it was the last one). - Compaction orchestration. Owning the
LogCleanerpool and mediating the race between retention and compaction (pausing the cleaner on a partition while retention or truncation runs).
Retention and compaction are two mutually-coordinated reclamation paths over the same segment files. Retention (run by the kafka-log-retention scheduler thread) deletes whole oldest-first segments; compaction (run by the kafka-log-cleaner-thread-N pool) rewrites segments to drop superseded keys. A per-partition state machine in LogCleanerManager guarantees they never touch the same partition concurrently.
Where it lives in the code
| Class / interface | File | Responsibility |
|---|---|---|
LogManager | storage/.../internals/log/LogManager.java | Registry of all logs; dir validation/locking; startup recovery; schedulers; create/delete; disk-failure handling. |
LogCleaner | storage/.../internals/log/LogCleaner.java | The pool of CleanerThreads; reconfiguration; throttling; metrics. Implements BrokerReconfigurable. |
LogCleaner.CleanerThread | storage/.../internals/log/LogCleaner.java:467 | One background thread; loop = grab filthiest log, clean it, delete eligible segments. |
Cleaner | storage/.../internals/log/Cleaner.java | The actual two-pass compaction logic for one log: build offset map, then recopy segments. |
LogCleanerManager | storage/.../internals/log/LogCleanerManager.java | Per-partition cleaning state machine; dirtiest-log selection; cleaner-offset checkpoints; uncleanable set. |
LogCleaningState | storage/.../internals/log/LogCleaningState.java | Sealed interface: InProgress / Aborted / Paused(n). |
LogToClean | storage/.../internals/log/LogToClean.java | A cleanable log + its clean/dirty byte split and cleanable ratio (the comparator key). |
OffsetMap / SkimpyOffsetMap | storage/.../internals/log/{OffsetMap,SkimpyOffsetMap}.java | Hash table mapping key-hash → latest offset, the dedupe core of compaction. |
CleanerConfig | storage/.../internals/log/CleanerConfig.java | Cleaner tunables (threads, dedupe buffer, I/O buffer/throttle, backoff). |
LogDirFailureChannel | storage/.../internals/log/LogDirFailureChannel.java | Bounded queue from any I/O thread to the failure-handler thread; each dir enqueued once. |
OffsetCheckpointFile | storage/.../internals/checkpoint/OffsetCheckpointFile.java | Text-format (topic partition offset) checkpoint used for recovery points, log-start offsets, and cleaner offsets. |
LogDirFailureHandler | core/.../server/ReplicaManager.scala:227 | The broker thread that drains the failure channel and invokes handleLogDirFailure. |
Core concepts & terminology
- Live / offline / cordoned dir
- A directory is live if validated and lockable; offline once an
IOExceptiontakes it out of service (until restart); cordoned if administratively excluded from new-partition placement (cordonedLogDirs,LogManager.java:138). - Current vs. future log
currentLogsholds the serving replica;futureLogsholds a copy being built in a different dir during an intra-broker move (a-futuredirectory) that will atomically replace the current log once it catches up (LogManager.java:92-96).- Recovery point
- The offset up to which a log is known durably flushed; segments above it are re-validated on unclean restart. Checkpointed to
recovery-point-offset-checkpoint. - Log start offset
- The lowest offset still readable (raised by
DeleteRecordsor segment deletion). Checkpointed tolog-start-offset-checkpointso deleted data is not re-exposed after restart. - Clean / dirty / cleanable section
- For a compacted log, segments below the cleaner checkpoint are clean; above it is dirty, split into a cleanable prefix and an uncleanable tail (active segment, anything past the last stable offset, anything younger than
min.compaction.lag.ms) (LogCleaner.java:53-56). - Tombstone
- A record with a non-null key and null value; treated as a delete during compaction. Retained for
delete.retention.msafter the batch enters the clean section, then dropped. - Cleanable ratio
cleanableBytes / (cleanBytes + cleanableBytes), the cleaner's "dirtiness" estimate and the selection key for the filthiest log (LogToClean.java:45-54).
Data structures
In-memory state in LogManager
The registry is two concurrent maps plus several coordination structures (LogManager.java:91-142):
| Field | Type | Guards / purpose |
|---|---|---|
currentLogs | ConcurrentHashMap<TopicPartition, UnifiedLog> | The serving logs. |
futureLogs | ConcurrentHashMap<TopicPartition, UnifiedLog> | In-progress intra-broker moves. |
logCreationOrDeletionLock | Object monitor | Serializes create/delete/rename so the two maps stay consistent. |
logsToBeDeleted | LinkedBlockingQueue<Map.Entry<UnifiedLog, Long>> | Logs whose dir was renamed …-delete, paired with the millis they were enqueued; drained by kafka-delete-logs. |
liveLogDirs | ConcurrentLinkedQueue<File> | The currently-serving directories. |
directoryIds | ConcurrentHashMap<String, Uuid> | Each dir's UUID from its meta.properties (for JBOD directory assignment). |
dirLocks | List<FileLock> | One .lock per dir, preventing a second broker process from opening it. |
recoveryPointCheckpoints / logStartOffsetCheckpoints | Map<File, OffsetCheckpointFile> | One checkpoint file per dir for each of the two offset kinds. |
partitionsInitializing | ConcurrentHashMap<TopicPartition, Boolean> | Detects a config update racing with log load (KAFKA-8813); the flag forces a config reload after init. |
Cleaning state in LogCleanerManager
Two maps under one ReentrantLock (LogCleanerManager.java:84-100):
inProgress: Map<TopicPartition, LogCleaningState>, the per-partition state machine.uncleanablePartitions: Map<String, Set<TopicPartition>>, partitions that threw during cleaning, keyed by log dir; excluded until restart or recovery.checkpoints: Map<File, OffsetCheckpointFile>, thecleaner-offset-checkpointper dir, recording the first dirty offset (cleaner point) of each compacted partition.
On-disk: the offset checkpoint file
All three offset checkpoints (recovery point, log-start, cleaner) share one plain-text format (OffsetCheckpointFile.java:40-50): a version line (CURRENT_VERSION = 0), an entry-count line, then one topic partition offset triple per line:
| Line | recovery-point-offset-checkpoint | cleaner-offset-checkpoint |
|---|---|---|
| version | 0 | 0 |
| #entries | 2 | 1 |
| entry 1 | orders 0 105823 | orders 0 98112 (first dirty offset) |
| entry 2 | orders 1 105640 | — |
topic partition offset triple per line. Writes go through CheckpointFileWithFailureHandler, which on IOException routes the dir to the LogDirFailureChannel and throws KafkaStorageException.log.dir, per checkpoint kind
recovery-point = durably-flushed offset · cleaner = first-dirty offset
Writes are crash-consistent: checkpoint.write ultimately calls Utils.atomicMoveWithFallback, which writes a temp file, fsyncs, renames, and flushes the parent directory (noted at LogManager.java:1054). The three checkpoint file names are constants: RECOVERY_POINT_CHECKPOINT_FILE, LOG_START_OFFSET_CHECKPOINT_FILE (LogManager.java:85-86), and LogCleanerManager.OFFSET_CHECKPOINT_FILE = "cleaner-offset-checkpoint" (LogCleanerManager.java:66).
The dedupe hash table, SkimpyOffsetMap
Compaction's working memory is a fixed-size open-addressing hash table that stores a cryptographic hash of each key (not the key itself) alongside the latest offset for that key (SkimpyOffsetMap.java:28-88). Each slot is bytesPerEntry = hashSize + 8 bytes; with the default MD5 (16-byte digest) that is 24 bytes per entry, so a per-thread buffer holds memory / 24 slots.
bytesPerEntry = hashSize + 8 = 24 bytes (MD5). Empty-slot test: all 24 bytes are zero (isEmpty). Probe sequence reads successive 4-byte ints from the hash itself, then degrades to linear probing once exhausted.clear() zeroes the buffer between logs. A full map throws on put; the load factor (default 0.9) caps how full it is allowed to get.This design trades a tiny false-collision probability (two distinct keys with the same MD5 → one wrongly dropped) for very low memory per key. The Javadoc on CleanerConfig spells out the tradeoff: a higher load factor "will allow more log to be cleaned at once but will lead to more hash collisions" (CleanerConfig.java:69). The buffer is allocated once per Cleaner at config.dedupeBufferSize / config.numThreads bytes (LogCleaner.java:480), capped at 2 GiB.
Architecture & control flow
LogCleanerManager state machine; disk failures flow through a bounded channel to a single handler thread.Startup: validate, lock, recover, schedule
Construction calls createAndValidateLogDirs then lockLogDirs then loadDirectoryIds (LogManager.java:229-232). Validation creates missing dirs, rejects duplicates by canonical path, and routes any failing dir to the failure channel rather than aborting, unless every dir fails, in which case the broker halts (LogManager.java:376-379). Locking acquires an exclusive FileLock on each dir's .lock file (LogManager.java:449-462).
loadLogs (LogManager.java:608) recovers logs per directory, in parallel:
- For each live dir it spins up a fixed pool of
num.recovery.threads.per.data.dirthreads namedlog-recovery-<dirPath>-<n>(LogManager.java:590-591,625). - It reads the dir's
CleanShutdownFileHandler. If the clean-shutdown marker exists, it is deleted immediately andhadCleanShutdownis cached, so a crash mid-load is correctly treated as unclean next boot (KAFKA-10471,LogManager.java:629-636). - It reads the recovery-point and log-start checkpoints (tolerating a corrupt checkpoint by resetting to 0 / first segment).
- It lists topic-partition subdirectories, skipping the
remote-log-index-cachedir and the cluster-metadata topic (LogManager.java:653-660), and submits oneloadLogjob per partition. With a clean shutdown, segment recovery is skipped entirely; otherwise every segment above the recovery point is re-validated. loadLog(LogManager.java:511) opens eachUnifiedLogand routes it: a-delete-suffixed dir →logsToBeDeleted; a-straydir → logged; a still-present-but-orphaned replica → renamed-stray; otherwise inserted intocurrentLogs/futureLogs(a duplicate is a fatalIllegalStateException).
Two gauges, remainingLogsToRecover (per dir) and remainingSegmentsToRecover (per recovery thread), let operators watch progress (LogManager.java:741-753). After loading, startupWithConfigOverrides schedules the five tasks and, if cleanerConfig.enableCleaner, builds and starts the LogCleaner (LogManager.java:811-841).
All five schedulers fire after an initial delay of log.initial.task.delay.ms (default 30 000 ms, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT), staggering background work away from the startup spike (LogManager.java:814-833).
Detailed mechanics, retention (cleanup.policy=delete)
The kafka-log-retention task runs cleanupLogs every log.retention.check.interval.ms (default 5 minutes, ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_DEFAULT). It first pauses the cleaner on all non-compacted partitions to avoid a race, iterates them calling UnifiedLog.deleteOldSegments() (and the same on any future log), then resumes (LogManager.java:1705-1744).
cleanupLogs():
deletable = cleaner.pauseCleaningForNonCompactedPartitions() // non-compact logs, now Paused(1)
for (tp, log) in deletable:
total += log.deleteOldSegments()
futureLogs[tp]?.deleteOldSegments()
finally: cleaner.resumeCleaning(deletable.keySet())
UnifiedLog.deleteOldSegments() (UnifiedLog.java:1967) dispatches on policy. For cleanup.policy=delete it runs three predicates in order, log-start-offset breach, then size breach, then time breach:
| Predicate | Condition (per candidate segment) | Code |
|---|---|---|
| logStartOffset breach | nextSegment.baseOffset() ≤ logStartOffset, the whole segment is below the start offset. | UnifiedLog.java:2071 |
| retention.bytes breach | Walk oldest-first while running total of bytes-to-remove (logSize − retentionSize) still covers the segment; delete those. | UnifiedLog.java:2040 |
| retention.ms breach | now − segment.largestTimestamp() > retentionMs. | UnifiedLog.java:2002 |
A segment's "age" is largestTimestamp(): the segment's max record timestamp if one exists, else its file lastModified time (LogSegment.java:861-866). Future-dated records are detected and logged but still block deletion (a segment with a future timestamp is never "old enough", UnifiedLog.java:2008-2009).
deletableSegments walks segments oldest-first and stops at the first that fails the predicate (UnifiedLog.java:1879-1914). Two crucial guards:
- High-watermark guard. A segment is deletable only if
highWatermark() ≥ upperBoundOffset, never delete data at or above the HW, so the log start offset can never exceed the HW (UnifiedLog.java:1892-1894). - Never-empty guard. The final segment, if empty, is never returned; and if deletion would remove all segments,
deleteSegmentsfirst rolls a fresh active segment so the log always has at least one (UnifiedLog.java:1933-1942).
On deletion, deleteSegments advances the (local) log-start offset to the base of the next surviving segment before removing the segment from the index, then calls localLog.removeAndDeleteSegments and prunes producer snapshots (UnifiedLog.java:1944-1955).
The asynchronous .deleted rename
Physical deletion is deferred and crash-safe (LocalLog.deleteSegmentFiles, LocalLog.java:846-877):
00098112.{log,index,timeindex,txnindex} ⇒ *.deletedFiles.deleteIfExists(…) for each .deleted filefile.delete.delay.ms later so in-flight reads (which captured a file handle) can finish.The default delay is file.delete.delay.ms = 60000 (ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT, wired as the default for TopicConfig.FILE_DELETE_DELAY_MS_CONFIG at LogConfig.java:224). If asyncDelete is false the unlink runs inline (LocalLog.java:872-876). Any leftover .deleted files after a crash are removed during recovery in LogLoader (LocalLog.java:982-983).
Deleting a whole partition
When a partition is removed from the broker, asyncDelete renames its directory to <topic>-<p>.<uuid>-delete and enqueues it (LogManager.java:1589-1625). The kafka-delete-logs task (deleteLogs, LogManager.java:1415) drains logsToBeDeleted, deleting each only after file.delete.delay.ms has elapsed since it was enqueued, and self-reschedules for the remaining time of the next-due entry (nextDeleteDelayMs, LogManager.java:1397-1406,1436). Before enqueueing it aborts any cleaning of that partition and updates checkpoints (LogManager.java:1597-1617).
A log always has at least one segment, and the active segment is never deleted by retention. The log start offset never exceeds the high watermark. These two together guarantee a consumer can always read from the current start offset up to the HW.
Detailed mechanics, compaction (cleanup.policy=compact)
Compaction's contract (LogCleaner.java:50-51): "A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'." The cleaner keeps, for every key, only the record at its highest offset (plus tombstones within their retention window).
Choosing the filthiest log
Each CleanerThread.doWork() calls tryCleanFilthiestLog; if nothing was cleaned it sleeps log.cleaner.backoff.ms (default 15 s) (LogCleaner.java:512-523). Selection happens in LogCleanerManager.grabFilthiestCompactedLog under the manager lock (LogCleanerManager.java:239):
- Read all cleaner checkpoints (
allCleanerCheckpoints), the per-partition first-dirty offset. - For every compacted log not already in-progress and not uncleanable, compute
cleanableOffsets→(firstDirtyOffset, firstUncleanableDirtyOffset), and build aLogToClean(which computescleanBytes,cleanableBytes, and the cleanable ratio). - Keep only logs that either need compaction now (forced by
max.compaction.lag.ms) with non-zero cleanable bytes, or whose cleanable ratio exceeds the per-topicmin.cleanable.dirty.ratio(LogCleanerManager.java:281-283). - Pick the maximum cleanable ratio; mark it
LOG_CLEANING_IN_PROGRESSand return it.
cleanableOffsets computes the first uncleanable offset as the minimum of three bounds (LogCleanerManager.java:730-742):
- the last stable offset (never clean past the LSO, see transactions below);
- the active segment's base offset (the active segment is always uncleanable);
- if
min.compaction.lag.ms > 0, the base offset of the first segment whoselargestTimestamp > now − minCompactionLagMs(findFirstUncleanableSegment,LogCleanerManager.java:767-783).
It also self-heals a bad checkpoint: if the checkpointed dirty offset is below the log start or above the log end, it resets firstDirtyOffset to the log start and flags forceUpdateCheckpoint (LogCleanerManager.java:702-722). max.compaction.lag.ms forces cleaning even of a barely-dirty log: maxCompactionDelay measures how long the earliest dirty segment has waited beyond the max-lag and sets needCompactionNow when positive (LogCleanerManager.java:669-680).
firstDirtyOffset = the cleaner checkpoint; cleaning recopies [firstDirtyOffset, firstUncleanableDirtyOffset). cleanableRatio = cleanableBytes / (cleanBytes + cleanableBytes); eligible if cleanableRatio > min.cleanable.dirty.ratio OR needCompactionNow (max.compaction.lag.ms exceeded).[firstDirtyOffset, firstUncleanableDirtyOffset).firstDirtyOffset ≤ cleanable < firstUncleanableDirtyOffset ≤ uncleanable
Pass 1, build the offset map
Cleaner.doClean (Cleaner.java:159) first calls buildOffsetMap over the dirty section [firstDirtyOffset, firstUncleanableOffset) (Cleaner.java:710). For each dirty segment it iterates batches, and for each non-control, non-aborted record with a key at or above the start offset, it puts key → offset into the SkimpyOffsetMap (Cleaner.java:807-817). Because later offsets overwrite earlier ones, the map ends holding each key's highest offset in the dirty range. It stops adding once the map reaches slots × loadFactor entries and returns "full", so a single clean may cover only part of the dirty range; endOffset = map.latestOffset() + 1 becomes the new cleaner point (Cleaner.java:178,810-814).
Pass 2, recopy segments keeping only survivors
Segments from offset 0 up to endOffset are grouped by groupSegmentsBySize so a group's combined log and index bytes stay under segment.bytes / segment.index.bytes and the offset span stays under Integer.MAX_VALUE, this merges tiny segments and "prevents segment sizes from shrinking too much" with repeated cleanings (Cleaner.java:641-676). Each group is rewritten by cleanSegments into one or more .cleaned segments (Cleaner.java:228), and finally swapped in via UnifiedLog.replaceSegments (Cleaner.java:328).
The retain/discard decision for each record is shouldRetainRecord (Cleaner.java:567-602):
- If
record.offset() > map.latestOffset()the record is past the mapped range → always retain (it is beyond what this pass deduplicated). - Else look up the key. Retain only if this is the latest offset for the key (
record.offset() ≥ foundOffset) and the record either has a value or is a tombstone still within its delete-retention window. - A record with no key is invalid in a compacted log; it is dropped and counted in
stats.invalidMessagesRead(Cleaner.java:598-601).
The atomic, crash-safe swap
UnifiedLog.createNewCleanedSegment writes to *.cleaned files. replaceSegments (LocalLog.java:1004-1062) then performs a two-phase swap designed to be recoverable at any crash point (the ordering described at LocalLog.java:974-990):
XXXX.cleaned (.log/.index/.timeindex/.txnindex).swapDESCENDING base-offset order, for crash recovery.deleted · schedule async unlink.swap ⇒ ‘’drop suffix; segment now liveflushDir(dir)fsync the directory entry.swap file whose offset exceeds the minimum-offset .cleaned file is deleted, and the swap is replayed; any .deleted files are removed (LocalLog.java:974-990). The descending-rename order makes this deterministic.Tombstones and delete.retention.ms
A tombstone (null value) must survive long enough that a consumer reading from offset 0 still observes the delete before it disappears, otherwise it could see the pre-tombstone value and never learn the key was removed. The cleaner therefore retains a tombstone for delete.retention.ms (default 86 400 000 ms = 24 h, LogConfig.DEFAULT_DELETE_RETENTION_MS) measured from when its batch enters the clean section. For the modern v2 record format this is tracked by stamping a delete horizon into the batch header on the first clean that encounters it; the record is dropped only once currentTime ≥ batch.deleteHorizonMs() (Cleaner.java:589-596). For legacy (< v2) formats a coarser horizon based on the last clean segment's lastModified − deleteRetentionMs is used (Cleaner.java:166-170,260).
Interaction with transactions / markers
The cleaner's transaction handling is summarized at LogCleaner.java:81-96 and enforced in cleanInto (Cleaner.java:366):
- Never clean past the LSO. Because
firstUncleanableOffsetis bounded bylastStableOffset(), every record the cleaner sees is already committed or aborted, so the transaction index can be consulted up front (LogCleanerManager.java:730-733). - Aborted-transaction records are dropped unconditionally, ignoring keys (
CleanedTransactionMetadatadrivesdiscardBatchRecords,Cleaner.java:386-391). - The last batch of each active producer is retained even if empty (
RETAIN_EMPTY) to preserve sequence-number continuity; likewise the batch whose next offset equals the high watermark is retained so the last-offset information survives (Cleaner.java:394-420). - Transaction markers (control batches) are retained until all records from their transaction are gone and a delete-horizon has passed, the same tombstone-retention logic, applied to markers (
Cleaner.java:384-389).
Concurrency & threading
| Thread(s) | Count / name | Work |
|---|---|---|
| Recovery pool | num.recovery.threads.per.data.dir per dir; log-recovery-<dir>-<n> (default 2) | Parallel log load/recovery at startup; non-daemon (LogManager.java:585,625). |
| Generic scheduler | shared KafkaScheduler pool | Runs kafka-log-retention, kafka-log-flusher, the two checkpoint tasks, kafka-delete-logs, and per-segment delete-file tasks. |
| Cleaner pool | log.cleaner.threads; kafka-log-cleaner-thread-<id> (default 1) | ShutdownableThreads; each owns a Cleaner with its own offset map and I/O buffers (LogCleaner.java:467-492). |
| Dir-failure handler | 1; LogDirFailureHandler | Blocks on takeNextOfflineLogDir(), calls handleLogDirFailure (ReplicaManager.scala:227-232). |
| Close pool | per-dir at shutdown; log-closing-<dir> | Flush + close every log, then write checkpoints and the clean-shutdown marker (LogManager.java:874-916). |
Locks. LogManager.logCreationOrDeletionLock (a plain monitor) serializes create/delete/rename and disk-failure handling. LogCleanerManager.lock (a ReentrantLock with a pausedCleaningCond condition) guards all access to inProgress, uncleanablePartitions, and the checkpoint map (LogCleanerManager.java:93-100). Retention runs under UnifiedLog's own lock while deleting segments (UnifiedLog.java:1840). I/O throttling across all cleaner threads is a single shared Throttler sized to log.cleaner.io.max.bytes.per.second (LogCleaner.java:168).
The cleaning state machine
LogCleaningState is a sealed interface with three cases (LogCleaningState.java:24-72). Transitions are all guarded by the manager lock:
grabFilthiestCompactedLog moves a partition to InProgress; a normal doneCleaning returns it to None. If interrupted, abortAndPauseCleaning flags it Aborted; the cleaner thread notices that flag at the next checkDone (which throws LogCleaningAbortedException, unwinding and deleting .cleaned output), then transitions to Paused(n). abortAndPauseCleaning blocks on pausedCleaningCond until the partition is Paused (LogCleanerManager.java:367-392,452-459). Pauses are reentrant: Paused(n) requires n matching resumeCleaning calls before returning to None.This is how LogManager.truncateTo, truncateFullyAndStartAt, intra-broker moves, and partition deletion safely interleave with compaction: each calls abortAndPauseCleaning(tp), does its work, then resumeCleaning(tp) (LogManager.java:944-1003,1453-1455). If a clean threw an unexpected exception, tryCleanFilthiestLog marks the partition uncleanable so it is skipped until maintainUncleanablePartitions or a restart clears it (LogCleaner.java:542-551; LogCleanerManager.java:640-652).
Configuration reference
Retention / segment-lifecycle (topic-level keys; broker defaults via ServerLogConfigs):
| Key | Default | Effect |
|---|---|---|
cleanup.policy | delete | delete, compact, or both; selects the reclamation path (TopicConfig.java:169-171). |
retention.ms | 604800000 (7 d) | Delete segments older than this by largest timestamp (DEFAULT_RETENTION_MS, LogConfig.java:134). |
retention.bytes | -1 (unbounded) | Delete oldest segments once the log exceeds this size (LOG_RETENTION_BYTES_DEFAULT = -1). |
file.delete.delay.ms | 60000 | Delay between .deleted rename and physical unlink (LOG_DELETE_DELAY_MS_DEFAULT). |
segment.bytes | 1073741824 (1 GiB) | Max bytes per group when grouping segments for compaction; also the roll size (DEFAULT_SEGMENT_BYTES). |
flush.ms | Long.MAX_VALUE | Max time before kafka-log-flusher fsyncs a dirty log (LOG_FLUSH_SCHEDULER_INTERVAL_MS_DEFAULT; LogManager.java:1812). |
Compaction (topic-level; cleaner defaults in CleanerConfig / LogConfig):
| Key | Default | Effect |
|---|---|---|
min.cleanable.dirty.ratio | 0.5 | Cleanable-ratio threshold to make a log eligible (DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, LogConfig.java:138). |
min.compaction.lag.ms | 0 | Minimum age before a record may be compacted; segments younger than this are uncleanable (DEFAULT_MIN_COMPACTION_LAG_MS). |
max.compaction.lag.ms | Long.MAX_VALUE | Maximum age a dirty record may remain uncompacted; forces cleaning when exceeded (DEFAULT_MAX_COMPACTION_LAG_MS; range atLeast(1)). |
delete.retention.ms | 86400000 (24 h) | How long tombstones/markers are retained after entering the clean section (DEFAULT_DELETE_RETENTION_MS). |
Broker-level cleaner & manager knobs (CleanerConfig.java, reconfigurable set at LogCleaner.java:101-109):
| Key | Default | Effect |
|---|---|---|
log.cleaner.threads | 1 | Number of CleanerThreads (LOG_CLEANER_THREADS). Reconfigurable, but only by ±2× / ÷2 at a time (LogCleaner.java:266-275). |
log.cleaner.dedupe.buffer.size | 134217728 (128 MiB) | Total dedupe memory across threads; per-thread buffer = size/threads, capped 2 GiB (LOG_CLEANER_DEDUPE_BUFFER_SIZE). |
log.cleaner.io.buffer.load.factor | 0.9 | Max fullness of the dedupe buffer; higher = more cleaning per pass but more collisions (LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR). |
log.cleaner.io.buffer.size | 524288 (512 KiB) | Total read+write I/O buffer across threads (LOG_CLEANER_IO_BUFFER_SIZE). |
log.cleaner.io.max.bytes.per.second | Double.MAX_VALUE | Throttle on total cleaner read+write I/O (LOG_CLEANER_IO_MAX_BYTES_PER_SECOND). |
log.cleaner.backoff.ms | 15000 | Sleep when no log is eligible (LOG_CLEANER_BACKOFF_MS). |
log.cleaner.enable | true | Deprecated (since 4.1, removed in 5.0); the cleaner will always run (CleanerConfig.java:53-54,72-74). |
log.retention.check.interval.ms | 300000 (5 min) | Period of the retention task (LOG_CLEANUP_INTERVAL_MS_DEFAULT). |
num.recovery.threads.per.data.dir | 2 | Parallelism of startup recovery and shutdown close per dir (NUM_RECOVERY_THREADS_PER_DATA_DIR_DEFAULT). |
log.flush.offset.checkpoint.interval.ms | 60000 | Period of the recovery-point checkpoint task (LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS_DEFAULT). |
log.initial.task.delay.ms | 30000 | Initial delay before background tasks start (LOG_INITIAL_TASK_DELAY_MS_DEFAULT). |
Failure modes, edge cases & recovery
Disk failure
Any class doing log I/O routes an IOException to LogDirFailureChannel.maybeAddOfflineLogDir, which records the dir once and enqueues it onto a bounded queue (LogDirFailureChannel.java:60-65). The single LogDirFailureHandler thread drains it and calls ReplicaManager.handleLogDirFailure → LogManager.handleLogDirFailure (LogManager.java:411), which under logCreationOrDeletionLock:
- removes the dir from
liveLogDirsanddirectoryIds, and halts the broker if it was the last live dir (LogManager.java:416-419); - drops the dir's recovery-point and log-start checkpoint maps;
- tells the cleaner to stop cleaning that dir (drops its cleaner-offset checkpoint,
LogCleanerManager.handleLogDirFailure); - removes and closes all current and future logs under that dir, marking those partitions offline;
- destroys the dir's
.lock.
An offline dir stays offline until broker restart (LogDirFailureChannel.java:36).
The failure channel's queue is sized to the number of log dirs (new ArrayBlockingQueue<>(logDirNum), LogDirFailureChannel.java:45). Because each dir is enqueued at most once (the putIfAbsent guard), the queue can never overflow.
Truncation during compaction
If a log is truncated below the active segment's base while it is being cleaned, LogManager.truncateTo aborts and pauses cleaning, truncates, then maybeTruncateCleanerCheckpointToActiveSegmentBaseOffset rewinds the cleaner checkpoint if it now points past the data, before resuming (LogManager.java:944-967,1143-1147; LogCleanerManager.maybeTruncateCheckpoint:552-567). Mid-clean, checkDone raises LogCleaningAbortedException and cleanSegments deletes its partial .cleaned output (Cleaner.java:330-341).
Buffer too small for a batch
If a chunk read yields zero complete batches, the cleaner doubles its I/O buffers up to max(maxLogMessageSize, maxIoBufferSize) (a single oversized batch, e.g. compression pushing a set just over max.message.bytes, or a later reduction of that config, is still made to fit), failing only if the message genuinely exceeds the allowed maximum (Cleaner.java:508-617). A first write to an empty cleaned segment is always allowed to avoid an infinite overflow loop (Cleaner.java:469-472).
Unclean shutdown
If the clean-shutdown marker is absent (or was deleted mid-load), every segment above the recovery point is re-validated on startup. The marker is written at shutdown only for dirs that either had it before or whose logs were all recovered this run, and it carries the broker epoch for KRaft fencing (LogManager.java:905-914).
Invariants & guarantees
- No concurrent retention + compaction on a partition. Enforced by the
inProgressstate machine; retention pauses non-compacted partitions, and compaction marks its targetInProgressbefore touching it. - Compaction never crosses the LSO.
firstUncleanableOffset ≤ lastStableOffset, so the cleaner only ever sees decided (committed/aborted) records. - Latest value per key survives. The offset map keeps each key's highest offset;
shouldRetainRecordretains only that record (or a within-window tombstone). - Crash-atomic swap. The
.cleaned → .swap → livesequence plus descending-offset renames makereplaceSegmentsrecoverable at any crash point. - Deleted data is not re-exposed. The log-start-offset checkpoint is written before/at deletion so a restart does not reveal segments below the start offset.
- Producer sequence continuity. The last batch of each active producer (and the marker at the HW) is retained even when emptied by compaction.
Interactions with other subsystems
- 03 · The Log Storage Engine,
UnifiedLog/LocalLog/LogSegmentprovide the segments this chapter creates, deletes, recopies, and swaps;replaceSegmentsanddeleteSegmentFileslive there. - 05 · Tiered Storage, when remote copy is enabled, retention switches to
local.retention.ms/local.retention.bytesand only deletes local segments already uploaded (UnifiedLog.java:1857-1869,1979-1982). - 08 · Replication, ISR & High Watermark, the high-watermark guard in
deletableSegmentsand the LSO bound on compaction tie reclamation to replication progress. - 14 · Transactions & Exactly-Once, the cleaner consults the transaction index, drops aborted records, and retains markers/last-producer batches.
- 12 · Metadata Propagation & Broker Lifecycle, stray-replica detection (
isStrayReplica) and intra-broker moves (future logs) are driven by the metadata image and the JBOD directory assignment. - 13 · Group Coordination, the internal
__consumer_offsetstopic is compacted; the cleaner must run for offset/group state not to grow unbounded (the deprecation note warns about exactly this,CleanerConfig.java:73-74). - 07 · Request Processing,
DeleteRecordsraises the log start offset, which then drivesdeleteLogStartOffsetBreachedSegments.
Design rationale & evolution
Log compaction was introduced in KIP-58; the min.compaction.lag.ms/max.compaction.lag.ms bounds that let operators trade compaction promptness against rewrite amplification came with KIP-354. The cleaner's transaction-aware behaviour (never cleaning past the LSO, dropping aborted records, retaining markers and the last producer batch) is part of the exactly-once design line of KIP-98 and the delete-horizon batch stamping refined under KIP-534 (record v2 baseTimestamp/delete-horizon). The MD5 SkimpyOffsetMap deliberately stores hashes rather than keys to bound memory to ~24 bytes per dirty key regardless of key size.
The cleaner is being simplified for Kafka 5.0: log.cleaner.enable is deprecated and the cleaner will always run (LogManager.java:838-841, CleanerConfig.java:53). Reconfiguration of cleaner threads (LogCleaner implements BrokerReconfigurable) shuts down and recreates the pool, which also resurrects any threads that had died (LogCleaner.java:289-301).
Gotchas & operational notes
A compacted topic whose only churn is below min.cleanable.dirty.ratio (default 0.5) may never compact unless max.compaction.lag.ms is set, half the log must be dirty before the ratio rule fires. For __consumer_offsets and similar, set max.compaction.lag.ms to bound staleness.
Tombstones are not gone the instant a key is superseded, they linger for delete.retention.ms (24 h by default) after entering the clean section so slow full-scan consumers still observe the delete. Shrinking this risks consumers missing deletes; the value is also a soft bound on how long a from-zero scan may take (CleanerConfig.java:75-77).
A wall-clock-future record timestamp makes a segment ineligible for retention.ms deletion (it is never "old enough"); the broker logs a warning but cannot reclaim it until time catches up (UnifiedLog.java:2008-2009). Misconfigured producer clocks can therefore stall time-based retention.
Watch uncleanable-partitions-count / uncleanable-bytes (per dir) and DeadThreadCount: a partition that throws during cleaning is marked uncleanable and silently skipped until restart, so its compacted topic grows unbounded meanwhile (LogCleaner.java:542-551, LogCleanerManager.java:70-71).
Per-partition compaction throughput is gated by one offset map per thread: a partition with more distinct dirty keys than fit in slots × loadFactor is cleaned in multiple passes, advancing the cleaner point a chunk at a time. Increasing log.cleaner.dedupe.buffer.size (or threads) raises the per-pass key budget (Cleaner.java:781,810-813,178).