krivaltsevich.com Kafka Internals4.4

02 · The Wire Protocol & RPC Framework

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

Every byte that crosses a Kafka connection, producer to broker, broker to broker, broker to controller, controller-quorum peer to peer, is a request/response pair framed and encoded by one small, uniform machine: a 4-byte length-prefixed frame, a versioned header, and a body whose schema is described in JSON and compiled to Java by a bespoke code generator. This chapter dissects that machine: the ApiKeys registry, the Readable/Writable abstraction, the message generator, the flexible (compact + tagged-field) encoding from KIP-482, version negotiation via ApiVersions (KIP-35), and the Errors code space, ending with a worked byte-by-byte layout of a real request.

Role & Responsibilities

The wire-protocol subsystem is the lingua franca of the cluster. It has four jobs:

  • Framing. Delimit one logical message on a byte stream using a 4-byte big-endian size prefix, so a reader knows exactly how many bytes constitute the next request or response.
  • Addressing & versioning. A request header carries the API key (which RPC), the API version (which schema variant), a client-chosen correlation id (to match responses to requests), and a client id. The response header echoes the correlation id.
  • (De)serialization. Turn a typed, generated message object into bytes and back, in a version-specific way, with zero-copy handling for record sets.
  • Evolution. Allow every RPC to gain fields over time without breaking old peers, both by bumping versions and, in flexible versions, by adding optional tagged fields that unknown readers skip cleanly.

This layer is deliberately policy-free: it knows how to read and write a ProduceRequest, but nothing about logs, ISRs, or quotas. Those live one layer up in Request Processing (KafkaApis), fed by the Network Layer & Threading Model that owns the sockets.

Key idea

Kafka does not hand-write serializers. A declarative *.json schema per message is compiled into a Java ApiMessage implementation by the generator module. The schema is the single source of truth for field order, types, version ranges, nullability, and tags; the generated code and the protocol documentation are both derived from it.

Where It Lives in the Code

ConcernPrincipal class / file
API registry (key ↔ schema, versions, listeners)clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
Read interface (primitives, varints, records, tags)clients/src/main/java/org/apache/kafka/common/protocol/Readable.java
Write interfaceclients/src/main/java/org/apache/kafka/common/protocol/Writable.java
Concrete buffer-backed accessor (both interfaces)clients/src/main/java/org/apache/kafka/common/protocol/ByteBufferAccessor.java
Generated-message contractprotocol/Message.java, protocol/ApiMessage.java
Zero-copy send assemblyprotocol/SendBuilder.java
Two-pass size/value cacheprotocol/ObjectSerializationCache.java, protocol/MessageSizeAccumulator.java
Error code spaceprotocol/Errors.java
Request/response base + dispatchrequests/AbstractRequest.java, requests/AbstractResponse.java
Headersrequests/RequestHeader.java, requests/ResponseHeader.java
Serialize helpersrequests/RequestUtils.java, protocol/MessageUtil.java
The code generatorgenerator/src/main/java/org/apache/kafka/message/* (notably MessageDataGenerator.java, ApiMessageTypeGenerator.java, FieldType.java, Versions.java)
Schemasclients/src/main/resources/common/message/*.json
Frame reader (network)clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java

Core Concepts & Terminology

Frame
One int32 big-endian length N, followed by exactly N bytes of header+body. Read by NetworkReceive.readFrom (network/NetworkReceive.java:91).
API key
A permanent int16 id identifying the RPC (e.g. PRODUCE = 0, FETCH = 1, API_VERSIONS = 18). Defined by the ApiKeys enum; the id is "permanent and immutable … this can't change ever" (protocol/ApiKeys.java:161).
API version
An int16 selecting a schema variant of that RPC. Each RPC has an inclusive [oldestVersion, latestVersion] range.
Correlation id
A client-assigned int32 tag copied verbatim into the response header so the client can match an out-of-order reply to its pending request.
Flexible version
A version at or above the schema's flexibleVersions floor. Flexible versions use compact length encodings (unsigned varints) and append tagged fields to every struct. Introduced by KIP-482.
Tagged field
An optional field carried as (tag, length, bytes) in a trailing section. Readers that do not know a tag store it as a RawTaggedField and re-emit it, preserving forward compatibility.
Records type
A special field type whose payload is a raw record batch buffer, written zero-copy rather than copied into the message buffer.

The API Registry: ApiKeys

ApiKeys is an enum where each constant wraps a generated ApiMessageType (the per-RPC metadata object produced by ApiMessageTypeGenerator). As of this build the enum spans ids 0 through 93 and ends at STREAMS_GROUP_TOPOLOGY_DESCRIPTION_UPDATE (protocol/ApiKeys.java:48-141). Each constant records:

  • id, the immutable int16 from messageType.apiKey() (ApiKeys.java:162,191).
  • name, used for metrics and debugging (ApiKeys.java:165).
  • clusterAction, true if the RPC is broker/controller-internal only (e.g. VOTE, BEGIN_QUORUM_EPOCH, WRITE_TXN_MARKERS, BROKER_REGISTRATION). Used by authorization to require CLUSTER_ACTION; see Security.
  • forwardable, true if a broker may forward this admin RPC to the controller through an ENVELOPE (ApiKeys.java:171); see Metadata Propagation & Broker Lifecycle.
  • requiresDelayedAllocation, true if any version's schema contains a bytes or records type, meaning the request retains a reference into the receive buffer and the buffer cannot be recycled early (ApiKeys.java:194,198-207,343-360).

Version range is delegated to the message type: oldestVersion() = messageType.lowestSupportedVersion() and latestVersion() = messageType.highestSupportedVersion(...) (ApiKeys.java:221-231). An RPC may be removed while keeping its id reserved: hasValidVersion() returns false when oldestVersion > latestVersion, so the slot is never reused for a different API (ApiKeys.java:263-265).

Each listener type (BROKER, CONTROLLER) exposes a different subset. The APIS_BY_LISTENER map is built once at class-load by filtering on messageType.listeners() (ApiKeys.java:143-150, 374-383), driven by the "listeners" array in each request schema, e.g. FetchRequest.json declares "listeners": ["broker","controller"] while ProduceRequest.json is broker-only.

Gotcha

Versions 0–2 of PRODUCE (and 0–3 of FETCH) were removed in Kafka 4.0; v3/v4 are the new baselines. But a librdkafka bug (KAFKA-18659) requires the broker to advertise produce min-version 0 in its ApiVersionsResponse even though it rejects those versions. The constant PRODUCE_API_VERSIONS_RESPONSE_MIN_VERSION = 0 and the special-case in toApiVersion(...) (ApiKeys.java:159,286-302) implement exactly this lie-to-the-client, scoped to the BROKER listener.

The Frame and the Headers

The 4-byte frame

On the wire a message is size(int32) ‖ header ‖ body. The size counts header+body only, never itself. On send, SendBuilder.buildSend accumulates the combined size of header and body in a single sizing phase, two addSize calls (header then body) into a shared MessageSizeAccumulator, then writes the size as the first 4 bytes (protocol/SendBuilder.java:208-226): builder.writeInt(messageSize.totalSize()). On receive, NetworkReceive first reads 4 bytes into a fixed size buffer, rejects negative or over-limit values, then allocates a payload buffer of exactly that many bytes (network/NetworkReceive.java:84-101). The maxSize guard is the broker's socket.request.max.bytes.

sizeint32 BE · @0 · = N
RequestHeadervariable · @4
Request body (ApiMessage)variable
Length-prefixed framing. The 4-byte size counts the N bytes of header+body only, it never includes itself.
wire field --w = relative byte width int32 BE = type · endianness @N = byte offset from start of frame

Request header (schema)

The header is itself a generated message, schema RequestHeader.json. Its valid versions are 1–2, flexible at 2+ (common/message/RequestHeader.json:26-27). Fields in wire order:

FieldTypeVersionsNotes
RequestApiKeyint160+which RPC
RequestApiVersionint160+which schema variant
CorrelationIdint320+echoed in response
ClientIdstring (nullable)1+"flexibleVersions": "none", always old-style 2-byte length, even in header v2
tagged fields, 2+ onlyempty section (a single 0x00) in practice
Design rationale

ClientId is forced to non-compact even in flexible header v2 (RequestHeader.json:36-42). The reason, stated in the schema comment, is that an old broker must be able to parse the header of an ApiVersionsRequest sent by a newer client that does not yet know which versions the broker supports. Keeping the client-id length encoding fixed guarantees the header prefix has a stable, parseable shape regardless of negotiated version. This is the bootstrap that makes KIP-35 version discovery work.

Header version is derived, not sent

The header version is never on the wire; it is computed from the API key and API version. The generator emits a requestHeaderVersion(short) / responseHeaderVersion(short) switch per API (generator/.../ApiMessageTypeGenerator.java:346-389): for a given API version, the request header is v2 if that version is flexible else v1; the response header is v1 if flexible else v0. RequestHeader.parse exploits this: it peeks the first two int16s (api key, api version), looks up the header version via apiKey.requestHeaderVersion(apiVersion), rewinds, then parses the header with the correct version (requests/RequestHeader.java:123-152).

Gotcha

The one exception: the response header of ApiVersions (api key 18) is always v0 (no tagged-field section), hard-coded in the generator (ApiMessageTypeGenerator.java:362-368, "See KIP-511"). The response body is flexible from v3, but the header length must stay constant so that a client which guessed the wrong version can still locate the body. ApiVersionsResponse.json states the same constraint at lines 24-26.

Response header

Trivially small: schema ResponseHeader.json is just CorrelationId(int32), valid versions 0–1, flexible at 1+ (so v1 carries a trailing empty tagged-field byte). RequestHeader.toResponseHeader() builds it by copying the correlation id and resolving the response header version (requests/RequestHeader.java:119-121). On the client, AbstractResponse.parseResponse parses this header and throws CorrelationIdMismatchException if it does not equal the pending request's id (requests/AbstractResponse.java:97-111).

Reading and Writing: Readable / Writable / ByteBufferAccessor

All (de)serialization funnels through two interfaces. Writable declares writeByte/Short/Int/Long/Double, writeByteArray, writeByteBuffer, writeUnsignedVarint, writeVarint, writeVarlong, plus default helpers writeUuid (two big-endian longs, MSB first) and writeRecords (protocol/Writable.java:27-65). Readable is the mirror image, with default helpers readString(len) (UTF-8), readUuid, readRecords(len) (returns null for length < 0), and readUnknownTaggedField which appends a RawTaggedField (protocol/Readable.java:29-89).

ByteBufferAccessor is the production implementation of both, wrapping a single java.nio.ByteBuffer (protocol/ByteBufferAccessor.java:24-160). Fixed-width primitives go straight to the buffer's big-endian accessors (getInt, putLong, …). Variable-length integers delegate to ByteUtils. Notably readByteBuffer(len) returns a slice over the underlying buffer and advances the position, no copy, which is how records and bytes fields are read zero-copy.

Varints and zig-zag

Compact lengths and the tagged-field framing use Protocol-Buffers-style base-128 varints. writeUnsignedVarint emits 7 bits per byte, high bit as continuation (utils/internals/ByteUtils.java:408). Signed varints zig-zag first: writeVarint(v) = writeUnsignedVarint((v << 1) ^ (v >> 31)) (ByteUtils.java:486-487), so small-magnitude negatives stay short. sizeOfUnsignedVarint computes the byte count from leading-zero count without looping (ByteUtils.java:557-573).

Two-pass serialization

Writing is a strict two passes over the object graph, sharing an ObjectSerializationCache (an IdentityHashMap). Pass 1 (addSize) accumulates the total byte count into a MessageSizeAccumulator and stashes derived values, UTF-8 bytes of strings, computed array sizes, in the cache so they are not recomputed (protocol/ObjectSerializationCache.java:32-55, protocol/Message.java:49-62). Pass 2 (write) emits bytes, reusing the cached values (e.g. the generator writes a string via _cache.getSerializedValue(name), MessageDataGenerator.java:962-964). RequestUtils.serialize shows the canonical flow: size header, size body, allocate exactly headerSize + messageSize bytes, write header then body, flip (requests/RequestUtils.java:69-86).

toSend(header)AbstractRequest
SendBuilder.buildSend
header.addSize + body.addSize⇒ MessageSizeAccumulator
writeInt(totalSize)the 4-byte frame
header.write(…)
body.write(…)records / bytes kept as buffer refs (zero-copy)
SendByteBufferSend or MultiRecordsSend
Outbound serialize path. A strict two-pass walk (size, then write) shares an ObjectSerializationCache; record/bytes payloads are appended by reference for zero-copy.
broker serialize step resulting Send next step pass 1 / pass 2 = sizing vs writing cylinder = network Send handed to the socket
parseRequest(apiKey, ver, readable)AbstractRequest
doParseRequest, big switch
ProduceRequest.parse
FetchRequest.parse
new XxxRequestData(readable, version)version-aware decode
Inbound parse path. The doParseRequest switch in AbstractRequest / AbstractResponse bridges the dynamic ApiKeys id to the static, typed XxxRequest.parse, which constructs the generated *Data object.
broker parse step rounded = the API-key switch control flow parallel columns = per-API case branches XxxRequestData = generated message class

Zero-copy via SendBuilder

For payloads that contain record sets, copying into one contiguous buffer would defeat sendfile-style efficiency. SendBuilder implements Writable but treats writeByteBuffer and writeRecords specially: it flushes the bytes accumulated so far into a slice, then appends the records buffer by reference (no copy) to a list (protocol/SendBuilder.java:99-148). build() returns a single ByteBufferSend if there is one buffer, otherwise a MultiRecordsSend that the network layer gathers (SendBuilder.java:173-181). This is why ApiKeys.requiresDelayedAllocation matters: a request whose schema has a records field keeps a live reference into the original receive buffer, so that buffer must not be returned to the pool until the request is fully handled.

The Message Generator

Each RPC has a JSON schema under clients/src/main/resources/common/message/. At build time the generator module reads these and emits a Java class per message (e.g. ProduceRequestData) implementing ApiMessage, plus the ApiMessageType registry. The generated class is a plain mutable data holder with fluent setters and version-aware read/write/addSize methods, there is no hand-written serializer for any RPC.

Schema anatomy

A schema (e.g. ProduceRequest.json) declares apiKey, type (request/response/header/data), listeners, validVersions, flexibleVersions, and a recursive fields list. Each field carries:

AttributeMeaning
name, typeField name and type. Types parsed by FieldType.parse (generator/.../FieldType.java:386-432): bool, int8/16/32/64, uint16/32, uuid, float64, string, bytes, records, struct names, and []X arrays.
versionsVersion range in which the field exists (e.g. "3+", "0-12"). Parsed by Versions.parse (generator/.../Versions.java:40-65).
nullableVersionsSubrange in which null is a legal value.
taggedVersions + tagMarks the field as a tagged (optional) field with a numeric tag, present only in flexible versions.
ignorableIf true, a non-default value may be silently dropped when serializing to a version that lacks the field.
mapKeyThe field identifies its struct within an array, enabling a generated keyed collection.
entityTypeSemantic tag (e.g. topicName, transactionalId) used by tooling and for type-safe IDs.
default, aboutDefault value; human description (feeds protocol docs).

For ProduceRequest the field tree is TransactionalId, Acks(int16), TimeoutMs(int32), TopicData[]{ Name|TopicId, PartitionData[]{ Index, Records } }, valid versions 3–13, flexible at 9+ (ProduceRequest.json:50-72). Note how v13 swaps the topic Name(string, 0-12) for TopicId(uuid, 13+) (KIP-516), a pure schema change, no serializer edits.

What the generator emits

Per message it produces fields, fluent getters/setters, and three core methods. read(Readable, version) branches on version to decode each field (and, in flexible versions, a trailing tagged-field loop). write(Writable, cache, version) mirrors it. addSize(...) is the sizing pass. It also tracks unknown tags in a List<RawTaggedField> _unknownTaggedFields (generator/.../MessageDataGenerator.java:316,348) so that a message round-trips fields it does not understand.

FLEXIBLE Versions: Compact Encodings & Tagged Fields (KIP-482)

A version is "flexible" if it is ≥ the schema's flexibleVersions floor. Flexible versions change two things on the wire: variable-length prefixes become compact (unsigned varint instead of fixed int16/int32), and every struct gains a trailing tagged-field section.

Compact strings, bytes, and arrays

The generator's generateVariableLengthWriter contains the exact rule (MessageDataGenerator.java:985-994):

// flexible:     length is written as (len + 1) as an unsigned varint
_writable.writeUnsignedVarint(len + 1);
// non-flexible: string → int16 length, bytes/array → int32 length
_writable.writeShort((short) len);   // or writeInt(len)

The +1 bias reserves the encoded value 0 for null. A non-null empty string is length 0 → encoded as varint 1 (one byte 0x01). A null string is encoded as varint 0 (one byte 0x00); the null branch emits writeUnsignedVarint(0) (MessageDataGenerator.java:946). Compact arrays follow the same convention: count +1 as an unsigned varint, then the elements. On read, the loop reverses it: read varint, subtract 1; value 0 means null.

TypeNon-flexible prefixFlexible prefixnull
stringint16 lengthuvarint(len+1)int16 = -1 / uvarint 0
bytesint32 lengthuvarint(len+1)int32 = -1 / uvarint 0
array []Xint32 countuvarint(count+1)int32 = -1 / uvarint 0
recordsint32 byte lengthuvarint(len+1)length < 0 → null records

The tagged-field section

In a flexible version, every struct ends with: an unsigned varint numTaggedFields, then that many entries, each tag(uvarint) ‖ size(uvarint) ‖ value(size bytes). The write side counts how many tagged fields are actually present (those differing from default) plus any preserved unknowns, writes the count, then writes each in ascending tag order (MessageDataGenerator.java:794-879). Crucially, tagged values are always serialized using the flexible/compact form regardless of the field's own declared encoding, the read code comments confirm "All tagged fields are serialized using the new-style flexible versions serialization" (MessageDataGenerator.java:506-508).

The read side is the forward-compatibility linchpin (MessageDataGenerator.java:489-541): read numTaggedFields, loop reading tag and size, switch on known tags; the default case calls readUnknownTaggedField to stash the raw (tag, bytes) for re-emission (MessageDataGenerator.java:533-536). A known tag at an out-of-range version throws (the tag is reserved). Because unknown tags carry their own length, a reader can skip them precisely even though it has no schema for them.

Key idea

Tagged fields decouple field addition from version bumps. A new optional field can be added at the same flexible version with a fresh tag; old readers preserve it untouched, new readers decode it. Mandatory or layout-changing edits still require a version bump. Look at ApiVersionsResponse.json:50-79: SupportedFeatures(tag 0), FinalizedFeaturesEpoch(tag 1), FinalizedFeatures(tag 2), ZkMigrationReady(tag 3) are all tagged at 3+.

Version Negotiation: ApiVersions (KIP-35)

A client cannot assume a broker supports any particular version of any RPC. The handshake (KIP-35) is: open the connection, send an ApiVersionsRequest (api key 18) at the highest version the client knows, and read back an ApiVersionsResponse listing, for every API the broker supports on that listener, an (ApiKey, MinVersion, MaxVersion) triple (ApiVersionsResponse.json:39-47). The client then picks, per RPC, the highest mutually supported version.

The bootstrap quirk

The chicken-and-egg problem, you must agree on a version to discover supported versions, is solved by making ApiVersions uniquely tolerant. ApiKeys.isVersionEnabled short-circuits to true for API_VERSIONS regardless of the requested version (ApiKeys.java:245-252). If a client sends a version the broker does not support, the broker treats the request as v0 and replies using the v0 response schema with UNSUPPORTED_VERSION; the response still enumerates the broker's supported version of ApiVersions so the client can retry correctly. ApiVersionsRequest.getErrorResponse implements exactly that: on UNSUPPORTED_VERSION it populates the ApiKeys collection with just ApiKeys.API_VERSIONS's range (requests/ApiVersionsRequest.java:88-98,126-143, "KIP-511"). This is also why the v0 response header must stay headerless of tagged fields (above).

Intersection for forwarding

When a broker forwards admin RPCs to the controller (the ENVELOPE path), it must only advertise versions both it and the controller support. ApiVersionsResponse.intersectForwardableApis intersects the broker's range with the controller's per forwardable API (requests/ApiVersionsResponse.java:232-262); the core operation is intersect = [max(minA,minB), min(maxA,maxB)], returning empty when the ranges do not overlap (ApiVersionsResponse.java:311-325). The non-forwardable APIs are simply filtered to those in scope for the listener via filterApis (ApiVersionsResponse.java:195-219). See The KRaft Controller for the forwarding mechanics.

Newer response fields (v5, KIP-1242)

ApiVersionsResponse v5 adds ClusterId/NodeId checking and a REBOOTSTRAP_REQUIRED error so a client with stale bootstrap metadata is told to re-bootstrap (ApiVersionsResponse.json:33-34; ApiVersionsRequest.isValid enforces that cluster id and node id are both set or both absent, ApiVersionsRequest.java:104-111).

The Error Code Space: Errors

Errors is an enum mapping a stable int16 code to an ApiException factory. As of this build it defines 137 entries from UNKNOWN_SERVER_ERROR(-1) and NONE(0) up to STREAMS_TOPOLOGY_DESCRIPTION_UPDATE_FAILED(135) (protocol/Errors.java:181-425). Each constant carries code, a default message, and a constructor reference for the exception (Errors.java:443-451). Two static maps are built at load: CODE_TO_ERROR and CLASS_TO_ERROR.

  • forCode(short), wire code → enum; an unknown code logs a warning and falls back to UNKNOWN_SERVER_ERROR (Errors.java:511-519). This is how an old client tolerates an error code added in a newer broker.
  • forException(Throwable), walks the exception's superclass chain, returning the first matching mapped error, so subclasses inherit their parent's code (Errors.java:525-532).
  • code(), exception(), exception(message), maybeThrow(), translate between the two worlds (Errors.java:456-496).
Invariant

Error codes are immutable forever; only names and messages may change (class doc, Errors.java:167-179). A code is the cross-version contract, a broker and a client built years apart must agree on what code 6 (NOT_LEADER_OR_FOLLOWER) means.

Error codes appear in responses either as a top-level ErrorCode(int16) or per-entity (per topic, per partition). AbstractResponse.errorCounts() aggregates them across the whole response for metrics (requests/AbstractResponse.java:64-91). RequestUtils.isFatalException classifies auth/version errors as fatal (non-retriable) for client-side handling (requests/RequestUtils.java:88-96).

Request & Response Base Classes

AbstractRequest wraps a generated *RequestData with an apiKey and concrete version, validated in the constructor (requests/AbstractRequest.java:91-96). A nested Builder<T> captures an allowed version range and builds the concrete request at a chosen version (AbstractRequest.java:34-86). Key methods:

  • toSend(header)SendBuilder.buildRequestSend (zero-copy, size-prefixed) (AbstractRequest.java:109-111).
  • serializeWithHeader(header) → header+body, no size prefix, via RequestUtils.serialize; it asserts the header's api key and version match the request (AbstractRequest.java:116-124).
  • getErrorResponse(throttleMs, e), abstract; each RPC builds a schema-shaped error response (used when a request fails before normal handling).
  • parseRequest(apiKey, version, readable), the inbound factory. It records the buffer size, then dispatches through a giant switch in doParseRequest to the right XxxRequest.parse (AbstractRequest.java:172-363).

AbstractResponse mirrors this with toSend(header, version), a parallel parse switch (requests/AbstractResponse.java:113-299), an abstract throttleTimeMs()/maybeSetThrottleTimeMs(int), and shouldClientThrottle(version) (default false). The throttle plumbing is how quota enforcement surfaces on the wire, see Quotas, Throttling & Client Metrics.

Note

The two big switch statements in AbstractRequest/AbstractResponse are the manual bridge between the dynamic ApiKeys id and the static, typed XxxRequest/XxxResponse classes. The default branch throws an AssertionError reminding maintainers to wire up any newly added API (AbstractRequest.java:359-362).

Throttling Fields on the Wire

Most responses carry a ThrottleTimeMs(int32) field telling the client how long the broker delayed (or wants the client to delay) due to a quota violation. In ApiVersionsResponse it is v1+ and ignorable (ApiVersionsResponse.json:48-49). The contract evolved across versions: in early versions the broker held the response then sent it (server-side throttle); from a later version the broker sends immediately and expects the client to self-throttle, encoded per RPC by overriding shouldClientThrottle(version). The base returns false (AbstractResponse.java:306-308); a comment there explains client-side throttling is needed "when communicating with a newer version of broker which, on quota violation, sends out responses before throttling."

Worked Example: a Minimal ApiVersionsRequest on the Wire

Consider a Java client opening a connection and sending an ApiVersionsRequest at version 3 (flexible) with client id "c1", correlation id 7, software name "apache-kafka-java" and some version string. Because v3 is flexible, the request header is v2 (flexible), but recall ClientId stays old-style two-byte-length. The bytes:

size = NNint32 BE · @0 · frame
RequestApiKey = 18int16 · @4 · header v2
RequestApiVersion = 3int16 · @6
CorrelationId = 7int32 · @8
ClientId len = 2int16 · @12 · NOT compact
"c1"2 bytes · @14 · 63 31
tags = 0uvarint · @16 · header end
SoftwareName len+1 = 18uvarint · @17 · body v3
"apache-kafka-java"17 bytes · @18
SoftwareVersionuvarint(len+1) ‖ bytes · @35
tags = 0uvarint · body end
An ApiVersionsRequest v3. The header's ClientId uses a fixed 2-byte length, while the body's strings use compact (varint len+1) encoding because the body is flexible.
wire field --w = relative byte width int16 / int32 = fixed-width header field uvarint = compact length (flexible body) len+1 = compact bias (0 ⇒ null) @N = byte offset

Reading it back: NetworkReceive consumes the 4-byte size and allocates NN bytes. RequestHeader.parse peeks 00 12 00 03 → api key 18, version 3 → requestHeaderVersion(3)=2, rewinds, parses the v2 header (decoding the 2-byte-length client id and the trailing 0x00 tag count). The remaining bytes go to ApiVersionsRequest.parsenew ApiVersionsRequestData(readable, 3), which reads the two compact strings and the trailing tag count (requests/ApiVersionsRequest.java:145-147).

Concurrency & Threading

The protocol classes are intentionally simple regarding threads, the heavy lifting of which thread reads which socket lives in the Network Layer. The relevant facts here:

  • Generated *Data objects are mutable and not thread-safe. They are owned by a single thread for the duration of a request/response. A request is parsed on a network/processor thread and handed off; a response is built on a request-handler thread.
  • ByteBufferAccessor wraps one ByteBuffer, which is single-threaded by nature (position is mutated on every read/write).
  • ObjectSerializationCache is per-serialization, single-threaded. Reusing the same cache instance across the size and write passes is the entire point; sharing it across threads would corrupt sizing.
  • The expensive, allocation-laden work is the parse/serialize itself, which is why it runs on pooled threads rather than the acceptor: requiresDelayedAllocation exists precisely to avoid copying records and to defer freeing receive buffers (ApiKeys.java:173,194).
  • Static lookup tables (ID_TO_TYPE, APIS_BY_LISTENER, CODE_TO_ERROR, CLASS_TO_ERROR) are built once during class initialization and then read-only, so they need no locking (ApiKeys.java:143-154, Errors.java static block).

Failure Modes, Edge Cases & Recovery

ConditionDetection / handling
Negative or oversized frame sizeNetworkReceive.readFrom throws InvalidReceiveException (NetworkReceive.java:92-95); the connection is dropped.
Unknown api keyApiKeys.forId throws IllegalArgumentException; RequestHeader.parse wraps parse failures as InvalidRequestException with a best-guess api key (RequestHeader.java:153-160).
Removed API (id reserved, no versions)hasValidVersion() false → RequestHeader.parse throws InvalidRequestException early with a helpful message (RequestHeader.java:135-136).
Unsupported version (normal RPC)Server returns UNSUPPORTED_VERSION(35); on the request side AbstractRequest constructor throws UnsupportedVersionException if asked to build an out-of-range version.
Unsupported version (ApiVersions)Special-cased: broker answers as v0 with the supported range, never drops the connection (ApiVersionsRequest.java:92-98).
Null client idTreated as empty string at parse time (RequestHeader.java:141-145).
Correlation id mismatchCorrelationIdMismatchException on the client (AbstractResponse.java:103-108), usually indicates a protocol desync; the client closes the connection.
Unknown tagged fieldPreserved as RawTaggedField and re-emitted; never an error (MessageDataGenerator.java:533-536).
Unknown error codeMapped to UNKNOWN_SERVER_ERROR with a warning (Errors.java:516-518); treated as non-retriable.
Buffer underrun on readByteBufferAccessor.readArray throws a RuntimeException reporting requested vs available bytes (ByteBufferAccessor.java:58-61).

Invariants & Guarantees

Invariant

An API key id and an error code, once assigned, are permanent. Schemas may add fields and versions but must never change the on-wire meaning of an existing (apiKey, apiVersion) pairing, that is what lets a v4-era client talk to a v4.4 broker and vice-versa.

  • Round-trip fidelity: a flexible message decoded by software that does not know some tags, then re-encoded, preserves those tags byte-for-byte (via _unknownTaggedFields).
  • Self-delimiting: every variable-length element carries an explicit length (fixed or compact), so a reader never needs out-of-band schema knowledge to skip a field it does not recognize.
  • Header derivability: the header version is always recoverable from the first two int16s, guaranteeing the header is parseable before the body version is fully trusted (RequestHeader.java:126-138).
  • Size correctness: the two-pass scheme guarantees the declared frame size exactly equals the bytes written, because both passes share derived values through the cache.

Interactions with Other Subsystems

Design Rationale & Evolution

Kafka's protocol was originally a set of hand-written, hand-versioned serializers, which were error-prone and made adding fields painful. The pivotal evolutions:

KIP-35
Versioned ApiVersions handshake, so clients discover broker capabilities instead of assuming them, the foundation of independent client/broker upgrades.
KIP-482
Flexible versions: compact varint length prefixes and tagged optional fields, letting RPCs add fields without a version bump and making messages smaller. This is what the generator implements at MessageDataGenerator.java:489-541,794-879,985-994.
KIP-511
Client software name/version reporting in ApiVersionsRequest, and the rule that ApiVersionsResponse keeps a v0-shaped header (ApiMessageTypeGenerator.java:362-368).
KIP-516
Topic IDs replacing topic names in hot-path RPCs (ProduceRequest v13, FetchRequest v13), a schema-only change thanks to the generator.
KIP-1242
ApiVersionsResponse v5 cluster/node-id checking and REBOOTSTRAP_REQUIRED.

The generator-driven approach is the through-line: by making the schema the single source of truth, Kafka guarantees that the serializer, the size calculator, the equals/hashCode/toString, the JSON converters, and the published protocol documentation can never drift from one another.

Gotchas & Operational Notes

  • The header version is invisible on the wire. When debugging captures, you must derive it from api key + api version exactly as RequestHeader.parse does, or you will mis-align the body.
  • ClientId is never compact, even in flexible header v2. A hexdump showing a 2-byte length where you expected a varint is correct, not corrupt.
  • An empty trailing 0x00 after a flexible struct is the (empty) tagged-field count, not padding. Do not strip it.
  • Produce's advertised min version differs from its real min version on the broker listener (the librdkafka workaround). Tooling that derives "supported versions" from ApiVersionsResponse will see produce min=0 even though v0–2 are rejected (ApiKeys.java:267-302).
  • Removed APIs keep their ids. Never assume a contiguous, fully-live id space; check hasValidVersion().
  • Records-bearing requests pin their receive buffer. If you see delayed buffer recycling under produce load, that is requiresDelayedAllocation doing its job, not a leak.

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.