krivaltsevich.com Kafka Internals4.4

19 · Quotas, Throttling & Client Metrics

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

Kafka protects a shared cluster from being overwhelmed by any single tenant through a layered set of quotas: bandwidth (produce/fetch byte-rate), request-thread time, controller-mutation rate, replication and alter-log-dirs throttles, and connection count / creation rate. All of them are built on the same primitive, a Sensor wrapping a sampled Rate or TokenBucket stat with a Quota bound, and all of them throttle by holding the channel muted for a computed delay and returning throttle_time_ms so well-behaved clients back off rather than being disconnected. The same broker also acts as a sink for client telemetry (KIP-714): ClientMetricsManager assigns each client a UUID, hands out metric subscriptions via GetTelemetrySubscriptions, and ingests OpenTelemetry payloads via PushTelemetry, gating each client to its own push interval. This chapter derives the data structures, algorithms, threading, and configuration of all of these from source.

Role & responsibilities

The quota subsystem answers a single operational question, "is this client allowed to do this much, right now?", for several distinct resources, and translates "no" into a backpressure signal rather than an error wherever possible. Its responsibilities:

  • Bandwidth quotas (QuotaType.PRODUCE, QuotaType.FETCH): cap bytes/sec per principal/client-id, measured against the request/response size on the produce and fetch hot paths.
  • Request quota (QuotaType.REQUEST): cap the fraction of broker request-handler + network-thread CPU time a client may consume (request_percentage), so a client cannot starve others with cheap-but-numerous requests.
  • Controller-mutation quota (QuotaType.CONTROLLER_MUTATION, KIP-599): cap the rate of partition creations/deletions from CreateTopics, CreatePartitions, DeleteTopics using a token bucket that tolerates bursts.
  • Replication throttles (LEADER_REPLICATION, FOLLOWER_REPLICATION, ALTER_LOG_DIRS_REPLICATION): cap reassignment / log-dir-move traffic so it does not crowd out client traffic.
  • Connection quotas: cap concurrent connection count (broker / listener / per-IP) and connection creation rate (broker / listener / per-IP), enforced inside the acceptor.
  • Client telemetry (KIP-714): subscribe clients to metric sets, ingest pushed OTLP, and throttle each client to its negotiated push interval.

Quota limits are stored as cluster metadata (config records in the KRaft metadata log) and applied dynamically; see Metadata Propagation & Broker Lifecycle. The throttling mechanism is interleaved with request processing (Request Processing (KafkaApis)) and the network layer's channel mute/unmute machinery (Network Layer & Threading Model).

Where it lives in the code

Class / fileRole
server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.javaBase manager for bandwidth/request/mutation quotas; sensor lifecycle, entity resolution, throttle-delay computation, the reaper thread and delay queue.
core/src/main/java/kafka/server/ClientRequestQuotaManager.javaREQUEST quota; records request-thread + network-thread time as a percentage; exempt sensor.
server/.../quota/ControllerMutationQuotaManager.java (+ StrictControllerMutationQuota, PermissiveControllerMutationQuota, AbstractControllerMutationQuota)Token-bucket controller-mutation quota and the per-request quota objects.
server/.../quota/ReplicationQuotaManager.javaReplication / alter-log-dirs byte-rate throttle; throttled-partition set.
core/src/main/java/kafka/server/QuotaFactory.javaInstantiates the seven managers into a QuotaManagers record and wires the quota callback plugin.
server/.../quota/ThrottledChannel.java, ThrottleCallback.java, ClientSensors.javaDelay-queue element, the start/end-throttling callback interface, and the per-client sensor tuple.
server-common/.../quota/QuotaType.java, QuotaUtils.java, SensorAccess.javaQuota-type enum, throttle-time math, lock-guarded sensor creation.
server-common/.../config/QuotaConfig.javaAll quota config keys, defaults and ConfigDefs.
clients/.../common/metrics/stats/{Rate,TokenBucket}.java, Sensor.java, Quota.javaThe metric primitives that do the actual rate accounting and quota check.
core/src/main/scala/kafka/network/SocketServer.scala (ConnectionQuotas)Connection count + creation-rate enforcement in the acceptor.
core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scalaApplies ClientQuotasDelta from the metadata log to the managers and to ConnectionQuotas.
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.javaKIP-714 telemetry: subscriptions, client-instance cache, push-interval gating, OTLP export.
server/.../metrics/{ClientMetricsInstance,ClientMetricsConfigs}.javaPer-client telemetry state & the metrics/interval.ms/match subscription config.

Core concepts & terminology

Quota entity
The subject a quota applies to. For client quotas it is a combination of a user (principal) and/or a client-id; for connection quotas it is broker / listener / IP. Defaults exist at each level (<default>).
Quota type
One of the nine QuotaType values (server-common/.../quota/QuotaType.java:19): FETCH, PRODUCE, REQUEST, CONTROLLER_MUTATION, LEADER_REPLICATION, FOLLOWER_REPLICATION, ALTER_LOG_DIRS_REPLICATION, RLM_COPY, RLM_FETCH. The first four map to a ClientQuotaType; the last two are tiered-storage throttles (see Tiered Storage).
Sensor
A named accumulator (org.apache.kafka.common.metrics.Sensor) holding one or more stats. Recording a value updates every stat; if checkQuotas is requested and any stat's measured value crosses its configured Quota, a QuotaViolationException is thrown carrying the observed value, the bound, and the metric.
Throttle time
The delay (ms) needed to bring the observed rate back to the bound, assuming no further recording. Computed by QuotaUtils.throttleTime for rates and by ControllerMutationQuotaManager.throttleTimeMs for token buckets.
Muting a channel
Suppressing reads on a connection so the client receives no further responses (and its in-flight window fills) until the throttle expires. Used to enforce the delay without a busy-wait.
Subscription / client instance
KIP-714 terms: a subscription is a named set of (metric prefixes, push interval, match pattern) stored as broker config; a client instance is one connected client identified by a broker-assigned UUID, carrying its resolved subscription and last-request timestamps.
Key idea

Every client quota is "one Sensor per (quota-type, resolved entity)". The quota limit lives in the sensor's MetricConfig.quota(); the quota usage is the sensor's sampled Rate/TokenBucket value. Changing a limit is just re-setting MetricConfig on an existing metric, no usage is reset, which is why dynamic quota changes are cheap and non-disruptive.

Data structures

In-memory: the sampled Rate and the quota check

A bandwidth/request quota sensor carries a Rate (clients/.../metrics/stats/Rate.java:33) wrapping a SampledStat (a WindowedSum by default). The sampled stat keeps N windows; the rate is "total observed over the live windows ÷ elapsed time across those windows". Rate.windowSize (line 81) deliberately pads the denominator to at least samples-1 full windows so that a tiny amount of elapsed time in a fresh window does not produce an absurdly high instantaneous rate.

Field / objectTypeMeaning
MetricConfig.timeWindowMslongWidth of one sample window. From quota.window.size.seconds (default 1 s).
MetricConfig.samplesintNumber of windows retained. From quota.window.num (default 11 = 10 whole + 1 current; QuotaConfig.java:42).
MetricConfig.quotaQuotaThe bound (bytes/sec, percentage, or token refill rate) and direction (upper).
SampledStat.sampleslist of windowsEach window has startTimeMs, eventCount, accumulated value.

Quota (clients/.../metrics/Quota.java:22) is a (double bound, boolean upper); acceptable(v) returns upper ? v <= bound : v >= bound. The check happens in Sensor.checkQuotas(timeMs) (Sensor.java:253): for each metric with a configured quota it measures the value and, for a TokenBucket, violates iff the value (remaining tokens) is < 0; otherwise it violates iff !quota.acceptable(value).

In-memory: the token bucket (controller mutations)

TokenBucket (clients/.../metrics/stats/TokenBucket.java:59) holds a running tokens credit and lastUpdateMs. On every record(config, value, timeMs) it first refills, tokens = min(burst, tokens + quota·Δt), then subtracts the recorded permits, tokens = min(burst, tokens − value). The maximum burst is samples · windowMs · quota (line 99). The quota is "exhausted" when tokens < 0.

record(value, timeMs)value = permits charged
refill · tokens = min(B, tokens + Q·Δt)B = S·W·Q (max burst credits) · Q = controller_mutation_rate (mutations/sec)
spend · tokens = min(B, tokens − value)
tokens < 0 ?
within quotathrottle = 0
exhausted
throttle = round(−tokens / Q · 1000) ms
Token-bucket accounting in TokenBucket (refill-then-spend) and the throttle math in ControllerMutationQuotaManager.throttleTimeMs.
caller (request handler) token bucket · waiting quota check quota exhausted control flow cylinder = bucket credit op rounded = decision
Design rationale

The token bucket exists because a single large sample can pin a sampled Rate above quota until that sample expires, forcing a throttle of up to S·W seconds even when the true backlog is small. The bucket tracks continuously-refilled credits instead, so a burst of 560 against Q=5, B=500 lands at −60 tokens and clears in 12 s rather than 100 s (worked example in the TokenBucket Javadoc). This is the controller-mutation enhancement of KIP-599.

In-memory: per-client sensor tuple & manager state

ClientSensors (server/.../quota/ClientSensors.java:31) is a record of (metricTags, quotaSensor, throttleTimeSensor). Inside ClientQuotaManager the important fields are:

  • volatile int quotaTypesEnabled, a bitmask (NO_QUOTAS=0, CLIENT_ID=1, USER=2, USER_CLIENT_ID=4, CUSTOM=8) summarising which entity levels currently have any quota set; enables a fast path that skips quota work entirely when nothing is configured (ClientQuotaManager.java:188, quotasEnabled() at 306).
  • ConcurrentHashMap<Integer,Integer> activeQuotaEntities, reference counts per level, so quotaTypesEnabled can be cleared when the last entity at a level is removed (updateQuotaTypes, line 572).
  • DelayQueue<ThrottledChannel> delayQueue, the in-flight throttled channels, ordered by expiry.
  • DefaultQuotaCallback.overriddenQuotas, a ConcurrentHashMap<ClientQuotaEntity, Quota> holding the configured limits keyed by entity (the source of truth for limit lookups).

On-wire: the delayed channel

ThrottledChannel (server/.../quota/ThrottledChannel.java:27) implements java.util.concurrent.Delayed. It records endTimeNanos = now + throttleTimeMs, calls callback.startThrottling() in its constructor, orders itself by endTimeNanos, and on dequeue calls callback.endThrottling() via notifyThrottlingDone().

Persistent: quota config records

Client/user/IP quotas are stored as ClientQuotaRecord entries in the KRaft metadata log, managed by metadata/.../controller/ClientQuotaControlManager.java (alterClientQuotas at line 97, replay at 131). Each record names an entity (a list of (entityType, entityName) with null = default) and a single (key, value) such as producer_byte_rate → 1048576. The config key constants are in QuotaConfig.java:89-93:

Override keyTypeQuota type
producer_byte_rateLONGPRODUCE bandwidth
consumer_byte_rateLONGFETCH bandwidth
request_percentageDOUBLEREQUEST CPU %
controller_mutation_rateDOUBLECONTROLLER_MUTATION (mutations/sec)
connection_creation_rateINTper-IP connection rate (default Integer.MAX_VALUE)

Replication throttles are broker configs (leader.replication.throttled.rate, follower.replication.throttled.rate, replica.alter.log.dirs.io.max.bytes.per.second; QuotaConfig.java:75-86) paired with topic configs {leader,follower}.replication.throttled.replicas that name which partitions are throttled.

Architecture & control/data flow

Config plane, controller ⇒ publisher
Admin: AlterClientQuotas / IncrementalAlterConfigs
ClientQuotaControlManager
metadata logreplicated
ClientQuotaMetadataManager.accept()
manager.updateQuota(user, client, Quota)
connectionQuotas.updateIpConnectionRateQuota()
↓ limit lookup feeds the hot path ↓
Request hot path, handler thread
KafkaApis.handleProduce / handleFetch
maybeRecordAndGetThrottleTimeMs(session, clientId, v)reads the configured limit set above
throttleTimeMs > 0 ?
RequestHandlerHelper.throttlebuild ThrottleCallback
ClientQuotaManager.throttledelayQueue.add · startThrottling
ThrottledChannelReaper: pollendThrottling, unmute channel
End-to-end quota configuration (top) and enforcement (bottom): metadata-derived limits feed the per-request throttle decision.
client / admin controller · metadata broker component metadata log · check throttle · waiting flow deferred (reaper) cylinder = log/store · rounded = decision

Detailed mechanics

Entity resolution order

For a given (principal, client-id) the most specific configured quota wins. DefaultQuotaCallback.findQuota (ClientQuotaManager.java:747) walks this precedence, returning the first hit:

  1. /config/users/<user>/clients/<client-id>
  2. /config/users/<user>/clients/<default>
  3. /config/users/<user>
  4. /config/users/<default>/clients/<client-id>
  5. /config/users/<default>/clients/<default>
  6. /config/users/<default>
  7. /config/clients/<client-id>
  8. /config/clients/<default>

The metric is tagged by the level that matched, not by the raw (user, client). The manager optimizes the common case where only one level is configured: quotaMetricTags (line 821) short-circuits to "(client-id only)", "(user only)" or "(user, client-id)" by inspecting quotaTypesEnabled, and only does the full eight-step probe of overriddenQuotas when multiple levels coexist. This keeps the per-request tag computation O(1) for the vast majority of clusters.

Recording & the throttle-time formula

The bandwidth/request path is recordAndGetThrottleTimeMs (ClientQuotaManager.java:333): get-or-create the sensor, then quotaSensor().record(value, timeMs, true). The true asks the sensor to enforce; on violation it computes throttleTime(e, timeMs) and returns it (else 0). The formula (QuotaUtils.throttleTime, line 41) is derived from "to bring observed rate O down to target T over window W, add delay X such that O·W/(W+X)=T", giving:

X = (O − T) / T · W      // W = the metric's current window size

The request quota caps this at one window (maxThrottleTimeMs = quotaWindowSizeSeconds·1000) via QuotaUtils.boundedThrottleTime (ClientRequestQuotaManager.java:99); connection-rate throttling is likewise capped at one window (SocketServer.scala:1585).

The fetch "unrecord" subtlety

Because the Sensor API couples recording and quota-checking, the fetch path must record the response size to test it. When the result is a throttle, the broker returns an empty response and so must subtract the value it just recorded: quotas.fetch.unrecordQuotaSensor(...) (ClientQuotaManager.java:367; called from KafkaApis.scala:710) records the negative of the same quantity, since the windowed sum is additive. Conversely, fetch sizing is pre-capped: maxValueInQuotaWindow (line 377) returns limit · (samples−1) · windowSeconds, the largest payload that could ever be returned without guaranteeing a throttle, and KafkaApis clamps fetchMaxBytes to it (line 742).

Request-quota accounting (CPU time)

The request quota measures time, not bytes. ClientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(request, timeMs) (line 78) records nanosToPercentage(request.requestThreadTimeNanos()), request-handler thread time expressed as a percentage of one second (NANOS_TO_PERCENTAGE_PER_SECOND = 100 / 1e9, line 41), and installs a callback so the network thread's send time is added later via recordNoThrottle. Internal/cluster-action traffic is recorded against a separate, never-expiring exempt-Request sensor through maybeRecordExempt (line 89) so it shows up in metrics without counting toward any client's quota.

Strict vs permissive controller-mutation quotas

Topic-mutation handlers obtain a per-request quota object whose strictness depends on the request version. ControllerMutationQuotaManager.newQuotaFor(session, header, strictSinceVersion) (line 169) returns a StrictControllerMutationQuota at or above the version and a PermissiveControllerMutationQuota below it. On the controller these are wired with strictSinceVersion = 5 (DeleteTopics), 6 (CreateTopics), 3 (CreatePartitions), ControllerApis.scala:200,364,801.

Strict (StrictControllerMutationQuota)Permissive (PermissiveControllerMutationQuota)
Accepts when exhausted?No, rejects with THROTTLING_QUOTA_EXCEEDEDYes, always records
Throttles below quota?No (only once exhausted)Throttles as soon as exhausted
record()checkQuotas then record(v,…,false) inside synchronized(quotaSensor); on violation throws (StrictControllerMutationQuota.java:54)record(v,…,true); on violation just remembers the delay (PermissiveControllerMutationQuota.java:51)

Both extend AbstractControllerMutationQuota, whose throttleTime() (line 46) decays the remembered delay by the time already elapsed since recording, important because a controller request can sit in purgatory long after the quota was evaluated. The strict path is "pre-check before mutate": the controller invokes controllerMutationQuota.record(permits) as it counts partitions (ControllerApis.scala:387), so an over-quota CreateTopics is rejected before any metadata record is written.

Holding the channel: mute / unmute

ClientQuotaManager.throttle (ClientQuotaManager.java:392) records the delay into the throttle-time sensor and enqueues a ThrottledChannel. The callback is built in RequestHandlerHelper.throttle (RequestHandlerHelper.scala:39): startThrottling → requestChannel.startThrottling(request) and endThrottling → requestChannel.endThrottling(request). Those send special responses (StartThrottlingResponse / EndThrottlingResponse, RequestChannel.scala:147,151) that the Processor turns into channel mute events THROTTLE_STARTED / THROTTLE_ENDED (SocketServer.scala:961,965), feeding the channel's mute state machine. The actual user response carrying throttle_time_ms is sent immediately (via the various sendResponseMaybeThrottle helpers) so the client learns the delay; the mute then suppresses further reads from that connection until the reaper fires THROTTLE_ENDED.

Connection quotas inside the acceptor

ConnectionQuotas.inc (SocketServer.scala:1295) runs under counts.synchronized on every accepted socket and does three things in order: (1) waitForConnectionSlot blocks until a count slot is free and the creation-rate throttle has elapsed; (2) recordIpConnectionMaybeThrottle checks the per-IP creation rate and throws ConnectionThrottledException if exceeded; (3) it bumps the per-IP / listener / total counters and throws TooManyConnectionsException if the per-IP count max is reached.

accept(socket)
ConnectionQuotas.inc(listener, ip)counts.synchronized
waitForConnectionSlotthrottleMs = recordConnectionAndGetThrottleTimeMs(listener)
throttleMs > 0 OR no slot ?
counts.wait(remaining)blocks the ACCEPTOR · re-checks slot on wakeup ↺
per-IP rate exceeded ?
unrecord · updateListenerMetricsthrow ConnectionThrottledException
counts[ip]++ · listenerCounts++ · totalCount++
counts[ip] ≥ maxPerIp ?
throw TooManyConnectionsException
admit connection
Connection admission control inside the acceptor. Note the acceptor thread itself blocks in counts.wait and re-checks the slot on wakeup; the inter-broker listener is "protected" so it is never starved.
socket / admitted acceptor (ConnectionQuotas) check / decision blocked / waiting rejected (exception) admit path block & retry rounded = decision

Creation-rate limits use the same Rate+Quota primitive (getOrCreateConnectionRateQuotaSensor, line 1597) with broker, per-listener and per-IP sensors. The broker rate metric is named broker-connection-accept-rate; per-IP/listener metrics are connection-accept-rate tagged by ip/listener (ConnectionQuotaEntity.java:29-54). A "protected" listener (protectedListener, line 1487) is the inter-broker listener when more than one listener exists; it bypasses the broker-wide rate and count so replication is never throttled by client connection pressure.

Concurrency & threading

StateGuarded byAccessed by
Sensor creation (per-client)SensorAccess read/write lock (read to look up, write to create), SensorAccess.java:39request-handler threads (via getOrCreateQuotaSensors)
Sensor recording & checkQuotassynchronized(this) + metricLock() inside Sensor.recordInternal (Sensor.java:232); mutation quotas additionally wrap the check+record in synchronized(quotaSensor) to make it atomicrequest-handler threads
Quota limit map & quotaTypesEnabledReentrantReadWriteLock write lock in updateQuota (ClientQuotaManager.java:530); quotaTypesEnabled is volatile for lock-free readsmetadata publisher thread writes; request threads read
DelayQueue<ThrottledChannel>the queue's own lock (concurrent)request threads enqueue; one reaper thread dequeues
Connection counts & rate configcounts.synchronized (an intrinsic monitor; counts.notifyAll wakes blocked acceptors)acceptor threads; config-update thread
Telemetry client cache / subscriptionsSynchronizedCache (LRU) + ConcurrentHashMap; double-checked synchronized(this) for instance creationrequest-handler threads; expiration-timer thread

Each ClientQuotaManager starts exactly one daemon thread, the ThrottledChannelReaper (ClientQuotaManager.java:276), named <prefix>ThrottledChannelReaper-<QuotaType> (e.g. ...ThrottledChannelReaper-Produce). Its loop polls the delay queue with a 1-second timeout; when a channel's delay expires it decrements the queue-size sensor and calls notifyThrottlingDone() to unmute. So there is one reaper per managed quota type (Produce, Fetch, Request, ControllerMutation). The four reaper threads, the request handlers, and the metadata publisher are the only actors here; bandwidth/fetch/request enforcement is entirely on the calling request-handler thread, with no extra hop.

Invariant

A channel is unmuted exactly once per throttle: the reaper is the sole consumer of the delay queue, and the channel mute state machine treats THROTTLE_STARTED/THROTTLE_ENDED as a balanced pair. On shutdown, initiateShutdown enqueues a zero-delay no-op ThrottledChannel (ClientQuotaManager.java:700) to wake the reaper out of its blocking poll.

Configuration reference

KeyDefaultEffect
quota.window.num11Samples retained for client quotas (10 whole + 1 current).
quota.window.size.seconds1Width of one client-quota window; also caps request-quota and connection-rate throttle to one window.
controller.quota.window.num / ...size.seconds11 / 1Sampling for the controller-mutation token bucket (the bucket's burst = num·size·rate).
replication.quota.window.num / ...size.seconds11 / 1Sampling for leader/follower replication throttle.
alter.log.dirs.replication.quota.window.num / ...size.seconds11 / 1Sampling for the alter-log-dirs throttle.
client.quota.callback.classnullCustom ClientQuotaCallback; when set, quotaTypesEnabled = CUSTOM_QUOTAS and the built-in entity resolution / metric-update optimizations are bypassed.
producer_byte_rate / consumer_byte_rate (entity config)Long.MAX_VALUEProduce / fetch bandwidth ceiling (bytes/sec) for a user/client entity.
request_percentage (entity config)Integer.MAX_VALUE (as double)Request-thread + network-thread CPU percentage ceiling.
controller_mutation_rate (entity config)Integer.MAX_VALUE (as double)Partition mutations/sec ceiling (KIP-599).
leader.replication.throttled.rate / follower.replication.throttled.rateLong.MAX_VALUEBytes/sec ceiling for throttled replicas (broker config, dynamic only).
replica.alter.log.dirs.io.max.bytes.per.secondLong.MAX_VALUEBytes/sec ceiling for inter-log-dir moves (dynamic only).
{leader,follower}.replication.throttled.replicas[]Per-topic list partitionId:brokerId,… or * marking which replicas are throttled.
max.connectionsInteger.MAX_VALUEBroker-wide concurrent connection count limit.
max.connections.per.ip / max.connections.per.ip.overridesInteger.MAX_VALUE / ""Per-IP connection count limit and host/ip overrides.
max.connection.creation.rateInteger.MAX_VALUEBroker-wide new-connection rate; per-listener via listener.name.X. prefix.
connection_creation_rate (IP entity)Integer.MAX_VALUEPer-IP new-connection rate (KIP-612), set dynamically per IP or default.
telemetry.max.bytes1048576 (1 MiB)Max compressed size of a single PushTelemetry payload (MetricConfigs.java:66).
interval.ms (client-metrics subscription)300000 (5 min)Push interval; clamped 100 ms–1 h (ClientMetricsConfigs.java:89-91).

Client telemetry (KIP-714)

The same broker that throttles a client also collects metrics from it. ClientMetricsManager handles two RPCs that any broker advertising them can serve (KafkaApis.scala:233-234).

The two RPCs

RPCHandlerWhat it does
GetTelemetrySubscriptionsprocessGetTelemetrySubscriptionRequest (line 159)Resolves/creates the client instance, returns its clientInstanceId (generating a fresh UUID if the client sent ZERO_UUID), the resolved metric set, a subscriptionId, the negotiated pushIntervalMs, accepted compression types, telemetryMaxBytes, and deltaTemporality=true.
PushTelemetryprocessPushTelemetryRequest (line 188)Validates the instance & push timing, checks compression and size, then hands the OTLP metrics ByteBuffer to clientTelemetryExporterPlugin.exportMetrics(...).

Subscriptions & the subscription id

A subscription is broker config (metrics, interval.ms, match; ClientMetricsConfigs.java:73-75) applied via updateSubscription (line 135), which bumps an AtomicInteger subscriptionUpdateVersion on every change. When a client first appears, createClientInstance (line 330) unions all subscriptions whose match pattern matches the client's metadata (client-id, software name/version, source address/port), unions their metric prefixes, and takes the minimum push interval; if any matching subscription requests * the metric set collapses to just *. The subscriptionId is CRC32C(metrics.toString()+pushIntervalMs) XOR clientInstanceId.hashCode() (line 368), so it changes whenever the effective subscription changes, letting the client detect drift by comparing ids.

Push-interval gating (a per-client throttle)

Telemetry has its own throttle independent of the quota managers: a client may push only once per pushIntervalMs. ClientMetricsInstance.maybeUpdatePushRequestTimestamp (ClientMetricsInstance.java:111) accepts a push iff (a) it follows a GetTelemetrySubscriptions that has not yet been "consumed" by a push (lastGetRequestTimestamp > lastPushRequestTimestamp), or (b) at least pushIntervalMs has elapsed since the last push. A too-early request raises ThrottlingQuotaExceededException (counted by the throttle sensor) and is reflected back as throttle_time_ms. The Get side is gated identically (maybeUpdateGetRequestTimestamp, line 101) but always lets through clients whose last error was UNKNOWN_SUBSCRIPTION_ID / UNSUPPORTED_COMPRESSION_TYPE so they can recover immediately.

ClientBroker, ClientMetricsManager
GetTelemetrySubscriptions(ZERO_UUID)
id=U, subId, metrics=[…], pushIntervalMs=P, maxBytes
Client: wait ~P (0.5–1.5× jitter) before pushing
PushTelemetry(U, subId, OTLP bytes)
Broker: validate(size ≤ maxBytes, subId, interval) ⇒ exporter.exportMetrics()
PushTelemetryResponse(NONE)
subId mismatch ⇒ UNKNOWN_SUBSCRIPTION_ID · client re-Gets to refresh
KIP-714 client-telemetry handshake and push loop.
client broker (telemetry sink) request response note = local processing / branch time flows ↓

Instance lifecycle

Instances live in an LRU SynchronizedCache capped at 16 384 entries (CACHE_MAX_SIZE, line 90) keyed by UUID, plus a connectionId → UUID map. Each instance has an expiration TimerTask scheduled max(60s, 3·pushIntervalMs) ahead on a SystemTimerReaper thread named client-metrics-reaper (line 84). Disconnects are observed via a ConnectionDisconnectListener (line 477) that evicts the instance and unregisters its per-instance metrics. A re-evaluation path (line 288) recomputes a client's subscription when subscriptionVersion < subscriptionUpdateVersion so config changes take effect on the client's next request.

Failure modes, edge cases & recovery

  • acks=0 produce: request throttling is exempt (a no-op response cannot carry throttle_time_ms), but the channel can still be muted for a bandwidth violation, KafkaApis.scala:523-525 sends a no-op-exempt response yet the produce-quota throttle still mutes.
  • Forwarded requests: when a broker forwards to the controller, request throttling is applied only on the original (non-forwarded) request to avoid double-counting (RequestHandlerHelper.scala:86,96,130); the response also merges the controller's own throttle_time_ms with the broker's via max (line 73).
  • Telemetry payload too large / bad compression: validatePushRequest raises TelemetryTooLargeException (> telemetry.max.bytes) or UnsupportedCompressionTypeException; an exporter failure returns INVALID_RECORD and increments plugin-error (line 219).
  • Terminating clients: a client that previously set terminating=true is refused further pushes with InvalidRequestException (line 404); the terminating push itself bypasses the interval gate so last-gasp metrics are not lost (line 411).
  • Telemetry cache pressure: if an instance is LRU-evicted before its timer fires, the timer's remove returns false and the manager logs a capacity warning at most once per 5 minutes (line 558).
  • Connection-rate over-record: throwing ConnectionThrottledException for an IP violation un-records the connection from the IP, listener and broker sensors (updateListenerMetrics, line 1533) so a rejected connection does not inflate the broker rate.
  • Acceptor blocking: waitForConnectionSlot blocks the acceptor thread under counts; an increase to max.connections calls counts.notifyAll (line 1323) to release waiters. A connection that hit the per-IP count max is created-then-rejected (the counter is incremented before the check at line 1307), and the socket is closed by the caller.

Invariants & guarantees

  • Changing a quota limit never resets accumulated usage, only the metric's MetricConfig.quota is replaced (updateQuotaMetricConfigs, line 633; ReplicationQuotaManager.updateQuota, line 64).
  • A client that obeys returned throttle_time_ms is never disconnected for a soft quota; throttling is backpressure, not failure. Hard limits (connection count, strict mutation quota) do reject.
  • The observed rate over any contiguous span converges to ≤ the quota: any excess is delayed by exactly the time required to dilute it back to the bound (the (O−T)/T·W identity).
  • The inter-broker listener is never throttled by broker-wide connection count/rate when multiple listeners exist (protectedListener), preserving replication under client overload.
  • A telemetry client accepts at most one push per push interval; the subscription id is a pure function of the effective subscription, so id-equality implies subscription-equality.

Interactions with other subsystems

Design rationale & evolution

  • KIP-599 added the controller-mutation quota and the TokenBucket stat to cope with bursty topic-admin workloads, plus the THROTTLING_QUOTA_EXCEEDED error for strict rejection. The strict-since-version gating lets old clients keep the permissive behaviour while new clients get hard rejection.
  • KIP-714 made the broker a telemetry sink: standardized client metrics serialized as OpenTelemetry, pushed to any broker, with broker-managed subscriptions and a per-client push-interval. The deltaTemporality flag and the CRC32C subscription id are direct artifacts of that design.
  • Per-IP connection-rate quotas (connection_creation_rate) were introduced to defend against connection storms from a single host independent of the broker-wide rate (KIP-612, inferred from the IP-entity plumbing in ConnectionQuotas).
  • The mute-channel mechanism (returning throttle_time_ms and holding the connection rather than erroring) is the long-standing quota-enforcement design (request-quota era, KIP-124/KIP-219 lineage; inferred). It keeps throttling transparent to correct clients.

Gotchas / operational notes

Gotcha

Per-client sensors expire after 1 hour of inactivity (INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600, ClientQuotaManager.java:62), and per-IP connection-rate sensors after 1 hour (ConnectionQuotaEntity.ipQuotaEntity). A reconnecting idle client starts a fresh quota window, usage history is not retained indefinitely. The request exempt sensor and the broker connection-rate sensor never expire.

Caution

The replication/connection-rate docs warn to keep throttle limits above ~1 MB/s for accurate behaviour: with a 1-second window and very small limits, the discrete sampling and one-window throttle cap make the effective rate noisy. The token bucket is immune to the "stuck sample" pathology but the plain Rate (bandwidth, replication) is not.

Note

Setting client.quota.callback.class flips the manager to CUSTOM_QUOTAS, disabling the single-level fast paths in quotaMetricTags / updateQuotaMetricConfigs and routing every lookup through your callback. A callback that returns quotaResetRequired=true forces a full metric-config sweep on every sensor creation (getOrCreateQuotaSensors, line 464), correct, but more expensive.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.