krivaltsevich.com Kafka Internals4.4

07 · Request Processing (KafkaApis)

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

Every byte a client sends a Kafka node eventually arrives at a single Scala method: a giant match on the request's API key. This chapter follows that journey, how a parsed request is dequeued by a request-handler thread, dispatched to one of roughly eighty cases, authorized, validated, acted upon (often via a coordinator or the replica manager, frequently parked in a purgatory for produce/fetch), turned into a response, throttled by the quota managers, and finally handed back to the network layer for transmission. We dissect the dispatcher (KafkaApis on brokers, ControllerApis on the KRaft controller), the request-handler thread pool, the RequestChannel round-trip, the ApiVersionManager that decides which APIs a node even exposes, and the authorization/throttling/error-mapping hooks woven through every handler.

Role & responsibilities

The request-processing layer is the broker's application logic dispatcher. The network layer (see Network Layer & Threading Model) is responsible for accepting connections, authenticating them, reading bytes off sockets, parsing them into typed request objects, and enqueuing them. From that point on, the request-processing layer owns the request. Its responsibilities are:

  • Dispatch, route each request to a type-specific handler based on its API key.
  • Authorize, consult the pluggable Authorizer for every resource the request touches, mapping denials to the appropriate *_AUTHORIZATION_FAILED error.
  • Validate, reject malformed input, unknown topics/partitions, unsupported versions, and invalid record batches.
  • Act, invoke the right subsystem: ReplicaManager for produce/fetch/delete-records, the GroupCoordinator/TransactionCoordinator/ShareCoordinator for group, transaction and share-group APIs, the MetadataCache for read-only metadata, or forward administrative requests to the controller.
  • Respond, build the response message, fold in quota-driven throttle time, and hand it to the RequestChannel.
  • Account, record per-request timing, byte, and error metrics.

A second, structurally similar dispatcher, ControllerApis, runs only on KRaft controller nodes and handles the disjoint set of APIs that the controller exposes (Raft RPCs, broker registration/heartbeat, and the write side of admin operations forwarded by brokers).

Key idea

There is no per-API thread or actor. A small, fixed pool of request-handler threads pulls requests off one shared queue and runs the entire synchronous portion of a handler to completion. Operations that must wait for other events (replication acks, accumulating fetch bytes, a coordinator commit) do not block the thread, they register a delayed operation in a purgatory and the thread moves on. Completion happens later on another thread, which invokes a captured callback to finish building and sending the response.

Where it lives in the code

Class / InterfaceFileRole
ApiRequestHandlercore/src/main/scala/kafka/server/KafkaRequestHandler.scala:37The contract: handle(request, requestLocal) + tryCompleteActions().
KafkaApiscore/src/main/scala/kafka/server/KafkaApis.scala:93Broker-side dispatcher; ~80 dispatch cases.
ControllerApiscore/src/main/scala/kafka/server/ControllerApis.scala:69KRaft-controller-side dispatcher.
KafkaRequestHandlercore/src/main/scala/kafka/server/KafkaRequestHandler.scala:91One request-handler thread (the Runnable).
KafkaRequestHandlerPoolcore/src/main/scala/kafka/server/KafkaRequestHandler.scala:227Pool of handler threads; dynamically resizable.
RequestChannelcore/src/main/scala/kafka/network/RequestChannel.scala:80The queue between network and handler threads, and the response path back.
Requestserver/src/main/java/org/apache/kafka/network/Request.java:54The in-flight request object: context, body, timing fields.
RequestHandlerHelpercore/src/main/scala/kafka/server/RequestHandlerHelper.scala:28Throttling + response-sending helpers shared by both dispatchers.
AuthHelpercore/src/main/scala/kafka/server/AuthHelper.scala:42Authorization helpers (single, batch, by-resource-type).
ApiVersionManagerserver/src/main/java/org/apache/kafka/server/ApiVersionManager.java:28Decides which APIs/versions this node exposes.
DefaultApiVersionManager / SimpleApiVersionManagerserver/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java:33 / SimpleApiVersionManager.java:33Broker / controller implementations.
DelayedProduceserver/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java:41Parked acks=all produce.
DelayedFetchcore/src/main/scala/kafka/server/DelayedFetch.scala:43Parked long-poll fetch.

Core concepts & terminology

API key
A short identifying the request type (PRODUCE=0, FETCH=1, …). Enumerated in ApiKeys. The dispatcher's giant match keys on it.
Request handler thread
A daemon thread named data-plane-kafka-request-handler-N that loops, pulling requests and running handlers. There are num.io.threads of them per node.
RequestChannel
The bridge: a bounded request queue (network → handlers) plus the per-processor response queues (handlers → network).
RequestLocal
A thread-confined scratch area (notably a buffer-supplier cache) handed to the handler; one per request-handler thread, reused across requests on that thread.
Purgatory
A timeout-and-key-indexed holding area for operations that cannot complete immediately. See Replication, ISR & High Watermark and The Fetch Path.
Forwarding
Brokers do not mutate cluster metadata directly; admin requests are wrapped in an Envelope and sent to the active controller via the ForwardingManager.
Throttle time
An integer field in most responses telling the client how long it was (or should consider itself) throttled by quotas. Computed by the quota managers and folded in just before the response is built.

The request-handler thread & the RequestChannel round-trip

The handler loop

Each KafkaRequestHandler is a Runnable. Its run() loop (KafkaRequestHandler.scala:108) is deceptively small:

while (!stopped) {
  val req = requestChannel.receiveRequest(300)          // blocks up to 300 ms
  // … account idle time …
  req match {
    case _: ShutdownRequest   => completeShutdown(); return
    case callback: CallbackRequest => threadCurrentRequest.set(callback.originalRequest)
                                      callback.fun().accept(requestLocal)   // rescheduled callback
    case request: Request     => request.requestDequeueTimeNanos(endTime)
                                 threadCurrentRequest.set(request)
                                 apis.handle(request, requestLocal)         // the dispatch
    case _: WakeupRequest     => // handled inside receiveRequest
    case null                 => // poll timed out, loop
  }
}

On entry the thread stashes the RequestChannel in a ThreadLocal (threadRequestChannel, KafkaRequestHandler.scala:44); while handling, it stashes the current Request in threadCurrentRequest. These two thread-locals are the mechanism that lets an asynchronous completion callback reschedule itself back onto a request-handler thread (described below). Every branch wraps the work in try/catch: a FatalExitError triggers process exit (Exit.exit), any other Throwable is logged and swallowed so a single bad request never kills the thread. The finally clears threadCurrentRequest and calls request.releaseBuffer().

Client socketProcessor
(network thread)
RequestChannelRequest-handler thread
bytes
parse ⇒ new Request
sendRequest(request)
requestQueue.put(request)
receiveRequest(300) poll
request
apis.handle(request, requestLocal)
authorize · validate · act
sendResponse(request, resp)
buildResponseSend ⇒ SendResponse
enqueueResponse(resp)
write Send (bytes)
Request lifecycle: the network thread parses and enqueues; a handler thread dequeues, runs the handler, and pushes a response back through the originating processor.
client socket broker thread / queue request / call poll result / response box = note (work done in a lane) lanes read top → bottom over time

RequestChannel internals

RequestChannel (RequestChannel.scala:80) holds three structures:

  • requestQueue, an ArrayBlockingQueue[BaseRequest] of capacity queueSize (= queued.max.requests, default 500). The network threads put; handler threads poll. Because it is bounded and blocking, a slow handler pool exerts back-pressure all the way to the sockets.
  • processors, a ConcurrentHashMap[Int, Processor] keyed by processor id, so a response can be routed back to the exact network thread (processor) that owns the client's connection. request.processor records that id.
  • callbackQueue, a second ArrayBlockingQueue for CallbackRequests (rescheduled completions), which jump ahead of normal requests in receiveRequest (RequestChannel.scala:200) since they have "already waited in line".

The response path is sendResponse(request, response) (RequestChannel.scala:131): it updates per-error metrics, builds a Send via request.buildResponseSend(response), computes the optional JSON request-log node, wraps it in a SendResponse, sets responseCompleteTimeNanos, looks up the originating Processor in processors, and calls processor.enqueueResponse. If the processor was shut down (lookup returns null) the response is silently dropped, the connection is already gone. Beyond SendResponse, the channel models NoOpResponse (e.g. acks=0 produce: nothing to send), CloseConnectionResponse (error on a no-reply request), and the throttling sentinels StartThrottlingResponse/EndThrottlingResponse that mute/unmute a channel without consuming the single real response (RequestChannel.scala:42-77).

Invariant

Exactly one of SendResponse, NoOpResponse, or CloseConnectionResponse is produced per request; StartThrottlingResponse/EndThrottlingResponse may occur in addition and deliberately skip the response-timing bookkeeping (RequestChannel.scala:174-187).

The thread pool

KafkaRequestHandlerPool (KafkaRequestHandler.scala:227) creates num.io.threads handler threads via KafkaThread.daemon("data-plane-kafka-request-handler-" + id, …). The pool is built from a shared KafkaRequestHandlerPoolFactory so that the broker and controller (when co-located) share one aggregate idle-percent meter (RequestHandlerAvgIdlePercent) while each keeps its own per-pool meter (BrokerRequestHandlerAvgIdlePercent / ControllerRequestHandlerAvgIdlePercent, KafkaRequestHandler.scala:243-251). The pool is dynamically resizable: resizeThreadPool(newSize) (KafkaRequestHandler.scala:283) spins up or stops threads under a monitor, wired to the dynamic config num.io.threads through DynamicThreadPool.

ConfigDefaultEffect
num.io.threads8Size of the request-handler pool per node (ServerConfigs.NUM_IO_THREADS_DEFAULT). Dynamically reconfigurable.
queued.max.requests500Capacity of the shared request queue (SocketServerConfigs.QUEUED_MAX_REQUESTS_DEFAULT); the back-pressure point.

The dispatch contract

Both dispatchers implement ApiRequestHandler.handle. On the broker, KafkaApis.handle (KafkaApis.scala:155) does:

  1. Version gate. If apiVersionManager.isApiEnabled(apiKey, apiVersion) is false it throws IllegalStateException, though in practice the socket layer already rejected such requests before enqueuing, so this is a defensive backstop (KafkaApis.scala:166-170).
  2. The giant match. A request.header.apiKey match with a case per API (KafkaApis.scala:172-254). Each case calls a handleXxxRequest method. A missing key throws IllegalStateException("No handler …").
  3. Error funnel. The whole body is wrapped in try/catch: FatalExitError re-throws; any other Throwable goes to the local handleError, which logs and calls requestHelper.handleError(request, e) to send (or, if no response is expected, to close the connection) (KafkaApis.scala:255-257).
  4. finally. Calls replicaManager.tryCompleteActions(), draining the deferred action queue so any delayed operations made completable by this request fire promptly, and records apiLocalCompleteTimeNanos if still unset (KafkaApis.scala:258-267).

Two handler styles coexist:

  • Imperative (Unit-returning). e.g. produce, fetch, metadata. These send their own response inside callbacks and return nothing.
  • Future-returning (CompletableFuture[Unit]). e.g. all the consumer-group, share-group and streams APIs that delegate to the GroupCoordinator (which is asynchronous). The dispatch attaches .exceptionally(handleError) so a failed future is funneled to the same error path (e.g. KafkaApis.scala:177, :230).
Note

A large family of admin APIs is not handled locally at all on the broker, the case body is simply forwardToController(request): CREATE_TOPICS, DELETE_TOPICS, CREATE_PARTITIONS, CREATE_ACLS/DELETE_ACLS, ALTER_CLIENT_QUOTAS, ALTER_USER_SCRAM_CREDENTIALS, ELECT_LEADERS, ALTER_PARTITION_REASSIGNMENTS/LIST_PARTITION_REASSIGNMENTS, UPDATE_FEATURES, UNREGISTER_BROKER, the Raft-voter APIs, and DESCRIBE_QUORUM (KafkaApis.scala:188-237). (The ALTER_CONFIGS family is not bare-forwarded: handleAlterConfigsRequest/handleIncrementalAlterConfigsRequest preprocess some resources locally and forward only the remainder.) These all land in ControllerApis after forwarding.

Walkthrough 1, Produce

handleProduceRequest (KafkaApis.scala:397) is the canonical "act, then maybe park in purgatory" handler.

  1. Transactional authorize. If the batch carries transactional records, the principal must have WRITE on the TRANSACTIONAL_ID; otherwise the request is failed with TRANSACTIONAL_ID_AUTHORIZATION_FAILED (KafkaApis.scala:400-407).
  2. Resolve & bucket partitions. Each partition's topic id/name is resolved against the MetadataCache. Partitions are sorted into unauthorizedTopicResponses, nonExistingTopicResponses, invalidRequestResponses, and authorizedRequestInfo. Authorization is done in one batch call, authHelper.filterByAuthorized(ctx, WRITE, TOPIC, …), to avoid one authorizer round-trip per partition (KafkaApis.scala:415-451).
  3. Record validation. Each authorized batch is run through ProduceRequest.validateRecords; an ApiException there maps the partition to the corresponding error (KafkaApis.scala:444-450).
  4. Define the response callback. sendResponseCallback(responseStatus) merges the per-partition results, attaches current-leader hints (for v10+, on NOT_LEADER_OR_FOLLOWER), computes throttle time, and sends (KafkaApis.scala:458-530).
  5. Act. If anything is left to write, call replicaManager.handleProduceAppend(timeout, requiredAcks, …, responseCallback = sendResponseCallback, …), then immediately produceRequest.clearPartitionRecords() to drop the request's reference to the record bytes so GC can reclaim them while the operation sits in purgatory (KafkaApis.scala:538-558).

Inside ReplicaManager, appendRecords writes to the leader log, builds a per-partition ProducePartitionStatus, and calls maybeAddDelayedProduce (ReplicaManager.scala:878). If requiredAcks == -1 (acks=all) and the local writes succeeded, it constructs a DelayedProduce and registers it: delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) keyed by each TopicPartitionOperationKey (ReplicaManager.scala:903-911). Otherwise (acks=0 or acks=1) the callback fires synchronously and the response is sent at once.

DelayedProduce mechanics

DelayedProduce (DelayedProduce.java:41) holds a map of ProducePartitionStatus. On construction, partitions that succeeded locally have acksPending=true and their response error is pre-set to REQUEST_TIMED_OUT (cleared only when enough replicas catch up) (DelayedProduce.java:111-121). tryComplete() (DelayedProduce.java:140) asks, for each still-pending partition, a PartitionStatusValidator, a callback that delegates back into ReplicaManager#getPartitionOrError(...).checkEnoughReplicasReachOffset(requiredOffset) (wired in ReplicaManager.scala:891-901), three cases:

  • A: replica not assigned to this broker; B: broker no longer leader; C: broker is leader → C.1 a local error, or C.2 enough replicas have reached the required offset.

If any case resolves a partition, its acksPending is cleared and its error set accordingly. When no partition is still pending, forceComplete() fires onComplete(), which assembles the final per-partition response map and invokes the captured responseCallback (= the handler's sendResponseCallback) (DelayedProduce.java:159-196). On timeout, onExpiration() just records the expiration meter; the pre-set REQUEST_TIMED_OUT errors then surface to the client.

handleProduceRequestKafkaApis
ReplicaManager.handleProduceAppend
appendRecordsToLeaderlocal write
acks = all?
responseCallback(now)
delayedProducePurgatory.tryCompleteElseWatch(DelayedProduce, keys)
follower fetch advances HWcheckAndComplete(key)
tryComplete
forceComplete
onExpirationREQUEST_TIMED_OUT
onComplete
responseCallback
sendResponse
acks=all produce: written locally, then parked in purgatory until follower fetches advance the high watermark or the timeout expires; either way a single response callback fires.
broker dispatch / replica manager log / local write purgatory (delayed op) timeout / error path cylinder = log store rounded = decision control flow async wake-up expiry

Walkthrough 2, Fetch

handleFetchRequest (KafkaApis.scala:564) mirrors produce but for reads, and additionally manages incremental fetch sessions.

  1. Build a fetch context. fetchManager.newContext(version, metadata, isFromFollower, fetchData, forgottenTopics, topicNames) resolves the incremental-fetch session (full vs. incremental) and yields a FetchContext (KafkaApis.scala:577-583). See The Fetch Path & Replica Fetchers.
  2. Authorize by caller type. A follower fetch (replicaId ≥ 0) requires CLUSTER_ACTION on CLUSTER; a consumer fetch requires READ on each topic (batched). Unauthorized / unknown partitions go into erroneous; the rest into interesting (KafkaApis.scala:587-621).
  3. Define the response callback. processResponseCallback(responsePartitionData) builds each FetchResponseData.PartitionData (high watermark, last-stable-offset, log-start, aborted txns, records, preferred read replica, diverging epoch, and current-leader hints for v16+), then either records leader-quota usage (follower path, exempt from client throttle) or applies fetch+request quotas and possibly returns an empty throttled response (consumer path) (KafkaApis.scala:636-731).
  4. Act. If interesting is non-empty, cap fetchMaxBytes against fetch.max.bytes and the quota window, build FetchParams, and call replicaManager.fetchMessages(params, interesting, quota, responseCallback) (KafkaApis.scala:733-776).

When the leader log has fewer than minBytes immediately available, ReplicaManager creates a DelayedFetch and watches it. DelayedFetch.tryComplete (DelayedFetch.scala:70) re-evaluates eight cases on each trigger: leadership loss (A/B), unknown partition (C), offline dir (D), fenced epoch (E), fetch offset on an older/truncated segment (F), accumulated bytes ≥ minBytes (G), or a newly-discovered diverging epoch requiring truncation (H). On completion it re-reads from the log (readFromLog(..., readFromPurgatory = true)) and invokes the callback (DelayedFetch.scala:158-178).

Note

Follower fetches are exempt from client throttling (sendResponseExemptThrottle) but are recorded against the leader replication quota; consumer fetches are subject to both the per-client fetch-bandwidth quota and the request quota (KafkaApis.scala:686-730).

Walkthrough 3, Metadata (read from the cache)

handleTopicMetadataRequest (KafkaApis.scala:885) is purely read-only and never touches a log or coordinator. It authorizes DESCRIBE on the requested topics, then delegates to the private getTopicMetadata helper (KafkaApis.scala:838-883, invoked at :962), which resolves them through metadataCache.getTopicMetadata(topics, listenerName, …) (KafkaApis.scala:847). For topics the cache does not know, the helper either (a) asks the AutoTopicCreationManager to create them, itself a forward-to-controller path gated by the controller-mutation quota, when auto-creation is allowed, or (b) returns UNKNOWN_TOPIC_OR_PARTITION / INVALID_TOPIC_EXCEPTION (KafkaApis.scala:856-882). The MetadataCache is the broker's read-optimized projection of the KRaft metadata log; see Metadata Propagation & Broker Lifecycle.

Walkthrough 4, Coordinator-delegating APIs

Group, transaction and share-group APIs are thin: authorize, then hand off to a coordinator and let its (asynchronous) result drive the response.

  • InitProducerId (KafkaApis.scala:1621): authorizes WRITE on the transactional id (or IDEMPOTENT_WRITE/WRITE-by-resource-type for idempotent-only producers; plus TWO_PHASE_COMMIT when enable2Pc), validates the producer-id/epoch pair, then calls txnCoordinator.handleInitProducerId(... sendResponseCallback, requestLocal). The callback down-converts PRODUCER_FENCED to INVALID_PRODUCER_EPOCH for v<4 clients (KafkaApis.scala:1640-1681).
  • AddPartitionsToTxn (KafkaApis.scala:1874): v4+ requests come only from other brokers and require CLUSTER_ACTION; v<4 come from clients and require WRITE on the transactional id and topics. Each transaction is dispatched to txnCoordinator.handleAddPartitionsToTransaction (or handleVerifyPartitionsInTransaction for verify-only), and a synchronized counter (responses.synchronized) sends the batched response only once all transactions have reported back (KafkaApis.scala:1904-1980).
  • FindCoordinator (KafkaApis.scala:1192): resolves the coordinator node for a group/transaction/share key by hashing into the internal topic and reading the partition leader from the MetadataCache.
  • Offset & group APIs (OFFSET_COMMIT, OFFSET_FETCH, JOIN_GROUP, SYNC_GROUP, HEARTBEAT, CONSUMER_GROUP_HEARTBEAT, …) all return CompletableFuture[Unit] from the new GroupCoordinator interface; see Group Coordination & Rebalance Protocols.

Transaction-specific handling is covered in Transactions & Exactly-Once Semantics; share-group APIs (the SHARE_* and *_SHARE_GROUP_STATE families dispatched at KafkaApis.scala:238-249) in Share Groups.

ApiVersions & the ApiVersionManager

ApiVersionManager (ApiVersionManager.java:28) is the authority on which API/version combinations this node accepts. Its default isApiEnabled(apiKey, version) is a conjunction of three predicates (ApiVersionManager.java:62-64):

apiKey != null
  && apiKey.inScope(listenerType())                       // listener-type scope
  && apiKey.isVersionEnabled(version, enableUnstableLastVersion())

inScope (ApiKeys.java:312) tests whether the API is exposed on this node's listener type (BROKER vs. CONTROLLER); isVersionEnabled (ApiKeys.java:245) tests the version range, with ApiVersions itself accepted at any version so an old client can always negotiate down to v0. The enableUnstableLastVersion flag, backing the config unstable.api.versions.enable, decides whether the newest, not-yet-stable version of each API is advertised.

Two implementations exist:

DefaultApiVersionManager (broker)SimpleApiVersionManager (controller)
Listener typeBROKERCONTROLLER
Features sourcemetadataCache.features()controller-supplied featuresProvider
Advertised versionsIntersection of broker APIs with the active controller's NodeApiVersions (so a client only sees versions the controller can actually service for forwarded APIs)A fixed collection computed once from the listener type
Forwarding-aware?Yes, takes () => forwardingManager.controllerApiVersionsNo
Built atBrokerServer.scala:267ControllerServer.scala:162

handleApiVersionsRequest (KafkaApis.scala:1524) is special-cased: it returns the full supported-API list regardless of authentication state, because clients must negotiate versions before they can SASL-authenticate. The handler builds the response via apiVersionManager.apiVersionResponse(throttleMs, alterFeatureLevel0), where alterFeatureLevel0 = (apiVersion < 4). It also validates the request: UNSUPPORTED_VERSION for an unknown request version, INVALID_REQUEST if malformed, and, for v5+ that carry a cluster id / node id (KIP-1242), REBOOTSTRAP_REQUIRED if they don't match this broker (KafkaApis.scala:1531-1551). The response also carries the cluster's finalized and supported features, which the broker uses for feature-gating (DefaultApiVersionManager.java:78-101).

Design rationale

Folding the controller's API versions into the broker's ApiVersionsResponse means a client never attempts a forwardable admin request at a version the controller cannot handle, the broker advertises only the intersection. Forwarding of administrative requests through brokers was introduced by KIP-590; the post-ZooKeeper, KRaft-only world makes the broker a pure proxy for these mutations.

Broker APIs vs. controller APIs (the split)

The two dispatchers are deliberately disjoint in scope. The split is enforced structurally by ApiKeys.inScope(listenerType): a broker listener never even surfaces, say, VOTE or BROKER_HEARTBEAT to KafkaApis, and a controller listener never surfaces PRODUCE.

KafkaApis (broker)ControllerApis (KRaft controller)
HandlesData plane (produce/fetch/list-offsets), group/txn/share coordination, metadata reads, client telemetry, SASL, plus forwarding of admin writes.Raft RPCs (VOTE, BEGIN/END_QUORUM_EPOCH, FETCH/FETCH_SNAPSHOT for the metadata log), broker registration/heartbeat, ALTER_PARTITION, and the write side of admin operations (create/delete topics, ACLs, configs, quotas, reassignments, feature updates).
Return styleMixed: Unit + CompletableFuture[Unit]Uniformly CompletableFuture[Unit] per case (ControllerApis.scala:96-137)
Backing engineReplicaManager, coordinators, MetadataCacheThe Controller (QuorumController) and RaftManager
ApiVersionManagerDefaultApiVersionManagerSimpleApiVersionManager

On the controller, Raft RPCs are answered by handleRaftRequest (ControllerApis.scala:685), which forwards the request straight to raftManager.handleRequest(...) and sends the result exempt from throttling. Admin writes (e.g. handleDeleteTopics, ControllerApis.scala:198) wrap the call to controller.* in a ControllerRequestContext carrying a deadline and a controller-mutation quota recorder, and respond via sendResponseMaybeThrottleWithControllerQuota. See KRaft Consensus and The KRaft Controller.

Envelopes & forwarding

When a broker forwards a request, it wraps the original (with its original principal) in an EnvelopeRequest and sends it to the controller via ForwardingManager.forwardRequest (ForwardingManager.java:46). The controller's handleEnvelopeRequest (ControllerApis.scala:164) requires CLUSTER_ACTION (only brokers may envelope), unwraps it via EnvelopeUtils.handleEnvelopeRequest, and re-dispatches the inner request through its own handle. The inner response is re-wrapped; crucially, if the inner response indicates NOT_CONTROLLER, Request.buildResponseSend rewrites the envelope to carry NOT_CONTROLLER so the broker knows to re-resolve the active controller and retry (Request.java:236-256). A forwarded request is marked by request.envelope().isPresent(); request.isForwarded gates throttling so forwarded requests are not double-throttled (RequestHandlerHelper.scala:87, 97, 105).

Concurrency & threading model

The model is "synchronous handler, asynchronous completion":

  • Who runs handlers: the data-plane-kafka-request-handler-N threads. One handler runs start-to-finish on one such thread for its synchronous portion. State internal to a single handle call is therefore single-threaded and lock-free.
  • RequestLocal: created per thread (RequestLocal.withThreadConfinedCaching, KafkaRequestHandler.scala:105) and reused; it holds a buffer-supplier cache that must only be touched by its owning thread.
  • Cross-thread completion: a DelayedProduce/DelayedFetch may be completed by a different thread, a replica-fetcher thread advancing the high watermark, the purgatory's expiration-reaper thread, or another request-handler thread processing a follower fetch. Its onComplete invokes the response callback there.
  • Re-scheduling onto a handler thread: when a callback must run with the originating thread's RequestLocal (e.g. coordinator append callbacks), KafkaRequestHandler.wrapAsyncCallback (KafkaRequestHandler.scala:65) checks whether it is already on the right request thread; if not, it enqueues a CallbackRequest on the channel's callbackQueue and pushes a WakeupRequest to nudge a waiting handler. The callback then runs on a handler thread with that thread's RequestLocal (KafkaRequestHandler.scala:131-160).
  • Shared mutable state in handlers: deliberately minimal. The AddPartitionsToTxn handler is the notable exception, it guards its result accumulator with responses.synchronized because per-transaction callbacks may fire concurrently (KafkaApis.scala:1905).
  • Memory visibility on Request: the timing fields are volatile precisely because they are written by request/purgatory threads and read by network threads (Request.java:70-82).
Caution

A handler must never perform a long blocking call (disk-bound replication wait, coordinator commit) on the request-handler thread, with only num.io.threads (default 8) threads, a few stuck handlers stall the whole node. The purgatory + callback-rescheduling machinery exists precisely to keep handler threads non-blocking.

Authorization, request context & principal

Every Request carries a RequestContext (Request.java:59) with the authenticated KafkaPrincipal, the client address, listener name, security protocol, and the parsed header. A Session (principal + address) is derived for quota keying (Request.java:100).

Handlers authorize through AuthHelper (AuthHelper.scala:42), which wraps an optional Plugin[Authorizer], when no authorizer is configured, everything is allowed (the authorizer.forall / "None ⇒ all" branches). Three shapes are used:

  • authorize(ctx, op, resourceType, name), a single allow/deny (AuthHelper.scala:43).
  • filterByAuthorized(ctx, op, type, resources)(nameOf), one batched authorizer call for many resources, returning the allowed names; this is why produce/fetch authorize all partitions in a single round-trip (AuthHelper.scala:100). It deduplicates and counts references per resource name so the audit log sees the true access count.
  • authorizeByResourceType / authorizeClusterOperation, for cluster-wide and idempotent-write checks (AuthHelper.scala:57, 78).

Denials never throw at the partition granularity, they translate to a per-resource error code (TOPIC_AUTHORIZATION_FAILED, GROUP_AUTHORIZATION_FAILED, …) folded into the response, so a partially-authorized request still succeeds for the parts it may touch. Cluster-level denials throw ClusterAuthorizationException, which the error funnel converts to CLUSTER_AUTHORIZATION_FAILED. Full details in Security: Authentication & Authorization.

Throttling integration

Throttling is computed inside the handler (or in RequestHandlerHelper) right before the response is built, and is expressed two ways at once: (1) the integer throttleTimeMs in the response body that tells the client to back off, and (2) actual server-side channel muting for that duration. The quota managers live in QuotaManagers (quotas.produce, quotas.fetch, quotas.request, quotas.controllerMutation, quotas.leader).

The common helpers (RequestHandlerHelper.scala):

HelperBehaviour
sendResponseMaybeThrottleRecord request-quota usage, throttle (mute) if violated and not forwarded, then send (:93).
sendErrorResponseMaybeThrottleAs above for error responses; also throttles cluster-auth failures even when forwarded (:102).
sendResponseExemptThrottleRecord as exempt and send without muting, used for follower fetch and Raft RPCs (:143).
sendForwardedResponseTake max(controllerThrottle, localRequestThrottle) so forwarding adds the controller's throttle on top (:67).
sendResponseMaybeThrottleWithControllerQuotaCompare controller-mutation throttle vs. request throttle, mute on whichever is larger (:120).
throttleMute the channel by sending StartThrottlingResponse/EndThrottlingResponse around the delay (:34).

The produce and fetch handlers compute throttle inline because they juggle two quotas (bandwidth + request) and pick the larger; maxThrottleTimeMs = max(bandwidthThrottleTimeMs, requestThrottleTimeMs) is recorded onto request.apiThrottleTimeMs and used as the muting duration (KafkaApis.scala:491-505 for produce, :700-717 for fetch). Note that for acks=0 the request quota is not enforced (KafkaApis.scala:494-496). Quotas, throttling, and client metrics are detailed in Quotas, Throttling & Client Metrics.

Error mapping

Kafka has a fixed catalog of Errors codes; handlers map exceptions and conditions to them. There are two layers:

  • Per-resource: a handler explicitly sets a partition/topic/group error code (e.g. UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_OR_FOLLOWER). Some are version-aware: a KafkaStorageException is down-converted to NOT_LEADER_OR_FOLLOWER for fetch ≤ v5 (KafkaApis.scala:623-633); PRODUCER_FENCED becomes INVALID_PRODUCER_EPOCH for old InitProducerId/AddPartitionsToTxn clients (KafkaApis.scala:1643-1646, :1955-1958).
  • Whole-request: an uncaught exception hits the funnel. RequestHandlerHelper.handleError (RequestHandlerHelper.scala:46) decides between sendErrorResponseMaybeThrottle and sendErrorResponseExemptThrottle (cluster-action requests are exempt unless they failed with ClusterAuthorizationException). Building the error response calls request.body(...).getErrorResponse(throttleMs, error); if that returns null (the API expects no response, e.g. acks=0 produce), the connection is closed via closeConnection instead (RequestHandlerHelper.scala:54-65).

Failure modes, edge cases & recovery

  • Handler exception. Caught by the dispatcher and translated to an error response (or connection close); the handler thread survives.
  • FatalExitError. Propagated out of the loop; the thread shuts down and the JVM exits, used for truly unrecoverable states.
  • Queue full. requestQueue.put blocks the network thread, throttling intake at the socket, intentional back-pressure rather than dropping requests.
  • Processor gone. If the owning network thread shut down, sendResponse finds null in processors and drops the response (connection already closed) (RequestChannel.scala:189-194).
  • acks=0 with errors. No response is expected, so on any error the broker closes the connection to force the client to refresh metadata (KafkaApis.scala:508-521).
  • Forwarding to a stale controller. If the controller replies with an unsupported version during forwarding, the broker closes the client connection (it may have failed over) (KafkaApis.scala:145-150); a NOT_CONTROLLER envelope reply makes the broker re-resolve the controller.
  • Delayed-op timeout. A DelayedProduce/DelayedFetch that never becomes completable fires onExpiration at its deadline, returning whatever is available (with REQUEST_TIMED_OUT for produce partitions that never got enough acks).
  • Buffer lifecycle. Most parsed requests release their network buffer immediately; produce (and any schema with BYTES/NULLABLE_BYTES fields, flagged requiresDelayedAllocation) keeps it until processing finishes, then the loop's finally calls releaseBuffer() (Request.java:113-115, ApiKeys.java:194, KafkaRequestHandler.scala:175).

Invariants & guarantees

Invariant

A request is processed by at most one handler thread's synchronous pass; any continuation that needs that thread's RequestLocal is rescheduled back onto a handler thread rather than run on a foreign (purgatory/fetcher) thread.

Invariant

Per-partition ordering for a producer is preserved because each connection's requests are read in order by one network thread, the request queue is FIFO, and (with idempotence) the log append enforces sequence ordering. The dispatcher itself adds no reordering.

Invariant

Exactly one terminal response (or connection-close) is emitted per request; throttle muting is layered on top without consuming that response.

Interactions with other subsystems

Design rationale & evolution

Design rationale

The single shared queue + small fixed thread pool keeps the broker's concurrency model trivially reasonable and bounded, while the purgatory pattern decouples a request's arrival from its completion so long-poll fetches and quorum-acked produces never tie up a scarce I/O thread. The cost is that all latency-sensitive work must be expressible as a delayed operation with a tryComplete predicate.

Major evolutionary points visible in the code: request forwarding through brokers (KIP-590) which makes KafkaApis a proxy for admin mutations; incremental fetch sessions (KIP-227) behind FetchManager.newContext; the new asynchronous GroupCoordinator consumer-group protocol (KIP-848) that turned many group handlers into CompletableFuture-returning methods; share groups / queues (KIP-932) adding the whole SHARE_* handler family; and the cluster-id/node-id bootstrap check in ApiVersions (KIP-1242, KafkaApis.scala:1544). With ZooKeeper removed in 4.0, the broker/controller dispatcher split is now the clean boundary between the data plane (KafkaApis) and the metadata control plane (ControllerApis + the KRaft controller).

Gotchas & operational notes

Gotcha

The version check in KafkaApis.handle (:166) is a defensive backstop, the socket server normally rejects out-of-scope/unsupported APIs and closes the connection before the request reaches the handler. If you ever see that IllegalStateException, the network-layer gate was bypassed (typically only in tests).

Gotcha

acks=0 produce is the one path that closes the client connection on error rather than replying. Spurious connection resets under load on producers configured with acks=0 usually mean broker-side produce errors, not network faults.

Gotcha

Per-request metrics distinguish sub-categories under the same API key: follower vs. consumer fetch (FetchFollower/FetchConsumer), verify-only AddPartitionsToTxn, and the legacy client-metrics resource listing, see the metric-name override switch in Request.updateRequestMetrics (Request.java:339-356). Monitor the right one.

Note

The RequestHandlerAvgIdlePercent meter is the canonical "is my broker CPU/IO-thread saturated?" signal: values near 0 mean the handler pool is fully busy and you may need more num.io.threads (or to relieve the underlying bottleneck). It is computed by discounting each handler's idle time by the thread count so it reads as a true fraction (KafkaRequestHandler.scala:111-123).

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.