II · 02 · Limits & Boundaries: Hard, Soft & Emergent
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Operational guidance grounded in source code and cited benchmarks.
Every Kafka cluster runs inside a fence of limits, and the single most common production mistake is misjudging which kind of fence you are leaning on. A HARD limit is a compiled-in constant, a number baked into a .java file that no config can move (the 2 GiB segment ceiling, the 2000-partition request cap, the 10,000-record controller batch). A SOFT limit is a config with a default, movable, but movable with consequences (max.message.bytes, queued.max.requests, max.connections). An EMERGENT limit is a consequence of resources and architecture, no single constant defines it; it falls out of mmap counts, file descriptors, RAM, replication bandwidth, and recovery time (the per-broker partition ceiling, per-topic throughput). This chapter is the definitive limits catalogue: each boundary named, classified, sourced to file:line, and, the cardinal rule, explained by the mechanism that produces it. We answer the two questions operators ask most: what is the maximum number of partitions per topic and per cluster? and is ~1 GiB/s per topic a hard limit? Both answers are "it depends, and here is exactly on what."
The three kinds of limit
Before the catalogue, internalize the taxonomy, it changes how you respond to a wall. If you hit a HARD limit you must redesign (more partitions, more topics, more brokers); tuning will never help. If you hit a SOFT limit you change a config and accept the tradeoff. If you hit an EMERGENT limit you add resources or change topology, and the "limit" moves with them.
| Kind | Defined by | How you move it | Canonical example |
|---|---|---|---|
| HARD | A compiled constant / type width in source | You cannot. Only a code change in a future release moves it. Redesign around it. | 2 GiB segment ceiling; MAX_REQUEST_PARTITION_SIZE_LIMIT=2000; MAX_RECORDS_PER_BATCH=10000 |
| SOFT | A ConfigDef default | Set the key (static, dynamic, or per-topic). Accept the documented tradeoff. | max.message.bytes; queued.max.requests=500; max.connections |
| EMERGENT | Resource × architecture interaction | Add hardware, raise OS limits, change topology. The bound shifts. | partitions/broker (mmap, FDs, RAM, recovery); per-topic throughput (NIC, disk, RF) |
When you hit a wall, first classify it. "Cannot create topic, excessively large number of partitions per request" is a HARD batch limit (MAX_PARTITIONS_PER_BATCH): split the request, do not file a ticket asking to raise it. "Producer rejected, message too large" is a SOFT limit: decide whether to raise max.message.bytes (and its three downstream configs) or shrink the message. "p99 produce latency climbing as partition count grows" is EMERGENT: you are spending replication/fetcher budget, not hitting a constant.
Partitions, part 1, per topic
The blunt answer to "what is the maximum number of partitions per topic?": there is no per-topic HARD ceiling on the resulting partition count. A topic is just a name with N partition replicas distributed across brokers; the partition id is an int32, so the absolute type ceiling is ~2.1 billion partitions per topic, a number you will never reach. What you will hit first are two HARD batch limits on the controller operation that creates or grows the topic, and then the EMERGENT per-broker and per-cluster ceilings below.
The controller batch limits (HARD)
Topic creation and partition growth are controller operations: the active controller must materialize one metadata record per partition (a PartitionRecord) and append them to the __cluster_metadata log (Part I 11 · KRaft Controller). Two compiled constants bound a single request:
| Limit | Value | Source | Class | What it caps |
|---|---|---|---|---|
MAX_PARTITIONS_PER_BATCH | 10,000 | metadata/.../controller/ReplicationControlManager.java:152 | HARD | Total partitions across all topics in one CreateTopics request. |
MAX_RECORDS_PER_USER_OP | 10,000 | metadata/.../controller/QuorumController.java:187 (= DEFAULT_MAX_RECORDS_PER_BATCH, :180) | HARD | Metadata records any single user-initiated controller op may generate. |
The check is explicit: validateTotalNumberOfPartitions() sums the requested partitions and throws PolicyViolationException("Excessively large number of partitions per request.") if the total exceeds MAX_PARTITIONS_PER_BATCH (ReplicationControlManager.java:1234–1237). The code comment states the why plainly: "Exceeding this number of topics per batch has led to out-of-memory exceptions. We use this validation to fail earlier to avoid allocating the memory" (:1214–1215). It is a memory-safety guard on the controller, validated before any allocation.
The controller builds result records in a BoundedList sized to MAX_RECORDS_PER_USER_OP (ReplicationControlManager.java:634, 988, 1566, 1734). A BoundedList rejects the overflowing element rather than growing unboundedly, so a runaway request fails fast instead of OOM-killing the active controller and triggering a failover. The limit is per-request, not per-cluster: to create 50,000 partitions you need ⌈50,000 partitions ÷ 10,000 partitions/request⌉ = ≥5 CreateTopics calls. Most CLIs and the AdminClient do not auto-split, so a single --partitions 20000 call is rejected; this is a tooling reality, not a Kafka maximum.
The deeper trap: you can only ever increase partitions
There is no API to reduce a topic's partition count, and this is the single most under-appreciated constraint in Kafka. Worse, increasing partitions on a keyed topic silently breaks per-key ordering: the default partitioner maps a key via hash(key) % numPartitions, so growing N remaps existing keys to different partitions (e.g. 7654321 % 4 = 1 but 7654321 % 6 = 3), Confluent docs; Arpit Bhayani (empirical). Co-partitioned Kafka Streams state stores do not follow the keys either. Treat partition count as near-immutable for any keyed topic; the accepted migration is to create a new topic at the target count and replay. See II · 03 · Partitioning for the migration runbook.
«excessively large»
to __cluster_metadata
Partitions, part 2, per broker (EMERGENT)
The per-broker partition ceiling is the most important EMERGENT limit in the system, because it gates blast radius, recovery time, and how many brokers you must buy. No constant defines it. It is the minimum of four independent resource bounds, whichever you hit first wins.
Bound 1, vm.max_map_count and memory-mapped index files
Every open log segment carries memory-mapped index files. The offset index and time index are mapped via MappedByteBuffer (storage/.../log/AbstractIndex.java:72), that is the whole point of the design: the OS pages index lookups directly from the buffer cache without a syscall (AbstractIndex.java:50–58). Linux caps the number of mmap regions per process at vm.max_map_count; a partition's active segment maps its offset and time indexes, so the planning multiplier is ~2 mmap regions per partition. That yields the canonical ceiling, derived from these two inputs:
- Assumption,
vm.max_map_count= 65,530 regions/process - kind: OS default (Linux kernel
vm.max_map_countsysctl). The maximum number of memory-map regions one process may hold. This is the value Kafka inherits unless you raise it. - Assumption, ~2 mmap regions per partition
- kind: derived from Kafka source (planning figure). Each open segment has an
OffsetIndexand aTimeIndex, each anAbstractIndexwith its ownMappedByteBuffer(AbstractIndex.java:72); the active segment that must be mapped contributes ≈2. A transaction index or retained older segments push it higher, so 2 is a floor per partition, not a guarantee.
- Derivation, default mmap ceiling
- partitions/broker ≈
vm.max_map_count÷ (mmap regions per partition) = 65,530 regions ÷ 2 regions/partition ≈ 32,765 partitions/broker beforemmapallocation fails (theregionsunit cancels, leaving partitions), Instaclustr (empirical); ops reference §2. - Raised ceiling
- Set
vm.max_map_count = 262144(or higher) to lift this bound above what RAM/FDs allow first, at 2 regions/partition that is 262,144 ÷ 2 ≈ 131,072 partitions/broker of mmap headroom. Heuristic: keep it comfortably above (open segments per broker × mmaps per segment), ops reference §3 (empirical).
Because the multiplier tracks open segments (≈2 per active partition) rather than partitions directly, the ~32,765 figure is the headroom for the active-segment maps; a broker whose partitions each retain many segments hits the ceiling at a lower partition count.
A segment is the unit of mapping, and each segment has its own OffsetIndex and TimeIndex, each an AbstractIndex with its own MappedByteBuffer. So the multiplier tracks open segments, not partitions directly. A partition with many retained segments (large topic, short segment.bytes) consumes more than 2, but for the active segment that must be writable and mapped, ~2 per partition is the planning number. This is why vm.max_map_count is the first OS knob to raise on a high-partition broker; the failure mode is an mmap error during segment roll, not a graceful rejection.
Bound 2, file descriptors (open files)
Every segment is several files on disk: .log, .index, .timeindex (and possibly .txnindex), each an open file descriptor, plus one FD per client/replication socket. A broker with thousands of partitions and multiple segments each easily runs tens of thousands of open files.
- Formula
- open files ≈ (#partitions) × (partition_size ÷
segment.bytes) × (files per segment) + connections, Jun Rao / Confluent (empirical). Each segment is ~3 files on disk (.log,.index,.timeindex;.txnindexadds a 4th), so segments-per-partition = retained bytes ÷ segment size, and each segment costs ~3 FDs.
Worked example (illustrative, substitute your measured values):
- Assumption, partitions/broker = 2,000
- kind: workload-dependent. A mid-sized broker; chosen so the result is concrete.
- Assumption, retained data per partition = 20 GiB
- kind: workload-dependent (retention × throughput). What a partition holds on disk after retention settles.
- Assumption,
segment.bytes= 1 GiB - kind: cited Kafka default (
log.segment.bytes,LogConfig.java:131). - Assumption, files per segment = 3
- kind: derived from on-disk layout (
.log+.index+.timeindex). - Assumption, open client/replication sockets = 5,000
- kind: workload-dependent. One FD per connection; illustrative.
- Derivation
- segments/partition = 20 GiB ÷ 1 GiB/segment = 20 segments/partition. FDs from segment files = 2,000 partitions × 20 segments/partition × 3 files/segment = 120,000 files. Plus 5,000 socket FDs ⇒ ≈ 125,000 open files, already above a 100,000 ulimit, which is why the recommendation below is a floor, not a ceiling.
- Recommendation
- Set the broker process
nofileulimit to ≥ 100,000 (raise further when the derivation above exceeds it). Production clusters routinely run "more than 30 thousand open file handles per broker", Jun Rao (empirical).
Bound 3, memory (heap and page cache)
Per-partition memory is small but not free: each partition carries leader/follower state, fetch-session bookkeeping, and producer-state metadata, and on the client side the producer must buffer per partition. The architecture deliberately keeps the JVM heap small (~6 GB) and hands the rest of RAM to the OS page cache, which is the real read/write accelerator via zero-copy sendfile (Part I 09 · Fetch Path). Two empirical planning rules:
- The producer client should allocate "at least a few tens of KB per partition being produced" against
buffer.memory(default 32 MiB,ProducerConfig.java:401), Jun Rao (empirical). - A large heap to chase performance is an anti-pattern, it starves the page cache and lengthens GC pauses; a 32 GB G1GC heap can incur 100–200 ms pauses that drop replicas from ISR, Conduktor (empirical).
Bound 4, recovery / replay time (the one that actually bites)
The bound operators feel in an incident is not steady-state memory, it is how long a broker takes to come back. On unclean restart the broker must replay and re-index the tail of every partition's log; on a leadership move the controller must elect a new leader per affected partition. Empirically:
- Leader election costs ~5 ms per partition (assumption, kind: empirical planning figure, Jun Rao 2015, ZK-era hardware; teach the mechanism, do not predict 2026 latency). Derivation: a broker holding 1,000 leaderships ⇒ recovery time ≈ 1,000 leaders × 5 ms/leader = 5,000 ms ≈ 5 s of unavailability for those partitions on unclean loss (the
leaderunit cancels, leaving ms). The conclusion, recovery time grows linearly with leaders/broker, is why blast radius, not the raw mmap ceiling, caps the practical partition count. - Set
num.recovery.threads.per.data.dirabove its default of 2 (ServerLogConfigs.java:147) to parallelize startup log recovery across cores. - Tiered storage (KIP-405) cuts recovery dramatically, a rejoining broker need not re-replicate cold data, Uber (empirical). See Part I 05 · Tiered Storage.
vm.max_map_countnofilesegment.bytes) × ~3 files/segment + 1 per socket. Set ≥ 100,000.Despite a ~32,765 mmap ceiling and lab demonstrations of ~600,000 partitions on one KRaft broker (Kafka 3.2.1, 64 GB EC2, RF=1, Instaclustr, empirical), the practical per-broker count is set by recovery time and blast radius, not by the raw ceiling. ZooKeeper-era guidance of ~4,000 partitions/broker is obsolete under KRaft but still a sane conservative baseline; large fleets cap clusters at ~200 brokers to bound blast radius (Pinterest, Netflix, empirical). "KRaft means partitions are free" is wrong: every partition still costs an mmap, FDs, fetcher overhead, and failover work.
Partitions, part 3, per cluster (EMERGENT, transformed by KRaft)
Here is the headline answer to "what is the maximum number of partitions per cluster?": there is no compiled per-cluster limit. The ceiling is EMERGENT and was structurally rewritten by the removal of ZooKeeper (KIP-500; ZK removed in 4.0). The number you may have memorized, ~200,000/cluster, is a ZooKeeper-era figure and no longer the binding constraint.
| Era | Per-cluster ceiling | Binding mechanism | Source |
|---|---|---|---|
| ZooKeeper, pre-1.1.0 | far lower; 50K partitions = 6.5 min controlled shutdown | O(partitions) serial controller writes to ZK | Confluent 200K post (empirical) |
| ZooKeeper, 1.1.0+ (KIP-227) | ~200,000/cluster, ~4,000/broker | Batched/async controller writes dropped 50K-partition shutdown to 3 s; 100K-partition metadata reload 28 s → 14 s | Confluent 200K post (empirical) |
| KRaft (4.x) | "a million or more"; lab-verified 2,000,000 | In-memory, log-replicated metadata quorum; hot-standby controllers ⇒ near-instant failover, no reload phase | KIP-500; Confluent KRaft lab (empirical) |
Under ZooKeeper the cluster partition count was bounded by controller hard-failover recovery: the new controller had to reload all partition metadata from ZK, an O(partitions) operation that could run for minutes at scale, the cluster had no leader-assignment authority during that window. KRaft replaces ZK with a Raft-replicated metadata log; the controller's state is already in memory, kept current by standby controllers, so failover does not require a reload (Part I 10 · KRaft Consensus, 12 · Metadata Propagation). The structural O(partitions) reload term that capped ZK clusters is simply gone. KIP-500
Raising partition count to chase throughput is a documented anti-pattern. In one test (3× r6g.large, Kafka 3.1.1, acks=all, 8-byte messages) peak producer rate hit ~2,000,000 msg/s at 100 partitions and latency rose sharply past ~1,000 partitions, KRaft and ZooKeeper throughput were identical in this test, Instaclustr (empirical). Scalability (how many partitions you can host) is not performance (throughput at a given count). Size partitions from max(t/p, t/c) (II · 03), not from "more is faster."
Message and batch size limits
The maximum record-batch size is a SOFT limit set by config, but it is governed by multiple interlocking configs across the producer, broker, replica fetcher, and consumer. Setting one without the others is the classic "I raised max.message.bytes and replication/consumption broke" footgun.
| Config | Default | Source | Plane | Role |
|---|---|---|---|---|
max.message.bytes (topic) | 1 MiB + overhead (1048576 + LOG_OVERHEAD) | ServerLogConfigs.java:177; wired LogConfig.java:214 | broker/topic | Largest record batch (after compression) the broker will accept & store. |
message.max.bytes (broker) | same default (MAX_MESSAGE_BYTES_DEFAULT) | ServerConfigs.java:130 | broker | Cluster-wide default for the above; topic config overrides it. |
max.request.size (producer) | 1,048,576 (1 MiB) | ProducerConfig.java:424–429 | producer | Caps one produce request; "effectively a cap on the maximum uncompressed record batch size." |
replica.fetch.max.bytes | 1,048,576 (1 MiB)/partition | ReplicationConfigs.java:68 | broker (follower) | Bytes a follower attempts to fetch per partition (not an absolute max, see below). |
replica.fetch.response.max.bytes | 10 MiB | ReplicationConfigs.java:88 | broker (follower) | Cap on the whole follower fetch response (not absolute). |
fetch.max.bytes (broker default) | 55 MiB (55 * 1024 * 1024) | ServerConfigs.java:106 | broker | Max bytes returned for a consumer fetch request; must be ≥ 1024. |
fetch.max.bytes (consumer) | 52,428,800 (50 MiB) | ConsumerConfig.java:200 | consumer | Max bytes the consumer requests across all partitions. |
max.partition.fetch.bytes (consumer) | 1,048,576 (1 MiB) | ConsumerConfig.java:225 | consumer | Max bytes per partition the consumer requests; should be ≥ topic max.message.bytes. |
Does a too-large message wedge replication?, the modern answer
Historically, if a topic's max.message.bytes exceeded replica.fetch.max.bytes, a follower could be unable to fetch the oversized batch and replication for that partition would stall, a genuine wedge. Modern Kafka closed this hole in the protocol: both replica-fetch limits are explicitly not absolute maximums. The source documents the override: "if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that progress can be made" (ReplicationConfigs.java:69–72 for replica.fetch.max.bytes; :89–92 for the response cap). The broker always returns the oversized first batch so the follower can advance.
The replication wedge is gone, but a related trap remains. The broker's accepted batch size is governed by message.max.bytes / max.message.bytes, the replica-fetch limits explicitly defer to it (ReplicationConfigs.java:71–72, 91–92). If you raise a topic's max.message.bytes above the producer's max.request.size (1 MiB) or the consumer's max.partition.fetch.bytes (1 MiB), the producer cannot send the large batch and the consumer may stall fetching it, even though brokers store and replicate it fine. Raise all the relevant configs together, or none. The mechanical reason: each plane validates the batch against its own config independently; there is no automatic propagation between topic, producer, and consumer.
Beyond config coordination, large messages tax the page cache (fewer messages fit hot), inflate replication and cross-AZ bytes, and increase per-request latency. For payloads beyond a few MiB, prefer claim-check (store the blob in object storage, send a reference). The 1 MiB defaults are deliberate, not timid.
The request plane limits
Several SOFT limits and one HARD-feeling cap bound how much in-flight work a broker accepts. These are the knobs behind "broker is dropping connections" and "fetch sessions evicted."
| Limit | Default | Source | Class | Mechanism & tuning |
|---|---|---|---|---|
queued.max.requests | 500 | SocketServerConfigs.java:144 | SOFT | Depth of the data-plane request queue between network and I/O threads. When full, network threads stop reading, backpressure to clients. Raise for bursty load if I/O threads keep up; lower to bound memory. |
queued.max.request.bytes | -1 (disabled) | SocketServerConfigs.java:148 | SOFT | Byte-based variant of the above; -1 = no byte cap. Use to bound queue memory when request sizes vary widely. |
max.incremental.fetch.session.cache.slots | 1000 | ServerConfigs.java:102 | SOFT | Total incremental-fetch sessions cached, sharded across 8 shards (round-robin), LRU eviction within a shard. Cap on concurrent incremental-fetch clients before sessions are evicted (falling back to full fetches). Raise on brokers with many consumers/followers. |
max.request.partition.size.limit (MAX_REQUEST_PARTITION_SIZE_LIMIT) | 2000 | ServerConfigs.java:110–111 | SOFT (config default; behaves as a per-request cap) | "The maximum number of partitions [that] can be served in one request." Validated atLeast(1) (:155). Protects the broker from pathologically wide requests. |
socket.request.max.bytes | 104,857,600 (100 MiB) | SocketServerConfigs.java:96 | SOFT | Hard cap on a single socket request's size; a larger request is rejected at the wire. The true ceiling on any one request. |
queued.max.requests=500 creates backpressure, not errorsThe data plane is a bounded producer/consumer pipeline: network threads parse requests and enqueue them; a pool of num.io.threads (default 8, ServerConfigs.java:51) dequeues and processes them (Part I 06 · Network & Threading, 07 · Request Processing). When the queue hits 500 the network threads block on the enqueue and stop reading sockets, the kernel's TCP receive buffers fill, and clients experience slowdown rather than rejection. This is intentional, graceful backpressure. If you see RequestQueueTimeMs climbing and RequestHandlerAvgIdlePercent near zero, the I/O threads are the bottleneck, add num.io.threads or brokers, not queue depth (see II · 08 · Metrics & Signals).
Connection caps
Connection limits default to unbounded, they are an opt-in safety mechanism, not a default protection. This surprises operators who assume a broker self-limits.
| Config | Default | Source | Class |
|---|---|---|---|
max.connections | Integer.MAX_VALUE (~2.1 billion) | SocketServerConfigs.java:116 | SOFT (unbounded by default) |
max.connections.per.ip | Integer.MAX_VALUE | SocketServerConfigs.java:111 | SOFT (unbounded by default) |
max.connection.creation.rate | Integer.MAX_VALUE | SocketServerConfigs.java:127 | SOFT (unbounded by default) |
max.connections.per.ip.overrides | "" (none) | SocketServerConfigs.java:106 | SOFT |
connections.max.idle.ms | 600,000 (10 min) | SocketServerConfigs.java:135 | SOFT |
max.connections is applied per-broker and per-listener, "but connections on the inter-broker listener are permitted even if [the] broker-wide limit is reached. The least recently used connection on another listener will be closed in this case" (SocketServerConfigs.java:122–123). The design protects replication and controller traffic from being starved by a client connection storm, a client flood evicts client connections, never inter-broker ones. Set a real max.connections based on broker capacity (memory per connection is non-trivial, TLS channels can cost ~122 KB each, Pinterest, empirical) plus per-listener limits, because the default protects you from nothing.
Storage limits, the 2 GiB segment and the int64 offset
Two storage boundaries shape every capacity calculation: a HARD 2 GiB ceiling on a single segment, and an effectively unbounded offset space.
The 2 GiB segment ceiling (HARD)
A log segment's data file cannot exceed Integer.MAX_VALUE bytes, ~2 GiB (2,147,483,647 bytes). This is not an arbitrary config maximum; it is forced by a 32-bit type used for the byte position within a segment. The mechanism:
- The append path uses an
intfor physical position.LogSegment.append()reads the current size asint physicalPosition = log.sizeInBytes();(storage/.../log/LogSegment.java:256) and increments it per batch (:277). Anintoverflows past ~2 GiB. - The offset index entry is 8 bytes: a 4-byte relative offset + a 4-byte physical position.
OffsetIndex.ENTRY_SIZE = 8(OffsetIndex.java:56); the file format is "a 4 byte 'relative' offset and a 4 byte file location" (OffsetIndex.java:44, stored viaputIntat:207–208). A 4-byte file location addresses at most ~2 GiB. - The relative offset must also fit in an
int. Before each append the segment checkscanConvertToRelativeOffset()→OffsetIndex.canAppendOffset()(LogSegment.java:237–239);toRelative()returns empty ifrelativeOffset > Integer.MAX_VALUE(AbstractIndex.java:556–557), and the append throwsLogSegmentOffsetOverflowException(LogSegment.java:258, 281–283) /IndexOffsetOverflowException(AbstractIndex.java:312–314). The log rolls a new segment rather than overflow.
| Config / constant | Default | Max | Source |
|---|---|---|---|
log.segment.bytes / segment.bytes | 1 GiB (1024*1024*1024) | Integer.MAX_VALUE (~2 GiB), floor 1024*1024 (1 MiB) | default & validator LogConfig.java:131, 160, 198 |
segment.index.bytes / log.index.size.max.bytes | 10 MiB | , | ServerLogConfigs.java:93 |
index.interval.bytes | 4096 (sparse: ~1 entry / 4 KiB) | , | ServerLogConfigs.java:97 |
| OffsetIndex entry size | 8 bytes (4 rel-offset + 4 position) | HARD | OffsetIndex.java:56 |
| TimeIndex entry size | 12 bytes (8 timestamp + 4 rel-offset) | HARD | TimeIndex.java:56 |
Because both the in-memory append position and the on-disk index file-location field are 32-bit, a single segment's data file is hard-bounded at Integer.MAX_VALUE bytes. The default 1 GiB leaves comfortable headroom and keeps each segment's index small enough to mmap cheaply. Setting segment.bytes near the 2 GiB max is legal but raises recovery cost (more to replay per segment) and lowers roll frequency (delaying retention/compaction granularity). See Part I 03 · Storage Log Engine and 04 · Storage Management.
The int64 offset (effectively unbounded)
While the relative offset within a segment is 32-bit, the absolute partition offset is a 64-bit signed integer. The segment's baseOffset is a long (AbstractIndex.java:63), and the index stores offsets relative to it precisely so the per-segment field can stay 4 bytes while the partition's absolute offset space remains effectively infinite. How effectively infinite? The derivation, from two stated inputs:
- Assumption, int64 offset ceiling = 2⁶³ − 1 ≈ 9.2×10¹⁸ records/partition
- kind: HARD type width (signed 64-bit
long,AbstractIndex.java:63). - Assumption, sustained write rate = 1,000,000 records/second
- kind: illustrative, substitute your measured per-partition rate. A deliberately aggressive single-partition rate, far above the ~10 MB/s planning figure, to make the bound a worst case.
- Derivation
- time-to-exhaust = 9.2×10¹⁸ records ÷ 10⁶ records/second ≈ 9.2×10¹² seconds (records cancel); ÷ (60×60×24×365 ≈ 3.15×10⁷ seconds/year) ≈ ~290,000 years.
So offset exhaustion is not a real-world limit; segment-position overflow (handled by rolling, above) is the only practical 32-bit boundary in the storage layer.
Integer.MAX_VALUE, the log rolls a new segment.The headline question: is ~1 GiB/s per topic a hard limit?
No. There is no compiled constant that caps a topic's throughput at 1 GiB/s, or at any value. Per-topic throughput is the canonical EMERGENT limit, and understanding why is the most useful mental model in this chapter. The reasoning rests on four numbers, each labelled below before it is used in the derivation:
- Assumption, per-partition produce ceiling
p≈ 50 MB/s (conservative ~10 MB/s) - kind: workload/hardware-dependent empirical planning figure, a single partition sustains "tens of MB/sec"; ~10 MB/s is the conservative number, tuned modern Kafka reaches 50–100+ MB/s. Jun Rao; ops reference §7 (version/hardware-dependent, substitute your measured value).
- Assumption, replication factor RF = 3
- kind: topic config (the durable default; leader + 2 followers).
- Assumption, egress amplification = RF (≈3× the raw write per leader)
- kind: derived from RF. A leader serves its own write plus (RF−1) replication copies to followers; counting the inbound write and the RF−1 outbound copies, a leader's byte budget is ≈ RF × ingress.
- Assumption, topic ingress target
T= 1 GiB/s ≈ 1,024 MB/s - kind: illustrative engineering target (the headline question's number; substitute your own).
- A topic's writes are spread across its partitions, and each partition has exactly one leader broker (Part I 08 · Replication). All producer traffic for a partition lands on that one leader. So a topic's aggregate write throughput is bounded by the sum of its partition-leaders' capacities, and any single partition is bounded by its single leader.
- The per-partition ceiling is itself emergent, set by that leader's disk write bandwidth, its NIC, and the replication it must drive, this is assumption
pabove. - Therefore topic throughput ≈ (#partitions) × (per-partition ceiling
p), a product whose units cancel to a rate: e.g. 20 partitions × 50 MB/s per partition = 1,000 MB/s ≈ 1 GiB/s. It is capped by the aggregate capacity of the brokers hosting those leaders, their NICs, disks, and the replication amplification of RF. - Replication multiplies the cost. With RF=3, every 1 byte produced is copied to RF−1 = 2 followers, so a leader's egress (and cross-AZ bytes) ≈ (RF−1) × ingress = 2 × ingress. Plan egress at ≥2× ingress, Confluent (empirical). This is why "the NIC" is often the true per-broker ceiling, not the disk: at RF=3 the NIC carries ingress + 2× ingress ≈ 3× the raw write.
Putting the four assumptions together: partition count N = ⌈T ÷ p⌉ = ⌈1,024 MB/s ÷ 50 MB/s per partition⌉ ≈ ⌈20.5⌉ = 21 partitions of pure leader write capacity (the MB/s cancels, leaving partitions). At the conservative p = 10 MB/s the same target needs ⌈1,024 ÷ 10⌉ ≈ 103 partitions, the figure swings by ~5× purely with the per-partition assumption, which is why you measure p rather than guess it. But each leader must also drive replication: at RF=3 a leader's NIC carries write + (RF−1) × write = 1 + 2 = 3× the raw write, so you must spread those leaders across enough brokers that no single NIC exceeds (its line rate ÷ 3). Concretely, for the per-broker usable ingress ceiling, assume a 10 GbE NIC (kind: hardware) ≈ 10 Gbit/s ÷ 8 bit/byte ≈ 1,250 MB/s of raw line rate, and the ~3× amplification just derived: usable ingress per broker ≈ NIC ÷ amplification = 1,250 MB/s ÷ 3 ≈ ~400 MB/s (or ÷5 ≈ ~250 MB/s once you also reserve headroom for consumer-fetch egress and cross-AZ, the planning figure used in II · 04). To reach 1,024 MB/s you therefore need at least ⌈1,024 ÷ 250⌉ ≈ 5 brokers carrying leadership, before any availability floor. This is the conclusion: ~1 GiB/s holds because the partition count provides the parallelism and the leaders are spread thin enough that no NIC/disk/replication budget is exceeded, not because Kafka guarantees it. The headline cluster benchmarks, LinkedIn's 2,024,032 rec/s (193 MB/s) with three producers + async replication, Confluent's 605 MB/s peak at RF=3 (both empirical, specific hardware/settings), are aggregate, multi-broker, multi-producer results, not single-topic guarantees. ~1 GiB/s per topic is an engineering target you reach by partitioning and provisioning, not a constant Kafka enforces. See II · 04 · Capacity Planning for the full sizing math.
Where the limit lives: Kafka vs Kinesis
One question surfaces so often it belongs in the catalogue, because answering it exercises the whole taxonomy: "Kinesis is Kafka under the hood, so where in Kafka's source is the 10 MB/s-per-shard limit?" The premise is wrong twice, and the untangling is the point. First, Kinesis is not built on Kafka. AWS Kinesis Data Streams is an independent, proprietary implementation of the same distributed-log pattern (Part III III · 00 · The Distributed Log as a Pattern); the Kafka-on-AWS product is Amazon MSK, a separate thing. A Kinesis shard and a Kafka partition are convergent designs, not shared code. Second, the number: a Kinesis shard sustains 1 MB/s (or 1,000 records/s) write and 2 MB/s (or 2,000 records/s) read (AWS Kinesis quotas). The "10 MB" is not a per-second rate: it is the most a single GetRecords call may return (5 calls/s per shard, but sustained read stays at 2 MB/s), and separately the per-record payload ceiling (raised to ~10 MiB, served via burst capacity).
The instructive part is where each limit is defined. In Kafka, per-partition throughput is the canonical EMERGENT limit from the section above: no constant sets it; it falls out of the leader's disk, NIC, and replication budget. In Kinesis the same logical number is a hard service quota, a value AWS chose and enforces at the API edge (the ProvisionedThroughputExceededException), because in Kinesis the shard is the unit of billing, so its capacity must be a declared constant. Same abstraction, opposite home for the limit.
| Limit | Kafka, where it lives | Kinesis, where it lives |
|---|---|---|
| Per-partition / per-shard write throughput | EMERGENT, ~10–100+ MB/s. No constant; set by leader disk, NIC, RF (the throughput section above). | Hard service quota: 1 MB/s or 1,000 rec/s. AWS-defined, API-enforced. |
| Per-partition / per-shard read throughput | EMERGENT, page-cache + zero-copy bound; optionally governed by quotas (below). | Hard service quota: 2 MB/s shared, or 2 MB/s per consumer with Enhanced Fan-Out. |
| Max single record / batch | SOFT: max.message.bytes ≈ 1 MiB default (ServerLogConfigs.java:177); raise it with its downstream configs. | Hard service quota: ~10 MiB payload; 10 MB per GetRecords call. |
| Rate limiting as a feature | SOFT, opt-in: per-(user, client-id) byte-rate quotas via ClientQuotaManager (Part I 19 · Quotas & Throttling); default unlimited. | Mandatory: the per-shard quota is the model. You cannot disable it, only buy more shards. |
Kafka and Kinesis implement the same abstraction and make the opposite decision about throughput. Kafka leaves the per-partition rate EMERGENT and unbounded by default, then offers quotas (ClientQuotaManager) as an opt-in governor; you trade a predictable bill for a higher ceiling, and you measure your per-partition p rather than read it off a price list. Kinesis hardens the per-shard rate into a mandatory, visible quota precisely so capacity equals billing. So the honest answer to "where does the 10 MB/s come from?" is: not from Kafka, and not from a constant at all; it is a vendor quota on a different system, the mirror image of Kafka's deliberately emergent per-partition throughput. When you design a metered system this is the lever to reach for: deciding which limits stay emergent and which harden into the contract is a first-class design decision, not an implementation detail. The full Kafka-versus-Kinesis architecture trade is in Part III III · 06 · Comparative Architecture.
Durability limits and the configs that bound them
A few SOFT limits govern durability and availability; their defaults are conservative-leaning but one default in particular is a footgun. These are covered in depth in II · 06 · Durability; here is the limit-relevant summary.
| Config | Default | Source | Why it matters as a limit |
|---|---|---|---|
min.insync.replicas | 1 | ServerLogConfigs.java:155; wired LogConfig.java:186, 232 | With acks=all, the minimum ISR members that must ack a write. Default 1 means a single replica's page cache is sufficient, set to 2 with RF=3 so a single broker loss is non-data-losing. |
min.insync.replicas vs RF | silently capped | effectiveMinIsr (broker) | min.insync.replicas is silently capped to the actual replica count: RF=1 still accepts writes even with min.insync.replicas=2. Always verify min.insync.replicas ≤ RF, Conduktor (empirical). |
replica.lag.time.max.ms | 30,000 (30 s) | ReplicationConfigs (KIP-16) | A follower not caught up within this window is dropped from ISR. Bounds how stale an ISR member may be, the implicit "how much in-flight data a single failure can lose" budget. |
replica.fetch.wait.max.ms | 500 | ReplicationConfigs.java:75 | "Should always be less than replica.lag.time.max.ms … to prevent frequent shrinking of ISR for low throughput topics" (:76–77), a config invariant. |
unclean.leader.election.enable | false (modern/KRaft) | , | When false, an out-of-sync replica may not become leader, trades availability for no data loss. Was true before 0.11.0; any UncleanLeaderElectionsPerSec > 0 is a data-loss event, Datadog (empirical). |
replication.factor=3 + min.insync.replicas=2 + acks=all (+ unclean.leader.election.enable=false) makes data loss require ≥2 simultaneous failures. The why is mechanical: with min-ISR=2 a write is acknowledged only after the leader and ≥1 follower have it; losing the leader leaves a caught-up follower to take over without truncating committed records. Drop min-ISR to 1 and you reintroduce single-failure data loss; the default of 1 exists for backward compatibility, not because it is safe. See Part I 08 · Replication and II · 06 · Durability.
Quick-reference: the limits catalogue
One table to scan in an incident. Class column: HARD (compiled), SOFT (config default), EMERGENT (resource/architecture).
| Boundary | Value | Class | Source / mechanism |
|---|---|---|---|
| Partitions per topic (resulting) | int32 id space (~2.1B); no practical cap | E | partition id is int32 |
| Partitions per request | 10,000 | H | ReplicationControlManager.java:152 |
| Metadata records per user op | 10,000 | H | QuorumController.java:187 |
| Partitions per broker (mmap ceiling) | ~32,765 (default vm.max_map_count) | E | 65,530 regions ÷ ~2 regions/partition; AbstractIndex.java:72 |
| Partitions per broker (practical) | gated by FDs, RAM, recovery time | E | FDs ≥ 100k; ~5 ms/partition election (empirical) |
| Partitions per cluster | ~200k (ZK-era) → millions / 2M lab (KRaft) | E | KIP-500; O(partitions) reload removed |
| Reduce partition count | not supported | H | no shrink API; keyed-topic ordering breaks |
| Segment data file | ≤ Integer.MAX_VALUE (~2 GiB) | H | int position LogSegment.java:256; OffsetIndex.java:44,56 |
log.segment.bytes default / floor | 1 GiB / 1 MiB | S | LogConfig.java:131,160 |
| Absolute offset | int64 (~9.2×10¹⁸) | H | baseOffset long, AbstractIndex.java:63 |
max.message.bytes / message.max.bytes | 1 MiB + overhead | S | ServerLogConfigs.java:177 |
max.request.size (producer) | 1 MiB | S | ProducerConfig.java:424 |
replica.fetch.max.bytes / response | 1 MiB / 10 MiB (not absolute) | S | ReplicationConfigs.java:68,88 |
fetch.max.bytes broker / consumer | 55 MiB / 50 MiB | S | ServerConfigs.java:106; ConsumerConfig.java:200 |
queued.max.requests | 500 | S | SocketServerConfigs.java:144 |
max.incremental.fetch.session.cache.slots | 1000 (8 shards) | S | ServerConfigs.java:102 |
max.request.partition.size.limit | 2000 | S | ServerConfigs.java:111 |
socket.request.max.bytes | 100 MiB | S | SocketServerConfigs.java:96 |
max.connections / per-ip / rate | Integer.MAX_VALUE (unbounded) | S | SocketServerConfigs.java:116,111,127 |
| Per-partition throughput | ~10–100+ MB/s (hardware/version) | E | leader disk/NIC/RF (empirical) |
| Per-topic throughput | N × per-partition, bounded by leader NIC/disk/RF | E | one leader/partition; egress ≥2× ingress at RF=3 |
Partition sizing math and the keyed-topic migration runbook: II · 03 · Partitioning. Turning these ceilings into a broker/disk/RAM bill: II · 04 · Capacity Planning. The signals that tell you which limit you are approaching: II · 08 · Metrics & Signals. Configuration mechanics (how a default is resolved, static vs dynamic): II · 01 · Configuration.