krivaltsevich.com Kafka Internals4.4

12 · Metadata Propagation & Broker Lifecycle

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

In a KRaft cluster the single source of truth for cluster state is an ordered, Raft-replicated record stream stored in the __cluster_metadata topic. This chapter follows committed records from the local RaftClient through the MetadataLoader, which folds them into immutable MetadataImage snapshots via incremental MetadataDelta objects and fans each new image out to a chain of MetadataPublishers. On the broker side BrokerMetadataPublisher turns image deltas into concrete actions on ReplicaManager, the coordinators, and the KRaftMetadataCache that answers Metadata requests. In parallel, BrokerLifecycleManager registers the broker, drives the fenced → active → controlled-shutdown state machine via periodic heartbeats, and AssignmentsManager reports JBOD directory placement back to the controller. We trace every data structure, algorithm, thread, and failure mode involved.

Role & responsibilities

The metadata-propagation subsystem is the read side of the controller's write side. The KRaft controller appends records to the metadata log; KRaft consensus replicates and commits them. Everything in this chapter is about how a node, broker or controller, turns that committed log into usable in-memory state and reacts to it. Concretely the subsystem must:

  • Consume committed records and snapshots from the RaftClient in offset order, on a single dedicated thread, and never expose partial state.
  • Maintain a complete, immutable MetadataImage, the materialized view of all topics, partitions, brokers, configs, quotas, ACLs, SCRAM credentials, delegation tokens, producer-id blocks, and feature levels.
  • Publish each new image plus the delta describing what changed to an ordered list of publishers, so that subsystems can apply only the differences.
  • Gate record formats and behavior on metadata.version and other feature levels (KIP-584), and react when they change.
  • Register the broker and keep it alive through BrokerRegistration + periodic BrokerHeartbeat, transitioning the broker through its lifecycle states and reporting offline/cordoned directories.
  • Block serving until caught up: a starting broker must replay metadata up to the cluster's high-water mark before it answers clients or is unfenced.
Key idea

Metadata is propagated as an ordered log of records, not as point-to-point RPC pushes. Each broker independently replays the same log to build byte-for-byte identical images. The controller never sends a broker its leadership assignments directly; it writes PartitionChangeRecords to the log, and the broker discovers the change when it replays them. This is the fundamental shift from the ZooKeeper era's LeaderAndIsrRequest/UpdateMetadataRequest push model.

Where it lives in the code

ConcernPrincipal classFile
Complete immutable snapshotMetadataImagemetadata/src/main/java/org/apache/kafka/image/MetadataImage.java
Incremental change setMetadataDeltametadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
Image source descriptorMetadataProvenancemetadata/.../image/MetadataProvenance.java
Topic / partition sub-imagesTopicsImage, TopicImage, TopicDeltametadata/.../image/TopicsImage.java, TopicDelta.java
Broker / controller sub-imageClusterImage, ClusterDeltametadata/.../image/ClusterImage.java, ClusterDelta.java
Feature levelsFeaturesImage, FeaturesDeltametadata/.../image/FeaturesImage.java, FeaturesDelta.java
Log consumer / image builderMetadataLoadermetadata/.../image/loader/MetadataLoader.java
Batch & transaction foldingMetadataBatchLoadermetadata/.../image/loader/MetadataBatchLoader.java
Publisher SPIMetadataPublishermetadata/.../image/publisher/MetadataPublisher.java
Broker-side applierBrokerMetadataPublishercore/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
Re-registration watcherBrokerRegistrationTrackermetadata/.../image/publisher/BrokerRegistrationTracker.java
Read-path metadata cacheKRaftMetadataCachemetadata/src/main/java/org/apache/kafka/metadata/KRaftMetadataCache.java
Broker registration recordBrokerRegistrationmetadata/.../metadata/BrokerRegistration.java
Partition registration recordPartitionRegistrationmetadata/.../metadata/PartitionRegistration.java
Lifecycle / heartbeat state machineBrokerLifecycleManagerserver/src/main/java/org/apache/kafka/server/BrokerLifecycleManager.java
JBOD directory-assignment reportingAssignmentsManagerserver/src/main/java/org/apache/kafka/server/AssignmentsManager.java
Wiring (loader, publishers)SharedServer, BrokerServercore/src/main/scala/kafka/server/SharedServer.scala, BrokerServer.scala

Core concepts & terminology

__cluster_metadata
The single-partition internal topic holding the metadata log. The name is Topic.CLUSTER_METADATA_TOPIC_NAME = "__cluster_metadata" and the partition is CLUSTER_METADATA_TOPIC_PARTITION (partition 0) clients/.../common/internals/Topic.java:30. KafkaRaftServer aliases these as MetadataPartition and MetadataTopicId = Uuid.METADATA_TOPIC_ID core/.../KafkaRaftServer.scala:119.
MetadataImage
An immutable Java record bundling one sub-image per metadata domain plus a MetadataProvenance. Reads are lock-free because each image is fully published before it becomes visible.
MetadataDelta
A mutable builder accumulating per-domain sub-deltas (topicsDelta, clusterDelta, …); calling apply(provenance) produces the next image, reusing unchanged sub-images by reference.
Provenance
(lastContainedOffset, lastContainedEpoch, lastContainedLogTimeMs, isOffsetBatchAligned), where in the log this image ends.
Publisher
An implementation of MetadataPublisher registered with the loader; receives onMetadataUpdate(delta, image, manifest) callbacks in install order.
Manifest
Describes what was published: a LogDeltaManifest (loaded from the log) or a SnapshotManifest (loaded from a snapshot).
Broker epoch
A monotonically increasing per-incarnation lease handle returned by the controller at registration; every heartbeat and most controller RPCs carry it for fencing.
Incarnation ID
A random Uuid generated once per broker process start server/.../BrokerLifecycleManager.java:103, letting the controller distinguish a restarted broker from the old one.
Fenced
A registered-but-not-serving state: the broker is in the metadata but excluded from leadership/ISR and from Metadata responses to clients.

Data structures

MetadataImage, the materialized view

MetadataImage is a record with ten components metadata/.../image/MetadataImage.java:31:

record MetadataImage(
    MetadataProvenance provenance,   // where in the log this image ends
    FeaturesImage      features,     // metadata.version + finalized feature levels
    ClusterImage       cluster,      // BrokerRegistration + ControllerRegistration maps
    TopicsImage        topics,       // topicsById + topicsByName (immutable persistent maps)
    ConfigurationsImage configs,     // dynamic configs by ConfigResource
    ClientQuotasImage  clientQuotas,
    ProducerIdsImage   producerIds,  // next producer-id block
    AclsImage          acls,
    ScramImage         scram,
    DelegationTokenImage delegationTokens)

The class documents itself as thread-safe; MetadataImage.EMPTY composes the EMPTY of every sub-image metadata/.../image/MetadataImage.java:35. isEmpty() is the logical AND of every sub-image being empty, used by the loader to decide whether it has ever seen a record. highestOffsetAndEpoch() and offset() derive from the provenance. write(writer, options) serializes the whole image back to records (used to produce snapshots and to "catch up" a new publisher), and crucially writes features first so the metadata.version record precedes every record whose decoding depends on it metadata/.../image/MetadataImage.java:67.

TopicImage / PartitionRegistration, partition state

TopicsImage holds two ImmutableMaps, topicsById (by Uuid) and topicsByName metadata/.../image/TopicsImage.java:37. Each TopicImage maps partition index → PartitionRegistration. PartitionRegistration is the on-image form of a partition and exposes its fields as public finals:

FieldTypeMeaning
replicasint[]Replica broker IDs, in assignment order.
directoriesUuid[]JBOD log-directory ID per replica (parallel to replicas); DirectoryId.MIGRATING until known.
isrint[]In-sync replica set.
removingReplicas / addingReplicasint[]Reassignment deltas.
elr / lastKnownElrint[]Eligible-leader-replica set and last-known ELR (KIP-966).
leaderintCurrent leader broker ID (NO_LEADER_CHANGE sentinel handling on merge).
leaderRecoveryStateLeaderRecoveryStateRECOVERED or RECOVERING (after unclean election).
leaderEpochintBumped on every leader change.
partitionEpochintBumped on every change to the partition (leader, ISR, replicas…).

The fields are declared at metadata/.../metadata/PartitionRegistration.java:153. A PartitionRegistration is built fresh from a PartitionRecord or evolved by merge(PartitionChangeRecord), which copies forward unset fields (a null array in the record means "unchanged") metadata/.../metadata/PartitionRegistration.java:238. The split between leaderEpoch and partitionEpoch is what lets the broker tell "I became leader" (leader epoch bump) from "my ISR changed while I am leader" (partition epoch bump only), see the local-change classification below.

BrokerRegistration, liveness & placement

BrokerRegistration is immutable metadata/.../metadata/BrokerRegistration.java:45 and built from a RegisterBrokerRecord via fromRecord metadata/.../metadata/BrokerRegistration.java:204. Its fields:

FieldMeaning
id, epochBroker ID and the controller-assigned broker epoch (lease).
incarnationIdRandom per-process UUID.
listenersMap of listener name → Endpoint (host/port/security protocol). Listeners must be named or construction throws.
supportedFeaturesMap of feature name → VersionRange the broker supports.
rackOptional rack string.
fencedWhether the broker is currently fenced.
inControlledShutdownWhether the broker is performing controlled shutdown (KIP-841).
directoriesSorted list of online JBOD log-directory UUIDs.
cordonedDirectoriesCordoned directories (KIP for cordoned log dirs, gated by MV ≥ 4.3-IV0); null until the broker first reports them.

Helper predicates used elsewhere: hasOnlineDir(dir), hasUncordonedDirs() (true if some directory is not cordoned), and node(listenerName) which constructs a Node carrying the fenced flag metadata/.../metadata/BrokerRegistration.java:246. The fenced/inControlledShutdown booleans are never mutated in place, a fencing change clones a new registration via cloneWith(...) in ClusterDelta.

FeaturesImage, metadata.version & feature gating

FeaturesImage holds Map<String,Short> finalizedVersions and an Optional<MetadataVersion> metadataVersion metadata/.../image/FeaturesImage.java:45. metadata.version is stored as a FeatureLevelRecord like any other feature but is materialized into a typed MetadataVersion; metadataVersionOrThrow() fails loudly if it is somehow absent. isElrEnabled() reads the eligible.leader.replicas.version finalized level metadata/.../image/FeaturesImage.java:80. When the image is written out, the metadata.version FeatureLevelRecord is emitted first and explicitly at record version 0 so that any older reader can still parse it metadata/.../image/FeaturesImage.java:89.

Architecture & control flow

Controller · Raft layer
the active controller writes records; the local RaftClient commits batches of __cluster_metadata-0 up to the high-water mark and serves snapshots
QuorumControllerRaftClient__cluster_metadata-0
↓ handleCommit / handleLoadSnapshot / handleLeaderChange (RaftClient.Listener)
MetadataLoader · single metadata-loader- event-queue thread
image : MetadataImage is mutated only here; MetadataBatchLoader folds N committed batches ⇒ one MetadataDelta (KIP-866 transactions); the catchingUp gate stays closed until offset ≥ highWaterMark−1
↓ onMetadataUpdate(delta, image, manifest) — in install order
Publisher chain
each new image + delta is fanned out to an ordered list of MetadataPublishers
SnapshotGeneratorBrokerMetadataPublisherBrokerRegistrationTrackerAclPublisherScramPublisher …
↓ setImage · applyDelta · updateCoordinator · startup
Broker subsystems
publishers turn image deltas into concrete actions on the read path, replication, and the coordinators
KRaftMetadataCache (reads)ReplicaManager.applyDelta(…)Group / Txn / Share coordinatorsLogManager · DynamicConfig …
The metadata read path, top to bottom: from committed Raft records, through the single-threaded MetadataLoader, out to the ordered publisher chain, and into materialized broker state. Each separator names the callback that crosses the boundary.
controller / Raft layer (metadata source) broker-side loader / publishers / subsystems chip = a component within a plane separator = the callback that crosses the boundary (top → bottom) lower plane = further from the log, closer to served state

Separately, the lifecycle path runs over a NodeToControllerChannelManager directly to the active controller:

BrokerLifecycleManager
lifecycle-manager- thread
Active ControllerMetadata log
currentMetadataOffset comes from highestMetadataOffsetProvider = loader.lastAppliedOffset()
BrokerRegistrationRequest(incarnationId, listeners, features, logDirs)
write RegisterBrokerRecord — appears in the log later
BrokerRegistrationResponse(brokerEpoch)
loop — every broker.heartbeat.interval.ms (2000 ms)
BrokerHeartbeatRequest(brokerEpoch, currentMetadataOffset, wantFence, wantShutDown, offlineLogDirs)
BrokerHeartbeatResponse(isCaughtUp, isFenced, shouldShutDown)
The lifecycle path: registration then periodic heartbeats, sent over a dedicated NodeToControllerChannelManager straight to the active controller (not via the metadata log). The controller persists the registration as a record that the broker only later replays.
broker (lifecycle manager) active controller __cluster_metadata log request response (amber) async write — lands in the log out-of-band arrow points from → to over time (top → bottom)

Detailed mechanics

From RaftClient callbacks to a published image

MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> and is registered with client.register(loader) at core/.../SharedServer.scala:358. All three callbacks immediately append work to the loader's own event queue so that processing is serialized on one thread metadata/.../image/loader/MetadataLoader.java:396:

  1. handleCommit(reader) drains every committed Batch. For each batch it calls loadControlRecords (which records the finalized kraft.version from a KRaftVersionRecord control record into metrics) and then batchLoader.loadBatch(batch, currentLeaderAndEpoch). After the reader is exhausted it calls batchLoader.maybeFlushBatches(currentLeaderAndEpoch, true) to emit whatever was accumulated metadata/.../image/loader/MetadataLoader.java:399.
  2. handleLoadSnapshot(reader) builds a delta on top of the current image, replays every snapshot record into it, calls delta.finishSnapshot(), computes a SnapshotManifest, applies the delta to get a new image, then batchLoader.resetToImage(image) and maybePublishMetadata(...) metadata/.../image/loader/MetadataLoader.java:417.
  3. handleLeaderChange(leaderAndEpoch) updates currentLeaderAndEpoch and calls publisher.onControllerChange(...) on every active publisher metadata/.../image/loader/MetadataLoader.java:498.

Batch folding & metadata transactions

MetadataBatchLoader exists so that many Raft batches committed together produce a single publisher notification rather than one per batch metadata/.../image/loader/MetadataBatchLoader.java:43. It accumulates records into one MetadataDelta and tracks lastOffset, lastEpoch, numBytes, numBatches, and a TransactionState. It implements KIP-866 metadata transactions with a small state machine driven by control-ish records:

RecordFrom stateTo state
BeginTransactionRecordNO_TRANSACTIONSTARTED_TRANSACTION (throws if already in one)
any data recordSTARTED_TRANSACTIONCONTINUED_TRANSACTION
EndTransactionRecordSTARTED/CONTINUEDENDED_TRANSACTION
AbortTransactionRecordSTARTED/CONTINUEDABORTED_TRANSACTION
any data recordENDED/ABORTEDNO_TRANSACTION

The transition table lives in replay metadata/.../image/loader/MetadataBatchLoader.java:226. On maybeFlushBatches the loader publishes only when not inside a transaction: STARTED/CONTINUED suppress publishing; ABORTED publishes an empty delta built on the unchanged image (discarding the partial transaction); ENDED and NO_TRANSACTION publish the accumulated delta metadata/.../image/loader/MetadataBatchLoader.java:190. applyDeltaAndUpdate calls delta.apply(provenance), invokes the loader's callback, and then resetToImage(image) so the next delta starts clean metadata/.../image/loader/MetadataBatchLoader.java:274.

Note

MetadataProvenance.isOffsetBatchAligned records whether the image ends exactly on a Raft batch boundary. Snapshots may only be taken at batch-aligned offsets, so the snapshot generator must respect this flag. A delta emitted mid-batch (because a transaction started part-way through a batch) is marked not-aligned metadata/.../image/loader/MetadataBatchLoader.java:154.

Applying a delta to produce the next image

MetadataDelta.replay(ApiMessage) is a big switch over MetadataRecordType that routes each record to the relevant sub-delta's getOrCreateXDelta() metadata/.../image/MetadataDelta.java:194. For example a PartitionChangeRecord goes to the topics delta; a FenceBrokerRecord to the cluster delta. NoOpRecord and the retained-but-inert ZkMigrationStateRecord are deliberate no-ops (the latter only matters for clusters that migrated from ZooKeeper under 3.x before upgrading to 4.x). apply(provenance) then walks each sub-delta: if a sub-delta is null the corresponding sub-image is carried forward by reference; otherwise its apply() produces a new sub-image. The result is a brand-new MetadataImage sharing all unchanged structure with its predecessor metadata/.../image/MetadataDelta.java:387.

Key idea

Image construction is structurally shared: applying a delta that touches only one topic allocates a new TopicsImage and one new TopicImage, but every other topic, the entire cluster map, configs, ACLs, etc. are reused by reference. This is what makes per-record metadata propagation cheap enough to run on every broker.

Feature-level changes ripple through every sub-delta

FeaturesDelta.replay(FeatureLevelRecord) decodes a metadata.version record into a typed MetadataVersion (throwing a helpful upgrade message if the level is unknown), records kraft.version changes as inert (KAFKA-18979), and otherwise stores/clears the feature level metadata/.../image/FeaturesDelta.java:60. When metadata.version itself changes, MetadataDelta.replay(FeatureLevelRecord) proactively calls handleMetadataVersionChange(newVersion) on every sub-delta so that any state that must be downgraded for an older MV is fixed immediately in the same delta metadata/.../image/MetadataDelta.java:328. MetadataDelta.metadataVersionChanged() surfaces this to publishers such as BrokerRegistrationTracker.

The catch-up gate

MetadataLoader starts with catchingUp = true and refuses to publish to broker subsystems until it has reached the cluster high-water mark metadata/.../image/loader/MetadataLoader.java:188. stillNeedToCatchUp(where, offset) returns true while any of these hold: the high-water mark is unknown; the loaded offset is still behind highWaterMark − 1; or no controller record has been seen yet (batchLoader.hasSeenRecord() false). Once it crosses, it logs "finished catching up" and flips the flag permanently metadata/.../image/loader/MetadataLoader.java:249. maybePublishMetadata always stores the new image into this.image (so the loader's notion of "current" advances even while catching up) but returns early without notifying publishers while the gate is closed metadata/.../image/loader/MetadataLoader.java:349.

Installing publishers & the catch-up snapshot

Publishers are installed via loader.installPublishers(list), which enqueues them into uninitializedPublishers and schedules initializeNewPublishers metadata/.../image/loader/MetadataLoader.java:523. Initialization waits (rescheduling every 100 ms) until the loader is caught up, then synthesizes a delta from the EMPTY image by writing the entire current image into an ImageReWriter; that gives each new publisher a single "everything changed" delta and the full image in one shot, followed by an onControllerChange metadata/.../image/loader/MetadataLoader.java:295. Install order is preserved (LinkedHashMap): a publisher installed earlier always receives a given update before one installed later, the contract the loader's Javadoc promises.

Invariant

A newly installed publisher never sees a partial view: it is either given the complete catch-up snapshot (after the loader is caught up) or nothing yet. Existing publishers continue to receive incremental deltas in strict offset order on the single loader thread, so no publisher can observe records out of order or skip an image.

BrokerMetadataPublisher, turning deltas into broker actions

BrokerMetadataPublisher.onMetadataUpdate is the broker's central applier core/.../metadata/BrokerMetadataPublisher.scala:111. Per update it, in order:

  1. Publishes the image to the cache: metadataCache.setImage(newImage), a single volatile store that atomically makes the new image visible to all readers metadata/.../metadata/KRaftMetadataCache.java:485.
  2. On the very first publish, calls initializeManagers(newImage) which starts LogManager (running unclean-shutdown recovery and identifying stray logs via isStrayReplica), recovers abandoned future logs (KAFKA-16082), then starts ReplicaManager, the group, transaction, and share coordinators core/.../metadata/BrokerMetadataPublisher.scala:309.
  3. Applies topic deltas to replication: if delta.topicsDelta() is non-null, replicaManager.applyDelta(topicsDelta, newImage) creates/deletes/promotes partitions core/.../server/ReplicaManager.scala:2360.
  4. Drives coordinator election/resignation for the __consumer_offsets, __transaction_state, and __share_group_state internal topics via updateCoordinator (detailed below).
  5. Fans out to feature-specific publishers: dynamic configs, client quotas, topic/cluster quotas, SCRAM, delegation tokens, ACLs, then propagates the whole image to the group and share coordinators.
  6. On first publish, calls finishInitializingReplicaManager() to start the high-water-mark checkpoint thread core/.../metadata/BrokerMetadataPublisher.scala:378.
  7. Reacts to share-version toggles if the features delta changed the finalized share.version core/.../metadata/BrokerMetadataPublisher.scala:226.
  8. In a finally block, sets _firstPublish = false and completes firstPublishFuture, the signal BrokerServer waits on during startup core/.../metadata/BrokerMetadataPublisher.scala:246.

Classifying local replica changes

updateCoordinator and ReplicaManager.applyDelta both rely on TopicDelta.localChanges(brokerId) to discover what changed for this broker metadata/.../image/TopicDelta.java:176. For each changed partition it sorts the broker into exactly one bucket:

  • deletes: the broker was a replica before but is not in the new replica set.
  • leaders: the broker is the new leader and the partitionEpoch changed.
  • electedLeaders (a subset of leaders): additionally the leaderEpoch changed, i.e. it newly became leader, not just had its ISR change.
  • followers: the broker is a non-leader replica and the partitionEpoch changed.
  • directoryIds: the partition's assigned directory for this broker changed.

updateCoordinator maps these to coordinator callbacks: deletions and follower transitions call onResignation, elected-leader transitions call onElection(partition, leaderEpoch). Topic deletion is special-cased: it calls onResignation(partition, None) with no epoch bump because deletion does not increment the leader epoch core/.../metadata/BrokerMetadataPublisher.scala:275.

KRaftMetadataCache, the read path

KRaftMetadataCache wraps a single volatile MetadataImage currentImage and replaces it wholesale in setImage metadata/.../metadata/KRaftMetadataCache.java:74. Its contract for readers is explicit in the source: because each image is immutable, a reader must grab currentImage once and operate on that snapshot for the duration of its work; re-reading the field risks mixing two image versions. The hot path is partitionMetadata(...) serving Metadata responses, and maybeFilterAliveReplicas, which drops brokers that are fenced() or that lack the requested listener metadata/.../metadata/KRaftMetadataCache.java:92. This is precisely where fencing becomes visible to clients: a fenced broker is filtered out of replica/ISR/leader lists, yielding LEADER_NOT_AVAILABLE / REPLICA_NOT_AVAILABLE rather than routing traffic to it.

Cluster bootstrap & storage formatting

Before any broker can register or replay metadata, the cluster must be brought into existence. Unlike the ZooKeeper era, KRaft makes formatting storage a mandatory first step: a node refuses to start against an unformatted (or mis-formatted) log directory. Formatting is performed by the kafka-storage tool (StorageTool), and it is what writes the cluster.id, the per-directory identity, and the seed metadata that pins the cluster's initial metadata.version.

  1. Generate a cluster ID. kafka-storage random-uuid prints a fresh Uuid (it simply emits Uuid.randomUuid) core/.../tools/StorageTool.scala:99. The same cluster ID must be supplied to every node when formatting.
  2. Format each node's log dirs. kafka-storage format --cluster-id <ID> --release-version <MV> is required; --cluster-id is mandatory and the formatter throws "You must specify the cluster id." otherwise metadata/.../storage/Formatter.java:250. --release-version selects the initial feature settings, its minimum is MINIMUM_VERSION and the default is LATEST_PRODUCTION core/.../tools/StorageTool.scala:336. Formatting writes one meta.properties per directory and (for metadata dirs) a bootstrap.checkpoint snapshot.
  3. Pick a quorum-formation mode (KIP-853 dynamic quorums). The format command exposes a mutually-exclusive group of controller-quorum options core/.../tools/StorageTool.scala:347.

What format writes per directory

For every empty log directory the formatter builds a MetaProperties (version V1) carrying the cluster ID, this node's ID, and a freshly generated directory.id UUID, then writes it as meta.properties metadata/.../storage/Formatter.java:401:

PropertySourceMeaning
versionMetaPropertiesVersion.V1meta.properties schema version.
cluster.id--cluster-idCluster-wide identity; verified against every other directory and every controller RPC.
node.idnode.id configThis node's broker/controller ID.
directory.idcopier.generateValidDirectoryId()Per-log-dir JBOD UUID; for a dynamic-metadata-voter dir it is taken from the --initial-controllers entry instead metadata/.../storage/Formatter.java:438.

For metadata directories the formatter also writes the binary bootstrap snapshot bootstrap.checkpoint (BootstrapMetadata.BINARY_BOOTSTRAP_FILENAME) metadata/.../bootstrap/BootstrapMetadata.java:46. Its records are computed by BootstrapMetadata.fromVersions(releaseVersion, featureLevels, "format command") and then any --add-scram credentials or extra records are appended via fromRecords metadata/.../storage/Formatter.java:384. The crucial payload is a FeatureLevelRecord for metadata.version at the chosen release version: on first start, "if the metadata log is empty, we will populate the log with these records" so that metadata.version is set before any other record is ever appended metadata/.../bootstrap/BootstrapMetadata.java:41. This is what makes the very first record in __cluster_metadata a metadata.version level, the same ordering invariant the loader relies on when it writes features first.

KIP-853 quorum-formation options

OptionEffect
--standalone / -sInitialize a controller as a single-node dynamic quorum. controller.quorum.voters must be unset; controller.quorum.bootstrap.servers is used instead core/.../tools/StorageTool.scala:348.
--initial-controllers / -IInitialize with a specified dynamic quorum: a comma-separated list of id@host:port:directory. The same values must be used to format all nodes core/.../tools/StorageTool.scala:357.
--no-initial-controllers / -NInitialize a server without committing to a dynamic quorum (joins later via controller.quorum.bootstrap.servers) core/.../tools/StorageTool.scala:352.
Note

These three are mutually exclusive (an addMutuallyExclusiveGroup). The classic alternative is the static controller.quorum.voters config, in which case none of these flags is passed. The directory UUID embedded in an --initial-controllers entry is exactly the directory.id that the matching node will write into its metadata meta.properties, tying voter identity to on-disk identity.

Broker lifecycle & heartbeats

The state machine

BrokerLifecycleManager owns the volatile BrokerState state and the volatile long brokerEpoch, both written only from its event-queue thread server/.../BrokerLifecycleManager.java:125. The state machine, implemented across StartupEvent and BrokerHeartbeatResponseEvent:

(constructed) NOT_RUNNING start() STARTING heartbeat isCaughtUp false ↺ retry ~10 ms heartbeat isCaughtUp true ⇒ complete initialCatchUpFuture RECOVERY heartbeat isFenced false ⇒ complete initialUnfenceFuture RUNNING beginControlledShutdown() PENDING_CONTROLLED_SHUTDOWN heartbeat wantShutDown true ↺ (await controller) response shouldShutDown true SHUTTING_DOWN ShutdownEvent completes controlledShutdownFuture
RECOVERY is where local log recovery runs; the broker stays fenced (wantFence=true) until setReadyToUnfence(), so the controller only reports isFenced=false — promoting it to RUNNING — once metadata is current.
BrokerState transitions, driven by heartbeat-response booleans (isCaughtUp, isFenced, shouldShutDown) and implemented across StartupEvent and BrokerHeartbeatResponseEvent server/.../BrokerLifecycleManager.java:645.
pill = broker lifecycle state accent = RUNNING (caught up, unfenced, serving) ☉ = initial (NOT_RUNNING entry) · terminal (process exit) transition (label = trigger) = self-transition (heartbeat retry / wait)

Registration

start(...) enqueues a StartupEvent that starts the channel manager, moves to STARTING, schedules a registration-timeout deadline, and calls sendBrokerRegistration() server/.../BrokerLifecycleManager.java:458. The BrokerRegistrationRequest carries the broker ID, cluster ID, incarnation ID, advertised listeners, supported feature ranges, sorted log-directory UUIDs, and previousBrokerEpoch (read from clean-shutdown files, to speed re-registration after a clean restart) server/.../BrokerLifecycleManager.java:475. The response handler runs on a network thread, so it merely prepends a BrokerRegistrationResponseEvent onto the event queue rather than touching state directly. On Errors.NONE it stores the assigned brokerEpoch, marks registered = true and initialRegistrationSucceeded = true, and immediately schedules the first heartbeat server/.../BrokerLifecycleManager.java:548.

Gotcha

If registration does not succeed within initial.broker.registration.timeout.ms (default 60000), the RegistrationTimeoutEvent fires and the broker shuts itself down, "Shutting down because we were unable to register with the controller quorum" server/.../BrokerLifecycleManager.java:732. A misconfigured controller listener or wrong cluster ID therefore manifests as a broker that exits a minute after start, not one that hangs forever.

Heartbeats & fencing

sendBrokerHeartbeat() builds a BrokerHeartbeatRequest with the current brokerEpoch, currentMetadataOffset = highestMetadataOffsetProvider.get() (wired to loader.lastAppliedOffset() at core/.../BrokerServer.scala:451), wantFence = !readyToUnfence, wantShutDown = (state == PENDING_CONTROLLED_SHUTDOWN), and the set of offline log directories server/.../BrokerLifecycleManager.java:564. Once the broker is caught up and cordoned dirs are supported, it also includes cordonedLogDirs.

The controller's response carries three booleans that drive transitions:

  • isCaughtUp: in STARTING, completes initialCatchUpFuture, moves to RECOVERY, and computes the cordoned dirs from config. The controller only sets this once the broker's reported metadata offset is sufficiently current.
  • isFenced: in RECOVERY, when it goes false, completes initialUnfenceFuture and moves to RUNNING.
  • shouldShutDown: in PENDING_CONTROLLED_SHUTDOWN, when true, calls beginShutdown().

The broker controls its own fencing through wantFence: it keeps wantFence=true until setReadyToUnfence() is called (which sets readyToUnfence=true and triggers an immediate heartbeat) server/.../BrokerLifecycleManager.java:379. Thus a broker is fenced during startup/recovery by design, so clients do not contact it while it is still loading state.

Design rationale

The pull-based registration + heartbeat lease model, with the broker self-fencing until ready and the controller unfencing only once metadata is current, comes from KIP-631 (the quorum-based controller). Separately, the inControlledShutdown flag in BrokerRegistrationChangeRecord and the rule that fenced replicas may not (re)join the ISR come from KIP-841. Feature levels gating record formats (including metadata.version) are KIP-584. Metadata transactions in the batch loader are KIP-866.

Controlled shutdown

beginControlledShutdown() enqueues a BeginControlledShutdownEvent. From RUNNING it moves to PENDING_CONTROLLED_SHUTDOWN and sends an immediate heartbeat so the controller can start migrating leadership off this broker as soon as possible; from any other state it shuts down directly server/.../BrokerLifecycleManager.java:335. The broker stays in pending-shutdown, heartbeating with wantShutDown=true, until a response sets shouldShutDown=true, meaning the controller has moved this broker out of leadership for all its partitions. The ShutdownEvent then sets SHUTTING_DOWN, completes controlledShutdownFuture, cancels the catch-up/unfence futures, and tears down the channel manager server/.../BrokerLifecycleManager.java:756.

Communication scheduling

All communication is funnelled through a single CommunicationEvent: if a request is already in flight it sets nextSchedulingShouldBeImmediate and defers; otherwise it heartbeats if registered, or re-registers if not server/.../BrokerLifecycleManager.java:742. Steady-state heartbeats are scheduled broker.heartbeat.interval.ms (default 2000) apart, both after success and after failure. The communicationInFlight flag plus the in-flight short-circuit guarantee at most one outstanding controller request at a time.

Offline & cordoned directories

If a log directory fails, propagateDirectoryFailure(dir, timeout) records it in offlineDirs (value=false until acknowledged), schedules an immediate heartbeat, and arms a deadline: if the offline dir is not acknowledged by the controller before the timeout, OfflineDirBrokerFailureEvent runs the shutdown hook server/.../BrokerLifecycleManager.java:281. Acknowledgement is detected in the heartbeat response handler, which marks each currently-reported offline dir as acknowledged server/.../BrokerLifecycleManager.java:644. Cordoned directories (MV ≥ 4.3-IV0) are reported once the broker is caught up.

BrokerRegistrationTracker, self-healing registration

BrokerRegistrationTracker is itself a publisher. On each update, if either the metadata version changed or this broker's own registration changed, it checks whether the registration needs refreshing metadata/.../image/publisher/BrokerRegistrationTracker.java:67. The one case it handles: the cluster upgraded to an MV that supports JBOD (≥ 3.7-IV2) but this broker's registration still lists no directories, then it calls the refresh callback, which is wired to lifecycleManager.resendBrokerRegistration() core/.../BrokerServer.scala:582. resendBrokerRegistration simply sets registered=false and triggers an immediate communication, causing a fresh BrokerRegistrationRequest server/.../BrokerLifecycleManager.java:303.

AssignmentsManager, reporting JBOD placement

In JBOD mode the controller records which directory each replica lives in, but the broker is the authority on actual placement. AssignmentsManager batches "this partition now lives in directory X" intents up to the controller via the AssignReplicasToDirs RPC server/.../AssignmentsManager.java:62. The DirectoryEventHandler on the broker forwards handleAssignment(partition, dir, reason, callback) into onAssignment core/.../BrokerServer.scala:345.

  • Queues: a ConcurrentHashMap ready holds assignments waiting to be sent; a volatile Map inflight holds the in-flight batch. onAssignment puts into ready and reschedules MaybeSendAssignmentsEvent with an exponential backoff (100 ms base, x2, 10 s cap) server/.../AssignmentsManager.java:213.
  • Sending: maybeSendAssignments sends nothing while a batch is in flight; otherwise it drains up to MAX_ASSIGNMENTS_PER_REQUEST assignments, validating each against the current image (assignment.valid(nodeId, image)) and discarding stale ones, then sends with the current broker epoch from image.cluster().brokerEpoch(nodeId) server/.../AssignmentsManager.java:329.
  • Response: per-partition NONE runs the success callback; any per-partition error or a global failure puts the assignment back into ready via putIfAbsent (so a newer intent for the same partition wins) and the request is retried with backoff server/.../AssignmentsManager.java:368.
Note

Like the loader and the lifecycle manager, AssignmentsManager is built on a single KafkaEventQueue thread (broker-N-directory-assignments-manager-). The ready map is a ConcurrentHashMap because onAssignment can be invoked from arbitrary threads, but all the send/response logic runs serialized on the event-queue thread.

Concurrency & threading

ComponentThreadWhat it guards
MetadataLoader + MetadataBatchLoaderone KafkaEventQueue thread kafka-N-metadata-loader-The current image, the publisher lists, catchingUp, transaction state. All RaftClient.Listener callbacks append here, so log/snapshot/leader events are serialized.
BrokerMetadataPublisher (and the other publishers)runs on the loader thread (invoked from maybePublishMetadata)Publishers must not block; a single slow publisher stalls the whole metadata pipeline. They mutate their own subsystems' state, not loader state.
KRaftMetadataCachewrites on the loader thread (setImage); reads on any request threadOne volatile reference; readers are lock-free and snapshot-consistent by grabbing it once.
BrokerLifecycleManagerone KafkaEventQueue thread broker-N-lifecycle-manager-All mutable state; state and brokerEpoch are volatile for cross-thread reads but written only here. Controller responses arrive on network threads and are prepended as events.
AssignmentsManagerone KafkaEventQueue thread broker-N-directory-assignments-manager-inflight and send logic; ready is a ConcurrentHashMap for external producers.
Invariant

There is exactly one writer of the metadata image (the loader thread) and exactly one writer of broker lifecycle state (the lifecycle thread). Cross-thread visibility is achieved either by a single volatile store of an immutable object (the image, the broker state) or by bouncing the work onto the owning event queue (controller responses, assignment intents). No locks are taken on the metadata read path.

Configuration reference

KeyDefaultEffect
broker.heartbeat.interval.ms2000Interval between broker heartbeats to the controller raft/.../KRaftConfigs.java:39.
broker.session.timeout.ms9000How long a broker lease lasts without heartbeats before the controller fences it (controller-side) raft/.../KRaftConfigs.java:43.
initial.broker.registration.timeout.ms60000Time to wait for initial registration before the broker process exits raft/.../KRaftConfigs.java:35.
metadata.log.dir / log.dirs(first log dir)Where the __cluster_metadata log lives; its directory ID seeds the RaftManager core/.../SharedServer.scala:296.
metadata.log.max.snapshot.interval.ms(see metadataSnapshotMaxIntervalMs)Max time between metadata snapshots, passed to the SnapshotGenerator core/.../SharedServer.scala:346.
metadata.log.max.record.bytes.between.snapshots(see metadataSnapshotMaxNewRecordBytes)Byte threshold that triggers a new snapshot.
unstable.feature.versions.enablefalseAllows selecting metadata.version values past LATEST_PRODUCTION (currently IBP_4_3_IV0) server-common/.../MetadataVersion.java:157.

metadata.version itself is not a normal config in KRaft: it is a finalized feature level set at kafka-storage format time and changed via the kafka-features tool, then propagated as a FeatureLevelRecord. The minimum is IBP_3_3_IV3 (feature level 7) and the latest production version is IBP_4_3_IV0 (level 30) server-common/.../MetadataVersion.java:147,157; unstable versions up to IBP_4_4_IV0 (level 31, DLQ for share groups, KIP-1191) require the unstable flag server-common/.../MetadataVersion.java:136.

Failure modes, edge cases & recovery

  • Loader fault during replay. Per-record errors in loadBatch/loadSnapshot are routed to a FaultHandler with the offending offset, not silently swallowed metadata/.../image/loader/MetadataBatchLoader.java:144. On a broker the metadata-loading fault handler is typically fatal (a broker that cannot understand committed metadata is not safe to run).
  • Snapshot load. If the local log is far behind or has been truncated by retention, the RaftClient delivers a snapshot instead of a tail of records. handleLoadSnapshot rebuilds the delta and crucially calls delta.finishSnapshot(), which (via each sub-delta) generates removal entries for anything in the old image not present in the snapshot, so loading a snapshot correctly drops deleted topics/brokers metadata/.../image/MetadataDelta.java:375.
  • Aborted metadata transaction. A BeginTransactionRecord with no matching EndTransactionRecord that is instead aborted causes the batch loader to publish an empty delta on the pre-transaction image, discarding partial work metadata/.../image/loader/MetadataBatchLoader.java:205.
  • Registration rejected. A non-NONE registration error (e.g. duplicate incarnation, stale epoch) triggers scheduleNextCommunicationAfterFailure() and a retry, bounded by the initial-registration timeout that otherwise shuts the broker down.
  • Unknown metadata.version. If a FeatureLevelRecord names a level this binary does not know, FeaturesDelta.replay throws with a directive to set MV to at least MINIMUM_VERSION before upgrading the software metadata/.../image/FeaturesDelta.java:64. This is the guard against rolling a too-old binary into a too-new cluster.
  • Stray logs / abandoned future replicas. On first publish, initializeManagers uses the new image to decide which on-disk logs are strays (their replica set no longer includes this broker, or the topic ID is missing) and to repair future replicas left behind by a disk failure during reassignment (KAFKA-16082) core/.../metadata/BrokerMetadataPublisher.scala:309.
  • Publisher close / shutdown. On loader shutdown every publisher is closed; BrokerMetadataPublisher.close() completes firstPublishFuture exceptionally with a TimeoutException so a startup still blocked waiting for the first image fails fast rather than hanging core/.../metadata/BrokerMetadataPublisher.scala:389.

Invariants & guarantees

Invariant

Total order. Every node applies metadata records in the exact offset order they were committed in __cluster_metadata. Two brokers that have replayed up to the same offset hold identical images (modulo node-local fields). This is the bedrock guarantee that replaces ZooKeeper's per-znode watches.

Invariant

Atomic visibility. Readers never see a half-applied update: the image is swapped by one volatile store of an immutable object, and a publisher always receives a complete image alongside the delta.

Invariant

Caught-up-before-serve. A broker does not answer client requests as a healthy node until the loader has reached the high-water mark (initialCatchUpFuture) and the controller has unfenced it (initialUnfenceFuture) and the first image has been published (firstPublishFuture) core/.../BrokerServer.scala:594.

Invariant

Epoch fencing. A broker carries the brokerEpoch assigned at registration in every heartbeat and controller RPC; a stale incarnation cannot impersonate the current one because its epoch will not match the controller's record.

Interactions with other subsystems

  • KRaft Consensus, the loader is a RaftClient.Listener; highWaterMark() drives the catch-up gate and snapshots come from the same client.
  • The KRaft Controller, produces every record the loader consumes, assigns broker epochs, and decides fencing/unfencing and controlled-shutdown completion based on heartbeat contents.
  • Replication, ISR & High Watermark, ReplicaManager.applyDelta consumes TopicDelta.localChanges to make this broker a leader or follower; fencing in the image removes a broker from ISR eligibility (KIP-841).
  • The Fetch Path, leadership/epoch changes published here are what cause replica fetchers to be (re)started against the right leader.
  • Group Coordination, Transactions, Share Groups, their coordinators are elected/resigned by updateCoordinator as the internal-topic partitions move.
  • Request Processing, KafkaApis answers Metadata, DescribeTopicPartitions, and feature queries straight from KRaftMetadataCache's current image.
  • Tiered Storage & JBOD, AssignmentsManager reports directory placement; offline-directory propagation feeds the controller's view of broker storage health.
  • Quotas & Security, the dynamic-config, client-quota, SCRAM, delegation-token, and ACL publishers are chained behind BrokerMetadataPublisher in the same update.

Design rationale & evolution

The image/delta architecture is a deliberate generalization of "apply a log to build a state machine," tuned for the reality that broker metadata is large and changes incrementally. Three design choices stand out in the code:

  • Immutable, structurally-shared images let any number of request threads read consistent metadata with zero locking, while the single loader thread is the only writer, a classic single-writer/multi-reader pattern made cheap by persistent data structures.
  • Deltas plus a manifest mean each subsystem applies only what changed (one topic, one broker) instead of diffing whole images, and the manifest tells a publisher whether it is seeing a log tail or a snapshot so it can behave correctly in both cases.
  • Self-fencing brokers (KIP-631) invert the ZooKeeper model: instead of the controller pushing LeaderAndIsr and hoping the broker is ready, the broker pulls metadata, tells the controller via wantFence when it is ready, and the controller unfences only once the broker's reported offset proves it is current.

Relevant KIPs: KIP-631 quorum controller & broker registration/heartbeat; KIP-584 feature versioning / metadata.version; KIP-841 fenced replicas & controlled-shutdown state; KIP-866 metadata transactions; KIP-858 JBOD disk-failure handling (directory assignments); KIP-966 eligible leader replicas (ELR fields in PartitionRegistration); KIP-919 controller registration records.

Gotchas / operational notes

  • Publishers run on the loader thread. Any blocking work inside onMetadataUpdate (or a coordinator onElection it triggers) delays all metadata propagation, including the cache update. This is why heavy startup work (log recovery) is gated to the first publish and why faults are caught per-section rather than allowed to escape.
  • "Caught up" is the controller's call, not the broker's. The broker reports its offset; the controller decides isCaughtUp/isFenced. A broker that cannot reach the active controller will heartbeat-fail and stay fenced even if its loader is fully caught up.
  • Re-registration is cheap and idempotent. BrokerRegistrationTracker may trigger a re-registration spuriously while replaying old log entries that change the directory list; the code accepts this because resending BrokerRegistrationRequest is harmless and a snapshot soon collapses the old entries metadata/.../image/publisher/BrokerRegistrationTracker.java:116.
  • Snapshots truncate replay cost. If a broker has been down a long time, it loads a snapshot rather than replaying the entire log; finishSnapshot guarantees deletions are honored, so a long-absent broker still converges to exactly the current state.
  • The metadata log is one partition. All cluster metadata flows through __cluster_metadata-0; there is no sharding. Throughput of metadata change is therefore bounded by a single Raft partition, a deliberate trade for total order and simplicity.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.