krivaltsevich.com Kafka Internals4.4

16 · The Producer Client

Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.

The KafkaProducer is an asynchronous, thread-safe client that turns user records into compact, compressed record batches and ships them to partition leaders. A user thread calling send() never touches a socket: it serializes the key/value, picks a partition, and appends the bytes into a per-partition queue inside the RecordAccumulator, returning a Future immediately. A single background Sender I/O thread drains ready batches, groups them by leader broker, builds ProduceRequests, pushes them through the NetworkClient pipeline, and dispatches retries, backoff, and completion callbacks. Idempotence (default-on since 3.0) stamps each batch with a producer id, epoch, and per-partition sequence number so the broker can deduplicate retries while preserving order. This chapter traces the entire path from send() to broker ack, field by field and lock by lock.

Role & responsibilities

The producer client sits at the boundary between application code and the Kafka cluster. Its responsibilities are:

  • Serialization & interception. Convert typed keys/values to byte[] via configured Serializers, after optionally rewriting the record through a chain of ProducerInterceptors.
  • Partition selection. Honour an explicit partition, else hash the key, else delegate to the adaptive sticky BuiltInPartitioner (or a user Partitioner).
  • Batching & buffering. Coalesce records destined for the same (topic, partition) into ProducerBatch objects backed by a bounded BufferPool, trading a little latency (linger.ms) for throughput.
  • Transmission. Decide which batches are ready, group them by destination node, encode ProduceRequests, and pipeline them with at most max.in.flight.requests.per.connection outstanding per broker.
  • Reliability. Classify responses as success / retriable / fatal; reenqueue retriable batches with exponential backoff; enforce a hard delivery.timeout.ms deadline; and, when idempotence or transactions are on, manage producer ids, epochs, and sequence numbers (see Transactions & EOS).

Where it lives in the code

ClassFileRole
KafkaProducer<K,V>clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.javaPublic entry point; send(), flush(), close(), transaction API; owns and wires every subsystem.
RecordAccumulatorclients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.javaPer-partition deque of batches; append, ready-check, drain, reenqueue, expiry.
ProducerBatchclients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.javaOne MemoryRecords builder plus its callbacks (thunks), state machine, and split logic.
BufferPoolclients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.javaBounded buffer.memory allocator with a free-list of poolable buffers and fair waiter queue.
Senderclients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.javaRunnable run by the I/O thread; the drain → request → response → retry loop.
BuiltInPartitionerclients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.javaAdaptive uniform sticky partitioner (one per topic).
TransactionManagerclients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.javaProducer id/epoch/sequence state for idempotence and transactions.
NetworkClientclients/src/main/java/org/apache/kafka/clients/NetworkClient.javaConnection management and the per-broker in-flight request pipeline.
ProducerConfigclients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.javaConfig keys, defaults, and idempotence post-validation.

Core concepts & terminology

RecordAccumulator
The in-memory staging area. A ConcurrentMap<String, TopicInfo> keyed by topic; each TopicInfo holds a ConcurrentMap<Integer, Deque<ProducerBatch>> keyed by partition (RecordAccumulator.java:1301).
ProducerBatch
A single record batch under construction (one MemoryRecordsBuilder) plus a list of thunks (callback + future pairs). Not thread-safe; guarded by the owning deque's monitor (ProducerBatch.java:58).
Thunk
A (Callback, FutureRecordMetadata) pair kept for every appended record so callbacks can fire and so the batch can be split and resent if rejected (ProducerBatch.java:426).
BufferPool
A buffer.memory-bounded allocator. Buffers of exactly batch.size ("poolable size") are recycled on a free list; larger ones are tracked as raw byte counts (BufferPool.java:45).
Sender / I/O thread
The lone background thread named kafka-producer-network-thread | <clientId> that owns all network interaction (KafkaProducer.java:251, KafkaProducer.java:475).
Sticky partition
The current target partition for keyless records, held in an AtomicReference<StickyPartitionInfo> and rotated after roughly batch.size bytes (BuiltInPartitioner.java:48).
Muting
When max.in.flight=1 (so ordering must be exact even without idempotence) the Sender mutes a partition while a batch is in flight to forbid concurrent draining (Sender.java:420).

The send pipeline, end to end

Everything below the public send(record, callback) happens on the calling thread. The interceptor chain runs first and outside the try/catch, so it sees every record even on later failure (KafkaProducer.java:1092); the heavy lifting is in doSend (KafkaProducer.java:1125).

send(record, cb)user thread · returns a Future immediately
interceptors.onSend · waitOnMetadata · serialize K/Vmay block up to max.block.ms on metadata
partition() · ensureValidRecordSize
RecordAccumulator.appendtopic → TopicInfo → partition → Deque<ProducerBatch> (last open) · BufferPool.allocate may block
if batchIsFull or newBatchCreated: sender.wakeup()then return Future<RecordMetadata>
, hand-off, the lone Sender (I/O) thread drains the accumulator in its runOnce() loop,
ready(metadata)which leader nodes have sendable batches
drain(node, maxSize)Map<nodeId, List<ProducerBatch>>
build ProduceRequest
client.send(req) · client.poll()NetworkClient pipeline
handleProduceResponse
complete / retry / fail · fire thunk callbacks
From send() to broker ack: the user thread fills the accumulator and returns a Future; the single Sender (I/O) thread drains ready batches, builds the request, and completes the callbacks. The RecordAccumulator (cylinder) is the shared hand-off between the two threads.
user-thread stage (send path) accumulator (shared store) Sender / network stage completion / callbacks cylinder = log / in-memory store control / data flow cross-thread hand-off / response

1. Metadata wait

waitOnMetadata returns immediately if the cluster already knows the topic's partition count (and the requested partition is in range); otherwise it adds the topic to the metadata set, calls sender.wakeup() to nudge the I/O thread into issuing a MetadataRequest, and blocks on metadata.awaitUpdate until the topic appears or max.block.ms elapses, at which point it throws TimeoutException (KafkaProducer.java:1244). This is the first of two places send() can block. See Metadata Propagation.

2. Serialization & size validation

Key and value serializers run on the user thread; a ClassCastException is rewrapped as SerializationException (KafkaProducer.java:1150). After serialization, headers are frozen read-only and the upper-bound serialized size is computed via AbstractRecords.estimateSizeInBytesUpperBound. ensureValidRecordSize rejects records larger than max.request.size or buffer.memory with RecordTooLargeException (KafkaProducer.java:1313). The record timestamp defaults to "now" when the user did not supply one (KafkaProducer.java:1176).

3. Partition selection

partition() resolves the target partition in priority order (KafkaProducer.java:1625):

  1. If record.partition() is non-null, use it verbatim.
  2. Else if a custom partitioner.class is configured, call it (a negative result is an IllegalArgumentException).
  3. Else if the serialized key is non-null and partitioner.ignore.keys=false, hash it: Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions (BuiltInPartitioner.java:397). Keyed records are always distributed this way; the adaptive logic never touches them.
  4. Else return RecordMetadata.UNKNOWN_PARTITION (the sentinel -1), deferring the choice to the BuiltInPartitioner inside the accumulator.
Key idea

For keyless records the partition is not chosen in partition(). It is chosen inside RecordAccumulator.append under the deque lock, so the sticky partition can be peeked, validated against concurrent switches, and committed atomically with the byte append. The chosen partition is reported back to the caller through AppendCallbacks.setPartition (RecordAccumulator.java:314).

4. The adaptive sticky partitioner (KIP-794 / KIP-480)

There is one BuiltInPartitioner per topic. It keeps the chosen partition in an AtomicReference<StickyPartitionInfo>; each StickyPartitionInfo carries an immutable partition index and an AtomicInteger producedBytes counter (BuiltInPartitioner.java:381). The append protocol is a careful lock-free peek / lock / re-validate dance (BuiltInPartitioner.java:156):

  1. peekCurrentPartitionInfo returns the current sticky info, creating one via nextPartition on first use (CAS-guarded against racing creators).
  2. The caller locks the partition's batch deque.
  3. isPartitionChanged confirms no other thread switched partitions; if it did, append loops and retries (RecordAccumulator.java:319).
  4. Bytes are appended.
  5. updatePartitionInfo adds the appended bytes and may rotate the sticky partition.

When to switch. The partition rotates once producedBytes ≥ stickyBatchSize (which is batch.size) and switching is enabled, or unconditionally once producedBytes ≥ 2 × stickyBatchSize (BuiltInPartitioner.java:251). Switching is disabled while the deque has an incomplete (non-full) last batch, so the producer does not strand half-filled 4 KB batches when linger.ms is high, the cap at 2× prevents pathological non-switching (RecordAccumulator.java:327, RecordAccumulator.java:355).

Where to switch. nextPartition picks the next partition. With adaptive partitioning disabled (partitioner.adaptive.partitioning.enable=false) or before stats exist, it chooses uniformly at random among partitions that have a leader (BuiltInPartitioner.java:79). With adaptive partitioning enabled, it draws from a cumulative frequency table weighted inversely by queue depth: each partition's weight is maxQueueSize + 1 − queueSize, folded into a running sum, then a uniform random in [0, total) is mapped via binary search (BuiltInPartitioner.java:270, BuiltInPartitioner.java:111). Partitions with shorter queues, i.e. faster brokers, get proportionally more records.

PartitionQueue depthWeight (max+1−depth, max=3)Cumulative sum (CFT)Random r rangeProbability
p00 (idle)440 1 2 34/8
p13 (slowest)1541/8
p21385 6 73/8
Adaptive partition selection: a queue-depth-weighted cumulative frequency table (KIP-794). A longer queue means a slower broker and a smaller weight; a uniform random r in [0, 8) is mapped via binary search, so the idle p0 wins most often.

The stats feeding this table are recomputed by the Sender thread during ready(): partitionReady gathers per-partition deque sizes and calls updatePartitionLoadStats (RecordAccumulator.java:653). If partitioner.availability.timeout.ms > 0, a partition whose leader has not drained data for that long is excluded from the available set entirely, so the partitioner stops sending to a stuck broker (RecordAccumulator.java:729). When partitioner.rack.aware=true, a parallel rack-local CFT is built and preferred when non-empty (BuiltInPartitioner.java:104, BuiltInPartitioner.java:345).

Design rationale

The original sticky partitioner (KIP-480) batched keyless records to one partition per batch but sent uniformly, so a slow broker built a backlog. KIP-794 made the partitioner adaptive: it weights partition choice by observed queue depth and broker latency, producing larger batches to faster brokers and uniform data distribution rather than uniform record-count distribution. Adaptive logic is skipped entirely when a custom partitioner is configured (KafkaProducer.java:447).

RecordAccumulator: batching & the append algorithm

append is the hot path shared by all user threads (RecordAccumulator.java:280). It increments appendsInProgress (so abortIncompleteBatches on close can't miss in-flight appends) and runs a retry loop:

computeIfAbsent topic → TopicInfo (creates per-topic BuiltInPartitioner)
loop:
  if partition == UNKNOWN: peek sticky partition else use explicit partition
  setPartition(callbacks, effectivePartition)
  dq = topicInfo.batches.computeIfAbsent(partition, ArrayDeque::new)
  synchronized (dq):
     if partitionChanged(...) continue          // racey switch; retry
     r = tryAppend(...) on last batch
     if r != null: updatePartitionInfo; return r // fit into existing batch
  // no room: allocate a fresh buffer (may block on BufferPool up to maxTimeToBlock)
  buffer = free.allocate(max(batch.size, estimatedRecordSize), remainingBlockMs)
  synchronized (dq):
     if partitionChanged(...) continue
     r = appendNewBatch(...)                     // create ProducerBatch, append, addLast
     if r.newBatchCreated: buffer = null          // buffer now owned by the batch
     updatePartitionInfo; return r
finally: free.deallocate(buffer); appendsInProgress--

tryAppend calls ProducerBatch.tryAppend, which checks recordsBuilder.hasRoomFor and returns null when the batch is full; on a full batch it eagerly calls closeForRecordAppends() to release compression buffers (RecordAccumulator.java:430, ProducerBatch.java:147). The buffer is sized as max(batch.size, upperBound(record)), so an over-sized record gets its own larger-than-batch.size buffer (RecordAccumulator.java:334). Note the careful buffer ownership transfer: when a new batch is created the local buffer is nulled so the finally block does not return it to the pool.

What "ready" and "full" mean

A batch counts as full when the deque has more than one batch (so all but the last are sealed) or the last batch's builder reports isFull() (RecordAccumulator.java:719). A node is ready to send if it has at least one non-muted, non-backing-off partition where any of these hold (RecordAccumulator.java:613):

  • the batch is full, or
  • it has lingered ≥ linger.ms (or, when retrying, ≥ the exponential retry backoff), or
  • the BufferPool is exhausted (threads are blocked waiting), or
  • the accumulator is closed, or a flush() is in progress, or the transaction is completing.

The RecordAppendResult returned to doSend carries batchIsFull and newBatchCreated; if either is true, doSend wakes the Sender so it does not wait out a full poll timeout (KafkaProducer.java:1192).

BufferPool & back-pressure

The pool is bounded by buffer.memory (default 32 MiB) and guarded by a single ReentrantLock (BufferPool.java:70). Free memory is the sum of nonPooledAvailableMemory plus the recycled free-list count times poolableSize (which equals batch.size). allocate(size, maxTimeToBlockMs):

  • returns a recycled buffer instantly if size == poolableSize and the free list is non-empty;
  • otherwise, if enough memory is on hand, reserves it (freeing pooled buffers as needed via freeUp);
  • otherwise blocks the caller on a per-request Condition appended to a FIFO waiters deque, accumulating memory incrementally as buffers are returned, until it has enough or maxTimeToBlockMs elapses, on timeout it throws BufferExhaustedException (BufferPool.java:159).

The maxTimeToBlockMs passed in is the remaining max.block.ms budget after the metadata wait already consumed part of it (KafkaProducer.java:1146). This is the second place send() can block. The waiter queue is strictly FIFO and memory is handed to the longest-waiting thread first, preventing starvation when a large request must wait for several small buffers to return (BufferPool.java:38). deallocate returns exact-poolableSize buffers to the free list and signals the head waiter (BufferPool.java:260).

Invariant

availableMemory() = nonPooledAvailableMemory + free.size() × poolableSize ≤ totalMemory at all times. Memory reserved by a thread that ultimately throws is returned in the allocate finally block (BufferPool.java:185), so a timed-out or interrupted allocation never leaks pool memory.

ProducerBatch: structure, futures, and the state machine

A ProducerBatch wraps one MemoryRecordsBuilder (the actual on-wire v2 batch, see Record Format & Batches) and a List<Thunk>. Each appended record produces a FutureRecordMetadata chained to the batch's shared ProduceRequestResult; that future is what send() hands back (ProducerBatch.java:147). Key mutable fields: recordCount, maxRecordSize, an AtomicInteger attempts, and an AtomicReference<FinalState> (ProducerBatch.java:64).

batch created (no final state) Open done(success) → set baseOffset, logAppendTime SUCCEEDED terminal, any further change ⇒ IllegalStateException
Failure branches (off the success spine, all converge on SUCCEEDED only via a late broker write):
· Open → FAILED, completeExceptionally (e.g. delivery.timeout expiry)
· Open → ABORTED, abort(reason)
· FAILED → FAILED, ignored (idempotent re-fail)
· FAILED → SUCCEEDED, logged: late broker success after local expiry
· ABORTED → SUCCEEDED, logged: late broker success after abort
ProducerBatch final-state machine. The first compareAndSet on finalState wins; done() tolerates the double-completion races (FAILED→FAILED is ignored, FAILED/ABORTED→SUCCEEDED is logged) but throws on any transition out of SUCCEEDED, which is terminal (ProducerBatch.java:262).
pill = batch final-state (Open / FAILED / ABORTED) accent = SUCCEEDED (committed) ◎ = initial (batch created) · ◎ end = terminal (SUCCEEDED) transition (label = triggering call) A → B = off-spine transition (see notes)

The double-completion tolerance matters: an in-flight batch may expire on delivery.timeout.ms (set FAILED) yet still have been written by the broker (a later SUCCEEDED attempt). The CAS-based done() ignores FAILED→FAILED and ABORTED→FAILED, logs the rare FAILED/ABORTED→SUCCEEDED, and throws only on SUCCEEDED→anything (ProducerBatch.java:280). completeFutureAndFireCallbacks sets the future, then invokes every thunk's callback in append order, swallowing exceptions thrown by user callbacks so one bad callback cannot poison the rest (ProducerBatch.java:297).

Invariant

Callbacks for records sent to the same partition fire in send order. This holds because records keep their append order within a batch (thunks are a List), and batches for a partition are completed in deque order; reenqueued retries are reinserted in sequence order (RecordAccumulator.java:557).

The Sender thread: drain, encode, transmit, complete

The I/O thread runs Sender.run, which loops runOnce() until running is cleared, then drains residual data and closes the client (Sender.java:240). runOnce first services the TransactionManager (coordinator discovery, InitProducerId, AddPartitions, commit/abort, idempotent epoch bumps), then calls sendProducerData followed by client.poll (Sender.java:310).

sendProducerData

One iteration (Sender.java:380):

  1. accumulator.ready(metadataSnapshot, now) → the set of ready leader nodes, the next-check delay, and topics with unknown leaders (which trigger a metadata refresh).
  2. For each ready node, gate on client.ready(node, now); nodes whose connection isn't established are dropped this round, and node latency stats are nudged.
  3. accumulator.drain(metadataSnapshot, readyNodes, maxRequestSize, now)Map<nodeId, List<ProducerBatch>>.
  4. addToInflightBatches records them for delivery-timeout tracking; if guaranteeMessageOrder (i.e. max.in.flight==1), every drained partition is muted.
  5. Expire batches that blew delivery.timeout.ms (both queued and in-flight) via failExpiredBatches.
  6. sendProduceRequests encodes and sends one ProduceRequest per node.

drainBatchesForOneNode

For a node, the accumulator walks that node's partitions starting at a per-node round-robin drainIndex (so no single topic-partition starves the others), and for each it pulls the head batch while (RecordAccumulator.java:860): the partition isn't muted; the batch isn't in retry backoff; adding it won't exceed max.request.size (unless the batch is the first and alone); and idempotence ordering permits it (shouldStopDrainBatchesForPartition). When idempotence is on and the drained batch has no sequence yet, it is stamped here: batch.setProducerState(producerIdAndEpoch, sequenceNumber(tp), isTransactional), the per-partition sequence is advanced by recordCount, and the batch is registered as in-flight (RecordAccumulator.java:911). The batch is then close()d (header written, compression finalized) outside the lock because closing is expensive.

Building and sending the ProduceRequest

sendProduceRequest assembles a ProduceRequestData with the configured acks and request.timeout.ms, grouping batches by topic (using topic id where available; modern ProduceRequest v13 drops topic names in favour of ids) and setting each batch's MemoryRecords as the partition payload (Sender.java:896). For transactional producers it stamps the transactionalId and selects the request version. Each batch is marked setInflight(true). The client request is created with expectResponse = acks != 0, for acks=0 there is no response and batches complete as soon as the bytes are written (Sender.java:945, Sender.java:653).

Handling the response

handleProduceResponse demultiplexes per-partition results, resolving the TopicPartition by topic id (v13) or name (older) and dispatching each to completeBatch (Sender.java:580). Disconnects map to NETWORK_EXCEPTION, timeouts to REQUEST_TIMED_OUT, version mismatches to UNSUPPORTED_VERSION. The response may carry updated leader id/epoch which is fed back into the metadata cache for NOT_LEADER_OR_FOLLOWER / FENCED_LEADER_EPOCH errors (Sender.java:638).

completeBatch: the success / split / retry / fail decision

completeBatch · setInflight(false)
MESSAGE_TOO_LARGE and recordCount ≥ 2 and (v2 or compressed)?
splitAndReenqueuesplit into ≤ batch.size pieces · NO attempt charged
error ≠ NONE?
complete successbaseOffset, logAppendTime
canRetry?
reenqueue(batch)retriable
DUPLICATE_SEQUENCE_NUMBER?
complete successoffset unknown
failBatchfatal for this batch
reenqueue · duplicate · failBatch then check — InvalidMetadataException? → metadata.requestUpdate()
if guaranteeMessageOrder: unmutePartitionend of completion (all paths converge here)
The per-batch completion decision. The split path charges no retry attempt; the retry/duplicate/fail paths additionally call metadata.requestUpdate() when the error is an InvalidMetadataException; every path ends by un-muting the partition when guaranteeMessageOrder is set (Sender.java:671).
Sender / completion step success outcome / split retry path fatal / failure path rounded = decision (yes / no branch) control flow reenqueue (async retry)

canRetry is the retriability gate (Sender.java:876): the batch must not have reached delivery.timeout.ms, must have attempts() < retries, must not be already done, and the error must be a RetriableException, or, when idempotence is on, must satisfy TransactionManager.canRetry, which additionally retries certain UNKNOWN_PRODUCER_ID and OUT_OF_ORDER_SEQUENCE_NUMBER cases and may request an epoch bump (TransactionManager.java:1043).

Ordering & idempotence

Idempotence is on by default (enable.idempotence=true, ProducerConfig.java:541). When enabled, the TransactionManager is instantiated even with no transactional.id ("an idempotent producer"; KafkaProducer.java:612). On first send the producer fetches a producer id and epoch via InitProducerId (TransactionManager.java:691), then stamps each batch with (producerId, epoch, baseSequence) at drain time. The broker retains the last five batches per producer-partition and rejects duplicates (a retried batch with an already-committed sequence) and gaps (OUT_OF_ORDER_SEQUENCE_NUMBER), giving exactly-once delivery within a producer session.

Why max.in.flight ≤ 5 preserves ordering

With idempotence, the broker enforces strictly increasing per-partition sequences, so even with up to five concurrent in-flight requests it rejects anything out of order; the client reorders and resends from its queue. Therefore ordering holds for any max.in.flight ∈ [1,5]. ProducerConfig hard-caps idempotent producers at 5 (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE); a larger value is a ConfigException (ProducerConfig.java:639). Without idempotence, only max.in.flight=1 guarantees order (enforced by partition muting); larger values can reorder on retry.

Reorder-on-retry handling. Once a batch is retried, the accumulator must keep the partition's queue in sequence order. reenqueue adds the batch to the front via insertInSequenceOrder, which linearly scans the in-flight batches and inserts so base sequences stay ascending (RecordAccumulator.java:557). To avoid writing a new-epoch sequence-0 batch ahead of older batches, shouldStopDrainBatchesForPartition stops draining new (sequence-less) batches while the partition has in-flight batches with a stale producer id/epoch, has unresolved sequences, or has an earlier in-flight batch being retried, effectively collapsing to one in-flight request for that partition during recovery (RecordAccumulator.java:823).

Epoch bumps. On a fatal sequence gap an idempotent (non-transactional) producer bumps its epoch locally and restarts sequences at 0 for the affected partition. An OutOfOrderSequenceException from the broker calls requestIdempotentEpochBumpForPartition; on the next runOnce, bumpIdempotentEpochAndResetIdIfNeeded increments the epoch (or, at Short.MAX_VALUE, resets to a brand-new producer id) and rewrites in-flight sequences from the beginning (TransactionManager.java:827, TransactionManager.java:673). An expired in-flight batch also marks the partition's sequence unresolved so no new batch is drained until the gap resolves (Sender.java:373).

acks, timeouts & retries

acks=0
Fire-and-forget. No response is requested; the batch completes when written to the socket and RecordMetadata.offset() is -1. Incompatible with idempotence.
acks=1
Leader-only durability. The leader acks after writing to its log, before followers replicate. Incompatible with idempotence.
acks=all (-1)
The leader acks only after the in-sync replica set has the record (subject to min.insync.replicas). The only value compatible with idempotence; it is the default (ProducerConfig.java:403). See Replication, ISR & High Watermark.

Three timeouts compose (KafkaProducer.java:589): request.timeout.ms bounds one network round trip; retries bounds the number of attempts; and delivery.timeout.ms is the hard wall-clock deadline from when the record enters the accumulator until success or failure is reported, spanning lingering, backoff, and all retries. The constructor enforces delivery.timeout.ms ≥ linger.ms + request.timeout.ms, raising ConfigException if the user set an inconsistent value or silently bumping the default otherwise. Because retries defaults to Integer.MAX_VALUE, in practice delivery.timeout.ms (default 120 s) is what actually terminates retrying, the docs and code both steer users to tune it instead of retries (ProducerConfig.java:402).

Error classification

ClassExamplesProducer action
Retriable (transient)NETWORK_EXCEPTION, REQUEST_TIMED_OUT, NOT_LEADER_OR_FOLLOWER, NOT_ENOUGH_REPLICASReenqueue with exponential backoff until retries or delivery.timeout.ms exhausted (Sender.java:692).
Too largeMESSAGE_TOO_LARGE on a multi-record batchSplit into ≤ batch.size batches and resend; does not consume a retry attempt (Sender.java:676, ProducerBatch.java:326).
Idempotence-recoverableOUT_OF_ORDER_SEQUENCE_NUMBER, UNKNOWN_PRODUCER_IDRetry, possibly bumping epoch / resetting sequences (TransactionManager.java:1043).
DuplicateDUPLICATE_SEQUENCE_NUMBERTreated as success but with unknown offset/timestamp (Sender.java:700).
FatalTOPIC_AUTHORIZATION_FAILED, CLUSTER_AUTHORIZATION_FAILED, RecordTooLargeException (single record), UnsupportedVersionExceptionFail the batch's futures/callbacks; for txn producers transition to an abortable/fatal error (Sender.java:770).

Compression

Compression is configured once via compression.type (default none; gzip/snappy/lz4/zstd supported, with optional per-codec level) and applied at the batch level by the MemoryRecordsBuilder as records are appended (KafkaProducer.java:562, RecordAccumulator.java:409). The producer maintains a per-(topic, codec) CompressionRatioEstimator so it can size buffers realistically; on close() each batch updates the estimate (ProducerBatch.java:518). Because compression operates on whole batches, larger batches (higher linger.ms/batch.size) compress better. When a compressed batch nonetheless exceeds max.request.size on the broker, the split path resets the estimate conservatively to avoid repeated splits (RecordAccumulator.java:516).

flush() and close()

flush() makes every buffered record immediately sendable and blocks until all in-flight requests complete. It increments flushesInProgress (which makes ready() return everything as sendable), wakes the Sender, then awaits each incomplete batch's ProduceRequestResult, using awaitAllDependents() so split batches are also awaited (KafkaProducer.java:1366, RecordAccumulator.java:1106). Calling flush() from inside a callback throws, because the callback runs on the I/O thread and would deadlock waiting on itself (KafkaProducer.java:1368).

close(timeout) closes the accumulator first (so no new appends are accepted), clears running, and joins the I/O thread, which drains residual batches and completes in-flight requests up to the timeout; if the deadline passes it forceClose()s, failing all incomplete batches via abortIncompleteBatches (KafkaProducer.java:1546, Sender.java:295). Called from a callback, close() degrades to a non-blocking close(0) to avoid a self-join (KafkaProducer.java:1554).

Concurrency & threading model

StateGuarded byAccessed by
Per-partition Deque<ProducerBatch>the deque object's monitor (synchronized(dq))user threads (append) and Sender (ready/drain)
topicInfoMap, TopicInfo.batchesCopyOnWriteMap (lock-free reads)both
Sticky partitionAtomicReference + deque lock for the peek/validate/update protocolboth
BufferPool free list / waitersone ReentrantLock + per-waiter Conditionuser threads (allocate) and Sender (deallocate)
ProducerBatch.finalState, attemptsAtomicReference / AtomicIntegerSender; user threads read via futures
muted, nodesDrainIndex, inFlightBatchesunsynchronized, Sender-thread-onlySender only (RecordAccumulator.java:90, Sender.java:126)
Transaction/idempotence statesynchronized methods on TransactionManagerSender and (for the public txn API) user threads
appendsInProgress, flushesInProgressAtomicIntegerboth

The design keeps user threads off the network entirely and confines all socket I/O, retry bookkeeping, and the in-flight pipeline to one Sender thread, so the only cross-thread contention is the short critical section around each partition's deque and the BufferPool lock. The hot ready()/partitionReady() loop is deliberately minimal under the deque lock to avoid contending with appends at large partition counts (a documented hot path; RecordAccumulator.java:697).

The NetworkClient pipeline

The Sender sends through a NetworkClient configured with max.in.flight.requests.per.connection. canSendRequest returns true only when the connection is ready, the channel is writable, and InFlightRequests.canSendMore(node) holds; the latter is true when the node has no outstanding requests, or the oldest send has completed and the queue size is below the max (NetworkClient.java:544, InFlightRequests.java:96). This is the mechanism that bounds pipelining per broker. Throttling from quotas is observed as a throttleDelayMs on the connection, which the Sender folds into its poll timeout. The wire encoding of ProduceRequest/ProduceResponse is covered in Wire Protocol & RPC Framework.

Failure modes, edge cases & recovery

  • Metadata never arrives. send() blocks up to max.block.ms then throws TimeoutException; if a partial wait already elapsed, the BufferPool gets the remainder (KafkaProducer.java:1146).
  • Buffer exhausted. allocate blocks up to the remaining max.block.ms, then throws BufferExhaustedException (a subclass of TimeoutException), incrementing buffer-exhausted-records (BufferPool.java:159).
  • Delivery timeout. A batch that exceeds delivery.timeout.ms in queue or in flight is failed with TimeoutException regardless of remaining retries (Sender.java:362). With idempotence this triggers a sequence-unresolved marker and eventual epoch bump.
  • Leader change. NOT_LEADER_OR_FOLLOWER/FENCED_LEADER_EPOCH are retriable; the response's new leader info updates metadata, and the batch tracks the new leader epoch so backoff is skipped when retrying to a fresh leader (ProducerBatch.java:116, RecordAccumulator.java:804).
  • Forced close. abortIncompleteBatches loops until appendsInProgress == 0 so no batch from a racing appender is missed, then aborts all incomplete batches (RecordAccumulator.java:1135).
  • Batch still in network use (KAFKA-19012). A batch sent to the NetworkClient may have its pooled buffer in use; deallocation is deferred (maybeRemoveAndDeallocateBatchLater) until the request truly completes, to prevent reusing a buffer that the selector is still reading (Sender.java:854, RecordAccumulator.java:1168).
  • Authorization mid-transaction. TransactionalIdAuthorizationException/ClusterAuthorizationException fail pending requests and transition the manager to UNINITIALIZED so the user need not reconstruct the producer (Sender.java:351).

Invariants & guarantees

Invariant

A user thread never performs network I/O. send() only serializes, partitions, and appends; the Sender thread alone talks to the cluster. The two bounded waits a user thread can experience are metadata refresh and buffer allocation, both capped by max.block.ms.

Invariant

With idempotence on and acks=all, retries never produce duplicates and never reorder records within a partition, for any max.in.flight ∈ [1,5], within a single producer session. Across sessions, only a transactional.id provides recovery (see Transactions & EOS).

Invariant

Every accepted record's future and callback fires exactly once, on success with a valid offset, or on failure with an exception, bounded by delivery.timeout.ms.

Configuration reference

KeyDefaultEffect
acksallDurability level; all required for idempotence (ProducerConfig.java:403).
enable.idempotencetrueDedup + ordered retries via producer id/epoch/sequence (ProducerConfig.java:541).
batch.size16384Target batch size and BufferPool poolable size; max(1, …) applied (ProducerConfig.java:413, KafkaProducer.java:458).
linger.ms5Max time to wait filling a batch before it is ready (ProducerConfig.java:418).
buffer.memory33554432 (32 MiB)Total accumulator memory; over-budget sends block (ProducerConfig.java:401).
max.block.ms60000Combined cap on metadata wait + buffer allocation in send() (ProducerConfig.java:449).
max.in.flight.requests.per.connection5Pipeline depth per broker; capped at 5 with idempotence (ProducerConfig.java:487).
retries2147483647Max attempts; prefer delivery.timeout.ms (ProducerConfig.java:402).
delivery.timeout.ms120000Hard deadline for success/failure; must be ≥ linger.ms + request.timeout.ms (ProducerConfig.java:419).
request.timeout.ms30000Per-round-trip timeout (ProducerConfig.java:455).
retry.backoff.ms100 (common default)Base of exponential reenqueue backoff (ProducerConfig.java:432).
compression.typenoneBatch-level codec: none/gzip/snappy/lz4/zstd (ProducerConfig.java:409).
max.request.size1048576 (1 MiB)Rejects over-sized records; bounds drained request size (ProducerConfig.java:424).
partitioner.classnullCustom partitioner; null ⇒ built-in (ProducerConfig.java:517).
partitioner.adaptive.partitioning.enabletrueWeight keyless partition choice by broker load (ProducerConfig.java:414).
partitioner.ignore.keysfalseIf true, never use the key for partitioning (ProducerConfig.java:416).
partitioner.availability.timeout.ms0 (off)Exclude partitions whose leader hasn't drained for this long (ProducerConfig.java:415).
transactional.idnullEnables transactions and cross-session recovery; forces idempotence on (ProducerConfig.java:551).
transaction.timeout.ms60000Broker-side max open-transaction time (ProducerConfig.java:546).

Interactions with other subsystems

Design rationale & evolution

Design rationale

Idempotence and acks=all became the defaults in 3.0 under KIP-679 to give the strongest delivery guarantee out of the box. ProducerConfig.postProcessAndValidateIdempotenceConfigs silently disables idempotence if the user explicitly set a weaker acks or retries=0 (to preserve old behaviour), but throws if the user explicitly asked for idempotence with conflicting settings (ProducerConfig.java:610). The original idempotent/transactional producer landed under KIP-98.

Design rationale

The partitioner evolved from round-robin → sticky (KIP-480, better batching) → adaptive uniform sticky (KIP-794, load- and latency-aware). The built-in partitioner is intentionally not a Partitioner implementation, it lives inside RecordAccumulator so it can observe per-partition queue depth, which an external Partitioner cannot see (BuiltInPartitioner.java:34).

Gotchas & operational notes

Gotcha

User callbacks run on the single Sender I/O thread. A slow or blocking callback stalls delivery for every partition. Offload heavy work to your own executor (KafkaProducer.java:1075).

Gotcha

batch.size=0 does not disable batching; the constructor coerces it to 1, so you still get a 1-byte-target batch per record rather than zero batching (KafkaProducer.java:456).

Gotcha

The adaptive partitioner needs a batch scheduled for every partition before it engages; until then it falls back to uniform selection, and it also bails to uniform when all queue depths are equal (RecordAccumulator.java:661, BuiltInPartitioner.java:322).

Caution

Application-level resends defeat idempotence, the broker can only deduplicate the producer's own retries, not a fresh record with a new sequence. If a send() ultimately fails even with infinite retries (e.g. a delivery-timeout expiry), the safe recovery is to close the producer and inspect the last produced offset rather than blindly resend (KafkaProducer.java:177).

Note

A send() that fails synchronously with an ApiException still fires the user callback and interceptor onSendError, and returns a pre-completed FutureFailure rather than throwing, only non-API exceptions (e.g. SerializationException, InterruptException) propagate to the caller (KafkaProducer.java:1200).

krivaltsevich.com · Part of Apache Kafka Internals · derived from Apache Kafka 4.4 source · GitHub · MIT-licensed.

Apache Kafka® is a registered trademark of the Apache Software Foundation. This is an independent, unofficial guide, not affiliated with or endorsed by the ASF.