06 · Network Layer & Threading Model
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
A Kafka broker is, at its core, a hand-rolled single-reactor-per-thread NIO server bolted onto a bounded hand-off queue and a pool of worker threads. This chapter dissects that machine end to end: the SocketServer with its per-listener Acceptor and Processor threads, the shared common.network.Selector/KafkaChannel reactor that performs size-delimited reads and writes, the RequestChannel that decouples the network threads from the KafkaRequestHandler I/O-thread pool, ConnectionQuotas (per-IP / per-listener / broker caps plus a connection-creation-rate throttle), and the request purgatory, a DelayedOperationPurgatory backed by a hierarchical TimingWheel with sharded watcher lists. It closes with the full request-lifecycle timeline and a census of every broker thread.
Role & responsibilities
The network layer owns the broker's TCP sockets and the thread anatomy that turns bytes on the wire into in-flight Request objects and back into responses. Its responsibilities:
- Accept connections on each configured listener and configure each socket (non-blocking,
TCP_NODELAY,SO_KEEPALIVE), subject to connection quotas (kafka/network/SocketServer.scala:667). - Run the NIO reactor: register channels, drive the TLS/SASL handshake, read complete size-delimited requests, write responses, expire idle connections, one
java.nio.channels.SelectorperProcessor(clients/.../network/Selector.java:445). - Decouple I/O from request handling via the bounded
RequestChannelhand-off queue, so slow request processing back-pressures the network threads rather than dropping data (kafka/network/RequestChannel.scala:90). - Dispatch dequeued requests to
KafkaApison theKafkaRequestHandlerpool (kafka/server/KafkaRequestHandler.scala:108). See Request Processing (KafkaApis). - Park requests that cannot complete immediately (produce awaiting acks, fetch awaiting bytes, …) in a purgatory with an O(1) timer, and re-check them when triggering events occur (
server-common/.../purgatory/DelayedOperationPurgatory.java:122). - Enforce connection quotas and connection-creation-rate limits (
ConnectionQuotas,kafka/network/SocketServer.scala:1276).
Kafka deliberately does not use a servlet container or Netty. It implements a thin reactor (Selector + KafkaChannel + TransportLayer) shared verbatim between clients and the broker, plus a Scala SocketServer that wires N reactor threads to a bounded queue and M worker threads. The same Selector code path drives producers, consumers, replica fetchers and the broker's accept side, which is why the protocol framing lives in clients/ and not core/.
Where it lives in the code
| Concern | Principal class | File |
|---|---|---|
| Top-level socket server / lifecycle | SocketServer | core/src/main/scala/kafka/network/SocketServer.scala:75 |
| Accept thread (per listener) | Acceptor / DataPlaneAcceptor | core/src/main/scala/kafka/network/SocketServer.scala:462, :364 |
| NIO reactor thread | Processor | core/src/main/scala/kafka/network/SocketServer.scala:801 |
| Bounded hand-off queue | RequestChannel | core/src/main/scala/kafka/network/RequestChannel.scala:80 |
| In-flight request (timing fields) | Request | server/src/main/java/org/apache/kafka/network/Request.java:54 |
| I/O worker thread + pool | KafkaRequestHandler / ...Pool | core/src/main/scala/kafka/server/KafkaRequestHandler.scala:91, :227 |
| NIO multiplexer (reactor core) | Selector | clients/src/main/java/org/apache/kafka/common/network/Selector.java:88 |
| Per-connection state machine | KafkaChannel | clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java:67 |
| Plaintext / TLS byte transport | PlaintextTransportLayer / SslTransportLayer | clients/.../network/PlaintextTransportLayer.java:32 |
| Size-delimited receive | NetworkReceive | clients/.../network/NetworkReceive.java:32 |
| Outbound send wrapper | NetworkSend | clients/.../network/NetworkSend.java:21 |
| Connection quotas | ConnectionQuotas | core/src/main/scala/kafka/network/SocketServer.scala:1276 |
| Delayed-operation purgatory | DelayedOperationPurgatory | server-common/.../purgatory/DelayedOperationPurgatory.java:38 |
| Hierarchical timing wheel | TimingWheel / SystemTimer | server-common/.../util/timer/TimingWheel.java:97, SystemTimer.java:30 |
| Socket-server config keys | SocketServerConfigs | server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:43 |
Core concepts & terminology
- Listener / endpoint
- A
(name, securityProtocol, host, port)tuple fromlisteners. Each gets its ownAcceptor+ pool ofProcessors and its own metric tags (SocketServer.scala:217). - Acceptor thread
- One non-daemon thread per listener that owns the
ServerSocketChannel, accepts new sockets, and round-robins each to aProcessor(SocketServer.scala:462). - Processor (network) thread
- One of
num.network.threadsthreads per listener; runs a privateSelectorreactor loop. Reads requests, enqueues them, writes responses (SocketServer.scala:893). - Request handler (I/O) thread
- One of
num.io.threadsdaemon threads pulling fromRequestChanneland callingKafkaApis.handle(KafkaRequestHandler.scala:108). - Connection id
- String
localAddr:localPort-remoteAddr:remotePort-index; the trailing monotonically-incrementingindexprevents id reuse while old requests are still in flight (SocketServer.scala:888,:1202). - Muting
- A channel is "muted" (read interest dropped) the instant a request is enqueued, and un-muted only after its response is sent. This is how single-request-per-connection ordering is guaranteed across many handler threads (
SocketServer.scala:1044). - Purgatory
- A holding area for operations that block until a condition or timeout. Backed by a hierarchical timing wheel for O(1) insert/cancel (
DelayedOperationPurgatory.java). - Privileged listener
- The inter-broker listener (or, historically, a control-plane listener). A flag on the
RequestContextthatKafkaApisuses to decide whether forwarding/envelope handling is permitted (SocketServer.scala:224,:1027).
Earlier Kafka exposed two "request planes": a data plane and an optional control plane listener (KIP-291). In this KRaft-only build the control-plane listener is gone, SocketServer models only dataPlaneAcceptors (SocketServer.scala:103). Controller traffic is isolated instead by the process boundary: the controller runs its own SocketServer over controller.listener.names with ListenerType.CONTROLLER (SocketServer.scala:150) and its own request-handler pool named "controller" (ControllerServer.scala:300). The string "control plane" survives only in a doc comment about request forwarding (SocketServer.scala:796).
Architecture & control/data flow
Construction and start-up ordering
The SocketServer constructor only opens the ports and builds the data structures; it does not start any thread (SocketServer.scala:146). Acceptors and their processors are created eagerly per endpoint (createDataPlaneAcceptorAndProcessors, :217), but thread start is deferred to enableRequestProcessing (:177), which chains each acceptor's start to a per-endpoint authorizer future so that no listener accepts traffic until its authorizer is initialized. Each acceptor exposes a startedFuture; the aggregate future returned lets the broker know when all listeners are live (:212).
Detailed mechanics
The Acceptor loop
The acceptor registers OP_ACCEPT and loops while shouldRun (an AtomicBoolean) is true (SocketServer.scala:576). Each iteration calls nioSelector.select(500) then drains ready keys. For each acceptable key it calls accept(), which:
- Accepts the
SocketChanneland callsconnectionQuotas.inc(...), this may block the acceptor when a connection slot is unavailable or throw on quota violation (:672). - Configures the socket:
configureBlocking(false),setTcpNoDelay(true),setKeepAlive(true), optionalSO_SNDBUF(:693). - Assigns the channel to the next processor by round-robin over
currentProcessorIndex. If a processor'snewConnectionsqueue is full it tries the next; if all are full it blocks on the last one (mayBlock=true) so no accepted socket is dropped (:638).
On TooManyConnectionsException the socket is closed; on ConnectionThrottledException the socket is parked in a per-acceptor throttledSockets priority queue and closed later by closeThrottledConnections after the throttle delay (:680, :704). The acceptor catches all Throwables (except ControlThrowable) to avoid dying on a single bad connection (:585).
The Acceptor → Processor hand-off never silently drops an accepted connection. assignNewConnection retries across all processors and, on the final attempt, performs a blocking newConnections.put (SocketServer.scala:1143). The cost is that a saturated reactor can stall the accept loop, visible as the AcceptorBlockedPercent meter (:504).
The Processor reactor loop
Each Processor owns a common.network.Selector built from a server-side ChannelBuilder (SocketServer.scala:853). Its run() performs a fixed pipeline every iteration (:893):
configureNewConnections() // register queued sockets with Selector (≤ 20/iter)
processNewResponses() // pull responseQueue, hand Sends to Selector, mute/unmute
poll() // selector.poll(timeout): NIO select + read/write
processCompletedReceives() // parse header, build Request, enqueue + MUTE channel
processCompletedSends() // drop inflightResponses entry, unmute channel
processDisconnected() // decrement quotas, fire disconnect listeners
closeExcessConnections() // if broker max.connections exceeded, close 1 LRU
The poll timeout is 0 when there are queued new connections, otherwise 300ms (:996). Newly registered connections are capped at ConnectionQueueSize = 20 per iteration so that existing-channel traffic and close notifications stay responsive (:777, :1167).
Reading a request: size-delimited framing
Inside Selector.poll, ready/readable channels are handled by pollSelectionKeys → attemptRead → channel.read() (Selector.java:677). KafkaChannel.read() lazily creates a NetworkReceive and delegates to receive.readFrom(transportLayer) (KafkaChannel.java:407). A NetworkReceive is a 4-byte big-endian size N followed by N payload bytes (NetworkReceive.java:32):
socket.request.max.bytes with InvalidReceiveException (NetworkReceive.java:92-95).Crucially the payload buffer is allocated from a MemoryPool (memoryPool.tryAllocate, NetworkReceive.java:103). If the pool is exhausted the allocation returns null and the channel mutes itself until memory is available (KafkaChannel.java:414; Selector.attemptRead sets outOfMemory=true, Selector.java:691). This is the back-pressure mechanism for queued.max.request.bytes (the pool size). Selector.poll re-checks memoryPool and un-mutes channels once pressure clears (:457), and shuffles the key-handling order under low memory to avoid read starvation (determineHandlingOrder, :665).
Selector adds at most one entry to completedReceives for any channel in a single poll() (Selector.java:434-437, enforced by hasCompletedReceive guarding attemptRead at :580). Combined with muting the channel on enqueue, this guarantees the broker processes requests from one connection strictly in send order even though they fan out to many handler threads.
From bytes to a Request
Processor.processCompletedReceives parses the header (parseRequestHeader validates the API key/version against the ApiVersionManager, SocketServer.scala:779), checks for SASL re-authentication and session expiry, then builds a RequestContext and a Request (:1027, :1031). It intercepts ApiVersionsRequest here to record the client software name/version on the channel (KIP-511, :1035). It then calls requestChannel.sendRequest(req) and immediately selector.mute(connectionId) (:1043). The receive's completedReceives map is cleared right after, returning buffers promptly (:1062).
Writing a response
Handlers never touch sockets. They call RequestChannel.sendResponse, which routes the Response to the originating Processor's responseQueue via processor.enqueueResponse and wakes its selector (RequestChannel.scala:189, SocketServer.scala:1208). On its next loop the processor's processNewResponses pattern-matches the response type (SocketServer.scala:937):
| Response type | Processor action |
|---|---|
SendResponse | selector.send(NetworkSend); record in inflightResponses (:978) |
NoOpResponse | no bytes sent (e.g. produce acks=0); just unmute to read pipelined requests (:943) |
CloseConnectionResponse | actively close the channel (used on errors with no client response) (:956) |
StartThrottlingResponse / EndThrottlingResponse | drive the channel mute state machine for quota throttling (:960) |
The send itself is a NetworkSend wrapping the connection id and the serialized payload (NetworkSend.java:21). The Selector registers OP_WRITE (KafkaChannel.setSend, :389), drains it across one or more polls, and on completion populates completedSends; processCompletedSends then removes the inflight entry and unmutes the channel (SocketServer.scala:1065).
The RequestChannel hand-off
RequestChannel holds a bounded requestQueue of size queued.max.requests (default 500), a same-sized callbackQueue, and a ConcurrentHashMap of processor id → Processor (each owning its own response deque) (RequestChannel.scala:90). sendRequest does a blocking put, this is the back-pressure point: when handlers fall behind, network threads block here and stop reading sockets (:117).
receiveRequest(timeout) services the callbackQueue first (those requests already waited once), then polls the request queue. A special WakeupRequest sentinel placed on the request queue lets a callback enqueued from a foreign thread wake a blocked handler so it checks the callback queue (:200, :235). ShutdownRequest.INSTANCE is the poison pill that tells a handler to exit (:233).
The KafkaRequestHandler pool
A KafkaRequestHandler is a Runnable on a daemon thread named data-plane-kafka-request-handler-N (KafkaRequestHandler.scala:275). Its loop blocks on receiveRequest(300), measures idle time into two meters (a per-pool meter and a shared aggregate meter, the denominators differ so combined-mode controllers report correctly, cf. KIP-1207), then dispatches by type (:108):
Request→ setrequestDequeueTimeNanos, stash it in aThreadLocal, callapis.handle(request, requestLocal), then infinallyrelease the request buffer (:162).CallbackRequest→ re-schedule a previously-deferred completion on a handler thread, carefully (re)computing callback timing so multi-callback requests aggregate correctly (:131).ShutdownRequest→ close the thread-localRequestLocaland exit (:126).
A FatalExitError from a handler triggers Exit.exit, the broker treats it as unrecoverable (:150). The pool is built by KafkaRequestHandlerPoolFactory so that broker and controller pools share one aggregate-threads counter; resizeThreadPool supports the dynamic num.io.threads change (:283).
Splitting network threads (reactor I/O) from I/O / request-handler threads (request processing, which may touch disk) lets each be sized and back-pressured independently: a small fixed reactor count multiplexes thousands of sockets, while a larger handler pool absorbs blocking work. The bounded RequestChannel converts handler slowness into TCP back-pressure rather than unbounded memory growth. Isolating controller traffic on its own pool descends from KIP-291 (originally a control-plane listener; now a separate controller process/pool in KRaft).
Re-scheduling callbacks onto handler threads
Some APIs need to finish work on a request thread after an async action (e.g. a transaction-coordinator verification) completes on a foreign thread. KafkaRequestHandler.wrapAsyncCallback captures the current RequestChannel and Request from thread-locals; if the callback later fires on the same handler thread it runs inline, otherwise it is posted via requestChannel.sendCallbackRequest, which enqueues to the callback queue and drops a WakeupRequest on the main queue (KafkaRequestHandler.scala:65). This keeps RequestLocal thread-confinement intact. See Transactions & EOS.
The request purgatory & hierarchical timing wheel
Many requests cannot be answered synchronously: a produce with acks=all waits for ISR replication; a fetch with fetch.min.bytes waits for data or fetch.max.wait.ms. These become DelayedOperations parked in a DelayedOperationPurgatory. The broker instantiates several, all keyed off ReplicaManager (ReplicaManager.scala:184):
| Purgatory name | Operation | Purge interval source |
|---|---|---|
Produce | DelayedProduce | producer.purgatory.purge.interval.requests |
Fetch | DelayedFetch | fetch.purgatory.purge.interval.requests |
DeleteRecords | DelayedDeleteRecords | delete.records.purgatory.purge.interval.requests |
RemoteFetch | DelayedRemoteFetch | 0 (purge each cycle to free large buffers, ReplicaManager.scala:196) |
RemoteListOffsets | DelayedRemoteListOffsets | default 1000 |
ShareFetch | DelayedShareFetch | share.fetch.purgatory.purge.interval.requests |
The transaction subsystem adds a DelayedFuturePurgatory (e.g. for async ACL/quota completion in AclApis). The new (KIP-848) group coordinator is not purgatory-based, it runs an event/timer-loop in its own threads rather than parking DelayedOperations, so there are no DelayedOperationPurgatory instances under group-coordinator/. See Replication, ISR & HWM, Fetch Path, and Group Coordination.
DelayedOperation contract
A DelayedOperation extends TimerTask and carries a completed flag plus a protected ReentrantLock (DelayedOperation.java:42). Subclasses implement tryComplete(), onComplete(), and onExpiration(). forceComplete() is the single chokepoint: it double-checks completed under the lock, cancels the timer task, and calls onComplete() exactly once, concurrent callers race, but only the winner returns true (:60). When the timer fires, run() calls forceComplete() then onExpiration() (:146).
tryCompleteElseWatch
The core enqueue path is tryCompleteElseWatch(op, watchKeys) (DelayedOperationPurgatory.java:122). The algorithm, with its careful locking:
- Under the operation's lock (
safeTryCompleteOrElse), calltryComplete(). If it succeeds, done, returntrue. - Otherwise add the operation to the watcher list for each key (a key can be e.g. a
TopicPartitionOperationKey), then calltryComplete()once more. Holding the lock across "add to watchlist + final tryComplete" guarantees the op cannot miss a concurrent triggering event after it is watched (:155). - If still not complete, add it to the timeout timer (
timeoutTimer.add(op)). If it completed in the race, cancel the timer task (:167).
The source carries a long comment dissecting a potential deadlock between tryCompleteElseWatch and checkAndComplete, and recommends that checkAndComplete be called without holding any external exclusive lock (:133-154).
Watcher lists, sharding, and purge
Watchers are sharded into SHARDS = 512 WatcherLists; a key maps to a shard by abs(key.hashCode() % 512) to reduce lock contention (DelayedOperationPurgatory.java:41, :105). Each shard has a ConcurrentHashMap<DelayedOperationKey, Watchers> guarded by a ReentrantLock; each Watchers holds a ConcurrentLinkedQueue of operations (:292, :311). When a triggering event happens, checkAndComplete(key) looks up the watchers and runs tryCompleteWatched, removing completed entries as it walks (:184, :333).
Because a completed op may linger on watcher lists for keys it never reached, the purgatory tracks estimatedTotalOperations and, in advanceClock, purges all shards when estimatedTotalOperations - numDelayed() > purgeInterval (:386). This is the GC for orphaned watcher entries that KAFKA-1989 was designed to make cheap.
The timing wheel
The timeout side is a SystemTimer wrapping a TimingWheel (SystemTimer.java:30). A timing wheel is a circular array of wheelSize buckets, each a TimerTaskList (a doubly-linked list with a dummy root), spanning tickMs each. The broker's SystemTimer default is tickMs=1, wheelSize=20 → a 20 ms span per level (SystemTimer.java:45). Overflows spill into a lazily-created overflow wheel whose tick is the parent's full interval, giving coarser resolution as you go up, hence "hierarchical" (TimingWheel.java:131, :174).
AtomicInteger counter); insert may recurse O(m) over m wheel levels (TimingWheel.java:147).Buckets are driven by a single DelayQueue<TimerTaskList> shared across all wheel levels; a bucket enqueues itself when its expiration is (re)set (TimingWheel.java:164). SystemTimer.advanceClock(timeoutMs) polls that delay queue; on a due bucket it advances the wheel clock and flushes the bucket, re-inserting each entry, which either lands in a finer bucket or, being now expired, is submitted to the executor (SystemTimer.java:89, addTimerTaskEntry at :76). Expired tasks run on a single-thread executor named executor-<purgatoryName> (SystemTimer.java:54). The actual ticking is performed by the purgatory's own background reaper.
A DelayQueue/priority-queue timer is O(log n) per insert and delete, and (in old Kafka) leaked completed operations until a sweep ran, risking OOM. The hierarchical timing wheel makes insert O(1) amortized and cancel O(1) exactly, decoupling timer cost from the number of in-flight delayed operations, most of which complete before they ever expire (KAFKA-1989).
Connection quotas
ConnectionQuotas enforces three count limits and a creation-rate limit, all guarded by synchronizing on a single counts mutable map (SocketServer.scala:1276):
- Per-IP cap,
max.connections.per.ip(defaultInteger.MAX_VALUE), overridable per host viamax.connections.per.ip.overrides. Exceeding it throwsTooManyConnectionsExceptionand the socket is closed (:1306). - Per-listener cap,
listener.name.X.max.connections, tracked inlistenerCounts(:1490). - Broker-wide cap,
max.connections. The inter-broker listener is "protected": it may exceed the broker cap, and an LRU connection on another listener is closed to make room (maxConnectionsExceeded+closeExcessConnections,:1474,SocketServer.scala:1110). - Creation-rate,
max.connection.creation.rate(broker-wide and per-listener), and per-IP rates via dynamicip-connection-ratequotas. Exceeding the rate yields a throttle delay (ConnectionThrottledException) computed from the quota window (recordConnectionAndGetThrottleTimeMs,:1501).
When a slot is unavailable, waitForConnectionSlot blocks the acceptor on counts.wait(...) and is woken by counts.notifyAll() on every dec, listener add/remove, or dynamic limit increase (:1453, :1425). Throttle time spent blocked is folded into the AcceptorBlockedPercent meter.
connectionQuotas.inc runs on the acceptor thread and can block it for the full throttle window. Because there is exactly one acceptor per listener, an aggressive client hammering a single IP can stall accepts for that whole listener (not just that IP). Rate-limit overrides per IP exist precisely to bound this, and the inter-broker listener is explicitly protected so cluster traffic is never starved by client churn.
Concurrency & threading
The state-ownership model is strict and is what makes this lock-light design correct:
| State | Owner / guard | Notes |
|---|---|---|
Selector.channels, completedReceives, all reactor state | Its single Processor thread, no lock | Selector is documented "not thread safe" (Selector.java:86) |
newConnections (Acceptor → Processor) | ArrayBlockingQueue(20) | cross-thread hand-off (SocketServer.scala:831) |
responseQueue (handlers → Processor) | LinkedBlockingDeque | plus selector.wakeup() (:833, :1208) |
requestQueue / callbackQueue | ArrayBlockingQueue(queued.max.requests) | many writers (processors) / many readers (handlers) |
Request timing fields | volatile longs | written by handler/purgatory threads, read by network thread (Request.java:70) |
processors map in RequestChannel | ConcurrentHashMap | resized on dynamic thread changes |
ConnectionQuotas.counts | synchronized(counts) + wait/notifyAll | plus @volatile limit fields (:1278) |
Purgatory completed flag | volatile + per-op ReentrantLock | (DelayedOperation.java:40, :42) |
| Watcher shard map | per-shard ReentrantLock (512 shards) | (DelayedOperationPurgatory.java:295) |
SystemTimer wheel mutation | ReentrantReadWriteLock | read lock on add, write lock while ticking (SystemTimer.java:40) |
TimerTaskList linkage | nested synchronized + AtomicInteger counter | (TimerTaskList.java:72) |
Broker thread census
The principal threads on a running broker (and their naming) relevant to this subsystem:
| Thread name pattern | Count | Daemon? | Role | Source |
|---|---|---|---|---|
data-plane-kafka-socket-acceptor-<listener>-<proto>-<port> | 1 per listener | non-daemon | accept + round-robin | SocketServer.scala:510 |
data-plane-kafka-network-thread-<node>-<listener>-<proto>-<id> | num.network.threads per listener | non-daemon | NIO reactor | SocketServer.scala:750, :829 |
data-plane-kafka-request-handler-<id> | num.io.threads | daemon | KafkaApis dispatch | KafkaRequestHandler.scala:275 |
ExpirationReaper-<brokerId>-<Purgatory> | 1 per purgatory | non-daemon | ticks the timer (advanceClock(200)) | DelayedOperationPurgatory.java:409 |
executor-<Purgatory> | 1 per SystemTimer | non-daemon | runs expired TimerTasks | SystemTimer.java:31, :54 |
(A KRaft broker also runs replica-fetcher threads, log cleaner/flusher threads, the Raft I/O thread, controller-side network/handler/reaper threads when in combined mode, and various scheduler/metadata threads, those belong to the fetch path, storage management, KRaft consensus and the controller.) Note the reaper and timer-executor threads are non-daemon (ShutdownableThread sets daemon=false, ShutdownableThread.java:56); they are stopped explicitly on shutdown via a no-op task that wakes the blocked DelayQueue.poll (DelayedOperationPurgatory.java:278).
The request lifecycle timeline
Request records five lifecycle timestamps (all in nanoseconds) that decompose end-to-end latency. They are populated at well-defined handoff points and rendered into the per-API request metrics by updateRequestMetrics (Request.java:325):
Each timestamp is written by a different actor: startTimeNanos by the network thread (Request.java:89); requestDequeueTimeNanos by the handler (KafkaRequestHandler.scala:164); apiLocalCompleteTimeNanos and responseCompleteTimeNanos by RequestChannel.sendResponse when the response is produced (RequestChannel.scala:179); responseDequeueTimeNanos by the processor's dequeueResponse (SocketServer.scala:1213). The network-thread time accumulated on the KafkaChannel (auth, read, write) is read and reset when the send completes and fed into the client's request-quota accounting (SocketServer.scala:1086, KafkaChannel.java:454). See Quotas & Throttling.
Configuration reference
All keys below are defined in SocketServerConfigs.java unless noted; num.io.threads/background.threads live in ServerConfigs.
| Key | Default | Effect |
|---|---|---|
num.network.threads | 3 | Processors (reactor threads) per listener; dynamically reconfigurable, bounded to [½×, 2×] current (SocketServerConfigs.java:151, SocketServer.scala:413) |
num.io.threads | 8 | Request-handler threads; processing + disk I/O (ServerConfigs.java:50) |
queued.max.requests | 500 | Size of the RequestChannel request queue; full queue blocks network threads (:143) |
queued.max.request.bytes | -1 (off) | If >0, sizes the SimpleMemoryPool; pool exhaustion mutes channels and stops reads (:147, SocketServer.scala:101) |
socket.request.max.bytes | 104857600 (100 MiB) | Max single receive size; NetworkReceive rejects larger frames (:95) |
socket.send.buffer.bytes | 102400 (100 KiB) | SO_SNDBUF; -1 = OS default (:87) |
socket.receive.buffer.bytes | 102400 (100 KiB) | SO_RCVBUF; -1 = OS default (:91) |
socket.listen.backlog.size | 50 | Server-socket accept backlog (:99) |
connections.max.idle.ms | 600000 (10 min) | Idle connections closed by the Selector's LRU idle-expiry manager (:134, Selector.java:1427) |
connection.failed.authentication.delay.ms | 100 | Delay before closing a connection that failed auth (slows brute-force) (:138, Selector.java:1389) |
max.connections.per.ip | 2147483647 | Per-IP connection cap (:110) |
max.connections.per.ip.overrides | "" (none) | Per-host overrides, e.g. 10.0.0.1:200 (:105) |
max.connections | 2147483647 | Broker-wide cap; inter-broker listener is exempt (closes LRU elsewhere) (:115) |
max.connection.creation.rate | 2147483647 | Conn-creation rate cap (broker & listener); exceeding throttles new accepts (:126) |
producer.purgatory.purge.interval.requests | 1000 | Watcher-list purge threshold for the Produce purgatory (ReplicationConfigs.java:111; wired at ReplicaManager.scala:186) |
fetch.purgatory.purge.interval.requests | 1000 | Same, for the Fetch purgatory (ReplicaManager.scala:190) |
The reconfigurable set is SocketServer.RECONFIGURABLE_CONFIGS; reconfigure live-applies max.connections* and the creation rate, while per-listener num.network.threads resizes the processor pool (SocketServer.scala:323, :438). See Metadata Propagation & Broker Lifecycle for how dynamic config reaches here.
Failure modes, edge cases & recovery
- Bad frame / disabled API.
parseRequestHeaderthrowsInvalidRequestException/UnsupportedVersionException; the processor closes just that channel viaprocessChannelExceptionand keeps serving others (SocketServer.scala:929,:779). - Oversize receive.
receiveSize > socket.request.max.bytes→InvalidReceiveException, channel closed (NetworkReceive.java:94). - Memory pressure. Pool-exhausted reads mute the channel; the broker recovers and un-mutes once memory frees, shuffling key order to avoid starving any one channel (
Selector.java:457,:668). - Idle / expired connections. The
IdleExpiryManagercloses the least-recently-active connection pastconnections.max.idle.mson each poll (Selector.java:795). - SASL session expiry / re-auth. A receive arriving on an expired session closes the channel and increments
expired-connections-killed-count(SocketServer.scala:1019). - acks=0 produce. The handler returns a
NoOpResponse(orCloseConnectionResponseon error); no bytes are written but the channel is still unmuted to read pipelined requests (SocketServer.scala:943,RequestChannel.scala:121). - Response for a dead connection. If the channel vanished,
sendResponselogs a warning and updates metrics with zero network time rather than throwing (SocketServer.scala:982). - Processor shutdown with queued responses.
RequestChannel.sendResponsetolerates anullprocessor (shut down) and silently drops the response, the socket is already closed (RequestChannel.scala:191). - Graceful close with outstanding receives. The
Selectorkeeps a remotely-closed channel inclosingChannelsuntil its buffered requests are processed (soacks=0records aren't lost), then closes it (Selector.java:929,:849). - Connection storm. Per-IP/listener/broker caps and rate limits throttle or reject; the inter-broker listener is protected so replication survives (
SocketServer.scala:1474). - Purgatory deadlock avoidance. Operation locks are held across watch+recheck; callers are advised not to call
checkAndCompleteunder an external exclusive lock (DelayedOperationPurgatory.java:133).
Invariants & guarantees
- Per-connection ordering. One completed receive per channel per poll + mute-until-response ⇒ requests on a single TCP connection are processed in send order despite multi-threaded handling (
Selector.java:434,SocketServer.scala:1044). - No accepted connection is dropped on hand-off. The acceptor blocks rather than discard (
SocketServer.scala:1143). - Exactly-once completion. A
DelayedOperation'sonComplete()runs exactly once, racing callers notwithstanding (DelayedOperation.java:60). - O(1) timer insert/cancel. Cancel just unlinks an entry; insert is O(1) within a wheel level, O(m) across overflow levels (
TimingWheel.java:147,TimerTaskList.java:98). - Connection-id uniqueness during overlap. The incrementing index prevents reuse while old in-flight requests reference the id (
SocketServer.scala:888). - Bounded request memory. The request queue is bounded and (optionally) the receive memory pool is bounded; together they cap how much un-processed request data the broker holds.
Interactions with other subsystems
- Request Processing (KafkaApis), the handler pool's
apis.handletarget;ApiRequestHandleris defined alongside the handler (KafkaRequestHandler.scala:37). - Wire Protocol & RPC Framework,
RequestHeader,RequestContext,NetworkReceiveframing, and theApiVersionManagergate used inparseRequestHeader. - Security, the
ChannelBuilder/TransportLayer/Authenticatortrio runs insideSelector.poll; SASL re-auth is intercepted inprocessCompletedReceives. - Quotas & Throttling, throttle responses drive the channel mute state machine; network-thread time feeds request-quota accounting;
ConnectionQuotasrate-limits accepts. - Replication, Fetch Path, Group Coordination, Transactions, Tiered Storage, owners of the various purgatories.
- KRaft Controller & Metadata Propagation, the controller runs an analogous
SocketServer/handler pool over its own listeners.
Design rationale & evolution
- KAFKA-1989 Replaced the O(log n)
DelayQueuepurgatory with hierarchical timing wheels for O(1) insert/cancel and bounded purge cost, the design captured inTimingWheel/SystemTimer. - KIP-291 Separated controller from data-plane traffic (originally via a control-plane listener). In KRaft 4.x the listener is removed; isolation is achieved by the controller's own process/
SocketServer/handler pool. - KIP-511 ApiVersions carries client software name/version; the broker harvests it in the processor for the per-client connection metrics (
SocketServer.scala:1035). - KIP-1207 Motivates the dual per-pool vs aggregate idle meters in the handler pool so combined-mode
RequestHandlerAvgIdlePercentis not skewed (KafkaRequestHandler.scala:111). - The receive
MemoryPool(queued.max.request.bytes) lets operators cap request-side memory independently of the queue depth, a defense against many large in-flight requests (SocketServer.scala:101).
Gotchas & operational notes
num.network.threads is per listener, not global (SocketServerConfigs.java:153). A broker with three listeners and the default 3 runs nine reactor threads. Dynamic decreases are clamped: you cannot drop below half (or above double) the current value in one step (SocketServer.scala:421).
- High RemoteTimeMs is normal on Fetch (consumers long-poll in purgatory) but on Produce signals slow ISR; use the per-API histograms emitted by
updateRequestMetrics. NetworkProcessorAvgIdlePercentnear 0 means reactor saturation, accepts may stall (watchAcceptorBlockedPercent); scalenum.network.threads.RequestHandlerAvgIdlePercentnear 0 means handler saturation, the request queue will fill and back-pressure the network; scalenum.io.threads(SocketServer.scala:120,KafkaRequestHandler.scala:253).queued.max.request.bytesdefaults off (-1). Without it the only request-memory bound is queue depth × request size; on memory-tight brokers consider enabling it.- The
RemoteFetchpurgatory uses purgeInterval 0 on purpose, to release potentially-50 MiBRemoteLogReadResultbuffers immediately for GC (ReplicaManager.scala:196). - Failed-auth close delay must be < idle timeout (
connection.failed.authentication.delay.ms<connections.max.idle.ms) or the connection may be reaped before the delayed close fires (SocketServerConfigs.java:140). - Acceptor and reaper threads are non-daemon; handler threads are daemon. Clean shutdown depends on the explicit poison-pill/no-op mechanisms, not on JVM exit.