19 · Quotas, Throttling & Client Metrics
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
Kafka protects a shared cluster from being overwhelmed by any single tenant through a layered set of quotas: bandwidth (produce/fetch byte-rate), request-thread time, controller-mutation rate, replication and alter-log-dirs throttles, and connection count / creation rate. All of them are built on the same primitive, a Sensor wrapping a sampled Rate or TokenBucket stat with a Quota bound, and all of them throttle by holding the channel muted for a computed delay and returning throttle_time_ms so well-behaved clients back off rather than being disconnected. The same broker also acts as a sink for client telemetry (KIP-714): ClientMetricsManager assigns each client a UUID, hands out metric subscriptions via GetTelemetrySubscriptions, and ingests OpenTelemetry payloads via PushTelemetry, gating each client to its own push interval. This chapter derives the data structures, algorithms, threading, and configuration of all of these from source.
Role & responsibilities
The quota subsystem answers a single operational question, "is this client allowed to do this much, right now?", for several distinct resources, and translates "no" into a backpressure signal rather than an error wherever possible. Its responsibilities:
- Bandwidth quotas (
QuotaType.PRODUCE,QuotaType.FETCH): cap bytes/sec per principal/client-id, measured against the request/response size on the produce and fetch hot paths. - Request quota (
QuotaType.REQUEST): cap the fraction of broker request-handler + network-thread CPU time a client may consume (request_percentage), so a client cannot starve others with cheap-but-numerous requests. - Controller-mutation quota (
QuotaType.CONTROLLER_MUTATION, KIP-599): cap the rate of partition creations/deletions fromCreateTopics,CreatePartitions,DeleteTopicsusing a token bucket that tolerates bursts. - Replication throttles (
LEADER_REPLICATION,FOLLOWER_REPLICATION,ALTER_LOG_DIRS_REPLICATION): cap reassignment / log-dir-move traffic so it does not crowd out client traffic. - Connection quotas: cap concurrent connection count (broker / listener / per-IP) and connection creation rate (broker / listener / per-IP), enforced inside the acceptor.
- Client telemetry (KIP-714): subscribe clients to metric sets, ingest pushed OTLP, and throttle each client to its negotiated push interval.
Quota limits are stored as cluster metadata (config records in the KRaft metadata log) and applied dynamically; see Metadata Propagation & Broker Lifecycle. The throttling mechanism is interleaved with request processing (Request Processing (KafkaApis)) and the network layer's channel mute/unmute machinery (Network Layer & Threading Model).
Where it lives in the code
| Class / file | Role |
|---|---|
server/src/main/java/org/apache/kafka/server/quota/ClientQuotaManager.java | Base manager for bandwidth/request/mutation quotas; sensor lifecycle, entity resolution, throttle-delay computation, the reaper thread and delay queue. |
core/src/main/java/kafka/server/ClientRequestQuotaManager.java | REQUEST quota; records request-thread + network-thread time as a percentage; exempt sensor. |
server/.../quota/ControllerMutationQuotaManager.java (+ StrictControllerMutationQuota, PermissiveControllerMutationQuota, AbstractControllerMutationQuota) | Token-bucket controller-mutation quota and the per-request quota objects. |
server/.../quota/ReplicationQuotaManager.java | Replication / alter-log-dirs byte-rate throttle; throttled-partition set. |
core/src/main/java/kafka/server/QuotaFactory.java | Instantiates the seven managers into a QuotaManagers record and wires the quota callback plugin. |
server/.../quota/ThrottledChannel.java, ThrottleCallback.java, ClientSensors.java | Delay-queue element, the start/end-throttling callback interface, and the per-client sensor tuple. |
server-common/.../quota/QuotaType.java, QuotaUtils.java, SensorAccess.java | Quota-type enum, throttle-time math, lock-guarded sensor creation. |
server-common/.../config/QuotaConfig.java | All quota config keys, defaults and ConfigDefs. |
clients/.../common/metrics/stats/{Rate,TokenBucket}.java, Sensor.java, Quota.java | The metric primitives that do the actual rate accounting and quota check. |
core/src/main/scala/kafka/network/SocketServer.scala (ConnectionQuotas) | Connection count + creation-rate enforcement in the acceptor. |
core/src/main/scala/kafka/server/metadata/ClientQuotaMetadataManager.scala | Applies ClientQuotasDelta from the metadata log to the managers and to ConnectionQuotas. |
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java | KIP-714 telemetry: subscriptions, client-instance cache, push-interval gating, OTLP export. |
server/.../metrics/{ClientMetricsInstance,ClientMetricsConfigs}.java | Per-client telemetry state & the metrics/interval.ms/match subscription config. |
Core concepts & terminology
- Quota entity
- The subject a quota applies to. For client quotas it is a combination of a user (principal) and/or a client-id; for connection quotas it is broker / listener / IP. Defaults exist at each level (
<default>). - Quota type
- One of the nine
QuotaTypevalues (server-common/.../quota/QuotaType.java:19):FETCH, PRODUCE, REQUEST, CONTROLLER_MUTATION, LEADER_REPLICATION, FOLLOWER_REPLICATION, ALTER_LOG_DIRS_REPLICATION, RLM_COPY, RLM_FETCH. The first four map to aClientQuotaType; the last two are tiered-storage throttles (see Tiered Storage). - Sensor
- A named accumulator (
org.apache.kafka.common.metrics.Sensor) holding one or more stats. Recording a value updates every stat; ifcheckQuotasis requested and any stat's measured value crosses its configuredQuota, aQuotaViolationExceptionis thrown carrying the observed value, the bound, and the metric. - Throttle time
- The delay (ms) needed to bring the observed rate back to the bound, assuming no further recording. Computed by
QuotaUtils.throttleTimefor rates and byControllerMutationQuotaManager.throttleTimeMsfor token buckets. - Muting a channel
- Suppressing reads on a connection so the client receives no further responses (and its in-flight window fills) until the throttle expires. Used to enforce the delay without a busy-wait.
- Subscription / client instance
- KIP-714 terms: a subscription is a named set of (metric prefixes, push interval, match pattern) stored as broker config; a client instance is one connected client identified by a broker-assigned UUID, carrying its resolved subscription and last-request timestamps.
Every client quota is "one Sensor per (quota-type, resolved entity)". The quota limit lives in the sensor's MetricConfig.quota(); the quota usage is the sensor's sampled Rate/TokenBucket value. Changing a limit is just re-setting MetricConfig on an existing metric, no usage is reset, which is why dynamic quota changes are cheap and non-disruptive.
Data structures
In-memory: the sampled Rate and the quota check
A bandwidth/request quota sensor carries a Rate (clients/.../metrics/stats/Rate.java:33) wrapping a SampledStat (a WindowedSum by default). The sampled stat keeps N windows; the rate is "total observed over the live windows ÷ elapsed time across those windows". Rate.windowSize (line 81) deliberately pads the denominator to at least samples-1 full windows so that a tiny amount of elapsed time in a fresh window does not produce an absurdly high instantaneous rate.
| Field / object | Type | Meaning |
|---|---|---|
MetricConfig.timeWindowMs | long | Width of one sample window. From quota.window.size.seconds (default 1 s). |
MetricConfig.samples | int | Number of windows retained. From quota.window.num (default 11 = 10 whole + 1 current; QuotaConfig.java:42). |
MetricConfig.quota | Quota | The bound (bytes/sec, percentage, or token refill rate) and direction (upper). |
SampledStat.samples | list of windows | Each window has startTimeMs, eventCount, accumulated value. |
Quota (clients/.../metrics/Quota.java:22) is a (double bound, boolean upper); acceptable(v) returns upper ? v <= bound : v >= bound. The check happens in Sensor.checkQuotas(timeMs) (Sensor.java:253): for each metric with a configured quota it measures the value and, for a TokenBucket, violates iff the value (remaining tokens) is < 0; otherwise it violates iff !quota.acceptable(value).
In-memory: the token bucket (controller mutations)
TokenBucket (clients/.../metrics/stats/TokenBucket.java:59) holds a running tokens credit and lastUpdateMs. On every record(config, value, timeMs) it first refills, tokens = min(burst, tokens + quota·Δt), then subtracts the recorded permits, tokens = min(burst, tokens − value). The maximum burst is samples · windowMs · quota (line 99). The quota is "exhausted" when tokens < 0.
TokenBucket (refill-then-spend) and the throttle math in ControllerMutationQuotaManager.throttleTimeMs.The token bucket exists because a single large sample can pin a sampled Rate above quota until that sample expires, forcing a throttle of up to S·W seconds even when the true backlog is small. The bucket tracks continuously-refilled credits instead, so a burst of 560 against Q=5, B=500 lands at −60 tokens and clears in 12 s rather than 100 s (worked example in the TokenBucket Javadoc). This is the controller-mutation enhancement of KIP-599.
In-memory: per-client sensor tuple & manager state
ClientSensors (server/.../quota/ClientSensors.java:31) is a record of (metricTags, quotaSensor, throttleTimeSensor). Inside ClientQuotaManager the important fields are:
volatile int quotaTypesEnabled, a bitmask (NO_QUOTAS=0, CLIENT_ID=1, USER=2, USER_CLIENT_ID=4, CUSTOM=8) summarising which entity levels currently have any quota set; enables a fast path that skips quota work entirely when nothing is configured (ClientQuotaManager.java:188,quotasEnabled()at 306).ConcurrentHashMap<Integer,Integer> activeQuotaEntities, reference counts per level, soquotaTypesEnabledcan be cleared when the last entity at a level is removed (updateQuotaTypes, line 572).DelayQueue<ThrottledChannel> delayQueue, the in-flight throttled channels, ordered by expiry.DefaultQuotaCallback.overriddenQuotas, aConcurrentHashMap<ClientQuotaEntity, Quota>holding the configured limits keyed by entity (the source of truth for limit lookups).
On-wire: the delayed channel
ThrottledChannel (server/.../quota/ThrottledChannel.java:27) implements java.util.concurrent.Delayed. It records endTimeNanos = now + throttleTimeMs, calls callback.startThrottling() in its constructor, orders itself by endTimeNanos, and on dequeue calls callback.endThrottling() via notifyThrottlingDone().
Persistent: quota config records
Client/user/IP quotas are stored as ClientQuotaRecord entries in the KRaft metadata log, managed by metadata/.../controller/ClientQuotaControlManager.java (alterClientQuotas at line 97, replay at 131). Each record names an entity (a list of (entityType, entityName) with null = default) and a single (key, value) such as producer_byte_rate → 1048576. The config key constants are in QuotaConfig.java:89-93:
| Override key | Type | Quota type |
|---|---|---|
producer_byte_rate | LONG | PRODUCE bandwidth |
consumer_byte_rate | LONG | FETCH bandwidth |
request_percentage | DOUBLE | REQUEST CPU % |
controller_mutation_rate | DOUBLE | CONTROLLER_MUTATION (mutations/sec) |
connection_creation_rate | INT | per-IP connection rate (default Integer.MAX_VALUE) |
Replication throttles are broker configs (leader.replication.throttled.rate, follower.replication.throttled.rate, replica.alter.log.dirs.io.max.bytes.per.second; QuotaConfig.java:75-86) paired with topic configs {leader,follower}.replication.throttled.replicas that name which partitions are throttled.
Architecture & control/data flow
Detailed mechanics
Entity resolution order
For a given (principal, client-id) the most specific configured quota wins. DefaultQuotaCallback.findQuota (ClientQuotaManager.java:747) walks this precedence, returning the first hit:
/config/users/<user>/clients/<client-id>/config/users/<user>/clients/<default>/config/users/<user>/config/users/<default>/clients/<client-id>/config/users/<default>/clients/<default>/config/users/<default>/config/clients/<client-id>/config/clients/<default>
The metric is tagged by the level that matched, not by the raw (user, client). The manager optimizes the common case where only one level is configured: quotaMetricTags (line 821) short-circuits to "(client-id only)", "(user only)" or "(user, client-id)" by inspecting quotaTypesEnabled, and only does the full eight-step probe of overriddenQuotas when multiple levels coexist. This keeps the per-request tag computation O(1) for the vast majority of clusters.
Recording & the throttle-time formula
The bandwidth/request path is recordAndGetThrottleTimeMs (ClientQuotaManager.java:333): get-or-create the sensor, then quotaSensor().record(value, timeMs, true). The true asks the sensor to enforce; on violation it computes throttleTime(e, timeMs) and returns it (else 0). The formula (QuotaUtils.throttleTime, line 41) is derived from "to bring observed rate O down to target T over window W, add delay X such that O·W/(W+X)=T", giving:
X = (O − T) / T · W // W = the metric's current window size
The request quota caps this at one window (maxThrottleTimeMs = quotaWindowSizeSeconds·1000) via QuotaUtils.boundedThrottleTime (ClientRequestQuotaManager.java:99); connection-rate throttling is likewise capped at one window (SocketServer.scala:1585).
The fetch "unrecord" subtlety
Because the Sensor API couples recording and quota-checking, the fetch path must record the response size to test it. When the result is a throttle, the broker returns an empty response and so must subtract the value it just recorded: quotas.fetch.unrecordQuotaSensor(...) (ClientQuotaManager.java:367; called from KafkaApis.scala:710) records the negative of the same quantity, since the windowed sum is additive. Conversely, fetch sizing is pre-capped: maxValueInQuotaWindow (line 377) returns limit · (samples−1) · windowSeconds, the largest payload that could ever be returned without guaranteeing a throttle, and KafkaApis clamps fetchMaxBytes to it (line 742).
Request-quota accounting (CPU time)
The request quota measures time, not bytes. ClientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(request, timeMs) (line 78) records nanosToPercentage(request.requestThreadTimeNanos()), request-handler thread time expressed as a percentage of one second (NANOS_TO_PERCENTAGE_PER_SECOND = 100 / 1e9, line 41), and installs a callback so the network thread's send time is added later via recordNoThrottle. Internal/cluster-action traffic is recorded against a separate, never-expiring exempt-Request sensor through maybeRecordExempt (line 89) so it shows up in metrics without counting toward any client's quota.
Strict vs permissive controller-mutation quotas
Topic-mutation handlers obtain a per-request quota object whose strictness depends on the request version. ControllerMutationQuotaManager.newQuotaFor(session, header, strictSinceVersion) (line 169) returns a StrictControllerMutationQuota at or above the version and a PermissiveControllerMutationQuota below it. On the controller these are wired with strictSinceVersion = 5 (DeleteTopics), 6 (CreateTopics), 3 (CreatePartitions), ControllerApis.scala:200,364,801.
Strict (StrictControllerMutationQuota) | Permissive (PermissiveControllerMutationQuota) | |
|---|---|---|
| Accepts when exhausted? | No, rejects with THROTTLING_QUOTA_EXCEEDED | Yes, always records |
| Throttles below quota? | No (only once exhausted) | Throttles as soon as exhausted |
| record() | checkQuotas then record(v,…,false) inside synchronized(quotaSensor); on violation throws (StrictControllerMutationQuota.java:54) | record(v,…,true); on violation just remembers the delay (PermissiveControllerMutationQuota.java:51) |
Both extend AbstractControllerMutationQuota, whose throttleTime() (line 46) decays the remembered delay by the time already elapsed since recording, important because a controller request can sit in purgatory long after the quota was evaluated. The strict path is "pre-check before mutate": the controller invokes controllerMutationQuota.record(permits) as it counts partitions (ControllerApis.scala:387), so an over-quota CreateTopics is rejected before any metadata record is written.
Holding the channel: mute / unmute
ClientQuotaManager.throttle (ClientQuotaManager.java:392) records the delay into the throttle-time sensor and enqueues a ThrottledChannel. The callback is built in RequestHandlerHelper.throttle (RequestHandlerHelper.scala:39): startThrottling → requestChannel.startThrottling(request) and endThrottling → requestChannel.endThrottling(request). Those send special responses (StartThrottlingResponse / EndThrottlingResponse, RequestChannel.scala:147,151) that the Processor turns into channel mute events THROTTLE_STARTED / THROTTLE_ENDED (SocketServer.scala:961,965), feeding the channel's mute state machine. The actual user response carrying throttle_time_ms is sent immediately (via the various sendResponseMaybeThrottle helpers) so the client learns the delay; the mute then suppresses further reads from that connection until the reaper fires THROTTLE_ENDED.
Connection quotas inside the acceptor
ConnectionQuotas.inc (SocketServer.scala:1295) runs under counts.synchronized on every accepted socket and does three things in order: (1) waitForConnectionSlot blocks until a count slot is free and the creation-rate throttle has elapsed; (2) recordIpConnectionMaybeThrottle checks the per-IP creation rate and throws ConnectionThrottledException if exceeded; (3) it bumps the per-IP / listener / total counters and throws TooManyConnectionsException if the per-IP count max is reached.
counts.wait and re-checks the slot on wakeup; the inter-broker listener is "protected" so it is never starved.Creation-rate limits use the same Rate+Quota primitive (getOrCreateConnectionRateQuotaSensor, line 1597) with broker, per-listener and per-IP sensors. The broker rate metric is named broker-connection-accept-rate; per-IP/listener metrics are connection-accept-rate tagged by ip/listener (ConnectionQuotaEntity.java:29-54). A "protected" listener (protectedListener, line 1487) is the inter-broker listener when more than one listener exists; it bypasses the broker-wide rate and count so replication is never throttled by client connection pressure.
Concurrency & threading
| State | Guarded by | Accessed by |
|---|---|---|
| Sensor creation (per-client) | SensorAccess read/write lock (read to look up, write to create), SensorAccess.java:39 | request-handler threads (via getOrCreateQuotaSensors) |
Sensor recording & checkQuotas | synchronized(this) + metricLock() inside Sensor.recordInternal (Sensor.java:232); mutation quotas additionally wrap the check+record in synchronized(quotaSensor) to make it atomic | request-handler threads |
Quota limit map & quotaTypesEnabled | ReentrantReadWriteLock write lock in updateQuota (ClientQuotaManager.java:530); quotaTypesEnabled is volatile for lock-free reads | metadata publisher thread writes; request threads read |
DelayQueue<ThrottledChannel> | the queue's own lock (concurrent) | request threads enqueue; one reaper thread dequeues |
| Connection counts & rate config | counts.synchronized (an intrinsic monitor; counts.notifyAll wakes blocked acceptors) | acceptor threads; config-update thread |
| Telemetry client cache / subscriptions | SynchronizedCache (LRU) + ConcurrentHashMap; double-checked synchronized(this) for instance creation | request-handler threads; expiration-timer thread |
Each ClientQuotaManager starts exactly one daemon thread, the ThrottledChannelReaper (ClientQuotaManager.java:276), named <prefix>ThrottledChannelReaper-<QuotaType> (e.g. ...ThrottledChannelReaper-Produce). Its loop polls the delay queue with a 1-second timeout; when a channel's delay expires it decrements the queue-size sensor and calls notifyThrottlingDone() to unmute. So there is one reaper per managed quota type (Produce, Fetch, Request, ControllerMutation). The four reaper threads, the request handlers, and the metadata publisher are the only actors here; bandwidth/fetch/request enforcement is entirely on the calling request-handler thread, with no extra hop.
A channel is unmuted exactly once per throttle: the reaper is the sole consumer of the delay queue, and the channel mute state machine treats THROTTLE_STARTED/THROTTLE_ENDED as a balanced pair. On shutdown, initiateShutdown enqueues a zero-delay no-op ThrottledChannel (ClientQuotaManager.java:700) to wake the reaper out of its blocking poll.
Configuration reference
| Key | Default | Effect |
|---|---|---|
quota.window.num | 11 | Samples retained for client quotas (10 whole + 1 current). |
quota.window.size.seconds | 1 | Width of one client-quota window; also caps request-quota and connection-rate throttle to one window. |
controller.quota.window.num / ...size.seconds | 11 / 1 | Sampling for the controller-mutation token bucket (the bucket's burst = num·size·rate). |
replication.quota.window.num / ...size.seconds | 11 / 1 | Sampling for leader/follower replication throttle. |
alter.log.dirs.replication.quota.window.num / ...size.seconds | 11 / 1 | Sampling for the alter-log-dirs throttle. |
client.quota.callback.class | null | Custom ClientQuotaCallback; when set, quotaTypesEnabled = CUSTOM_QUOTAS and the built-in entity resolution / metric-update optimizations are bypassed. |
producer_byte_rate / consumer_byte_rate (entity config) | Long.MAX_VALUE | Produce / fetch bandwidth ceiling (bytes/sec) for a user/client entity. |
request_percentage (entity config) | Integer.MAX_VALUE (as double) | Request-thread + network-thread CPU percentage ceiling. |
controller_mutation_rate (entity config) | Integer.MAX_VALUE (as double) | Partition mutations/sec ceiling (KIP-599). |
leader.replication.throttled.rate / follower.replication.throttled.rate | Long.MAX_VALUE | Bytes/sec ceiling for throttled replicas (broker config, dynamic only). |
replica.alter.log.dirs.io.max.bytes.per.second | Long.MAX_VALUE | Bytes/sec ceiling for inter-log-dir moves (dynamic only). |
{leader,follower}.replication.throttled.replicas | [] | Per-topic list partitionId:brokerId,… or * marking which replicas are throttled. |
max.connections | Integer.MAX_VALUE | Broker-wide concurrent connection count limit. |
max.connections.per.ip / max.connections.per.ip.overrides | Integer.MAX_VALUE / "" | Per-IP connection count limit and host/ip overrides. |
max.connection.creation.rate | Integer.MAX_VALUE | Broker-wide new-connection rate; per-listener via listener.name.X. prefix. |
connection_creation_rate (IP entity) | Integer.MAX_VALUE | Per-IP new-connection rate (KIP-612), set dynamically per IP or default. |
telemetry.max.bytes | 1048576 (1 MiB) | Max compressed size of a single PushTelemetry payload (MetricConfigs.java:66). |
interval.ms (client-metrics subscription) | 300000 (5 min) | Push interval; clamped 100 ms–1 h (ClientMetricsConfigs.java:89-91). |
Client telemetry (KIP-714)
The same broker that throttles a client also collects metrics from it. ClientMetricsManager handles two RPCs that any broker advertising them can serve (KafkaApis.scala:233-234).
The two RPCs
| RPC | Handler | What it does |
|---|---|---|
GetTelemetrySubscriptions | processGetTelemetrySubscriptionRequest (line 159) | Resolves/creates the client instance, returns its clientInstanceId (generating a fresh UUID if the client sent ZERO_UUID), the resolved metric set, a subscriptionId, the negotiated pushIntervalMs, accepted compression types, telemetryMaxBytes, and deltaTemporality=true. |
PushTelemetry | processPushTelemetryRequest (line 188) | Validates the instance & push timing, checks compression and size, then hands the OTLP metrics ByteBuffer to clientTelemetryExporterPlugin.exportMetrics(...). |
Subscriptions & the subscription id
A subscription is broker config (metrics, interval.ms, match; ClientMetricsConfigs.java:73-75) applied via updateSubscription (line 135), which bumps an AtomicInteger subscriptionUpdateVersion on every change. When a client first appears, createClientInstance (line 330) unions all subscriptions whose match pattern matches the client's metadata (client-id, software name/version, source address/port), unions their metric prefixes, and takes the minimum push interval; if any matching subscription requests * the metric set collapses to just *. The subscriptionId is CRC32C(metrics.toString()+pushIntervalMs) XOR clientInstanceId.hashCode() (line 368), so it changes whenever the effective subscription changes, letting the client detect drift by comparing ids.
Push-interval gating (a per-client throttle)
Telemetry has its own throttle independent of the quota managers: a client may push only once per pushIntervalMs. ClientMetricsInstance.maybeUpdatePushRequestTimestamp (ClientMetricsInstance.java:111) accepts a push iff (a) it follows a GetTelemetrySubscriptions that has not yet been "consumed" by a push (lastGetRequestTimestamp > lastPushRequestTimestamp), or (b) at least pushIntervalMs has elapsed since the last push. A too-early request raises ThrottlingQuotaExceededException (counted by the throttle sensor) and is reflected back as throttle_time_ms. The Get side is gated identically (maybeUpdateGetRequestTimestamp, line 101) but always lets through clients whose last error was UNKNOWN_SUBSCRIPTION_ID / UNSUPPORTED_COMPRESSION_TYPE so they can recover immediately.
Instance lifecycle
Instances live in an LRU SynchronizedCache capped at 16 384 entries (CACHE_MAX_SIZE, line 90) keyed by UUID, plus a connectionId → UUID map. Each instance has an expiration TimerTask scheduled max(60s, 3·pushIntervalMs) ahead on a SystemTimerReaper thread named client-metrics-reaper (line 84). Disconnects are observed via a ConnectionDisconnectListener (line 477) that evicts the instance and unregisters its per-instance metrics. A re-evaluation path (line 288) recomputes a client's subscription when subscriptionVersion < subscriptionUpdateVersion so config changes take effect on the client's next request.
Failure modes, edge cases & recovery
- acks=0 produce: request throttling is exempt (a no-op response cannot carry
throttle_time_ms), but the channel can still be muted for a bandwidth violation,KafkaApis.scala:523-525sends a no-op-exempt response yet the produce-quota throttle still mutes. - Forwarded requests: when a broker forwards to the controller, request throttling is applied only on the original (non-forwarded) request to avoid double-counting (
RequestHandlerHelper.scala:86,96,130); the response also merges the controller's ownthrottle_time_mswith the broker's viamax(line 73). - Telemetry payload too large / bad compression:
validatePushRequestraisesTelemetryTooLargeException(>telemetry.max.bytes) orUnsupportedCompressionTypeException; an exporter failure returnsINVALID_RECORDand incrementsplugin-error(line 219). - Terminating clients: a client that previously set
terminating=trueis refused further pushes withInvalidRequestException(line 404); the terminating push itself bypasses the interval gate so last-gasp metrics are not lost (line 411). - Telemetry cache pressure: if an instance is LRU-evicted before its timer fires, the timer's
removereturns false and the manager logs a capacity warning at most once per 5 minutes (line 558). - Connection-rate over-record: throwing
ConnectionThrottledExceptionfor an IP violation un-records the connection from the IP, listener and broker sensors (updateListenerMetrics, line 1533) so a rejected connection does not inflate the broker rate. - Acceptor blocking:
waitForConnectionSlotblocks the acceptor thread undercounts; an increase tomax.connectionscallscounts.notifyAll(line 1323) to release waiters. A connection that hit the per-IP count max is created-then-rejected (the counter is incremented before the check at line 1307), and the socket is closed by the caller.
Invariants & guarantees
- Changing a quota limit never resets accumulated usage, only the metric's
MetricConfig.quotais replaced (updateQuotaMetricConfigs, line 633;ReplicationQuotaManager.updateQuota, line 64). - A client that obeys returned
throttle_time_msis never disconnected for a soft quota; throttling is backpressure, not failure. Hard limits (connection count, strict mutation quota) do reject. - The observed rate over any contiguous span converges to ≤ the quota: any excess is delayed by exactly the time required to dilute it back to the bound (the
(O−T)/T·Widentity). - The inter-broker listener is never throttled by broker-wide connection count/rate when multiple listeners exist (
protectedListener), preserving replication under client overload. - A telemetry client accepts at most one push per push interval; the subscription id is a pure function of the effective subscription, so id-equality implies subscription-equality.
Interactions with other subsystems
- Request Processing (KafkaApis) and
RequestHandlerHelperare the call sites for every client quota; the produce/fetch paths combine bandwidth and request throttles withmax. - Network Layer & Threading Model owns the channel mute state machine the reaper drives, and hosts
ConnectionQuotasinside the acceptor. - Replication, ISR & High Watermark and The Fetch Path consult
ReplicationQuotaManager(leader side records throttled response sizes;KafkaApis.scala:689). - The KRaft Controller enforces the strict controller-mutation quota in
ControllerApis; Metadata Propagation deliversClientQuotasDeltaviaClientQuotaMetadataManager. - Tiered Storage reuses the same machinery for
RLM_COPY/RLM_FETCHthrottles (RLMQuotaManager). - The Producer Client / The Consumer Client read
throttle_time_msand the telemetry subscription; client-side telemetry plumbing lives inorg.apache.kafka.common.telemetry. - Security supplies the
KafkaPrincipalin theSessionthat the user-level quota keys on (Session.java:24).
Design rationale & evolution
- KIP-599 added the controller-mutation quota and the
TokenBucketstat to cope with bursty topic-admin workloads, plus theTHROTTLING_QUOTA_EXCEEDEDerror for strict rejection. The strict-since-version gating lets old clients keep the permissive behaviour while new clients get hard rejection. - KIP-714 made the broker a telemetry sink: standardized client metrics serialized as OpenTelemetry, pushed to any broker, with broker-managed subscriptions and a per-client push-interval. The
deltaTemporalityflag and the CRC32C subscription id are direct artifacts of that design. - Per-IP connection-rate quotas (
connection_creation_rate) were introduced to defend against connection storms from a single host independent of the broker-wide rate (KIP-612, inferred from the IP-entity plumbing inConnectionQuotas). - The mute-channel mechanism (returning
throttle_time_msand holding the connection rather than erroring) is the long-standing quota-enforcement design (request-quota era, KIP-124/KIP-219 lineage; inferred). It keeps throttling transparent to correct clients.
Gotchas / operational notes
Per-client sensors expire after 1 hour of inactivity (INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600, ClientQuotaManager.java:62), and per-IP connection-rate sensors after 1 hour (ConnectionQuotaEntity.ipQuotaEntity). A reconnecting idle client starts a fresh quota window, usage history is not retained indefinitely. The request exempt sensor and the broker connection-rate sensor never expire.
The replication/connection-rate docs warn to keep throttle limits above ~1 MB/s for accurate behaviour: with a 1-second window and very small limits, the discrete sampling and one-window throttle cap make the effective rate noisy. The token bucket is immune to the "stuck sample" pathology but the plain Rate (bandwidth, replication) is not.
Setting client.quota.callback.class flips the manager to CUSTOM_QUOTAS, disabling the single-level fast paths in quotaMetricTags / updateQuotaMetricConfigs and routing every lookup through your callback. A callback that returns quotaResetRequired=true forces a full metric-config sweep on every sensor creation (getOrCreateQuotaSensors, line 464), correct, but more expensive.