krivaltsevich.com Kafka Internals4.4

06 · Network Layer & Threading Model

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

A Kafka broker is, at its core, a hand-rolled single-reactor-per-thread NIO server bolted onto a bounded hand-off queue and a pool of worker threads. This chapter dissects that machine end to end: the SocketServer with its per-listener Acceptor and Processor threads, the shared common.network.Selector/KafkaChannel reactor that performs size-delimited reads and writes, the RequestChannel that decouples the network threads from the KafkaRequestHandler I/O-thread pool, ConnectionQuotas (per-IP / per-listener / broker caps plus a connection-creation-rate throttle), and the request purgatory, a DelayedOperationPurgatory backed by a hierarchical TimingWheel with sharded watcher lists. It closes with the full request-lifecycle timeline and a census of every broker thread.

Role & responsibilities

The network layer owns the broker's TCP sockets and the thread anatomy that turns bytes on the wire into in-flight Request objects and back into responses. Its responsibilities:

  • Accept connections on each configured listener and configure each socket (non-blocking, TCP_NODELAY, SO_KEEPALIVE), subject to connection quotas (kafka/network/SocketServer.scala:667).
  • Run the NIO reactor: register channels, drive the TLS/SASL handshake, read complete size-delimited requests, write responses, expire idle connections, one java.nio.channels.Selector per Processor (clients/.../network/Selector.java:445).
  • Decouple I/O from request handling via the bounded RequestChannel hand-off queue, so slow request processing back-pressures the network threads rather than dropping data (kafka/network/RequestChannel.scala:90).
  • Dispatch dequeued requests to KafkaApis on the KafkaRequestHandler pool (kafka/server/KafkaRequestHandler.scala:108). See Request Processing (KafkaApis).
  • Park requests that cannot complete immediately (produce awaiting acks, fetch awaiting bytes, …) in a purgatory with an O(1) timer, and re-check them when triggering events occur (server-common/.../purgatory/DelayedOperationPurgatory.java:122).
  • Enforce connection quotas and connection-creation-rate limits (ConnectionQuotas, kafka/network/SocketServer.scala:1276).
Key idea

Kafka deliberately does not use a servlet container or Netty. It implements a thin reactor (Selector + KafkaChannel + TransportLayer) shared verbatim between clients and the broker, plus a Scala SocketServer that wires N reactor threads to a bounded queue and M worker threads. The same Selector code path drives producers, consumers, replica fetchers and the broker's accept side, which is why the protocol framing lives in clients/ and not core/.

Where it lives in the code

ConcernPrincipal classFile
Top-level socket server / lifecycleSocketServercore/src/main/scala/kafka/network/SocketServer.scala:75
Accept thread (per listener)Acceptor / DataPlaneAcceptorcore/src/main/scala/kafka/network/SocketServer.scala:462, :364
NIO reactor threadProcessorcore/src/main/scala/kafka/network/SocketServer.scala:801
Bounded hand-off queueRequestChannelcore/src/main/scala/kafka/network/RequestChannel.scala:80
In-flight request (timing fields)Requestserver/src/main/java/org/apache/kafka/network/Request.java:54
I/O worker thread + poolKafkaRequestHandler / ...Poolcore/src/main/scala/kafka/server/KafkaRequestHandler.scala:91, :227
NIO multiplexer (reactor core)Selectorclients/src/main/java/org/apache/kafka/common/network/Selector.java:88
Per-connection state machineKafkaChannelclients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java:67
Plaintext / TLS byte transportPlaintextTransportLayer / SslTransportLayerclients/.../network/PlaintextTransportLayer.java:32
Size-delimited receiveNetworkReceiveclients/.../network/NetworkReceive.java:32
Outbound send wrapperNetworkSendclients/.../network/NetworkSend.java:21
Connection quotasConnectionQuotascore/src/main/scala/kafka/network/SocketServer.scala:1276
Delayed-operation purgatoryDelayedOperationPurgatoryserver-common/.../purgatory/DelayedOperationPurgatory.java:38
Hierarchical timing wheelTimingWheel / SystemTimerserver-common/.../util/timer/TimingWheel.java:97, SystemTimer.java:30
Socket-server config keysSocketServerConfigsserver/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:43

Core concepts & terminology

Listener / endpoint
A (name, securityProtocol, host, port) tuple from listeners. Each gets its own Acceptor + pool of Processors and its own metric tags (SocketServer.scala:217).
Acceptor thread
One non-daemon thread per listener that owns the ServerSocketChannel, accepts new sockets, and round-robins each to a Processor (SocketServer.scala:462).
Processor (network) thread
One of num.network.threads threads per listener; runs a private Selector reactor loop. Reads requests, enqueues them, writes responses (SocketServer.scala:893).
Request handler (I/O) thread
One of num.io.threads daemon threads pulling from RequestChannel and calling KafkaApis.handle (KafkaRequestHandler.scala:108).
Connection id
String localAddr:localPort-remoteAddr:remotePort-index; the trailing monotonically-incrementing index prevents id reuse while old requests are still in flight (SocketServer.scala:888, :1202).
Muting
A channel is "muted" (read interest dropped) the instant a request is enqueued, and un-muted only after its response is sent. This is how single-request-per-connection ordering is guaranteed across many handler threads (SocketServer.scala:1044).
Purgatory
A holding area for operations that block until a condition or timeout. Backed by a hierarchical timing wheel for O(1) insert/cancel (DelayedOperationPurgatory.java).
Privileged listener
The inter-broker listener (or, historically, a control-plane listener). A flag on the RequestContext that KafkaApis uses to decide whether forwarding/envelope handling is permitted (SocketServer.scala:224, :1027).
Note · request planes in KRaft 4.x

Earlier Kafka exposed two "request planes": a data plane and an optional control plane listener (KIP-291). In this KRaft-only build the control-plane listener is gone, SocketServer models only dataPlaneAcceptors (SocketServer.scala:103). Controller traffic is isolated instead by the process boundary: the controller runs its own SocketServer over controller.listener.names with ListenerType.CONTROLLER (SocketServer.scala:150) and its own request-handler pool named "controller" (ControllerServer.scala:300). The string "control plane" survives only in a doc comment about request forwarding (SocketServer.scala:796).

Architecture & control/data flow

TCP clients / peer brokersone TCP connection per peer
Acceptor thread · 1/listenerServerSocket · OP_ACCEPT · e.g. PLAINTEXT://:9092
Processor threads · num.network.threadsProcessor0 · Processor1 · Processor2 …, each: java.nio Selector + KafkaChannel reactor
RequestChannel · one per request planerequestQueue ABQ(500) · callbackQueue ABQ(500) · per-Processor responseQueue (linked deque)
Handler pool · num.io.threads (daemon)data-plane-kafka-request-handler-N · Handler0/1/2… · KafkaApis.handle()
Purgatorymay park here
Processor responseQueueSelector write · unmute → back to client
Broker request path: Acceptor → Processor (reactor) → RequestChannel → handler pool → response queue → Processor; a bounded queue back-pressures the network threads.
client / peer broker broker network & handler threads cylinder = RequestChannel hand-off queue purgatory (waiting) request / response data park / re-check (async) back-pressure

Construction and start-up ordering

The SocketServer constructor only opens the ports and builds the data structures; it does not start any thread (SocketServer.scala:146). Acceptors and their processors are created eagerly per endpoint (createDataPlaneAcceptorAndProcessors, :217), but thread start is deferred to enableRequestProcessing (:177), which chains each acceptor's start to a per-endpoint authorizer future so that no listener accepts traffic until its authorizer is initialized. Each acceptor exposes a startedFuture; the aggregate future returned lets the broker know when all listeners are live (:212).

Detailed mechanics

The Acceptor loop

The acceptor registers OP_ACCEPT and loops while shouldRun (an AtomicBoolean) is true (SocketServer.scala:576). Each iteration calls nioSelector.select(500) then drains ready keys. For each acceptable key it calls accept(), which:

  1. Accepts the SocketChannel and calls connectionQuotas.inc(...), this may block the acceptor when a connection slot is unavailable or throw on quota violation (:672).
  2. Configures the socket: configureBlocking(false), setTcpNoDelay(true), setKeepAlive(true), optional SO_SNDBUF (:693).
  3. Assigns the channel to the next processor by round-robin over currentProcessorIndex. If a processor's newConnections queue is full it tries the next; if all are full it blocks on the last one (mayBlock=true) so no accepted socket is dropped (:638).

On TooManyConnectionsException the socket is closed; on ConnectionThrottledException the socket is parked in a per-acceptor throttledSockets priority queue and closed later by closeThrottledConnections after the throttle delay (:680, :704). The acceptor catches all Throwables (except ControlThrowable) to avoid dying on a single bad connection (:585).

Invariant

The Acceptor → Processor hand-off never silently drops an accepted connection. assignNewConnection retries across all processors and, on the final attempt, performs a blocking newConnections.put (SocketServer.scala:1143). The cost is that a saturated reactor can stall the accept loop, visible as the AcceptorBlockedPercent meter (:504).

The Processor reactor loop

Each Processor owns a common.network.Selector built from a server-side ChannelBuilder (SocketServer.scala:853). Its run() performs a fixed pipeline every iteration (:893):

configureNewConnections()   // register queued sockets with Selector (≤ 20/iter)
processNewResponses()       // pull responseQueue, hand Sends to Selector, mute/unmute
poll()                      // selector.poll(timeout): NIO select + read/write
processCompletedReceives()  // parse header, build Request, enqueue + MUTE channel
processCompletedSends()     // drop inflightResponses entry, unmute channel
processDisconnected()       // decrement quotas, fire disconnect listeners
closeExcessConnections()    // if broker max.connections exceeded, close 1 LRU

The poll timeout is 0 when there are queued new connections, otherwise 300ms (:996). Newly registered connections are capped at ConnectionQueueSize = 20 per iteration so that existing-channel traffic and close notifications stay responsive (:777, :1167).

Reading a request: size-delimited framing

Inside Selector.poll, ready/readable channels are handled by pollSelectionKeys → attemptRead → channel.read() (Selector.java:677). KafkaChannel.read() lazily creates a NetworkReceive and delegates to receive.readFrom(transportLayer) (KafkaChannel.java:407). A NetworkReceive is a 4-byte big-endian size N followed by N payload bytes (NetworkReceive.java:32):

sizeint32 BE · @0 · phase 1
payload = RequestHeader ++ request bodyN bytes · @4 · phase 2
Phase 1: fill the 4-byte size buffer; validate 0 ≤ N ≤ socket.request.max.bytes Phase 2: memoryPool.tryAllocate(N); read until buffer full ⇒ complete()
The broker rejects N<0 or N>socket.request.max.bytes with InvalidReceiveException (NetworkReceive.java:92-95).
byte field on the wire --w = field width (proportional to bytes) int32 BE · @off = type · byte offset phase = read order (size first, then payload)

Crucially the payload buffer is allocated from a MemoryPool (memoryPool.tryAllocate, NetworkReceive.java:103). If the pool is exhausted the allocation returns null and the channel mutes itself until memory is available (KafkaChannel.java:414; Selector.attemptRead sets outOfMemory=true, Selector.java:691). This is the back-pressure mechanism for queued.max.request.bytes (the pool size). Selector.poll re-checks memoryPool and un-mutes channels once pressure clears (:457), and shuffles the key-handling order under low memory to avoid read starvation (determineHandlingOrder, :665).

Invariant · at most one completed receive per channel per poll

Selector adds at most one entry to completedReceives for any channel in a single poll() (Selector.java:434-437, enforced by hasCompletedReceive guarding attemptRead at :580). Combined with muting the channel on enqueue, this guarantees the broker processes requests from one connection strictly in send order even though they fan out to many handler threads.

From bytes to a Request

Processor.processCompletedReceives parses the header (parseRequestHeader validates the API key/version against the ApiVersionManager, SocketServer.scala:779), checks for SASL re-authentication and session expiry, then builds a RequestContext and a Request (:1027, :1031). It intercepts ApiVersionsRequest here to record the client software name/version on the channel (KIP-511, :1035). It then calls requestChannel.sendRequest(req) and immediately selector.mute(connectionId) (:1043). The receive's completedReceives map is cleared right after, returning buffers promptly (:1062).

Writing a response

Handlers never touch sockets. They call RequestChannel.sendResponse, which routes the Response to the originating Processor's responseQueue via processor.enqueueResponse and wakes its selector (RequestChannel.scala:189, SocketServer.scala:1208). On its next loop the processor's processNewResponses pattern-matches the response type (SocketServer.scala:937):

Response typeProcessor action
SendResponseselector.send(NetworkSend); record in inflightResponses (:978)
NoOpResponseno bytes sent (e.g. produce acks=0); just unmute to read pipelined requests (:943)
CloseConnectionResponseactively close the channel (used on errors with no client response) (:956)
StartThrottlingResponse / EndThrottlingResponsedrive the channel mute state machine for quota throttling (:960)

The send itself is a NetworkSend wrapping the connection id and the serialized payload (NetworkSend.java:21). The Selector registers OP_WRITE (KafkaChannel.setSend, :389), drains it across one or more polls, and on completion populates completedSends; processCompletedSends then removes the inflight entry and unmutes the channel (SocketServer.scala:1065).

The RequestChannel hand-off

RequestChannel holds a bounded requestQueue of size queued.max.requests (default 500), a same-sized callbackQueue, and a ConcurrentHashMap of processor id → Processor (each owning its own response deque) (RequestChannel.scala:90). sendRequest does a blocking put, this is the back-pressure point: when handlers fall behind, network threads block here and stop reading sockets (:117).

receiveRequest(timeout) services the callbackQueue first (those requests already waited once), then polls the request queue. A special WakeupRequest sentinel placed on the request queue lets a callback enqueued from a foreign thread wake a blocked handler so it checks the callback queue (:200, :235). ShutdownRequest.INSTANCE is the poison pill that tells a handler to exit (:233).

The KafkaRequestHandler pool

A KafkaRequestHandler is a Runnable on a daemon thread named data-plane-kafka-request-handler-N (KafkaRequestHandler.scala:275). Its loop blocks on receiveRequest(300), measures idle time into two meters (a per-pool meter and a shared aggregate meter, the denominators differ so combined-mode controllers report correctly, cf. KIP-1207), then dispatches by type (:108):

  • Request → set requestDequeueTimeNanos, stash it in a ThreadLocal, call apis.handle(request, requestLocal), then in finally release the request buffer (:162).
  • CallbackRequest → re-schedule a previously-deferred completion on a handler thread, carefully (re)computing callback timing so multi-callback requests aggregate correctly (:131).
  • ShutdownRequest → close the thread-local RequestLocal and exit (:126).

A FatalExitError from a handler triggers Exit.exit, the broker treats it as unrecoverable (:150). The pool is built by KafkaRequestHandlerPoolFactory so that broker and controller pools share one aggregate-threads counter; resizeThreadPool supports the dynamic num.io.threads change (:283).

Design rationale

Splitting network threads (reactor I/O) from I/O / request-handler threads (request processing, which may touch disk) lets each be sized and back-pressured independently: a small fixed reactor count multiplexes thousands of sockets, while a larger handler pool absorbs blocking work. The bounded RequestChannel converts handler slowness into TCP back-pressure rather than unbounded memory growth. Isolating controller traffic on its own pool descends from KIP-291 (originally a control-plane listener; now a separate controller process/pool in KRaft).

Re-scheduling callbacks onto handler threads

Some APIs need to finish work on a request thread after an async action (e.g. a transaction-coordinator verification) completes on a foreign thread. KafkaRequestHandler.wrapAsyncCallback captures the current RequestChannel and Request from thread-locals; if the callback later fires on the same handler thread it runs inline, otherwise it is posted via requestChannel.sendCallbackRequest, which enqueues to the callback queue and drops a WakeupRequest on the main queue (KafkaRequestHandler.scala:65). This keeps RequestLocal thread-confinement intact. See Transactions & EOS.

The request purgatory & hierarchical timing wheel

Many requests cannot be answered synchronously: a produce with acks=all waits for ISR replication; a fetch with fetch.min.bytes waits for data or fetch.max.wait.ms. These become DelayedOperations parked in a DelayedOperationPurgatory. The broker instantiates several, all keyed off ReplicaManager (ReplicaManager.scala:184):

Purgatory nameOperationPurge interval source
ProduceDelayedProduceproducer.purgatory.purge.interval.requests
FetchDelayedFetchfetch.purgatory.purge.interval.requests
DeleteRecordsDelayedDeleteRecordsdelete.records.purgatory.purge.interval.requests
RemoteFetchDelayedRemoteFetch0 (purge each cycle to free large buffers, ReplicaManager.scala:196)
RemoteListOffsetsDelayedRemoteListOffsetsdefault 1000
ShareFetchDelayedShareFetchshare.fetch.purgatory.purge.interval.requests

The transaction subsystem adds a DelayedFuturePurgatory (e.g. for async ACL/quota completion in AclApis). The new (KIP-848) group coordinator is not purgatory-based, it runs an event/timer-loop in its own threads rather than parking DelayedOperations, so there are no DelayedOperationPurgatory instances under group-coordinator/. See Replication, ISR & HWM, Fetch Path, and Group Coordination.

DelayedOperation contract

A DelayedOperation extends TimerTask and carries a completed flag plus a protected ReentrantLock (DelayedOperation.java:42). Subclasses implement tryComplete(), onComplete(), and onExpiration(). forceComplete() is the single chokepoint: it double-checks completed under the lock, cancels the timer task, and calls onComplete() exactly once, concurrent callers race, but only the winner returns true (:60). When the timer fires, run() calls forceComplete() then onExpiration() (:146).

tryCompleteElseWatch

The core enqueue path is tryCompleteElseWatch(op, watchKeys) (DelayedOperationPurgatory.java:122). The algorithm, with its careful locking:

  1. Under the operation's lock (safeTryCompleteOrElse), call tryComplete(). If it succeeds, done, return true.
  2. Otherwise add the operation to the watcher list for each key (a key can be e.g. a TopicPartitionOperationKey), then call tryComplete() once more. Holding the lock across "add to watchlist + final tryComplete" guarantees the op cannot miss a concurrent triggering event after it is watched (:155).
  3. If still not complete, add it to the timeout timer (timeoutTimer.add(op)). If it completed in the race, cancel the timer task (:167).

The source carries a long comment dissecting a potential deadlock between tryCompleteElseWatch and checkAndComplete, and recommends that checkAndComplete be called without holding any external exclusive lock (:133-154).

Watcher lists, sharding, and purge

Watchers are sharded into SHARDS = 512 WatcherLists; a key maps to a shard by abs(key.hashCode() % 512) to reduce lock contention (DelayedOperationPurgatory.java:41, :105). Each shard has a ConcurrentHashMap<DelayedOperationKey, Watchers> guarded by a ReentrantLock; each Watchers holds a ConcurrentLinkedQueue of operations (:292, :311). When a triggering event happens, checkAndComplete(key) looks up the watchers and runs tryCompleteWatched, removing completed entries as it walks (:184, :333).

Because a completed op may linger on watcher lists for keys it never reached, the purgatory tracks estimatedTotalOperations and, in advanceClock, purges all shards when estimatedTotalOperations - numDelayed() > purgeInterval (:386). This is the GC for orphaned watcher entries that KAFKA-1989 was designed to make cheap.

The timing wheel

The timeout side is a SystemTimer wrapping a TimingWheel (SystemTimer.java:30). A timing wheel is a circular array of wheelSize buckets, each a TimerTaskList (a doubly-linked list with a dummy root), spanning tickMs each. The broker's SystemTimer default is tickMs=1, wheelSize=20 → a 20 ms span per level (SystemTimer.java:45). Overflows spill into a lazily-created overflow wheel whose tick is the parent's full interval, giving coarser resolution as you go up, hence "hierarchical" (TimingWheel.java:131, :174).

add(entry)expiration E · currentTimeMs C · interval = tickMs × wheelSize
where does E fall?
already expiredE < C+tickMs ⇒ return false (run immediately)
own bucket, O(1)E < C+interval · bucket = (E / tickMs) mod wheelSize · list.add(entry)
overflow, O(m)E ≥ C+interval ⇒ overflowWheel.add(entry) (create on demand)
Where an entry lands when added: run-immediately if already expired, this level's bucket if in range, else recurse up into the overflow wheel.
timer / wheel logic cylinder = bucket (TimerTaskList) already-expired path rounded = decision placed in this wheel recurse to overflow wheel
level 0 · tick 1 ms · 20 buckets · span 20 ms
finest resolution; each bucket = TimerTaskList (doubly-linked, root sentinel)
bucket[0]bucket[1]bucket[19]
↑ flush on tick · overflow when E ≥ C+20 ms ↓
level 1 · tick 20 ms · 20 buckets · span 400 ms
coarser; created lazily on first overflow
↑ flush on tick · overflow ↓
level 2 … coarser still
one shared DelayQueue<TimerTaskList> drives every level
Hierarchical timing wheels: each level's tick is the level-below's full span, so resolution coarsens as you climb. Insert/cancel are O(1) (delete just unlinks the entry and decrements an AtomicInteger counter); insert may recurse O(m) over m wheel levels (TimingWheel.java:147).
timing-wheel level chip = a bucket (TimerTaskList) separator = overflow / flush between levels higher level = coarser tick, wider span

Buckets are driven by a single DelayQueue<TimerTaskList> shared across all wheel levels; a bucket enqueues itself when its expiration is (re)set (TimingWheel.java:164). SystemTimer.advanceClock(timeoutMs) polls that delay queue; on a due bucket it advances the wheel clock and flushes the bucket, re-inserting each entry, which either lands in a finer bucket or, being now expired, is submitted to the executor (SystemTimer.java:89, addTimerTaskEntry at :76). Expired tasks run on a single-thread executor named executor-<purgatoryName> (SystemTimer.java:54). The actual ticking is performed by the purgatory's own background reaper.

Key idea · why a timing wheel

A DelayQueue/priority-queue timer is O(log n) per insert and delete, and (in old Kafka) leaked completed operations until a sweep ran, risking OOM. The hierarchical timing wheel makes insert O(1) amortized and cancel O(1) exactly, decoupling timer cost from the number of in-flight delayed operations, most of which complete before they ever expire (KAFKA-1989).

Connection quotas

ConnectionQuotas enforces three count limits and a creation-rate limit, all guarded by synchronizing on a single counts mutable map (SocketServer.scala:1276):

  • Per-IP cap, max.connections.per.ip (default Integer.MAX_VALUE), overridable per host via max.connections.per.ip.overrides. Exceeding it throws TooManyConnectionsException and the socket is closed (:1306).
  • Per-listener cap, listener.name.X.max.connections, tracked in listenerCounts (:1490).
  • Broker-wide cap, max.connections. The inter-broker listener is "protected": it may exceed the broker cap, and an LRU connection on another listener is closed to make room (maxConnectionsExceeded + closeExcessConnections, :1474, SocketServer.scala:1110).
  • Creation-rate, max.connection.creation.rate (broker-wide and per-listener), and per-IP rates via dynamic ip-connection-rate quotas. Exceeding the rate yields a throttle delay (ConnectionThrottledException) computed from the quota window (recordConnectionAndGetThrottleTimeMs, :1501).

When a slot is unavailable, waitForConnectionSlot blocks the acceptor on counts.wait(...) and is woken by counts.notifyAll() on every dec, listener add/remove, or dynamic limit increase (:1453, :1425). Throttle time spent blocked is folded into the AcceptorBlockedPercent meter.

Gotcha

connectionQuotas.inc runs on the acceptor thread and can block it for the full throttle window. Because there is exactly one acceptor per listener, an aggressive client hammering a single IP can stall accepts for that whole listener (not just that IP). Rate-limit overrides per IP exist precisely to bound this, and the inter-broker listener is explicitly protected so cluster traffic is never starved by client churn.

Concurrency & threading

The state-ownership model is strict and is what makes this lock-light design correct:

StateOwner / guardNotes
Selector.channels, completedReceives, all reactor stateIts single Processor thread, no lockSelector is documented "not thread safe" (Selector.java:86)
newConnections (Acceptor → Processor)ArrayBlockingQueue(20)cross-thread hand-off (SocketServer.scala:831)
responseQueue (handlers → Processor)LinkedBlockingDequeplus selector.wakeup() (:833, :1208)
requestQueue / callbackQueueArrayBlockingQueue(queued.max.requests)many writers (processors) / many readers (handlers)
Request timing fieldsvolatile longswritten by handler/purgatory threads, read by network thread (Request.java:70)
processors map in RequestChannelConcurrentHashMapresized on dynamic thread changes
ConnectionQuotas.countssynchronized(counts) + wait/notifyAllplus @volatile limit fields (:1278)
Purgatory completed flagvolatile + per-op ReentrantLock(DelayedOperation.java:40, :42)
Watcher shard mapper-shard ReentrantLock (512 shards)(DelayedOperationPurgatory.java:295)
SystemTimer wheel mutationReentrantReadWriteLockread lock on add, write lock while ticking (SystemTimer.java:40)
TimerTaskList linkagenested synchronized + AtomicInteger counter(TimerTaskList.java:72)

Broker thread census

The principal threads on a running broker (and their naming) relevant to this subsystem:

Thread name patternCountDaemon?RoleSource
data-plane-kafka-socket-acceptor-<listener>-<proto>-<port>1 per listenernon-daemonaccept + round-robinSocketServer.scala:510
data-plane-kafka-network-thread-<node>-<listener>-<proto>-<id>num.network.threads per listenernon-daemonNIO reactorSocketServer.scala:750, :829
data-plane-kafka-request-handler-<id>num.io.threadsdaemonKafkaApis dispatchKafkaRequestHandler.scala:275
ExpirationReaper-<brokerId>-<Purgatory>1 per purgatorynon-daemonticks the timer (advanceClock(200))DelayedOperationPurgatory.java:409
executor-<Purgatory>1 per SystemTimernon-daemonruns expired TimerTasksSystemTimer.java:31, :54

(A KRaft broker also runs replica-fetcher threads, log cleaner/flusher threads, the Raft I/O thread, controller-side network/handler/reaper threads when in combined mode, and various scheduler/metadata threads, those belong to the fetch path, storage management, KRaft consensus and the controller.) Note the reaper and timer-executor threads are non-daemon (ShutdownableThread sets daemon=false, ShutdownableThread.java:56); they are stopped explicitly on shutdown via a no-op task that wakes the blocked DelayQueue.poll (DelayedOperationPurgatory.java:278).

The request lifecycle timeline

Request records five lifecycle timestamps (all in nanoseconds) that decompose end-to-end latency. They are populated at well-defined handoff points and rendered into the per-API request metrics by updateRequestMetrics (Request.java:325):

T0 · startTimeNanosrequest fully received on network thread (Request ctor)
T1 · requestDequeueTimeNanoshandler picked it up
T2 · apiLocalCompleteTimeNanosKafkaApis done with synchronous part
T3 · responseCompleteTimeNanosresponse object handed to RequestChannel
T4 · responseDequeueTimeNanosProcessor pulled response to write
T5 · endTime (now)totalTime = endTime − startTimeNanos
plus (orthogonal)throttleTime (quota, ms) · messageConversionsTime · temporaryMemoryBytes
The "remote time" bucket is where purgatory waiting shows up, a large RemoteTime on Fetch is normal, on Produce it indicates slow ISR replication.
timestamp on a broker thread response ready after purgatory wait active CPU / disk / socket work remote / purgatory wait queue-wait between hand-offs label = derived latency bucket

Each timestamp is written by a different actor: startTimeNanos by the network thread (Request.java:89); requestDequeueTimeNanos by the handler (KafkaRequestHandler.scala:164); apiLocalCompleteTimeNanos and responseCompleteTimeNanos by RequestChannel.sendResponse when the response is produced (RequestChannel.scala:179); responseDequeueTimeNanos by the processor's dequeueResponse (SocketServer.scala:1213). The network-thread time accumulated on the KafkaChannel (auth, read, write) is read and reset when the send completes and fed into the client's request-quota accounting (SocketServer.scala:1086, KafkaChannel.java:454). See Quotas & Throttling.

Configuration reference

All keys below are defined in SocketServerConfigs.java unless noted; num.io.threads/background.threads live in ServerConfigs.

KeyDefaultEffect
num.network.threads3Processors (reactor threads) per listener; dynamically reconfigurable, bounded to [½×, 2×] current (SocketServerConfigs.java:151, SocketServer.scala:413)
num.io.threads8Request-handler threads; processing + disk I/O (ServerConfigs.java:50)
queued.max.requests500Size of the RequestChannel request queue; full queue blocks network threads (:143)
queued.max.request.bytes-1 (off)If >0, sizes the SimpleMemoryPool; pool exhaustion mutes channels and stops reads (:147, SocketServer.scala:101)
socket.request.max.bytes104857600 (100 MiB)Max single receive size; NetworkReceive rejects larger frames (:95)
socket.send.buffer.bytes102400 (100 KiB)SO_SNDBUF; -1 = OS default (:87)
socket.receive.buffer.bytes102400 (100 KiB)SO_RCVBUF; -1 = OS default (:91)
socket.listen.backlog.size50Server-socket accept backlog (:99)
connections.max.idle.ms600000 (10 min)Idle connections closed by the Selector's LRU idle-expiry manager (:134, Selector.java:1427)
connection.failed.authentication.delay.ms100Delay before closing a connection that failed auth (slows brute-force) (:138, Selector.java:1389)
max.connections.per.ip2147483647Per-IP connection cap (:110)
max.connections.per.ip.overrides"" (none)Per-host overrides, e.g. 10.0.0.1:200 (:105)
max.connections2147483647Broker-wide cap; inter-broker listener is exempt (closes LRU elsewhere) (:115)
max.connection.creation.rate2147483647Conn-creation rate cap (broker & listener); exceeding throttles new accepts (:126)
producer.purgatory.purge.interval.requests1000Watcher-list purge threshold for the Produce purgatory (ReplicationConfigs.java:111; wired at ReplicaManager.scala:186)
fetch.purgatory.purge.interval.requests1000Same, for the Fetch purgatory (ReplicaManager.scala:190)

The reconfigurable set is SocketServer.RECONFIGURABLE_CONFIGS; reconfigure live-applies max.connections* and the creation rate, while per-listener num.network.threads resizes the processor pool (SocketServer.scala:323, :438). See Metadata Propagation & Broker Lifecycle for how dynamic config reaches here.

Failure modes, edge cases & recovery

  • Bad frame / disabled API. parseRequestHeader throws InvalidRequestException/UnsupportedVersionException; the processor closes just that channel via processChannelException and keeps serving others (SocketServer.scala:929, :779).
  • Oversize receive. receiveSize > socket.request.max.bytesInvalidReceiveException, channel closed (NetworkReceive.java:94).
  • Memory pressure. Pool-exhausted reads mute the channel; the broker recovers and un-mutes once memory frees, shuffling key order to avoid starving any one channel (Selector.java:457, :668).
  • Idle / expired connections. The IdleExpiryManager closes the least-recently-active connection past connections.max.idle.ms on each poll (Selector.java:795).
  • SASL session expiry / re-auth. A receive arriving on an expired session closes the channel and increments expired-connections-killed-count (SocketServer.scala:1019).
  • acks=0 produce. The handler returns a NoOpResponse (or CloseConnectionResponse on error); no bytes are written but the channel is still unmuted to read pipelined requests (SocketServer.scala:943, RequestChannel.scala:121).
  • Response for a dead connection. If the channel vanished, sendResponse logs a warning and updates metrics with zero network time rather than throwing (SocketServer.scala:982).
  • Processor shutdown with queued responses. RequestChannel.sendResponse tolerates a null processor (shut down) and silently drops the response, the socket is already closed (RequestChannel.scala:191).
  • Graceful close with outstanding receives. The Selector keeps a remotely-closed channel in closingChannels until its buffered requests are processed (so acks=0 records aren't lost), then closes it (Selector.java:929, :849).
  • Connection storm. Per-IP/listener/broker caps and rate limits throttle or reject; the inter-broker listener is protected so replication survives (SocketServer.scala:1474).
  • Purgatory deadlock avoidance. Operation locks are held across watch+recheck; callers are advised not to call checkAndComplete under an external exclusive lock (DelayedOperationPurgatory.java:133).

Invariants & guarantees

  • Per-connection ordering. One completed receive per channel per poll + mute-until-response ⇒ requests on a single TCP connection are processed in send order despite multi-threaded handling (Selector.java:434, SocketServer.scala:1044).
  • No accepted connection is dropped on hand-off. The acceptor blocks rather than discard (SocketServer.scala:1143).
  • Exactly-once completion. A DelayedOperation's onComplete() runs exactly once, racing callers notwithstanding (DelayedOperation.java:60).
  • O(1) timer insert/cancel. Cancel just unlinks an entry; insert is O(1) within a wheel level, O(m) across overflow levels (TimingWheel.java:147, TimerTaskList.java:98).
  • Connection-id uniqueness during overlap. The incrementing index prevents reuse while old in-flight requests reference the id (SocketServer.scala:888).
  • Bounded request memory. The request queue is bounded and (optionally) the receive memory pool is bounded; together they cap how much un-processed request data the broker holds.

Interactions with other subsystems

Design rationale & evolution

  • KAFKA-1989 Replaced the O(log n) DelayQueue purgatory with hierarchical timing wheels for O(1) insert/cancel and bounded purge cost, the design captured in TimingWheel/SystemTimer.
  • KIP-291 Separated controller from data-plane traffic (originally via a control-plane listener). In KRaft 4.x the listener is removed; isolation is achieved by the controller's own process/SocketServer/handler pool.
  • KIP-511 ApiVersions carries client software name/version; the broker harvests it in the processor for the per-client connection metrics (SocketServer.scala:1035).
  • KIP-1207 Motivates the dual per-pool vs aggregate idle meters in the handler pool so combined-mode RequestHandlerAvgIdlePercent is not skewed (KafkaRequestHandler.scala:111).
  • The receive MemoryPool (queued.max.request.bytes) lets operators cap request-side memory independently of the queue depth, a defense against many large in-flight requests (SocketServer.scala:101).

Gotchas & operational notes

Caution

num.network.threads is per listener, not global (SocketServerConfigs.java:153). A broker with three listeners and the default 3 runs nine reactor threads. Dynamic decreases are clamped: you cannot drop below half (or above double) the current value in one step (SocketServer.scala:421).

  • High RemoteTimeMs is normal on Fetch (consumers long-poll in purgatory) but on Produce signals slow ISR; use the per-API histograms emitted by updateRequestMetrics.
  • NetworkProcessorAvgIdlePercent near 0 means reactor saturation, accepts may stall (watch AcceptorBlockedPercent); scale num.network.threads. RequestHandlerAvgIdlePercent near 0 means handler saturation, the request queue will fill and back-pressure the network; scale num.io.threads (SocketServer.scala:120, KafkaRequestHandler.scala:253).
  • queued.max.request.bytes defaults off (-1). Without it the only request-memory bound is queue depth × request size; on memory-tight brokers consider enabling it.
  • The RemoteFetch purgatory uses purgeInterval 0 on purpose, to release potentially-50 MiB RemoteLogReadResult buffers immediately for GC (ReplicaManager.scala:196).
  • Failed-auth close delay must be < idle timeout (connection.failed.authentication.delay.ms < connections.max.idle.ms) or the connection may be reaped before the delayed close fires (SocketServerConfigs.java:140).
  • Acceptor and reaper threads are non-daemon; handler threads are daemon. Clean shutdown depends on the explicit poison-pill/no-op mechanisms, not on JVM exit.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.