krivaltsevich.com Kafka Internals4.4

05 · Tiered Storage (Remote Log)

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

Tiered storage (KIP-405) lets a Kafka topic keep a small, hot tail of log segments on local disk while offloading older, immutable segments to an external object store. The broker-side orchestrator is RemoteLogManager; it drives two pluggable SPIs, RemoteStorageManager (RSM) for byte movement of segment data and indexes, and RemoteLogMetadataManager (RLMM) for tracking RemoteLogSegmentMetadata with strongly-consistent semantics. Leaders run a copy task that uploads eligible segments past the local-retention boundary, an expiration task that enforces remote retention, and a follower task that tracks the highest-copied offset; consumers that fetch below the local log start are served by streaming the segment back from remote storage through a thread-pooled read path, with local index files cached by RemoteIndexCache. The default RLMM, TopicBasedRemoteLogMetadataManager, persists metadata as records in the internal __remote_log_metadata topic.

Role & responsibilities

Without tiered storage, a topic's retention is bounded by the aggregate local disk of the brokers that host its replicas. Tiered storage decouples retention from local disk: the broker copies sealed (non-active) segments to a remote object store, then deletes the corresponding local copies once they fall outside the (smaller) local retention. Reads for offsets that exist only in remote storage are transparently served by fetching from the object store. The RemoteLogManager Javadoc enumerates its duties at storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java:144: initialize the RSM and RLMM instances; react to leader/follower/stop-partition events; expose APIs to fetch indexes and metadata; copy log segments to remote; and clean up segments expired by remote retention.

Key idea

The contract splits cleanly: the RSM owns data (segment + index bytes) with only eventual-consistency requirements, while the RLMM owns metadata (which segment covers which offsets/epochs) with strongly consistent semantics. The broker always writes/updates RLMM metadata around every RSM data operation, so the durable source of truth about what exists in remote is the metadata, not the object store listing (RemoteStorageManager.java:33).

Tiered storage
System feature enabled cluster-wide by remote.log.storage.system.enable (default false) and per-topic by remote.storage.enable.
Local retention
How long/large the locally-kept tail may be (log.local.retention.ms / .bytes). Always ≤ the topic's total retention.
Remote retention
Effectively the topic's total retention.ms / retention.bytes, applied to remote segments by the expiration task.
Copied offset
Highest offset known to be durably in remote storage for the partition; gates local deletion.
RSM / RLMM
The two plugin SPIs: RemoteStorageManager and RemoteLogMetadataManager.

Where it lives in the code

Class / interfaceFileResponsibility
RemoteLogManagerstorage/.../remote/storage/RemoteLogManager.javaBroker orchestrator: tasks, read path, quotas, plugin wiring (~2435 lines).
RemoteStorageManager (SPI)storage/api/.../remote/storage/RemoteStorageManager.javaCopy / fetch / delete segment data & indexes to/from object store.
RemoteLogMetadataManager (SPI)storage/api/.../remote/storage/RemoteLogMetadataManager.javaStore / fetch RemoteLogSegmentMetadata, strongly consistent.
RemoteLogSegmentMetadatastorage/api/.../remote/storage/RemoteLogSegmentMetadata.javaPer-segment metadata record (offsets, epochs, size, state).
RemoteLogSegmentIdstorage/api/.../remote/storage/RemoteLogSegmentId.javaUniversally-unique (TopicIdPartition, Uuid) identifier.
LogSegmentDatastorage/api/.../remote/storage/LogSegmentData.javaPaths/buffers for the five artefacts passed to copyLogSegmentData.
RemoteIndexCachestorage/.../internals/log/RemoteIndexCache.javaOn-disk LFU cache of remote offset/time/txn indexes.
RemoteLogReader / RemoteStorageThreadPoolstorage/.../remote/storage/RemoteLogReader.java, internals/log/RemoteStorageThreadPool.javaAsync remote read task + its bounded thread pool.
TopicBasedRemoteLogMetadataManagerstorage/.../remote/metadata/storage/TopicBasedRemoteLogMetadataManager.javaDefault RLMM backed by __remote_log_metadata.
ConsumerTask / ProducerManager / RemoteLogMetadataCachestorage/.../remote/metadata/storage/{ConsumerTask,ProducerManager,RemoteLogMetadataCache}.javaConsume/produce metadata records; in-memory per-partition cache.
RLMQuotaManagerstorage/.../remote/quota/RLMQuotaManager.javaSliding-window (sample-based) byte-rate throttling for copy & fetch.
RemoteLogManagerConfigstorage/.../remote/storage/RemoteLogManagerConfig.javaAll broker-level RLM config keys & defaults.

Core concepts & terminology

The two SPIs

RemoteStorageManager (RemoteStorageManager.java:52) is Configurable + Closeable and exposes exactly five operations: copyLogSegmentData(metadata, LogSegmentData) returning an optional CustomMetadata; two overloads of fetchLogSegment(metadata, startPosition[, endPosition]) returning an InputStream; fetchIndex(metadata, IndexType) for one of the five IndexType values OFFSET, TIMESTAMP, PRODUCER_SNAPSHOT, TRANSACTION, LEADER_EPOCH (RemoteStorageManager.java:57); and deleteLogSegmentData(metadata). Copy and delete must be idempotent (re-copy overwrites; delete of a missing object must not throw). Plugins signal transient failures with RetriableRemoteStorageException so the RLM can retry without cancelling the task (RemoteStorageManager.java:47).

RemoteLogMetadataManager (RemoteLogMetadataManager.java:55) provides async, future-returning mutators, addRemoteLogSegmentMetadata (must be in state COPY_SEGMENT_STARTED), updateRemoteLogSegmentMetadata (state transition), putRemotePartitionDeleteMetadata, plus synchronous lookups: remoteLogSegmentMetadata(tpId, epoch, offset), highestOffsetForEpoch, listRemoteLogSegments(tpId[, epoch]) (epoch variant sorted by start offset), remoteLogSize(tpId, epoch), and the default-method nextSegmentWithTxnIndex. Lifecycle callbacks onPartitionLeadershipChanges and onStopPartitions tell the RLMM which partitions this broker now serves, and isReady(tpId) gates task execution until the partition's metadata is loaded.

Note

Both plugins can be loaded by a dedicated ChildFirstClassLoader when remote.log.storage.manager.class.path / remote.log.metadata.manager.class.path is set, isolating plugin dependencies from the broker classpath (RemoteLogManager.java:389, :409). Each plugin is wrapped via Plugin.wrapInstance(...) so a Monitorable plugin can register metrics tagged with its class name (RemoteLogManager.java:402, :422). RSM gets props prefixed rsm.config.; RLMM gets rlmm.config. plus broker.id, log.dir, cluster.id, and the bootstrap endpoint.

Segment state machine

Every remote segment moves through four states defined in RemoteLogSegmentState.java:51, each with a 1-byte id used on the wire:

add · initial COPY_SEGMENT_STARTED · id 0 copy ok  ·  (also → DELETE_STARTED on copy aborted / cleanup) COPY_SEGMENT_FINISHED · id 1 retention breach DELETE_SEGMENT_STARTED · id 2 RSM delete ok DELETE_SEGMENT_FINISHED · id 3 purged from cache
RemoteLogSegmentState transitions. From null only COPY_SEGMENT_STARTED is legal; COPY_SEGMENT_STARTED may also jump straight to DELETE_SEGMENT_STARTED on an aborted copy; every self-transition is valid for idempotent retry / failover.
pill = segment lifecycle state readable (COPY_SEGMENT_FINISHED only) delete path (invisible to reads) ● = initial · ◉ = terminal (purged) state transition (label = trigger) id N = 1-byte on-wire state id

isValidTransition (RemoteLogSegmentState.java:90) enforces the lattice: from null only COPY_SEGMENT_STARTED; a self-transition is always valid (idempotency under retry/failover); COPY_SEGMENT_STARTED{FINISHED, DELETE_STARTED}; COPY_SEGMENT_FINISHEDDELETE_STARTED; DELETE_STARTEDDELETE_FINISHED. The cache keeps every segment that has not reached DELETE_SEGMENT_FINISHED; only COPY_SEGMENT_FINISHED segments are visible to reads (RemoteLogMetadataCache.java:78 table).

Data structures

RemoteLogSegmentId & RemoteLogSegmentMetadata

RemoteLogSegmentId is (TopicIdPartition topicIdPartition, Uuid id) with a fresh random Uuid generated for each copy attempt via generateNew (RemoteLogSegmentId.java:40). Re-uploading the same segment after a failure produces a new id, which is how copy stays idempotent without coordinating on object names.

RemoteLogSegmentMetadata (RemoteLogSegmentMetadata.java:37) carries the field set below. Note the constructor invariants: startOffset ≥ 0, endOffset ≥ startOffset, and segmentLeaderEpochs must be non-empty (a single-epoch segment still maps that epoch → its start offset). It extends RemoteLogMetadata, which adds brokerId and eventTimestampMs.

FieldTypeMeaning
remoteLogSegmentIdRemoteLogSegmentIdUnique id (topic-id-partition + uuid).
startOffset / endOffsetlongInclusive offset range of the segment.
maxTimestampMslongLargest record timestamp; drives time-based remote retention.
segmentLeaderEpochsNavigableMap<Integer,Long>Leader-epoch → start-offset lineage within the segment (immutable TreeMap).
segmentSizeInBytesintSize of the segment data; drives size-based retention & lag.
customMetadataOptional<CustomMetadata>Opaque RSM-returned bytes (e.g. object path/size). Size-capped.
stateRemoteLogSegmentStateLifecycle state (see above).
txnIdxEmptybooleanTrue if the segment has no transaction index (skip txn fetch).
brokerId / eventTimestampMsint / longOrigin broker & event time (from RemoteLogMetadata base).

createWithUpdates(RemoteLogSegmentMetadataUpdate) (RemoteLogSegmentMetadata.java:310) produces a new immutable instance with the update's brokerId, eventTimestampMs, customMetadata, and state applied, offsets, epochs, and size are carried over unchanged. CustomMetadata (RemoteLogSegmentMetadata.java:384) wraps a byte[] whose accepted maximum is remote.log.metadata.custom.metadata.max.bytes (default 128; RemoteLogManagerConfig.java:88).

LogSegmentData, the five artefacts

copyLogSegmentData receives a LogSegmentData bundling the segment and its auxiliary indexes (LogSegmentData.java:29): logSegment (the .log file), offsetIndex (.index), timeIndex (.timeindex), transactionIndex (Optional<Path>, may be absent), producerSnapshotIndex (the producer-state snapshot at the next segment's base offset), and leaderEpochIndex, a ByteBuffer rather than a path. The RLM builds that buffer by serializing the leader-epoch checkpoint entries up to the next base offset via epochEntriesAsByteBuffer using the LeaderEpochCheckpointFile.FORMATTER (RemoteLogManager.java:2256).

On-wire metadata record & on-disk index files

The default RLMM serializes each RemoteLogMetadata event to bytes with RemoteLogMetadataSerde and produces it as the value of a record on __remote_log_metadata (key is null; partition is chosen explicitly, see partitioner below) (ProducerManager.java:83). On the read path, the local index files cached by RemoteIndexCache are named {startOffset}_{uuid}{suffix}, e.g. 45_<uuid>.index / .timeindex / .txnindex, under $logdir/remote-log-index-cache (RemoteIndexCache.java:713, :83). A remote segment's data stream is parsed batch-by-batch by RemoteLogInputStream, which reads the fixed log header up to the magic byte, derives the batch size from the SIZE_OFFSET field, allocates LOG_OVERHEAD + size bytes, and constructs a DefaultRecordBatch (magic > v1) over that buffer (clients/.../record/internal/RemoteLogInputStream.java:41).

Architecture & control/data flow

produceclient append
UnifiedLoglocal log, active · sealed · local-retention boundary
RemoteLogManagerbroker orchestrator (leader)
copy & read pathsRemoteLogManager fans out ↓
RLMM (SPI)add COPY_STARTED ⇒ update COPY_FINISHED · __remote_log_metadata
RSM (SPI)PUT segment + indexes
RemoteStorageThreadPoolRemoteLogReader.call ⇒ rlm.read
RSM (SPI)GET bytes
object storeS3 · GCS · ABS · HDFS
Copy path writes RLMM metadata then PUTs bytes via the RSM; the read path streams a remote segment back through the thread pool on a fetch below the local log start. Local deletion is gated by the highest remotely-stored offset.
client / produce broker component (RLM · RSM) log / metadata / object store remote read thread pool cylinder = log / store data / control flow deletion guard (feedback)

Task model on leadership change

onLeadershipChange(becomeLeader, becomeFollower, topicIds) (RemoteLogManager.java:469) filters to partitions whose UnifiedLog.remoteLogEnabled() is true, caches topic-id ↔ partition mappings, notifies the RLMM via onPartitionLeadershipChanges, then installs tasks. Leaders get a RLMCopyTask (only if remote.log.copy.disable=false) and a RLMExpirationTask; followers get a RLMFollowerTask (doHandleLeaderPartition at :2150, doHandleFollowerPartition at :2176). Each task is scheduled with scheduleWithFixedDelay(task, 0, remote.log.manager.task.interval.ms, MS) on its pool. Converting roles cancels the opposing tasks first via RLMTaskWithFuture.cancel(), which sets a volatile cancelled flag and calls future.cancel(true) (:2207).

Detailed mechanics

The copy task

RLMCopyTask.execute (RemoteLogManager.java:880) first resets per-task state if the log directory changed (an alter-logdir guard, KAFKA-16711), then calls copyLogSegmentsToRemote(log). The algorithm (:996):

  1. Establish offsets. On first run after becoming leader, maybeUpdateLogStartOffsetOnBecomingLeader computes the remote log-start via findLogStartOffset (start offset of the earliest segment found by walking leader epochs from the earliest entry forward; falls back to localLogStartOffset) and pushes it through updateRemoteLogStartOffset (:892, :2121). maybeUpdateCopiedOffset computes the highest already-copied offset via findHighestRemoteOffset, walking leader epochs from the latest entry backward, asking the RLMM highestOffsetForEpoch, and reconciling against the epoch's end offset (:902, :2091). It seeds log.updateHighestOffsetInRemoteStorage(...).
  2. Select candidates. With copiedOffset and the partition's last-stable-offset (LSO), candidateLogSegments(log, fromOffset, lso) returns all sealed segments (never the active segment) whose base offset ≤ LSO, in order, stopping at the first segment that is not yet eligible by copy-lag (:925). fromOffset = max(copiedOffset+1, logStartOffset).
  3. Copy-lag gate. isEligibleForUpload (:949) returns true immediately if either remote.copy.lag.ms or remote.copy.lag.bytes is 0 (the short-circuit at :957 is an ||); otherwise a segment is eligible if its age (now − largestTimestamp) ≥ copy-lag-ms, or the bytes of log after the segment ≥ copy-lag-bytes.
  4. Quota wait. Before each segment, under copyQuotaManagerLock, the task polls rlmCopyQuotaManager.getThrottleTimeMs() and, while > 0, records the throttle and awaits on copyQuotaManagerLockCondition for up to quotaTimeout() = 1 s, re-checking each cycle; on clearance it records the segment's byte size against the quota and signals other waiters (:1028).
  5. Per-segment copy. The caller copyLogSegmentsToRemote generates a fresh RemoteLogSegmentId via generateNew (:1049) and registers it in segmentIdsBeingCopied (:1050), then passes it to copyLogSegment (:1079), which builds the COPY_SEGMENT_STARTED metadata (offsets, largestTimestamp, size, segment leader epochs, isTxnIdxEmpty), and addRemoteLogSegmentMetadata(...).get(), blocking until the RLMM persists it. It then assembles LogSegmentData and calls RSM.copyLogSegmentData. On success it writes COPY_SEGMENT_FINISHED via updateRemoteLogSegmentMetadata(...).get(), advances the in-memory copiedOffsetOption to (endOffset, lastEpochInSegment), and calls log.updateHighestOffsetInRemoteStorage(endOffset).
generate uuidRemoteLogSegmentId.generateNew
add COPY_SEGMENT_STARTEDRLMM · blocking add(…).get()
RSM.copyLogSegmentData(meta, LogSegmentData)
rethrowtask retries next tick
deleteRemoteLogSegmentcleanup ⇒ rethrow
size > custom.metadata.max.bytes?
cleanup + throwCustomMetadataSizeLimitExceeded ⇒ cancel task
update COPY_SEGMENT_FINISHEDRLMM · blocking update(…).get()
copiedOffset := endOffsetlog.updateHighestOffsetInRemoteStorage(endOffset)
copyLogSegment failure handling (RemoteLogManager.java:1079). The highest remote offset advances only after COPY_SEGMENT_FINISHED is durable, so a mid-copy crash can never orphan local data.
broker step (RLM · RSM call) RLMM metadata write failure / abort path cylinder = metadata store rounded = decision / branch success flow error throw retriable (retry next tick)
Invariant

A local segment is only eligible for deletion once it is fully in remote. UnifiedLog.isSegmentEligibleForDeletion requires upperBoundOffset − 1 ≤ highestOffsetInRemoteStorage() (or that the log-start-offset was incremented past it) when remote copy is enabled (storage/.../internals/log/UnifiedLog.java:1857). Because copyLogSegment only advances highestOffsetInRemoteStorage after the COPY_SEGMENT_FINISHED metadata is durable, a crash mid-copy can never orphan data: the local copy survives and the next leader re-copies.

Custom-metadata overflow

If the RSM returns CustomMetadata larger than the configured cap, the task does not store the finished update; instead it reconstructs the finished metadata only to delete the just-copied data, throws CustomMetadataSizeLimitExceededException, and cancel()s the copy task for that partition (RemoteLogManager.java:1128, caught at :1061). This stops further copying for the partition until intervention, surfacing as failedRemoteCopyRequestRate.

The follower task

Followers do not copy; RLMFollowerTask.execute simply calls findHighestRemoteOffset and log.updateHighestOffsetInRemoteStorage(...) (RemoteLogManager.java:1662). This keeps a follower's local deletion guard in sync so that when it later becomes leader (or while it serves follower fetches) it will not delete local segments that the cluster believes are not yet remote.

The expiration task, remote retention

RLMExpirationTask.cleanupExpiredRemoteLogSegments (RemoteLogManager.java:1410) enforces remote retention and removes dangling/unreferenced segments. Step by step:

  1. Tally. calculateMetadataAndSize iterates all segments to compute metadata count, total tracked size, and the size of only COPY_SEGMENT_FINISHED segments; these feed gauges RemoteLogMetadataCount and RemoteLogSizeBytes (:1383).
  2. Build retention contexts. buildRetentionTimeData(retention.ms) yields a cleanupUntilMs = now − retentionMs threshold (only if retentionMs > -1 and the threshold is ≥ 0) (:1575). buildRetentionSizeData(...) computes how many remote bytes breach retention.bytes, validating each finished segment against the current leader-epoch lineage (:1594). The first time, it iterates and sums valid segments; once it proves all segments are valid (isAllSegmentsValid) it short-circuits to the cheaper cached size.
  3. Iterate epochs in order. For each leader epoch, for each segment (sorted by start offset): skip segments currently being copied (sets canProcess=false, :1465); re-publish a delete for any segment stuck in DELETE_SEGMENT_STARTED (dangling-delete retry, :1473); decide deletion via, in priority order, log-start-offset breach (isSegmentBreachByLogStartOffset), then, if the segment is within the leader-epoch lineage, retention-time breach, then retention-size breach (:1488).
  4. Advance remote log-start. Each deletion reason records the candidate's endOffset + 1; the maximum is pushed via handleLogStartOffsetUpdate before the actual deletes, so followers can pick up the new log-start even if this leader later fails the deletes (:1509; the code's comment at :1511 reasons through leader-change races).
  5. Delete. Each candidate goes through deleteRemoteLogSegment (:1677): publish DELETE_SEGMENT_STARTEDRSM.deleteLogSegmentData → publish DELETE_SEGMENT_FINISHED, all blocking, with remote-delete request/failure rates marked.
  6. Unreferenced cleanup. Finally it deletes segments whose leader epochs are all below the leader's earliest known epoch, unreferenced after unclean leader election (deleteLogSegmentsDueToLeaderEpochCacheTruncation, :1340, driven from :1539).
Key idea

Remote retention is computed against the partition's leader-epoch lineage, not a flat segment list. isRemoteSegmentWithinLeaderEpochs (RemoteLogManager.java:1728) validates that a remote segment's epoch/offset boundaries are consistent with the current leader's filtered epoch map, handling overlapping segments produced across leader elections. Segments outside the lineage are treated as unreferenced and removed separately. This is what makes tiered storage correct across unclean failovers where the remote store can hold segments from divergent histories.

The remote read path

When a consumer fetch targets an offset below the local log start, ReplicaManager.handleOffsetOutOfRangeError detects logStartOffset ≤ offset < localLogStartOffset and routes to remote (core/.../server/ReplicaManager.scala:1904). It first consults the fetch quota: if remoteLogManager.getFetchThrottleTimeMs > 0, it returns an empty FetchDataInfo with UNKNOWN_OFFSET_METADATA (so other partitions in the request can still be served via delayed fetch) instead of scheduling a remote read (ReplicaManager.scala:1917). Otherwise it builds a RemoteStorageFetchInfo and the fetch becomes a DelayedRemoteFetch; processRemoteFetch calls rlm.asyncRead(fetchInfo, callback) (ReplicaManager.scala:1610).

asyncRead submits a RemoteLogReader to the bounded remoteStorageReaderThreadPool (RemoteLogManager.java:2145). RemoteLogReader.call() times rlm.read(fetchInfo) against the remoteReadTimer, records bytes against the fetch quota, and invokes the callback with a RemoteLogReadResult (RemoteLogReader.java:61). The heavy lifting is RemoteLogManager.read (:1834):

  1. Resolve the leader epoch for the fetch offset from the leader-epoch cache and look up the covering RemoteLogSegmentMetadata via the RLMM; if none, throw OffsetOutOfRangeException (:1852).
  2. lookupPositionForOffset uses RemoteIndexCache.lookupOffset on the cached offset index to get a byte startPos (:1933); open RSM.fetchLogSegment(meta, startPos) and wrap it in a RemoteLogInputStream.
  3. findFirstBatch scans forward, skipping batches whose lastOffset < offset, summing skippedBytes (:2077). If a segment is exhausted (e.g. the offset was compacted away), findNextSegmentMetadata advances to the next segment and repeats (:1870).
  4. Copy the first batch plus as many subsequent bytes as fit into a min(fetchMaxBytes, partition.maxBytes) buffer; honour minOneMessage by returning at least the first batch even if it exceeds the cap, or an empty result if not (:1887).
  5. If isolation is TXN_COMMITTED, addAbortedTransactions walks the txn indexes of this and following segments (then local segments) up to the fetch upper bound, accumulating aborted-transaction markers for the client to filter (:1937, :1979).
consumer fetchoffset below localLogStartOffset
ReplicaManager.readFromLog⇒ OffsetOutOfRange
handleOffsetOutOfRangeError
getFetchThrottleTimeMs > 0, throttled?
empty FetchDataInfo · no remote readrequest shed, consumer retries later
RemoteStorageFetchInfo ⇒ DelayedRemoteFetchrlm.asyncRead(fetchInfo, callback)
RemoteLogReader.call ⇒ rlm.readon a RemoteStorageThreadPool thread ↓
RLMM.remoteLogSegmentMetadata(epoch, offset)resolve covering segment
RemoteIndexCache.lookupOffset ⇒ startPoscached offset index
RSM.fetchLogSegment(meta, startPos)⇒ InputStream
RemoteLogInputStream.findFirstBatchfill buffer ⇒ (aborted txns)
callback completes DelayedRemoteFetchresponse to client
Remote fetch lifecycle. A fetch below the local log start either is shed when the fetch quota is exceeded, or runs rlm.read on the bounded remote-storage thread pool (metadata lookup → index → segment stream → first batch) before the delayed fetch completes.
consumer / client response broker (ReplicaManager · RSM) throttle gate · remote read pool metadata / index lookup shed (quota exceeded) cylinder = store / cache rounded = decision request flow async on read-pool thread

RemoteIndexCache

Re-downloading index files for every remote fetch would be ruinous, so RemoteIndexCache keeps offset/time/txn indexes on local disk under $logdir/remote-log-index-cache and memory-maps them (RemoteIndexCache.java:78). It is a Caffeine cache weighted by entry byte size with maximum weight remote.log.index.file.cache.total.size.bytes (default 1 GiB) and optional expireAfterAccess = remote.log.index.file.cache.ttl.ms (default 900 000 ms; -1 disables) (:171). On a miss, loadIndexFile downloads via RSM.fetchIndex into a .tmp file and atomically renames it into place, then opens and sanity-checks the index; a missing transaction index (RemoteResourceNotFoundException) is tolerated by writing an empty file (:362, :441). On startup, init() deletes leftover .tmp/.deleted files and re-hydrates entries from any complete triplet on disk (:275).

Eviction is two-phase to avoid mutating files under in-flight reads. The Caffeine evictionListener only enqueues the entry; the dedicated remote-log-index-cleaner KafkaScheduler thread runs markForCleanup (rename to .deleted) then cleanup (delete) after a fileDeleteDelayMs (10 s) delay (:195, :248). Concurrency uses a class-level ReadWriteLock (reads take the read lock; close/resize take the write lock) plus a per-Entry ReentrantReadWriteLock so cleanup never races a lookup (:99, :518). The class doc notes the policy is Caffeine's Window-TinyLfu.

The default RLMM: TopicBasedRemoteLogMetadataManager

The default metadata store persists every event as a record in the internal compaction-disabled topic __remote_log_metadata (TopicBasedRemoteLogMetadataManagerConfig.java:44). The topic defaults: 50 partitions, replication factor 3, min.insync.replicas 2, cleanup policy delete, retention -1 (unlimited) (:54, newRemoteLogMetadataTopic at TopicBasedRemoteLogMetadataManager.java:432). Because retention is unlimited by default and the topic is delete-not-compact, the full event history is replayable.

Producing & the read-your-writes guarantee

storeRemoteLogMetadata publishes the event through ProducerManager (an idempotent, acks=all KafkaProducer; ProducerManager.java:48, TopicBasedRemoteLogMetadataManagerConfig.java:238) and then chains a stage that blocks until the local ConsumerManager has consumed up to that record's offset (TopicBasedRemoteLogMetadataManager.java:153). That is how the RLM's blocking add(...).get() / update(...).get() calls get strong consistency: the future does not complete until the in-memory cache reflects the write. The deterministic partition for a user partition is computed by RemoteLogMetadataTopicPartitioner.metadataPartition = murmur2(hash(topicId, partition)) mod numMetadataPartitions (RemoteLogMetadataTopicPartitioner.java:35), both producer and consumer use it so all events for a user partition land on one metadata partition.

Consuming & the in-memory cache

A single ConsumerTask thread polls the assigned metadata partitions and applies each deserialized event to a RemotePartitionMetadataEventHandler (ConsumerTask.java:65, :166). Assignment is keyed by metadata partition, not user partition: when partitions are assigned, the task computes the set of metadata partitions, consumer.assign(...)s them, seeks newly-assigned ones to the beginning (full replay) and others to the last processed offset (:224). A user partition is marked ready only once the consumer has read up to the metadata partition's end offset at assignment time, or the metadata partition is empty (maybeMarkUserPartitionsAsReady, :185); until then isReady(tpId) is false and the RLM skips that partition's tasks and read requests.

The handler delegates to a per-user-partition RemoteLogMetadataCache (RemoteLogMetadataCache.java:96). It holds two maps: idToSegmentMetadata (segment-id → metadata for all non-terminal segments) and leaderEpochEntries (leader epoch → RemoteLogLeaderEpochState tracking the floor structure and highest offset per epoch). Lookups respect state: remoteLogSegmentMetadata(epoch, offset) returns only COPY_SEGMENT_FINISHED segments and verifies the offset lies within the epoch's boundary inside that segment (:132); listRemoteLogSegments(epoch) returns started, finished, and delete-started (for cleanup) but never delete-finished. State updates run through checkStateTransition and drop invalid transitions rather than corrupting the cache (:344); a DELETE_SEGMENT_FINISHED event removes the segment entirely (:240).

Caution

The default RLMM is fail-fast on initialization. initializeResources creates/validates the metadata topic and starts the producer/consumer; if it cannot succeed within remote.log.metadata.initialization.retry.max.timeout.ms (default 120 s) it calls Exit.exit(1), the broker process terminates (TopicBasedRemoteLogMetadataManager.java:316, :370). Likewise, if the metadata topic's partition count drifts from the configured count, initialization fails. Initialization itself is deferred until onBrokerReady() so the local cluster is up before the broker tries to use it as a client (:391).

Concurrency & threading

The RLM runs several distinct pools, all created in the constructor (RemoteLogManager.java:256):

Pool / threadTypeThreads (config)Work
RLMCopyThreadPool (kafka-rlm-copy-thread-pool-%d)ScheduledThreadPoolExecutorremote.log.manager.copier.thread.pool.size = 10Runs RLMCopyTask per leader partition at fixed delay.
RLMExpirationThreadPool (kafka-rlm-expiration-thread-pool-%d)ScheduledThreadPoolExecutorremote.log.manager.expiration.thread.pool.size = 10Runs RLMExpirationTask per leader partition.
RLMFollowerScheduledThreadPool (kafka-rlm-follower-thread-pool-%d)ScheduledThreadPoolExecutorremote.log.manager.follower.thread.pool.size = 2Runs RLMFollowerTask to track highest-copied offset.
remote-log-reader-%dRemoteStorageThreadPool (ThreadPoolExecutor)remote.log.reader.threads = 10, queue remote.log.reader.max.pending.tasks = 100Executes RemoteLogReader / RemoteLogOffsetReader for remote fetch & list-offsets.
remote-log-index-cleanerKafkaScheduler (1 thread)fixedRenames & deletes evicted index files after a delay.
RLMMInitializationThread + ConsumerTask threadplain threads1 eachRLMM bootstrap; consume __remote_log_metadata.

The scheduled pools set removeOnCancelPolicy(true) and disable running delayed/periodic tasks after shutdown (RemoteLogManager.java:2324). Each scheduled task starts with a fast isCancelled() / isReady() guard (:826) so a cancelled-then-rescheduled task exits cheaply. State that crosses threads is protected as follows:

  • Task registries, leaderCopyRLMTasks, leaderExpirationRLMTasks, followerRLMTasks are ConcurrentHashMaps, mutated with computeIfAbsent/computeIfPresent so install/cancel are atomic (:184).
  • In-flight copies, segmentIdsBeingCopied is a concurrent set; the expiration task consults it to avoid deleting a segment mid-copy (:187, :1465).
  • Copy quota, a fair ReentrantLock copyQuotaManagerLock with a Condition serializes copy threads so they wait and re-check the byte-rate together (:169, :1028).
  • Per-task offsets, copiedOffsetOption, isLogStartOffsetUpdated, logDirectory are volatile within each RLMCopyTask (single executor thread mutates them; reads may come from cancel paths) (:870).
  • Index cache, class-level ReadWriteLock + per-entry lock as described above.

Quotas & throttling

Two independent RLMQuotaManagers bound aggregate (broker-global, all-partition) byte rates: copy (local→remote) and fetch (remote→local), both defaulting to Long.MAX_VALUE (unbounded) (RemoteLogManager.java:366, :372). Each uses a SimpleRate sensor over num windows of window.size.seconds (defaults 11 windows × 1 s) and a Quota bound; getThrottleTimeMs calls sensor.checkQuotas() and, on QuotaViolationException, returns QuotaUtils.throttleTime(...) (RLMQuotaManager.java:80). The two managers differ in where they apply: the copy quota makes the copy task wait before uploading a segment (back-pressure), whereas the fetch quota makes the read path shed the request, returning empty so the consumer retries later rather than queueing remote reads (ReplicaManager.scala:1917). Throttle times are surfaced via remote-copy-throttle-time / remote-fetch-throttle-time sensors (RemoteLogManager.java:244).

Design rationale

Copy is back-pressured but fetch is shed because a queued remote read holds a slot in a bounded pool and would extend client latency unboundedly; shedding lets the broker keep serving local partitions and bounds tail latency. Tiered-storage quotas were added by KIP-956 on top of the core tiered-storage design KIP-405. The copy/fetch rate limits are global per broker, which protects the object store and the broker NIC from saturation during catch-up.

Configuration reference

KeyDefaultEffect
remote.log.storage.system.enablefalseCluster-wide master switch; broker starts RLM services only when true.
remote.log.storage.manager.class.name(none)FQCN of the RSM plugin (required to use tiered storage).
remote.log.metadata.manager.class.nameTopicBasedRemoteLogMetadataManagerFQCN of the RLMM plugin.
remote.log.storage.manager.impl.prefix / ...metadata.manager.impl.prefixrsm.config. / rlmm.config.Prefixes whose stripped props are passed to the plugin's configure.
log.local.retention.ms-2 (→ log.retention.ms)Local tail time retention; must be ≤ total retention.
log.local.retention.bytes-2 (→ log.retention.bytes)Local tail size retention; must be ≤ total retention.
log.remote.copy.lag.ms / .bytes0 (immediate)Delay before a sealed segment becomes upload-eligible (time/size). -1 ⇒ use real local retention as max delay.
remote.log.manager.task.interval.ms30000Fixed delay between copy / expiration / follower task runs.
remote.log.manager.copier.thread.pool.size10Copy task pool size (dynamic via resizeCopierThreadPool).
remote.log.manager.expiration.thread.pool.size10Expiration task pool size.
remote.log.manager.follower.thread.pool.size2Follower task pool size (replaces deprecated remote.log.manager.thread.pool.size since 4.2).
remote.log.reader.threads10Remote read pool size.
remote.log.reader.max.pending.tasks100Read queue bound; full queue ⇒ fetch served with an error.
remote.log.index.file.cache.total.size.bytes1073741824 (1 GiB)Max bytes of cached remote index files (dynamic via resizeCacheSize).
remote.log.index.file.cache.ttl.ms (internal)900000Idle TTL for cache entries; -1 disables time-based eviction.
remote.log.metadata.custom.metadata.max.bytes128Cap on RSM custom metadata; exceeding it cancels copy for the partition.
remote.log.manager.copy.max.bytes.per.secondLong.MAX_VALUEGlobal upload byte-rate cap (copy quota).
remote.log.manager.fetch.max.bytes.per.secondLong.MAX_VALUEGlobal remote-read byte-rate cap (fetch quota).
remote.log.manager.{copy,fetch}.quota.window.num11Sample windows retained for the quota sensor.
remote.log.manager.{copy,fetch}.quota.window.size.seconds1Width of each quota window.
remote.fetch.max.wait.ms500Max wait for remote partitions in a fetch before responding.
remote.list.offsets.request.timeout.ms30000Timeout for remote list-offsets resolution.
remote.log.metadata.topic.num.partitions50Partitions of __remote_log_metadata (immutable after creation).
remote.log.metadata.topic.replication.factor3Replication factor of the metadata topic.
remote.log.metadata.topic.retention.ms-1 (unlimited)Should exceed the longest tiered topic retention to avoid metadata loss.
remote.log.metadata.consume.wait.ms120000Max wait for the local consumer to observe a just-produced event.

Failure modes, edge cases & recovery

  • Crash mid-copy. If the broker dies after COPY_SEGMENT_STARTED but before FINISHED, the local segment is retained (deletion guard never advanced), and the started metadata sits dangling. The new leader simply re-copies with a fresh uuid; the dangling started-record is harmless and excluded from size/visibility, and the expiration task can clean up segments stuck in DELETE_SEGMENT_STARTED.
  • RSM retriable error. RetriableRemoteStorageException propagates up and the scheduled task retries on its next tick; the run() wrapper logs it at debug and does not cancel (RemoteLogManager.java:848).
  • RSM hard error during copy. The code best-effort deletes the partially-copied segment, marks failedRemoteCopyRequestRate, and rethrows; the started-metadata remains until the next cleanup pass (:1114).
  • Read for a compacted/missing offset. read iterates to the next segment if the first batch is not found; if the offset truly is not in remote, it throws OffsetOutOfRangeException, returned to the client (:1856, :1870).
  • Read queue full. asyncRead throws RejectedExecutionException when the 100-slot queue is full; the fetch is served with an error rather than blocking (:2143).
  • Missing transaction index. Old metadata may report txnIdxEmpty=false yet have no remote txn index; getTransactionIndex uses ofNullable for backward compatibility, and the index cache substitutes an empty index on RemoteResourceNotFoundException (:2007, RemoteIndexCache.java:444).
  • Partition deletion. stopPartitions cancels all three tasks, and when deleteRemoteLog is set, deleteRemoteLogPartition publishes DELETE_STARTED for every segment, deletes data via the RSM, clears the index cache, then publishes DELETE_FINISHED (:524, :576).
  • Metadata topic unavailable at boot. The default RLMM exits the JVM after the init timeout (see Caution above); operationally this means the metadata topic must be healthy for tiered brokers to start.

Invariants & guarantees

  • A local segment is deleted only after its data is durably in remote (highestOffsetInRemoteStorage advanced post-FINISHED), no data loss window (UnifiedLog.java:1857, RemoteLogManager.java:1160).
  • Metadata is read-your-writes consistent: the RLM's blocking metadata calls return only after the local cache reflects the write (TopicBasedRemoteLogMetadataManager.java:158).
  • Only COPY_SEGMENT_FINISHED segments are readable; started/deleting segments are invisible to fetch but available for cleanup (RemoteLogMetadataCache.java:132).
  • Each copy attempt has a globally-unique id, so the copy/delete RSM operations are idempotent and overwriting (RemoteLogSegmentId.java:24).
  • Retention & size accounting are evaluated within the current leader-epoch lineage; segments outside it are treated as unreferenced (RemoteLogManager.java:1728).
  • Local effective retention is always ≤ total retention: log.local.retention.{ms,bytes} defaults of -2 resolve to the corresponding total-retention value (RemoteLogManagerConfig.java:159).

Interactions with other subsystems

  • The Log Storage Engine, UnifiedLog supplies sealed LogSegments, the leader-epoch cache, producer-state snapshots, and highestOffsetInRemoteStorage; tiered storage reuses the same on-disk index/segment formats (Record Format & Batches) when streaming remote data.
  • Log Management, Retention & Compaction, local deletion is performed by UnifiedLog.deleteOldSegments() driven by local retention, gated by the remote copy state; tiered topics use cleanup.policy=delete only.
  • The Fetch Path & Replica Fetchers, ReplicaManager translates an out-of-range fetch into a DelayedRemoteFetch and consults the fetch quota; followers fetching a remote-only offset receive OffsetMovedToTieredStorageException (ReplicaManager.scala:1914).
  • Replication, ISR & High Watermark, copy eligibility is bounded by the last-stable-offset so only committed data is tiered; leader/follower transitions install/cancel RLM tasks.
  • Metadata Propagation & Broker Lifecycle, leadership and stop-partition events from the controller drive onLeadershipChange/stopPartitions; the default RLMM defers init until the broker is ready.
  • Transactions & Exactly-Once Semantics, the remote read path reconstructs aborted-transaction lists from remote (then local) txn indexes for read_committed consumers.
  • Quotas, Throttling & Client Metrics, RLM copy/fetch quotas are a separate QuotaType (RLM_COPY, RLM_FETCH) from client request/bandwidth quotas but share the metrics/sensor machinery.

Design rationale & evolution

Tiered storage was introduced by KIP-405, deliberately structured around two narrow SPIs so cloud object stores (S3/GCS/ABS) and HDFS-like systems can be plugged in without touching the broker. The metadata/data split, strongly-consistent metadata, eventually-consistent data, lets the broker reason about correctness from a replicated log of events (the default RLMM) while treating the object store as a dumb blob sink. Tiered-storage quotas (KIP-956) added the global copy/fetch byte-rate limiters to keep catch-up traffic from overwhelming the object store or the broker. The follower thread pool was split out from the legacy single thread pool and the old remote.log.manager.thread.pool.size deprecated in 4.2 (RemoteLogManagerConfig.java:109). Separating copier and expiration pools lets uploads and deletes scale independently.

Gotchas / operational notes

Gotcha

The number of partitions of __remote_log_metadata is fixed at first creation. If the configured remote.log.metadata.topic.num.partitions later differs from the existing topic, RLMM initialization fails and the broker exits (TopicBasedRemoteLogMetadataManager.java:414). Pick this value carefully up front.

Gotcha

Set remote.log.metadata.topic.retention.ms longer than the longest retention of any tiered topic. The metadata topic is delete-not-compact; if its retention expires, the events describing live remote segments are lost and those segments become unreferenced (TopicBasedRemoteLogMetadataManagerConfig.java:64).

Note

Copy lag and remote-delete lag are exported per partition/topic (recordRemoteCopyLagBytes/...Segments, recordRemoteDeleteLag...; RemoteLogManager.java:1166, :1362). The copy task records lag as onlyLocalLogSegmentsSize − activeSegmentSize and onlyLocalLogSegmentsCount − 1, i.e. how much sealed local data still awaits upload, which is the primary health signal for whether tiering is keeping up (:1183).

Note

Disabling further uploads for a topic without losing already-tiered data is done with the per-topic remote.log.copy.disable flag: when set, no RLMCopyTask is created and local deletion falls back to ordinary retention.ms/.bytes instead of the remote-aware guard (RemoteLogManager.java:2157, UnifiedLog.java:1853).

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.