07 · Request Processing (KafkaApis)
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
Every byte a client sends a Kafka node eventually arrives at a single Scala method: a giant match on the request's API key. This chapter follows that journey, how a parsed request is dequeued by a request-handler thread, dispatched to one of roughly eighty cases, authorized, validated, acted upon (often via a coordinator or the replica manager, frequently parked in a purgatory for produce/fetch), turned into a response, throttled by the quota managers, and finally handed back to the network layer for transmission. We dissect the dispatcher (KafkaApis on brokers, ControllerApis on the KRaft controller), the request-handler thread pool, the RequestChannel round-trip, the ApiVersionManager that decides which APIs a node even exposes, and the authorization/throttling/error-mapping hooks woven through every handler.
Role & responsibilities
The request-processing layer is the broker's application logic dispatcher. The network layer (see Network Layer & Threading Model) is responsible for accepting connections, authenticating them, reading bytes off sockets, parsing them into typed request objects, and enqueuing them. From that point on, the request-processing layer owns the request. Its responsibilities are:
- Dispatch, route each request to a type-specific handler based on its API key.
- Authorize, consult the pluggable
Authorizerfor every resource the request touches, mapping denials to the appropriate*_AUTHORIZATION_FAILEDerror. - Validate, reject malformed input, unknown topics/partitions, unsupported versions, and invalid record batches.
- Act, invoke the right subsystem:
ReplicaManagerfor produce/fetch/delete-records, theGroupCoordinator/TransactionCoordinator/ShareCoordinatorfor group, transaction and share-group APIs, theMetadataCachefor read-only metadata, or forward administrative requests to the controller. - Respond, build the response message, fold in quota-driven throttle time, and hand it to the
RequestChannel. - Account, record per-request timing, byte, and error metrics.
A second, structurally similar dispatcher, ControllerApis, runs only on KRaft controller nodes and handles the disjoint set of APIs that the controller exposes (Raft RPCs, broker registration/heartbeat, and the write side of admin operations forwarded by brokers).
There is no per-API thread or actor. A small, fixed pool of request-handler threads pulls requests off one shared queue and runs the entire synchronous portion of a handler to completion. Operations that must wait for other events (replication acks, accumulating fetch bytes, a coordinator commit) do not block the thread, they register a delayed operation in a purgatory and the thread moves on. Completion happens later on another thread, which invokes a captured callback to finish building and sending the response.
Where it lives in the code
| Class / Interface | File | Role |
|---|---|---|
ApiRequestHandler | core/src/main/scala/kafka/server/KafkaRequestHandler.scala:37 | The contract: handle(request, requestLocal) + tryCompleteActions(). |
KafkaApis | core/src/main/scala/kafka/server/KafkaApis.scala:93 | Broker-side dispatcher; ~80 dispatch cases. |
ControllerApis | core/src/main/scala/kafka/server/ControllerApis.scala:69 | KRaft-controller-side dispatcher. |
KafkaRequestHandler | core/src/main/scala/kafka/server/KafkaRequestHandler.scala:91 | One request-handler thread (the Runnable). |
KafkaRequestHandlerPool | core/src/main/scala/kafka/server/KafkaRequestHandler.scala:227 | Pool of handler threads; dynamically resizable. |
RequestChannel | core/src/main/scala/kafka/network/RequestChannel.scala:80 | The queue between network and handler threads, and the response path back. |
Request | server/src/main/java/org/apache/kafka/network/Request.java:54 | The in-flight request object: context, body, timing fields. |
RequestHandlerHelper | core/src/main/scala/kafka/server/RequestHandlerHelper.scala:28 | Throttling + response-sending helpers shared by both dispatchers. |
AuthHelper | core/src/main/scala/kafka/server/AuthHelper.scala:42 | Authorization helpers (single, batch, by-resource-type). |
ApiVersionManager | server/src/main/java/org/apache/kafka/server/ApiVersionManager.java:28 | Decides which APIs/versions this node exposes. |
DefaultApiVersionManager / SimpleApiVersionManager | server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java:33 / SimpleApiVersionManager.java:33 | Broker / controller implementations. |
DelayedProduce | server/src/main/java/org/apache/kafka/server/purgatory/DelayedProduce.java:41 | Parked acks=all produce. |
DelayedFetch | core/src/main/scala/kafka/server/DelayedFetch.scala:43 | Parked long-poll fetch. |
Core concepts & terminology
- API key
- A
shortidentifying the request type (PRODUCE=0, FETCH=1, …). Enumerated inApiKeys. The dispatcher's giantmatchkeys on it. - Request handler thread
- A daemon thread named
data-plane-kafka-request-handler-Nthat loops, pulling requests and running handlers. There arenum.io.threadsof them per node. - RequestChannel
- The bridge: a bounded request queue (network → handlers) plus the per-processor response queues (handlers → network).
- RequestLocal
- A thread-confined scratch area (notably a buffer-supplier cache) handed to the handler; one per request-handler thread, reused across requests on that thread.
- Purgatory
- A timeout-and-key-indexed holding area for operations that cannot complete immediately. See Replication, ISR & High Watermark and The Fetch Path.
- Forwarding
- Brokers do not mutate cluster metadata directly; admin requests are wrapped in an
Envelopeand sent to the active controller via theForwardingManager. - Throttle time
- An integer field in most responses telling the client how long it was (or should consider itself) throttled by quotas. Computed by the quota managers and folded in just before the response is built.
The request-handler thread & the RequestChannel round-trip
The handler loop
Each KafkaRequestHandler is a Runnable. Its run() loop (KafkaRequestHandler.scala:108) is deceptively small:
while (!stopped) {
val req = requestChannel.receiveRequest(300) // blocks up to 300 ms
// … account idle time …
req match {
case _: ShutdownRequest => completeShutdown(); return
case callback: CallbackRequest => threadCurrentRequest.set(callback.originalRequest)
callback.fun().accept(requestLocal) // rescheduled callback
case request: Request => request.requestDequeueTimeNanos(endTime)
threadCurrentRequest.set(request)
apis.handle(request, requestLocal) // the dispatch
case _: WakeupRequest => // handled inside receiveRequest
case null => // poll timed out, loop
}
}
On entry the thread stashes the RequestChannel in a ThreadLocal (threadRequestChannel, KafkaRequestHandler.scala:44); while handling, it stashes the current Request in threadCurrentRequest. These two thread-locals are the mechanism that lets an asynchronous completion callback reschedule itself back onto a request-handler thread (described below). Every branch wraps the work in try/catch: a FatalExitError triggers process exit (Exit.exit), any other Throwable is logged and swallowed so a single bad request never kills the thread. The finally clears threadCurrentRequest and calls request.releaseBuffer().
(network thread)RequestChannelRequest-handler thread
authorize · validate · act
RequestChannel internals
RequestChannel (RequestChannel.scala:80) holds three structures:
requestQueue, anArrayBlockingQueue[BaseRequest]of capacityqueueSize(=queued.max.requests, default 500). The network threadsput; handler threadspoll. Because it is bounded and blocking, a slow handler pool exerts back-pressure all the way to the sockets.processors, aConcurrentHashMap[Int, Processor]keyed by processor id, so a response can be routed back to the exact network thread (processor) that owns the client's connection.request.processorrecords that id.callbackQueue, a secondArrayBlockingQueueforCallbackRequests (rescheduled completions), which jump ahead of normal requests inreceiveRequest(RequestChannel.scala:200) since they have "already waited in line".
The response path is sendResponse(request, response) (RequestChannel.scala:131): it updates per-error metrics, builds a Send via request.buildResponseSend(response), computes the optional JSON request-log node, wraps it in a SendResponse, sets responseCompleteTimeNanos, looks up the originating Processor in processors, and calls processor.enqueueResponse. If the processor was shut down (lookup returns null) the response is silently dropped, the connection is already gone. Beyond SendResponse, the channel models NoOpResponse (e.g. acks=0 produce: nothing to send), CloseConnectionResponse (error on a no-reply request), and the throttling sentinels StartThrottlingResponse/EndThrottlingResponse that mute/unmute a channel without consuming the single real response (RequestChannel.scala:42-77).
Exactly one of SendResponse, NoOpResponse, or CloseConnectionResponse is produced per request; StartThrottlingResponse/EndThrottlingResponse may occur in addition and deliberately skip the response-timing bookkeeping (RequestChannel.scala:174-187).
The thread pool
KafkaRequestHandlerPool (KafkaRequestHandler.scala:227) creates num.io.threads handler threads via KafkaThread.daemon("data-plane-kafka-request-handler-" + id, …). The pool is built from a shared KafkaRequestHandlerPoolFactory so that the broker and controller (when co-located) share one aggregate idle-percent meter (RequestHandlerAvgIdlePercent) while each keeps its own per-pool meter (BrokerRequestHandlerAvgIdlePercent / ControllerRequestHandlerAvgIdlePercent, KafkaRequestHandler.scala:243-251). The pool is dynamically resizable: resizeThreadPool(newSize) (KafkaRequestHandler.scala:283) spins up or stops threads under a monitor, wired to the dynamic config num.io.threads through DynamicThreadPool.
| Config | Default | Effect |
|---|---|---|
num.io.threads | 8 | Size of the request-handler pool per node (ServerConfigs.NUM_IO_THREADS_DEFAULT). Dynamically reconfigurable. |
queued.max.requests | 500 | Capacity of the shared request queue (SocketServerConfigs.QUEUED_MAX_REQUESTS_DEFAULT); the back-pressure point. |
The dispatch contract
Both dispatchers implement ApiRequestHandler.handle. On the broker, KafkaApis.handle (KafkaApis.scala:155) does:
- Version gate. If
apiVersionManager.isApiEnabled(apiKey, apiVersion)is false it throwsIllegalStateException, though in practice the socket layer already rejected such requests before enqueuing, so this is a defensive backstop (KafkaApis.scala:166-170). - The giant match. A
request.header.apiKey matchwith a case per API (KafkaApis.scala:172-254). Each case calls ahandleXxxRequestmethod. A missing key throwsIllegalStateException("No handler …"). - Error funnel. The whole body is wrapped in
try/catch:FatalExitErrorre-throws; any otherThrowablegoes to the localhandleError, which logs and callsrequestHelper.handleError(request, e)to send (or, if no response is expected, to close the connection) (KafkaApis.scala:255-257). - finally. Calls
replicaManager.tryCompleteActions(), draining the deferred action queue so any delayed operations made completable by this request fire promptly, and recordsapiLocalCompleteTimeNanosif still unset (KafkaApis.scala:258-267).
Two handler styles coexist:
- Imperative (
Unit-returning). e.g. produce, fetch, metadata. These send their own response inside callbacks and return nothing. - Future-returning (
CompletableFuture[Unit]). e.g. all the consumer-group, share-group and streams APIs that delegate to theGroupCoordinator(which is asynchronous). The dispatch attaches.exceptionally(handleError)so a failed future is funneled to the same error path (e.g.KafkaApis.scala:177,:230).
A large family of admin APIs is not handled locally at all on the broker, the case body is simply forwardToController(request): CREATE_TOPICS, DELETE_TOPICS, CREATE_PARTITIONS, CREATE_ACLS/DELETE_ACLS, ALTER_CLIENT_QUOTAS, ALTER_USER_SCRAM_CREDENTIALS, ELECT_LEADERS, ALTER_PARTITION_REASSIGNMENTS/LIST_PARTITION_REASSIGNMENTS, UPDATE_FEATURES, UNREGISTER_BROKER, the Raft-voter APIs, and DESCRIBE_QUORUM (KafkaApis.scala:188-237). (The ALTER_CONFIGS family is not bare-forwarded: handleAlterConfigsRequest/handleIncrementalAlterConfigsRequest preprocess some resources locally and forward only the remainder.) These all land in ControllerApis after forwarding.
Walkthrough 1, Produce
handleProduceRequest (KafkaApis.scala:397) is the canonical "act, then maybe park in purgatory" handler.
- Transactional authorize. If the batch carries transactional records, the principal must have
WRITEon theTRANSACTIONAL_ID; otherwise the request is failed withTRANSACTIONAL_ID_AUTHORIZATION_FAILED(KafkaApis.scala:400-407). - Resolve & bucket partitions. Each partition's topic id/name is resolved against the
MetadataCache. Partitions are sorted intounauthorizedTopicResponses,nonExistingTopicResponses,invalidRequestResponses, andauthorizedRequestInfo. Authorization is done in one batch call,authHelper.filterByAuthorized(ctx, WRITE, TOPIC, …), to avoid one authorizer round-trip per partition (KafkaApis.scala:415-451). - Record validation. Each authorized batch is run through
ProduceRequest.validateRecords; anApiExceptionthere maps the partition to the corresponding error (KafkaApis.scala:444-450). - Define the response callback.
sendResponseCallback(responseStatus)merges the per-partition results, attaches current-leader hints (for v10+, onNOT_LEADER_OR_FOLLOWER), computes throttle time, and sends (KafkaApis.scala:458-530). - Act. If anything is left to write, call
replicaManager.handleProduceAppend(timeout, requiredAcks, …, responseCallback = sendResponseCallback, …), then immediatelyproduceRequest.clearPartitionRecords()to drop the request's reference to the record bytes so GC can reclaim them while the operation sits in purgatory (KafkaApis.scala:538-558).
Inside ReplicaManager, appendRecords writes to the leader log, builds a per-partition ProducePartitionStatus, and calls maybeAddDelayedProduce (ReplicaManager.scala:878). If requiredAcks == -1 (acks=all) and the local writes succeeded, it constructs a DelayedProduce and registers it: delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) keyed by each TopicPartitionOperationKey (ReplicaManager.scala:903-911). Otherwise (acks=0 or acks=1) the callback fires synchronously and the response is sent at once.
DelayedProduce mechanics
DelayedProduce (DelayedProduce.java:41) holds a map of ProducePartitionStatus. On construction, partitions that succeeded locally have acksPending=true and their response error is pre-set to REQUEST_TIMED_OUT (cleared only when enough replicas catch up) (DelayedProduce.java:111-121). tryComplete() (DelayedProduce.java:140) asks, for each still-pending partition, a PartitionStatusValidator, a callback that delegates back into ReplicaManager#getPartitionOrError(...).checkEnoughReplicasReachOffset(requiredOffset) (wired in ReplicaManager.scala:891-901), three cases:
- A: replica not assigned to this broker; B: broker no longer leader; C: broker is leader → C.1 a local error, or C.2 enough replicas have reached the required offset.
If any case resolves a partition, its acksPending is cleared and its error set accordingly. When no partition is still pending, forceComplete() fires onComplete(), which assembles the final per-partition response map and invokes the captured responseCallback (= the handler's sendResponseCallback) (DelayedProduce.java:159-196). On timeout, onExpiration() just records the expiration meter; the pre-set REQUEST_TIMED_OUT errors then surface to the client.
Walkthrough 2, Fetch
handleFetchRequest (KafkaApis.scala:564) mirrors produce but for reads, and additionally manages incremental fetch sessions.
- Build a fetch context.
fetchManager.newContext(version, metadata, isFromFollower, fetchData, forgottenTopics, topicNames)resolves the incremental-fetch session (full vs. incremental) and yields aFetchContext(KafkaApis.scala:577-583). See The Fetch Path & Replica Fetchers. - Authorize by caller type. A follower fetch (replicaId ≥ 0) requires
CLUSTER_ACTIONonCLUSTER; a consumer fetch requiresREADon each topic (batched). Unauthorized / unknown partitions go intoerroneous; the rest intointeresting(KafkaApis.scala:587-621). - Define the response callback.
processResponseCallback(responsePartitionData)builds eachFetchResponseData.PartitionData(high watermark, last-stable-offset, log-start, aborted txns, records, preferred read replica, diverging epoch, and current-leader hints for v16+), then either records leader-quota usage (follower path, exempt from client throttle) or applies fetch+request quotas and possibly returns an empty throttled response (consumer path) (KafkaApis.scala:636-731). - Act. If
interestingis non-empty, capfetchMaxBytesagainstfetch.max.bytesand the quota window, buildFetchParams, and callreplicaManager.fetchMessages(params, interesting, quota, responseCallback)(KafkaApis.scala:733-776).
When the leader log has fewer than minBytes immediately available, ReplicaManager creates a DelayedFetch and watches it. DelayedFetch.tryComplete (DelayedFetch.scala:70) re-evaluates eight cases on each trigger: leadership loss (A/B), unknown partition (C), offline dir (D), fenced epoch (E), fetch offset on an older/truncated segment (F), accumulated bytes ≥ minBytes (G), or a newly-discovered diverging epoch requiring truncation (H). On completion it re-reads from the log (readFromLog(..., readFromPurgatory = true)) and invokes the callback (DelayedFetch.scala:158-178).
Follower fetches are exempt from client throttling (sendResponseExemptThrottle) but are recorded against the leader replication quota; consumer fetches are subject to both the per-client fetch-bandwidth quota and the request quota (KafkaApis.scala:686-730).
Walkthrough 3, Metadata (read from the cache)
handleTopicMetadataRequest (KafkaApis.scala:885) is purely read-only and never touches a log or coordinator. It authorizes DESCRIBE on the requested topics, then delegates to the private getTopicMetadata helper (KafkaApis.scala:838-883, invoked at :962), which resolves them through metadataCache.getTopicMetadata(topics, listenerName, …) (KafkaApis.scala:847). For topics the cache does not know, the helper either (a) asks the AutoTopicCreationManager to create them, itself a forward-to-controller path gated by the controller-mutation quota, when auto-creation is allowed, or (b) returns UNKNOWN_TOPIC_OR_PARTITION / INVALID_TOPIC_EXCEPTION (KafkaApis.scala:856-882). The MetadataCache is the broker's read-optimized projection of the KRaft metadata log; see Metadata Propagation & Broker Lifecycle.
Walkthrough 4, Coordinator-delegating APIs
Group, transaction and share-group APIs are thin: authorize, then hand off to a coordinator and let its (asynchronous) result drive the response.
- InitProducerId (
KafkaApis.scala:1621): authorizesWRITEon the transactional id (orIDEMPOTENT_WRITE/WRITE-by-resource-type for idempotent-only producers; plusTWO_PHASE_COMMITwhenenable2Pc), validates the producer-id/epoch pair, then callstxnCoordinator.handleInitProducerId(... sendResponseCallback, requestLocal). The callback down-convertsPRODUCER_FENCEDtoINVALID_PRODUCER_EPOCHfor v<4 clients (KafkaApis.scala:1640-1681). - AddPartitionsToTxn (
KafkaApis.scala:1874): v4+ requests come only from other brokers and requireCLUSTER_ACTION; v<4 come from clients and requireWRITEon the transactional id and topics. Each transaction is dispatched totxnCoordinator.handleAddPartitionsToTransaction(orhandleVerifyPartitionsInTransactionfor verify-only), and a synchronized counter (responses.synchronized) sends the batched response only once all transactions have reported back (KafkaApis.scala:1904-1980). - FindCoordinator (
KafkaApis.scala:1192): resolves the coordinator node for a group/transaction/share key by hashing into the internal topic and reading the partition leader from theMetadataCache. - Offset & group APIs (
OFFSET_COMMIT,OFFSET_FETCH,JOIN_GROUP,SYNC_GROUP,HEARTBEAT,CONSUMER_GROUP_HEARTBEAT, …) all returnCompletableFuture[Unit]from the newGroupCoordinatorinterface; see Group Coordination & Rebalance Protocols.
Transaction-specific handling is covered in Transactions & Exactly-Once Semantics; share-group APIs (the SHARE_* and *_SHARE_GROUP_STATE families dispatched at KafkaApis.scala:238-249) in Share Groups.
ApiVersions & the ApiVersionManager
ApiVersionManager (ApiVersionManager.java:28) is the authority on which API/version combinations this node accepts. Its default isApiEnabled(apiKey, version) is a conjunction of three predicates (ApiVersionManager.java:62-64):
apiKey != null
&& apiKey.inScope(listenerType()) // listener-type scope
&& apiKey.isVersionEnabled(version, enableUnstableLastVersion())
inScope (ApiKeys.java:312) tests whether the API is exposed on this node's listener type (BROKER vs. CONTROLLER); isVersionEnabled (ApiKeys.java:245) tests the version range, with ApiVersions itself accepted at any version so an old client can always negotiate down to v0. The enableUnstableLastVersion flag, backing the config unstable.api.versions.enable, decides whether the newest, not-yet-stable version of each API is advertised.
Two implementations exist:
DefaultApiVersionManager (broker) | SimpleApiVersionManager (controller) | |
|---|---|---|
| Listener type | BROKER | CONTROLLER |
| Features source | metadataCache.features() | controller-supplied featuresProvider |
| Advertised versions | Intersection of broker APIs with the active controller's NodeApiVersions (so a client only sees versions the controller can actually service for forwarded APIs) | A fixed collection computed once from the listener type |
| Forwarding-aware? | Yes, takes () => forwardingManager.controllerApiVersions | No |
| Built at | BrokerServer.scala:267 | ControllerServer.scala:162 |
handleApiVersionsRequest (KafkaApis.scala:1524) is special-cased: it returns the full supported-API list regardless of authentication state, because clients must negotiate versions before they can SASL-authenticate. The handler builds the response via apiVersionManager.apiVersionResponse(throttleMs, alterFeatureLevel0), where alterFeatureLevel0 = (apiVersion < 4). It also validates the request: UNSUPPORTED_VERSION for an unknown request version, INVALID_REQUEST if malformed, and, for v5+ that carry a cluster id / node id (KIP-1242), REBOOTSTRAP_REQUIRED if they don't match this broker (KafkaApis.scala:1531-1551). The response also carries the cluster's finalized and supported features, which the broker uses for feature-gating (DefaultApiVersionManager.java:78-101).
Folding the controller's API versions into the broker's ApiVersionsResponse means a client never attempts a forwardable admin request at a version the controller cannot handle, the broker advertises only the intersection. Forwarding of administrative requests through brokers was introduced by KIP-590; the post-ZooKeeper, KRaft-only world makes the broker a pure proxy for these mutations.
Broker APIs vs. controller APIs (the split)
The two dispatchers are deliberately disjoint in scope. The split is enforced structurally by ApiKeys.inScope(listenerType): a broker listener never even surfaces, say, VOTE or BROKER_HEARTBEAT to KafkaApis, and a controller listener never surfaces PRODUCE.
KafkaApis (broker) | ControllerApis (KRaft controller) | |
|---|---|---|
| Handles | Data plane (produce/fetch/list-offsets), group/txn/share coordination, metadata reads, client telemetry, SASL, plus forwarding of admin writes. | Raft RPCs (VOTE, BEGIN/END_QUORUM_EPOCH, FETCH/FETCH_SNAPSHOT for the metadata log), broker registration/heartbeat, ALTER_PARTITION, and the write side of admin operations (create/delete topics, ACLs, configs, quotas, reassignments, feature updates). |
| Return style | Mixed: Unit + CompletableFuture[Unit] | Uniformly CompletableFuture[Unit] per case (ControllerApis.scala:96-137) |
| Backing engine | ReplicaManager, coordinators, MetadataCache | The Controller (QuorumController) and RaftManager |
| ApiVersionManager | DefaultApiVersionManager | SimpleApiVersionManager |
On the controller, Raft RPCs are answered by handleRaftRequest (ControllerApis.scala:685), which forwards the request straight to raftManager.handleRequest(...) and sends the result exempt from throttling. Admin writes (e.g. handleDeleteTopics, ControllerApis.scala:198) wrap the call to controller.* in a ControllerRequestContext carrying a deadline and a controller-mutation quota recorder, and respond via sendResponseMaybeThrottleWithControllerQuota. See KRaft Consensus and The KRaft Controller.
Envelopes & forwarding
When a broker forwards a request, it wraps the original (with its original principal) in an EnvelopeRequest and sends it to the controller via ForwardingManager.forwardRequest (ForwardingManager.java:46). The controller's handleEnvelopeRequest (ControllerApis.scala:164) requires CLUSTER_ACTION (only brokers may envelope), unwraps it via EnvelopeUtils.handleEnvelopeRequest, and re-dispatches the inner request through its own handle. The inner response is re-wrapped; crucially, if the inner response indicates NOT_CONTROLLER, Request.buildResponseSend rewrites the envelope to carry NOT_CONTROLLER so the broker knows to re-resolve the active controller and retry (Request.java:236-256). A forwarded request is marked by request.envelope().isPresent(); request.isForwarded gates throttling so forwarded requests are not double-throttled (RequestHandlerHelper.scala:87, 97, 105).
Concurrency & threading model
The model is "synchronous handler, asynchronous completion":
- Who runs handlers: the
data-plane-kafka-request-handler-Nthreads. One handler runs start-to-finish on one such thread for its synchronous portion. State internal to a singlehandlecall is therefore single-threaded and lock-free. - RequestLocal: created per thread (
RequestLocal.withThreadConfinedCaching,KafkaRequestHandler.scala:105) and reused; it holds a buffer-supplier cache that must only be touched by its owning thread. - Cross-thread completion: a
DelayedProduce/DelayedFetchmay be completed by a different thread, a replica-fetcher thread advancing the high watermark, the purgatory's expiration-reaper thread, or another request-handler thread processing a follower fetch. ItsonCompleteinvokes the response callback there. - Re-scheduling onto a handler thread: when a callback must run with the originating thread's
RequestLocal(e.g. coordinator append callbacks),KafkaRequestHandler.wrapAsyncCallback(KafkaRequestHandler.scala:65) checks whether it is already on the right request thread; if not, it enqueues aCallbackRequeston the channel'scallbackQueueand pushes aWakeupRequestto nudge a waiting handler. The callback then runs on a handler thread with that thread'sRequestLocal(KafkaRequestHandler.scala:131-160). - Shared mutable state in handlers: deliberately minimal. The
AddPartitionsToTxnhandler is the notable exception, it guards its result accumulator withresponses.synchronizedbecause per-transaction callbacks may fire concurrently (KafkaApis.scala:1905). - Memory visibility on
Request: the timing fields arevolatileprecisely because they are written by request/purgatory threads and read by network threads (Request.java:70-82).
A handler must never perform a long blocking call (disk-bound replication wait, coordinator commit) on the request-handler thread, with only num.io.threads (default 8) threads, a few stuck handlers stall the whole node. The purgatory + callback-rescheduling machinery exists precisely to keep handler threads non-blocking.
Authorization, request context & principal
Every Request carries a RequestContext (Request.java:59) with the authenticated KafkaPrincipal, the client address, listener name, security protocol, and the parsed header. A Session (principal + address) is derived for quota keying (Request.java:100).
Handlers authorize through AuthHelper (AuthHelper.scala:42), which wraps an optional Plugin[Authorizer], when no authorizer is configured, everything is allowed (the authorizer.forall / "None ⇒ all" branches). Three shapes are used:
authorize(ctx, op, resourceType, name), a single allow/deny (AuthHelper.scala:43).filterByAuthorized(ctx, op, type, resources)(nameOf), one batched authorizer call for many resources, returning the allowed names; this is why produce/fetch authorize all partitions in a single round-trip (AuthHelper.scala:100). It deduplicates and counts references per resource name so the audit log sees the true access count.authorizeByResourceType / authorizeClusterOperation, for cluster-wide and idempotent-write checks (AuthHelper.scala:57, 78).
Denials never throw at the partition granularity, they translate to a per-resource error code (TOPIC_AUTHORIZATION_FAILED, GROUP_AUTHORIZATION_FAILED, …) folded into the response, so a partially-authorized request still succeeds for the parts it may touch. Cluster-level denials throw ClusterAuthorizationException, which the error funnel converts to CLUSTER_AUTHORIZATION_FAILED. Full details in Security: Authentication & Authorization.
Throttling integration
Throttling is computed inside the handler (or in RequestHandlerHelper) right before the response is built, and is expressed two ways at once: (1) the integer throttleTimeMs in the response body that tells the client to back off, and (2) actual server-side channel muting for that duration. The quota managers live in QuotaManagers (quotas.produce, quotas.fetch, quotas.request, quotas.controllerMutation, quotas.leader).
The common helpers (RequestHandlerHelper.scala):
| Helper | Behaviour |
|---|---|
sendResponseMaybeThrottle | Record request-quota usage, throttle (mute) if violated and not forwarded, then send (:93). |
sendErrorResponseMaybeThrottle | As above for error responses; also throttles cluster-auth failures even when forwarded (:102). |
sendResponseExemptThrottle | Record as exempt and send without muting, used for follower fetch and Raft RPCs (:143). |
sendForwardedResponse | Take max(controllerThrottle, localRequestThrottle) so forwarding adds the controller's throttle on top (:67). |
sendResponseMaybeThrottleWithControllerQuota | Compare controller-mutation throttle vs. request throttle, mute on whichever is larger (:120). |
throttle | Mute the channel by sending StartThrottlingResponse/EndThrottlingResponse around the delay (:34). |
The produce and fetch handlers compute throttle inline because they juggle two quotas (bandwidth + request) and pick the larger; maxThrottleTimeMs = max(bandwidthThrottleTimeMs, requestThrottleTimeMs) is recorded onto request.apiThrottleTimeMs and used as the muting duration (KafkaApis.scala:491-505 for produce, :700-717 for fetch). Note that for acks=0 the request quota is not enforced (KafkaApis.scala:494-496). Quotas, throttling, and client metrics are detailed in Quotas, Throttling & Client Metrics.
Error mapping
Kafka has a fixed catalog of Errors codes; handlers map exceptions and conditions to them. There are two layers:
- Per-resource: a handler explicitly sets a partition/topic/group error code (e.g.
UNKNOWN_TOPIC_OR_PARTITION,NOT_LEADER_OR_FOLLOWER). Some are version-aware: aKafkaStorageExceptionis down-converted toNOT_LEADER_OR_FOLLOWERfor fetch ≤ v5 (KafkaApis.scala:623-633);PRODUCER_FENCEDbecomesINVALID_PRODUCER_EPOCHfor old InitProducerId/AddPartitionsToTxn clients (KafkaApis.scala:1643-1646,:1955-1958). - Whole-request: an uncaught exception hits the funnel.
RequestHandlerHelper.handleError(RequestHandlerHelper.scala:46) decides betweensendErrorResponseMaybeThrottleandsendErrorResponseExemptThrottle(cluster-action requests are exempt unless they failed withClusterAuthorizationException). Building the error response callsrequest.body(...).getErrorResponse(throttleMs, error); if that returnsnull(the API expects no response, e.g.acks=0produce), the connection is closed viacloseConnectioninstead (RequestHandlerHelper.scala:54-65).
Failure modes, edge cases & recovery
- Handler exception. Caught by the dispatcher and translated to an error response (or connection close); the handler thread survives.
- FatalExitError. Propagated out of the loop; the thread shuts down and the JVM exits, used for truly unrecoverable states.
- Queue full.
requestQueue.putblocks the network thread, throttling intake at the socket, intentional back-pressure rather than dropping requests. - Processor gone. If the owning network thread shut down,
sendResponsefindsnullinprocessorsand drops the response (connection already closed) (RequestChannel.scala:189-194). - acks=0 with errors. No response is expected, so on any error the broker closes the connection to force the client to refresh metadata (
KafkaApis.scala:508-521). - Forwarding to a stale controller. If the controller replies with an unsupported version during forwarding, the broker closes the client connection (it may have failed over) (
KafkaApis.scala:145-150); aNOT_CONTROLLERenvelope reply makes the broker re-resolve the controller. - Delayed-op timeout. A
DelayedProduce/DelayedFetchthat never becomes completable firesonExpirationat its deadline, returning whatever is available (withREQUEST_TIMED_OUTfor produce partitions that never got enough acks). - Buffer lifecycle. Most parsed requests release their network buffer immediately; produce (and any schema with
BYTES/NULLABLE_BYTESfields, flaggedrequiresDelayedAllocation) keeps it until processing finishes, then the loop'sfinallycallsreleaseBuffer()(Request.java:113-115,ApiKeys.java:194,KafkaRequestHandler.scala:175).
Invariants & guarantees
A request is processed by at most one handler thread's synchronous pass; any continuation that needs that thread's RequestLocal is rescheduled back onto a handler thread rather than run on a foreign (purgatory/fetcher) thread.
Per-partition ordering for a producer is preserved because each connection's requests are read in order by one network thread, the request queue is FIFO, and (with idempotence) the log append enforces sequence ordering. The dispatcher itself adds no reordering.
Exactly one terminal response (or connection-close) is emitted per request; throttle muting is layered on top without consuming that response.
Interactions with other subsystems
- Network Layer & Threading, produces the parsed
Request, owns theProcessorthat sends the response, and authenticates the principal inRequestContext. - Replication, ISR & HW,
ReplicaManager.appendRecordsand the produce purgatory. - Fetch Path,
ReplicaManager.fetchMessages, fetch sessions, and the fetch purgatory. - Group Coordination / Transactions / Share Groups, the coordinators these handlers delegate to.
- KRaft Controller & Metadata Propagation, the forwarding target and the source of the
MetadataCache. - Security & Quotas, the authorization and throttling hooks invoked by every handler.
- Wire Protocol & RPC Framework, defines
ApiKeys, request/response schemas, and the version negotiation surfaced byApiVersionsResponse.
Design rationale & evolution
The single shared queue + small fixed thread pool keeps the broker's concurrency model trivially reasonable and bounded, while the purgatory pattern decouples a request's arrival from its completion so long-poll fetches and quorum-acked produces never tie up a scarce I/O thread. The cost is that all latency-sensitive work must be expressible as a delayed operation with a tryComplete predicate.
Major evolutionary points visible in the code: request forwarding through brokers (KIP-590) which makes KafkaApis a proxy for admin mutations; incremental fetch sessions (KIP-227) behind FetchManager.newContext; the new asynchronous GroupCoordinator consumer-group protocol (KIP-848) that turned many group handlers into CompletableFuture-returning methods; share groups / queues (KIP-932) adding the whole SHARE_* handler family; and the cluster-id/node-id bootstrap check in ApiVersions (KIP-1242, KafkaApis.scala:1544). With ZooKeeper removed in 4.0, the broker/controller dispatcher split is now the clean boundary between the data plane (KafkaApis) and the metadata control plane (ControllerApis + the KRaft controller).
Gotchas & operational notes
The version check in KafkaApis.handle (:166) is a defensive backstop, the socket server normally rejects out-of-scope/unsupported APIs and closes the connection before the request reaches the handler. If you ever see that IllegalStateException, the network-layer gate was bypassed (typically only in tests).
acks=0 produce is the one path that closes the client connection on error rather than replying. Spurious connection resets under load on producers configured with acks=0 usually mean broker-side produce errors, not network faults.
Per-request metrics distinguish sub-categories under the same API key: follower vs. consumer fetch (FetchFollower/FetchConsumer), verify-only AddPartitionsToTxn, and the legacy client-metrics resource listing, see the metric-name override switch in Request.updateRequestMetrics (Request.java:339-356). Monitor the right one.
The RequestHandlerAvgIdlePercent meter is the canonical "is my broker CPU/IO-thread saturated?" signal: values near 0 mean the handler pool is fully busy and you may need more num.io.threads (or to relieve the underlying bottleneck). It is computed by discounting each handler's idle time by the thread count so it reads as a true fraction (KafkaRequestHandler.scala:111-123).