16 · The Producer Client
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
The KafkaProducer is an asynchronous, thread-safe client that turns user records into compact, compressed record batches and ships them to partition leaders. A user thread calling send() never touches a socket: it serializes the key/value, picks a partition, and appends the bytes into a per-partition queue inside the RecordAccumulator, returning a Future immediately. A single background Sender I/O thread drains ready batches, groups them by leader broker, builds ProduceRequests, pushes them through the NetworkClient pipeline, and dispatches retries, backoff, and completion callbacks. Idempotence (default-on since 3.0) stamps each batch with a producer id, epoch, and per-partition sequence number so the broker can deduplicate retries while preserving order. This chapter traces the entire path from send() to broker ack, field by field and lock by lock.
Role & responsibilities
The producer client sits at the boundary between application code and the Kafka cluster. Its responsibilities are:
- Serialization & interception. Convert typed keys/values to
byte[]via configuredSerializers, after optionally rewriting the record through a chain ofProducerInterceptors. - Partition selection. Honour an explicit partition, else hash the key, else delegate to the adaptive sticky
BuiltInPartitioner(or a userPartitioner). - Batching & buffering. Coalesce records destined for the same
(topic, partition)intoProducerBatchobjects backed by a boundedBufferPool, trading a little latency (linger.ms) for throughput. - Transmission. Decide which batches are ready, group them by destination node, encode
ProduceRequests, and pipeline them with at mostmax.in.flight.requests.per.connectionoutstanding per broker. - Reliability. Classify responses as success / retriable / fatal; reenqueue retriable batches with exponential backoff; enforce a hard
delivery.timeout.msdeadline; and, when idempotence or transactions are on, manage producer ids, epochs, and sequence numbers (see Transactions & EOS).
Where it lives in the code
| Class | File | Role |
|---|---|---|
KafkaProducer<K,V> | clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java | Public entry point; send(), flush(), close(), transaction API; owns and wires every subsystem. |
RecordAccumulator | clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java | Per-partition deque of batches; append, ready-check, drain, reenqueue, expiry. |
ProducerBatch | clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java | One MemoryRecords builder plus its callbacks (thunks), state machine, and split logic. |
BufferPool | clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java | Bounded buffer.memory allocator with a free-list of poolable buffers and fair waiter queue. |
Sender | clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java | Runnable run by the I/O thread; the drain → request → response → retry loop. |
BuiltInPartitioner | clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java | Adaptive uniform sticky partitioner (one per topic). |
TransactionManager | clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java | Producer id/epoch/sequence state for idempotence and transactions. |
NetworkClient | clients/src/main/java/org/apache/kafka/clients/NetworkClient.java | Connection management and the per-broker in-flight request pipeline. |
ProducerConfig | clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java | Config keys, defaults, and idempotence post-validation. |
Core concepts & terminology
- RecordAccumulator
- The in-memory staging area. A
ConcurrentMap<String, TopicInfo>keyed by topic; eachTopicInfoholds aConcurrentMap<Integer, Deque<ProducerBatch>>keyed by partition (RecordAccumulator.java:1301). - ProducerBatch
- A single record batch under construction (one
MemoryRecordsBuilder) plus a list of thunks (callback + future pairs). Not thread-safe; guarded by the owning deque's monitor (ProducerBatch.java:58). - Thunk
- A
(Callback, FutureRecordMetadata)pair kept for every appended record so callbacks can fire and so the batch can be split and resent if rejected (ProducerBatch.java:426). - BufferPool
- A
buffer.memory-bounded allocator. Buffers of exactlybatch.size("poolable size") are recycled on a free list; larger ones are tracked as raw byte counts (BufferPool.java:45). - Sender / I/O thread
- The lone background thread named
kafka-producer-network-thread | <clientId>that owns all network interaction (KafkaProducer.java:251, KafkaProducer.java:475). - Sticky partition
- The current target partition for keyless records, held in an
AtomicReference<StickyPartitionInfo>and rotated after roughlybatch.sizebytes (BuiltInPartitioner.java:48). - Muting
- When
max.in.flight=1(so ordering must be exact even without idempotence) the Sender mutes a partition while a batch is in flight to forbid concurrent draining (Sender.java:420).
The send pipeline, end to end
Everything below the public send(record, callback) happens on the calling thread. The interceptor chain runs first and outside the try/catch, so it sees every record even on later failure (KafkaProducer.java:1092); the heavy lifting is in doSend (KafkaProducer.java:1125).
send() to broker ack: the user thread fills the accumulator and returns a Future; the single Sender (I/O) thread drains ready batches, builds the request, and completes the callbacks. The RecordAccumulator (cylinder) is the shared hand-off between the two threads.send path)
accumulator (shared store)
Sender / network stage
completion / callbacks
cylinder = log / in-memory store
control / data flow
cross-thread hand-off / response
1. Metadata wait
waitOnMetadata returns immediately if the cluster already knows the topic's partition count (and the requested partition is in range); otherwise it adds the topic to the metadata set, calls sender.wakeup() to nudge the I/O thread into issuing a MetadataRequest, and blocks on metadata.awaitUpdate until the topic appears or max.block.ms elapses, at which point it throws TimeoutException (KafkaProducer.java:1244). This is the first of two places send() can block. See Metadata Propagation.
2. Serialization & size validation
Key and value serializers run on the user thread; a ClassCastException is rewrapped as SerializationException (KafkaProducer.java:1150). After serialization, headers are frozen read-only and the upper-bound serialized size is computed via AbstractRecords.estimateSizeInBytesUpperBound. ensureValidRecordSize rejects records larger than max.request.size or buffer.memory with RecordTooLargeException (KafkaProducer.java:1313). The record timestamp defaults to "now" when the user did not supply one (KafkaProducer.java:1176).
3. Partition selection
partition() resolves the target partition in priority order (KafkaProducer.java:1625):
- If
record.partition()is non-null, use it verbatim. - Else if a custom
partitioner.classis configured, call it (a negative result is anIllegalArgumentException). - Else if the serialized key is non-null and
partitioner.ignore.keys=false, hash it:Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions(BuiltInPartitioner.java:397). Keyed records are always distributed this way; the adaptive logic never touches them. - Else return
RecordMetadata.UNKNOWN_PARTITION(the sentinel-1), deferring the choice to theBuiltInPartitionerinside the accumulator.
For keyless records the partition is not chosen in partition(). It is chosen inside RecordAccumulator.append under the deque lock, so the sticky partition can be peeked, validated against concurrent switches, and committed atomically with the byte append. The chosen partition is reported back to the caller through AppendCallbacks.setPartition (RecordAccumulator.java:314).
4. The adaptive sticky partitioner (KIP-794 / KIP-480)
There is one BuiltInPartitioner per topic. It keeps the chosen partition in an AtomicReference<StickyPartitionInfo>; each StickyPartitionInfo carries an immutable partition index and an AtomicInteger producedBytes counter (BuiltInPartitioner.java:381). The append protocol is a careful lock-free peek / lock / re-validate dance (BuiltInPartitioner.java:156):
peekCurrentPartitionInforeturns the current sticky info, creating one vianextPartitionon first use (CAS-guarded against racing creators).- The caller locks the partition's batch deque.
isPartitionChangedconfirms no other thread switched partitions; if it did,appendloops and retries (RecordAccumulator.java:319).- Bytes are appended.
updatePartitionInfoadds the appended bytes and may rotate the sticky partition.
When to switch. The partition rotates once producedBytes ≥ stickyBatchSize (which is batch.size) and switching is enabled, or unconditionally once producedBytes ≥ 2 × stickyBatchSize (BuiltInPartitioner.java:251). Switching is disabled while the deque has an incomplete (non-full) last batch, so the producer does not strand half-filled 4 KB batches when linger.ms is high, the cap at 2× prevents pathological non-switching (RecordAccumulator.java:327, RecordAccumulator.java:355).
Where to switch. nextPartition picks the next partition. With adaptive partitioning disabled (partitioner.adaptive.partitioning.enable=false) or before stats exist, it chooses uniformly at random among partitions that have a leader (BuiltInPartitioner.java:79). With adaptive partitioning enabled, it draws from a cumulative frequency table weighted inversely by queue depth: each partition's weight is maxQueueSize + 1 − queueSize, folded into a running sum, then a uniform random in [0, total) is mapped via binary search (BuiltInPartitioner.java:270, BuiltInPartitioner.java:111). Partitions with shorter queues, i.e. faster brokers, get proportionally more records.
| Partition | Queue depth | Weight (max+1−depth, max=3) | Cumulative sum (CFT) | Random r range | Probability |
|---|---|---|---|---|---|
p0 | 0 (idle) | 4 | 4 | 0 1 2 3 | 4/8 |
p1 | 3 (slowest) | 1 | 5 | 4 | 1/8 |
p2 | 1 | 3 | 8 | 5 6 7 | 3/8 |
r in [0, 8) is mapped via binary search, so the idle p0 wins most often.The stats feeding this table are recomputed by the Sender thread during ready(): partitionReady gathers per-partition deque sizes and calls updatePartitionLoadStats (RecordAccumulator.java:653). If partitioner.availability.timeout.ms > 0, a partition whose leader has not drained data for that long is excluded from the available set entirely, so the partitioner stops sending to a stuck broker (RecordAccumulator.java:729). When partitioner.rack.aware=true, a parallel rack-local CFT is built and preferred when non-empty (BuiltInPartitioner.java:104, BuiltInPartitioner.java:345).
The original sticky partitioner (KIP-480) batched keyless records to one partition per batch but sent uniformly, so a slow broker built a backlog. KIP-794 made the partitioner adaptive: it weights partition choice by observed queue depth and broker latency, producing larger batches to faster brokers and uniform data distribution rather than uniform record-count distribution. Adaptive logic is skipped entirely when a custom partitioner is configured (KafkaProducer.java:447).
RecordAccumulator: batching & the append algorithm
append is the hot path shared by all user threads (RecordAccumulator.java:280). It increments appendsInProgress (so abortIncompleteBatches on close can't miss in-flight appends) and runs a retry loop:
computeIfAbsent topic → TopicInfo (creates per-topic BuiltInPartitioner)
loop:
if partition == UNKNOWN: peek sticky partition else use explicit partition
setPartition(callbacks, effectivePartition)
dq = topicInfo.batches.computeIfAbsent(partition, ArrayDeque::new)
synchronized (dq):
if partitionChanged(...) continue // racey switch; retry
r = tryAppend(...) on last batch
if r != null: updatePartitionInfo; return r // fit into existing batch
// no room: allocate a fresh buffer (may block on BufferPool up to maxTimeToBlock)
buffer = free.allocate(max(batch.size, estimatedRecordSize), remainingBlockMs)
synchronized (dq):
if partitionChanged(...) continue
r = appendNewBatch(...) // create ProducerBatch, append, addLast
if r.newBatchCreated: buffer = null // buffer now owned by the batch
updatePartitionInfo; return r
finally: free.deallocate(buffer); appendsInProgress--
tryAppend calls ProducerBatch.tryAppend, which checks recordsBuilder.hasRoomFor and returns null when the batch is full; on a full batch it eagerly calls closeForRecordAppends() to release compression buffers (RecordAccumulator.java:430, ProducerBatch.java:147). The buffer is sized as max(batch.size, upperBound(record)), so an over-sized record gets its own larger-than-batch.size buffer (RecordAccumulator.java:334). Note the careful buffer ownership transfer: when a new batch is created the local buffer is nulled so the finally block does not return it to the pool.
What "ready" and "full" mean
A batch counts as full when the deque has more than one batch (so all but the last are sealed) or the last batch's builder reports isFull() (RecordAccumulator.java:719). A node is ready to send if it has at least one non-muted, non-backing-off partition where any of these hold (RecordAccumulator.java:613):
- the batch is full, or
- it has lingered ≥
linger.ms(or, when retrying, ≥ the exponential retry backoff), or - the BufferPool is exhausted (threads are blocked waiting), or
- the accumulator is closed, or a
flush()is in progress, or the transaction is completing.
The RecordAppendResult returned to doSend carries batchIsFull and newBatchCreated; if either is true, doSend wakes the Sender so it does not wait out a full poll timeout (KafkaProducer.java:1192).
BufferPool & back-pressure
The pool is bounded by buffer.memory (default 32 MiB) and guarded by a single ReentrantLock (BufferPool.java:70). Free memory is the sum of nonPooledAvailableMemory plus the recycled free-list count times poolableSize (which equals batch.size). allocate(size, maxTimeToBlockMs):
- returns a recycled buffer instantly if
size == poolableSizeand the free list is non-empty; - otherwise, if enough memory is on hand, reserves it (freeing pooled buffers as needed via
freeUp); - otherwise blocks the caller on a per-request
Conditionappended to a FIFOwaitersdeque, accumulating memory incrementally as buffers are returned, until it has enough ormaxTimeToBlockMselapses, on timeout it throwsBufferExhaustedException(BufferPool.java:159).
The maxTimeToBlockMs passed in is the remaining max.block.ms budget after the metadata wait already consumed part of it (KafkaProducer.java:1146). This is the second place send() can block. The waiter queue is strictly FIFO and memory is handed to the longest-waiting thread first, preventing starvation when a large request must wait for several small buffers to return (BufferPool.java:38). deallocate returns exact-poolableSize buffers to the free list and signals the head waiter (BufferPool.java:260).
availableMemory() = nonPooledAvailableMemory + free.size() × poolableSize ≤ totalMemory at all times. Memory reserved by a thread that ultimately throws is returned in the allocate finally block (BufferPool.java:185), so a timed-out or interrupted allocation never leaks pool memory.
ProducerBatch: structure, futures, and the state machine
A ProducerBatch wraps one MemoryRecordsBuilder (the actual on-wire v2 batch, see Record Format & Batches) and a List<Thunk>. Each appended record produces a FutureRecordMetadata chained to the batch's shared ProduceRequestResult; that future is what send() hands back (ProducerBatch.java:147). Key mutable fields: recordCount, maxRecordSize, an AtomicInteger attempts, and an AtomicReference<FinalState> (ProducerBatch.java:64).
· Open → FAILED, completeExceptionally (e.g. delivery.timeout expiry)
· Open → ABORTED, abort(reason)
· FAILED → FAILED, ignored (idempotent re-fail)
· FAILED → SUCCEEDED, logged: late broker success after local expiry
· ABORTED → SUCCEEDED, logged: late broker success after abort
compareAndSet on finalState wins; done() tolerates the double-completion races (FAILED→FAILED is ignored, FAILED/ABORTED→SUCCEEDED is logged) but throws on any transition out of SUCCEEDED, which is terminal (ProducerBatch.java:262).The double-completion tolerance matters: an in-flight batch may expire on delivery.timeout.ms (set FAILED) yet still have been written by the broker (a later SUCCEEDED attempt). The CAS-based done() ignores FAILED→FAILED and ABORTED→FAILED, logs the rare FAILED/ABORTED→SUCCEEDED, and throws only on SUCCEEDED→anything (ProducerBatch.java:280). completeFutureAndFireCallbacks sets the future, then invokes every thunk's callback in append order, swallowing exceptions thrown by user callbacks so one bad callback cannot poison the rest (ProducerBatch.java:297).
Callbacks for records sent to the same partition fire in send order. This holds because records keep their append order within a batch (thunks are a List), and batches for a partition are completed in deque order; reenqueued retries are reinserted in sequence order (RecordAccumulator.java:557).
The Sender thread: drain, encode, transmit, complete
The I/O thread runs Sender.run, which loops runOnce() until running is cleared, then drains residual data and closes the client (Sender.java:240). runOnce first services the TransactionManager (coordinator discovery, InitProducerId, AddPartitions, commit/abort, idempotent epoch bumps), then calls sendProducerData followed by client.poll (Sender.java:310).
sendProducerData
One iteration (Sender.java:380):
accumulator.ready(metadataSnapshot, now)→ the set of ready leader nodes, the next-check delay, and topics with unknown leaders (which trigger a metadata refresh).- For each ready node, gate on
client.ready(node, now); nodes whose connection isn't established are dropped this round, and node latency stats are nudged. accumulator.drain(metadataSnapshot, readyNodes, maxRequestSize, now)→Map<nodeId, List<ProducerBatch>>.addToInflightBatchesrecords them for delivery-timeout tracking; ifguaranteeMessageOrder(i.e.max.in.flight==1), every drained partition is muted.- Expire batches that blew
delivery.timeout.ms(both queued and in-flight) viafailExpiredBatches. sendProduceRequestsencodes and sends oneProduceRequestper node.
drainBatchesForOneNode
For a node, the accumulator walks that node's partitions starting at a per-node round-robin drainIndex (so no single topic-partition starves the others), and for each it pulls the head batch while (RecordAccumulator.java:860): the partition isn't muted; the batch isn't in retry backoff; adding it won't exceed max.request.size (unless the batch is the first and alone); and idempotence ordering permits it (shouldStopDrainBatchesForPartition). When idempotence is on and the drained batch has no sequence yet, it is stamped here: batch.setProducerState(producerIdAndEpoch, sequenceNumber(tp), isTransactional), the per-partition sequence is advanced by recordCount, and the batch is registered as in-flight (RecordAccumulator.java:911). The batch is then close()d (header written, compression finalized) outside the lock because closing is expensive.
Building and sending the ProduceRequest
sendProduceRequest assembles a ProduceRequestData with the configured acks and request.timeout.ms, grouping batches by topic (using topic id where available; modern ProduceRequest v13 drops topic names in favour of ids) and setting each batch's MemoryRecords as the partition payload (Sender.java:896). For transactional producers it stamps the transactionalId and selects the request version. Each batch is marked setInflight(true). The client request is created with expectResponse = acks != 0, for acks=0 there is no response and batches complete as soon as the bytes are written (Sender.java:945, Sender.java:653).
Handling the response
handleProduceResponse demultiplexes per-partition results, resolving the TopicPartition by topic id (v13) or name (older) and dispatching each to completeBatch (Sender.java:580). Disconnects map to NETWORK_EXCEPTION, timeouts to REQUEST_TIMED_OUT, version mismatches to UNSUPPORTED_VERSION. The response may carry updated leader id/epoch which is fed back into the metadata cache for NOT_LEADER_OR_FOLLOWER / FENCED_LEADER_EPOCH errors (Sender.java:638).
completeBatch: the success / split / retry / fail decision
metadata.requestUpdate() when the error is an InvalidMetadataException; every path ends by un-muting the partition when guaranteeMessageOrder is set (Sender.java:671).canRetry is the retriability gate (Sender.java:876): the batch must not have reached delivery.timeout.ms, must have attempts() < retries, must not be already done, and the error must be a RetriableException, or, when idempotence is on, must satisfy TransactionManager.canRetry, which additionally retries certain UNKNOWN_PRODUCER_ID and OUT_OF_ORDER_SEQUENCE_NUMBER cases and may request an epoch bump (TransactionManager.java:1043).
Ordering & idempotence
Idempotence is on by default (enable.idempotence=true, ProducerConfig.java:541). When enabled, the TransactionManager is instantiated even with no transactional.id ("an idempotent producer"; KafkaProducer.java:612). On first send the producer fetches a producer id and epoch via InitProducerId (TransactionManager.java:691), then stamps each batch with (producerId, epoch, baseSequence) at drain time. The broker retains the last five batches per producer-partition and rejects duplicates (a retried batch with an already-committed sequence) and gaps (OUT_OF_ORDER_SEQUENCE_NUMBER), giving exactly-once delivery within a producer session.
With idempotence, the broker enforces strictly increasing per-partition sequences, so even with up to five concurrent in-flight requests it rejects anything out of order; the client reorders and resends from its queue. Therefore ordering holds for any max.in.flight ∈ [1,5]. ProducerConfig hard-caps idempotent producers at 5 (MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE); a larger value is a ConfigException (ProducerConfig.java:639). Without idempotence, only max.in.flight=1 guarantees order (enforced by partition muting); larger values can reorder on retry.
Reorder-on-retry handling. Once a batch is retried, the accumulator must keep the partition's queue in sequence order. reenqueue adds the batch to the front via insertInSequenceOrder, which linearly scans the in-flight batches and inserts so base sequences stay ascending (RecordAccumulator.java:557). To avoid writing a new-epoch sequence-0 batch ahead of older batches, shouldStopDrainBatchesForPartition stops draining new (sequence-less) batches while the partition has in-flight batches with a stale producer id/epoch, has unresolved sequences, or has an earlier in-flight batch being retried, effectively collapsing to one in-flight request for that partition during recovery (RecordAccumulator.java:823).
Epoch bumps. On a fatal sequence gap an idempotent (non-transactional) producer bumps its epoch locally and restarts sequences at 0 for the affected partition. An OutOfOrderSequenceException from the broker calls requestIdempotentEpochBumpForPartition; on the next runOnce, bumpIdempotentEpochAndResetIdIfNeeded increments the epoch (or, at Short.MAX_VALUE, resets to a brand-new producer id) and rewrites in-flight sequences from the beginning (TransactionManager.java:827, TransactionManager.java:673). An expired in-flight batch also marks the partition's sequence unresolved so no new batch is drained until the gap resolves (Sender.java:373).
acks, timeouts & retries
- acks=0
- Fire-and-forget. No response is requested; the batch completes when written to the socket and
RecordMetadata.offset()is-1. Incompatible with idempotence. - acks=1
- Leader-only durability. The leader acks after writing to its log, before followers replicate. Incompatible with idempotence.
- acks=all (-1)
- The leader acks only after the in-sync replica set has the record (subject to
min.insync.replicas). The only value compatible with idempotence; it is the default (ProducerConfig.java:403). See Replication, ISR & High Watermark.
Three timeouts compose (KafkaProducer.java:589): request.timeout.ms bounds one network round trip; retries bounds the number of attempts; and delivery.timeout.ms is the hard wall-clock deadline from when the record enters the accumulator until success or failure is reported, spanning lingering, backoff, and all retries. The constructor enforces delivery.timeout.ms ≥ linger.ms + request.timeout.ms, raising ConfigException if the user set an inconsistent value or silently bumping the default otherwise. Because retries defaults to Integer.MAX_VALUE, in practice delivery.timeout.ms (default 120 s) is what actually terminates retrying, the docs and code both steer users to tune it instead of retries (ProducerConfig.java:402).
Error classification
| Class | Examples | Producer action |
|---|---|---|
| Retriable (transient) | NETWORK_EXCEPTION, REQUEST_TIMED_OUT, NOT_LEADER_OR_FOLLOWER, NOT_ENOUGH_REPLICAS | Reenqueue with exponential backoff until retries or delivery.timeout.ms exhausted (Sender.java:692). |
| Too large | MESSAGE_TOO_LARGE on a multi-record batch | Split into ≤ batch.size batches and resend; does not consume a retry attempt (Sender.java:676, ProducerBatch.java:326). |
| Idempotence-recoverable | OUT_OF_ORDER_SEQUENCE_NUMBER, UNKNOWN_PRODUCER_ID | Retry, possibly bumping epoch / resetting sequences (TransactionManager.java:1043). |
| Duplicate | DUPLICATE_SEQUENCE_NUMBER | Treated as success but with unknown offset/timestamp (Sender.java:700). |
| Fatal | TOPIC_AUTHORIZATION_FAILED, CLUSTER_AUTHORIZATION_FAILED, RecordTooLargeException (single record), UnsupportedVersionException | Fail the batch's futures/callbacks; for txn producers transition to an abortable/fatal error (Sender.java:770). |
Compression
Compression is configured once via compression.type (default none; gzip/snappy/lz4/zstd supported, with optional per-codec level) and applied at the batch level by the MemoryRecordsBuilder as records are appended (KafkaProducer.java:562, RecordAccumulator.java:409). The producer maintains a per-(topic, codec) CompressionRatioEstimator so it can size buffers realistically; on close() each batch updates the estimate (ProducerBatch.java:518). Because compression operates on whole batches, larger batches (higher linger.ms/batch.size) compress better. When a compressed batch nonetheless exceeds max.request.size on the broker, the split path resets the estimate conservatively to avoid repeated splits (RecordAccumulator.java:516).
flush() and close()
flush() makes every buffered record immediately sendable and blocks until all in-flight requests complete. It increments flushesInProgress (which makes ready() return everything as sendable), wakes the Sender, then awaits each incomplete batch's ProduceRequestResult, using awaitAllDependents() so split batches are also awaited (KafkaProducer.java:1366, RecordAccumulator.java:1106). Calling flush() from inside a callback throws, because the callback runs on the I/O thread and would deadlock waiting on itself (KafkaProducer.java:1368).
close(timeout) closes the accumulator first (so no new appends are accepted), clears running, and joins the I/O thread, which drains residual batches and completes in-flight requests up to the timeout; if the deadline passes it forceClose()s, failing all incomplete batches via abortIncompleteBatches (KafkaProducer.java:1546, Sender.java:295). Called from a callback, close() degrades to a non-blocking close(0) to avoid a self-join (KafkaProducer.java:1554).
Concurrency & threading model
| State | Guarded by | Accessed by |
|---|---|---|
Per-partition Deque<ProducerBatch> | the deque object's monitor (synchronized(dq)) | user threads (append) and Sender (ready/drain) |
topicInfoMap, TopicInfo.batches | CopyOnWriteMap (lock-free reads) | both |
| Sticky partition | AtomicReference + deque lock for the peek/validate/update protocol | both |
| BufferPool free list / waiters | one ReentrantLock + per-waiter Condition | user threads (allocate) and Sender (deallocate) |
ProducerBatch.finalState, attempts | AtomicReference / AtomicInteger | Sender; user threads read via futures |
muted, nodesDrainIndex, inFlightBatches | unsynchronized, Sender-thread-only | Sender only (RecordAccumulator.java:90, Sender.java:126) |
| Transaction/idempotence state | synchronized methods on TransactionManager | Sender and (for the public txn API) user threads |
appendsInProgress, flushesInProgress | AtomicInteger | both |
The design keeps user threads off the network entirely and confines all socket I/O, retry bookkeeping, and the in-flight pipeline to one Sender thread, so the only cross-thread contention is the short critical section around each partition's deque and the BufferPool lock. The hot ready()/partitionReady() loop is deliberately minimal under the deque lock to avoid contending with appends at large partition counts (a documented hot path; RecordAccumulator.java:697).
The NetworkClient pipeline
The Sender sends through a NetworkClient configured with max.in.flight.requests.per.connection. canSendRequest returns true only when the connection is ready, the channel is writable, and InFlightRequests.canSendMore(node) holds; the latter is true when the node has no outstanding requests, or the oldest send has completed and the queue size is below the max (NetworkClient.java:544, InFlightRequests.java:96). This is the mechanism that bounds pipelining per broker. Throttling from quotas is observed as a throttleDelayMs on the connection, which the Sender folds into its poll timeout. The wire encoding of ProduceRequest/ProduceResponse is covered in Wire Protocol & RPC Framework.
Failure modes, edge cases & recovery
- Metadata never arrives.
send()blocks up tomax.block.msthen throwsTimeoutException; if a partial wait already elapsed, the BufferPool gets the remainder (KafkaProducer.java:1146). - Buffer exhausted.
allocateblocks up to the remainingmax.block.ms, then throwsBufferExhaustedException(a subclass ofTimeoutException), incrementingbuffer-exhausted-records(BufferPool.java:159). - Delivery timeout. A batch that exceeds
delivery.timeout.msin queue or in flight is failed withTimeoutExceptionregardless of remaining retries (Sender.java:362). With idempotence this triggers a sequence-unresolved marker and eventual epoch bump. - Leader change.
NOT_LEADER_OR_FOLLOWER/FENCED_LEADER_EPOCHare retriable; the response's new leader info updates metadata, and the batch tracks the new leader epoch so backoff is skipped when retrying to a fresh leader (ProducerBatch.java:116, RecordAccumulator.java:804). - Forced close.
abortIncompleteBatchesloops untilappendsInProgress == 0so no batch from a racing appender is missed, then aborts all incomplete batches (RecordAccumulator.java:1135). - Batch still in network use (KAFKA-19012). A batch sent to the
NetworkClientmay have its pooled buffer in use; deallocation is deferred (maybeRemoveAndDeallocateBatchLater) until the request truly completes, to prevent reusing a buffer that the selector is still reading (Sender.java:854, RecordAccumulator.java:1168). - Authorization mid-transaction.
TransactionalIdAuthorizationException/ClusterAuthorizationExceptionfail pending requests and transition the manager toUNINITIALIZEDso the user need not reconstruct the producer (Sender.java:351).
Invariants & guarantees
A user thread never performs network I/O. send() only serializes, partitions, and appends; the Sender thread alone talks to the cluster. The two bounded waits a user thread can experience are metadata refresh and buffer allocation, both capped by max.block.ms.
With idempotence on and acks=all, retries never produce duplicates and never reorder records within a partition, for any max.in.flight ∈ [1,5], within a single producer session. Across sessions, only a transactional.id provides recovery (see Transactions & EOS).
Every accepted record's future and callback fires exactly once, on success with a valid offset, or on failure with an exception, bounded by delivery.timeout.ms.
Configuration reference
| Key | Default | Effect |
|---|---|---|
acks | all | Durability level; all required for idempotence (ProducerConfig.java:403). |
enable.idempotence | true | Dedup + ordered retries via producer id/epoch/sequence (ProducerConfig.java:541). |
batch.size | 16384 | Target batch size and BufferPool poolable size; max(1, …) applied (ProducerConfig.java:413, KafkaProducer.java:458). |
linger.ms | 5 | Max time to wait filling a batch before it is ready (ProducerConfig.java:418). |
buffer.memory | 33554432 (32 MiB) | Total accumulator memory; over-budget sends block (ProducerConfig.java:401). |
max.block.ms | 60000 | Combined cap on metadata wait + buffer allocation in send() (ProducerConfig.java:449). |
max.in.flight.requests.per.connection | 5 | Pipeline depth per broker; capped at 5 with idempotence (ProducerConfig.java:487). |
retries | 2147483647 | Max attempts; prefer delivery.timeout.ms (ProducerConfig.java:402). |
delivery.timeout.ms | 120000 | Hard deadline for success/failure; must be ≥ linger.ms + request.timeout.ms (ProducerConfig.java:419). |
request.timeout.ms | 30000 | Per-round-trip timeout (ProducerConfig.java:455). |
retry.backoff.ms | 100 (common default) | Base of exponential reenqueue backoff (ProducerConfig.java:432). |
compression.type | none | Batch-level codec: none/gzip/snappy/lz4/zstd (ProducerConfig.java:409). |
max.request.size | 1048576 (1 MiB) | Rejects over-sized records; bounds drained request size (ProducerConfig.java:424). |
partitioner.class | null | Custom partitioner; null ⇒ built-in (ProducerConfig.java:517). |
partitioner.adaptive.partitioning.enable | true | Weight keyless partition choice by broker load (ProducerConfig.java:414). |
partitioner.ignore.keys | false | If true, never use the key for partitioning (ProducerConfig.java:416). |
partitioner.availability.timeout.ms | 0 (off) | Exclude partitions whose leader hasn't drained for this long (ProducerConfig.java:415). |
transactional.id | null | Enables transactions and cross-session recovery; forces idempotence on (ProducerConfig.java:551). |
transaction.timeout.ms | 60000 | Broker-side max open-transaction time (ProducerConfig.java:546). |
Interactions with other subsystems
- Record Format & Batches, the v2 batch the
MemoryRecordsBuilderemits, including producer id/epoch/baseSequence header fields. - Wire Protocol,
ProduceRequest/ProduceResponseschemas and versioning (topic ids in v13). - Replication, ISR & HWM, what
acks=allandmin.insync.replicasactually require on the broker. - Transactions & EOS, the
TransactionManagerstate machine,AddPartitions, and the consume-transform-produce pattern viasendOffsetsToTransaction. - Metadata Propagation, how the producer learns partition leaders and reacts to leader changes.
- Quotas & Throttling, produce throttling surfaced as connection poll delays.
- Network Layer and Request Processing, the broker side that the Sender talks to.
Design rationale & evolution
Idempotence and acks=all became the defaults in 3.0 under KIP-679 to give the strongest delivery guarantee out of the box. ProducerConfig.postProcessAndValidateIdempotenceConfigs silently disables idempotence if the user explicitly set a weaker acks or retries=0 (to preserve old behaviour), but throws if the user explicitly asked for idempotence with conflicting settings (ProducerConfig.java:610). The original idempotent/transactional producer landed under KIP-98.
The partitioner evolved from round-robin → sticky (KIP-480, better batching) → adaptive uniform sticky (KIP-794, load- and latency-aware). The built-in partitioner is intentionally not a Partitioner implementation, it lives inside RecordAccumulator so it can observe per-partition queue depth, which an external Partitioner cannot see (BuiltInPartitioner.java:34).
Gotchas & operational notes
User callbacks run on the single Sender I/O thread. A slow or blocking callback stalls delivery for every partition. Offload heavy work to your own executor (KafkaProducer.java:1075).
batch.size=0 does not disable batching; the constructor coerces it to 1, so you still get a 1-byte-target batch per record rather than zero batching (KafkaProducer.java:456).
The adaptive partitioner needs a batch scheduled for every partition before it engages; until then it falls back to uniform selection, and it also bails to uniform when all queue depths are equal (RecordAccumulator.java:661, BuiltInPartitioner.java:322).
Application-level resends defeat idempotence, the broker can only deduplicate the producer's own retries, not a fresh record with a new sequence. If a send() ultimately fails even with infinite retries (e.g. a delivery-timeout expiry), the safe recovery is to close the producer and inspect the last produced offset rather than blindly resend (KafkaProducer.java:177).
A send() that fails synchronously with an ApiException still fires the user callback and interceptor onSendError, and returns a pre-completed FutureFailure rather than throwing, only non-API exceptions (e.g. SerializationException, InterruptException) propagate to the caller (KafkaProducer.java:1200).