20 · Kafka Streams Architecture
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
Kafka Streams is a client-side stream-processing library, there is no separate processing cluster. An application is a topology: a directed acyclic graph of source, processor and sink nodes built either from the low-level Processor API or from the high-level DSL (KStream/KTable/GlobalKTable). At build time the topology is split into sub-topologies, and at run time each sub-topology is instantiated once per input partition as a task. Tasks run on a small pool of StreamThreads, each owning its own consumer and (for exactly-once) its own transactional producer. State lives in pluggable key/value, window and session stores (RocksDB or in-memory), each backed by a compacted changelog topic so that a failed task can be re-created elsewhere and rebuilt by replaying its changelog. This chapter dissects the topology compiler, the task model, the StreamThread event loop, state restoration via the dedicated state-updater thread, the custom partition assignor (co-partitioning, sticky placement, warmup/probing rebalances per KIP-441) and the newer broker-driven Streams group protocol (KIP-1071), processing guarantees, time/windowing and interactive queries.
Role & responsibilities
Kafka Streams sits entirely on top of the consumer and producer clients. Its job is to turn a declarative or imperative description of a computation into a fault-tolerant, horizontally-scalable, stateful dataflow that consumes input topics, maintains local materialized state, and produces output topics, with at-least-once or exactly-once semantics. Concretely it must:
- Compile the DSL/Processor-API graph into an executable
ProcessorTopology, inserting internal repartition topics where a key changes and changelog topics for every persistent store (InternalTopologyBuilder). - Partition the work: split the topology into sub-topologies and map each (sub-topology, partition) pair to a
TaskId, enforcing co-partitioning of joined inputs. - Assign tasks to instances and to threads within an instance, stickily, while warming up state on new instances before moving active work (
StreamsPartitionAssignor+HighAvailabilityTaskAssignor). - Execute the poll → buffer → process → punctuate → commit loop (
StreamThread+TaskExecutor), respecting record timestamps and stream-time. - Restore local state from changelogs on assignment and keep hot standby replicas updated (
StoreChangelogReader,DefaultStateUpdater). - Guarantee exactly-once via producer transactions that atomically commit output records, changelog records and input offsets (
StreamsProducer). - Serve point and range queries against local state to the application (Interactive Queries / IQv2).
Where it lives in the code
| Concern | Principal class | File |
|---|---|---|
| App lifecycle / state machine | KafkaStreams | streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java |
| DSL entry point | StreamsBuilder → InternalStreamsBuilder | streams/.../StreamsBuilder.java, kstream/internals/InternalStreamsBuilder.java |
| Topology compiler | InternalTopologyBuilder | processor/internals/InternalTopologyBuilder.java |
| DSL operators | KStreamImpl, KTableImpl, KGroupedStreamImpl … | kstream/internals/*.java |
| Executable graph node | ProcessorNode, SourceNode, SinkNode | processor/internals/ProcessorNode.java |
| Per-instance worker | StreamThread | processor/internals/StreamThread.java |
| Task lifecycle owner | TaskManager, TaskExecutor | processor/internals/TaskManager.java, TaskExecutor.java |
| Active / standby task | StreamTask, StandbyTask | processor/internals/StreamTask.java, StandbyTask.java |
| Partition assignor (classic) | StreamsPartitionAssignor | processor/internals/StreamsPartitionAssignor.java |
| State restoration | StoreChangelogReader, DefaultStateUpdater | processor/internals/StoreChangelogReader.java, DefaultStateUpdater.java |
| State manager / checkpoint | ProcessorStateManager | processor/internals/ProcessorStateManager.java |
| EOS producer wrapper | StreamsProducer | processor/internals/StreamsProducer.java |
| State stores | RocksDBStore, CachingKeyValueStore, ThreadCache | state/internals/*.java |
| Configuration | StreamsConfig | streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java |
| Broker-side Streams group | StreamsGroup | group-coordinator/.../coordinator/group/streams/StreamsGroup.java |
Core concepts & terminology
- Topology
- A DAG of
ProcessorNodes. Sources read from Kafka topics, processors transform/aggregate, sinks write to topics. Built once, instantiated many times. - Sub-topology (node group)
- A maximal connected component of the graph that is not crossed by a repartition topic.
InternalTopologyBuilder.makeNodeGroups()assigns each an integer id; that id is thesubtopologyfield of every task derived from it (InternalTopologyBuilder.java:937). - Task / TaskId
- The unit of parallelism and of assignment.
TaskIdis(subtopology, partition[, topologyName])(processor/TaskId.java:43). One task owns one partition from each input topic of its sub-topology, a partition group. - StreamTask vs StandbyTask
StreamTaskactively processes records;StandbyTaskis a passive hot replica that only replays the changelog to keep a local copy of state warm (StandbyTask.java:46).- Repartition topic
- An internal topic (
…-repartition) inserted when the DSL changes the key (e.g.selectKey,groupBy) so that downstream stateful operators see co-located keys. - Changelog topic
- A compacted internal topic (
…-changelog) that records every write to a persistent store, enabling restoration after failure or relocation. - Stream-time
- The maximum record timestamp observed so far for a task's partition group; monotonically non-decreasing. Drives
STREAM_TIMEpunctuation and window advancement. - Record cache
- A per-thread byte-bounded write-back cache (
ThreadCache) in front of stores that deduplicates and batches updates, reducing downstream and changelog traffic.
From DSL/Processor API to a topology
Building the graph
The Processor API talks directly to InternalTopologyBuilder via addSource / addProcessor / addSink / addStateStore / connectProcessorAndStateStores (InternalTopologyBuilder.java:474, :552, :522, :598, :728). Each call appends a NodeFactory (a deferred build() of a ProcessorNode) and invalidates the cached node-grouping (nodeGroups = null). The DSL is a fluent layer: StreamsBuilder.stream(...) / .table(...) create KStreamImpl / KTableImpl objects that record a logical GraphNode in an intermediate graph held by InternalStreamsBuilder; only when build() is called does buildAndOptimizeTopology() walk that graph and translate each logical node into Processor-API calls on the same InternalTopologyBuilder (InternalStreamsBuilder.java:304).
The DSL is a thin compiler over the Processor API. There is exactly one executable abstraction, the ProcessorNode DAG inside InternalTopologyBuilder. Every KStream operator ultimately becomes one or more addProcessor/addSink/addSource calls.
DSL optimizations
optimizeTopology() runs when StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG is set (value all = OPTIMIZE, StreamsConfig.java:266). Two notable rules: reuseKTableSourceTopics() lets a KTable read directly from its source topic instead of creating a redundant changelog (the source topic is the changelog); and mergeRepartitionTopics() collapses multiple repartition operations on the same key into a single shared topic (InternalStreamsBuilder.java:353, :357, :472, :477). Finally rewriteRepartitionNodes() finalizes the repartition node names.
Inserting repartition topics
Whenever an operator may have changed the key, the DSL routes the stream through a repartition topic so the new key is correctly partitioned. KStreamImpl.createRepartitionedSource() (kstream/internals/KStreamImpl.java:969) emits a triple of nodes: a sink (…-sink) that writes to the repartition topic, a filter (…-filter, a pass-through KStreamFilter created with predicate (k,v) -> true at KStreamImpl.java:1005, the null-key-dropping predicate (k,v) -> k != null is installed later by the optimization pass InternalStreamsBuilder.rewriteRepartitionNodes(), InternalStreamsBuilder.java:372), and a source (…-source) reading the repartition topic back in. The topic name is the user prefix plus the suffix -repartition (REPARTITION_TOPIC_SUFFIX, KStreamImpl.java:102, :978). The presence of a repartition source topic is exactly what severs one sub-topology from the next.
Identifying sub-topologies and their topics
makeNodeGroups() performs a union-find over the node graph, breaking connections at repartition boundaries, to produce a Map<Integer, Set<String>> of node-group id → node names (InternalTopologyBuilder.java:937). For each group, subtopologyToTopicsInfo() assembles a TopicsInfo record with four sets (InternalTopologyBuilder.java:1209–1271):
| Field | Meaning |
|---|---|
sourceTopics | External + repartition-source topics feeding this sub-topology. |
sinkTopics | Topics this sub-topology writes to (output + repartition-sink). |
repartitionSourceTopics | Internal repartition topics, with their computed partition counts. |
stateChangelogTopics | Changelog topics for each persistent store, typed by store kind. |
Changelog topic configs are typed by store flavour in createChangelogTopicConfig() (InternalTopologyBuilder.java:~1330): versioned stores → VersionedChangelogTopicConfig, window stores → WindowedChangelogTopicConfig (retention = window size + grace + windowstore.changelog.additional.retention.ms), everything else → UnwindowedUnversionedChangelogTopicConfig (compaction). The assignor (or, under KIP-1071, the broker) materializes these topics with the right partition count before tasks start.
counts store0_x, sub-topology 1 → tasks 1_x).Data structures
ProcessorNode and ProcessorTopology
A ProcessorNode<KIn,VIn,KOut,VOut> wraps a user Processor (or FixedKeyProcessor), a list of child nodes (children / childByName) and the set of state-store names it is connected to (stateStores) (ProcessorNode.java:50–57). process(Record) invokes the user processor and translates exceptions: a ClassCastException becomes a helpful Serde-mismatch error, and other exceptions are routed to the configured ProcessingExceptionHandler, which may drop the record, fail, or emit dead-letter-queue records (ProcessorNode.java:175–283). A ProcessorTopology (built by InternalTopologyBuilder.build(nodeGroup), :1019) is the per-sub-topology executable: ordered processors(), sources() keyed by topic, sinks(), the store factories, and the source-topic → SourceNode map a task uses to build its input queues.
The task and its partition group
A StreamTask holds an AbstractPartitionGroup, one RecordQueue per input partition, each tagged with its SourceNode and a TimestampExtractor (StreamTask.java:238, RecordQueueCreator at :1446). The partition group also tracks per-partition timestamps and the task's stream-time. Key task fields (StreamTask.java:85–120):
| Field | Purpose |
|---|---|
partitionGroup | Buffered records per partition + stream-time bookkeeping; chooses the next record by smallest timestamp across partitions. |
recordCollector | RecordCollectorImpl that sends sink/changelog records through the thread's StreamsProducer. |
consumedOffsets / committedOffsets / highWatermark | Per-partition progress used for commits, lag and purging. |
streamTimePunctuationQueue / systemTimePunctuationQueue | Scheduled punctuators, by event-time and wall-clock. |
maxBufferedSize | buffered.records.per.partition (default 1000); when a queue exceeds it the consumer is paused for that partition (StreamTask.java:1151). |
commitNeeded / commitRequested / hasPendingTxCommit | Drive whether and how the next commit flushes and whether the task is processable. |
State stores, changelogs and the offset metadata
ProcessorStateManager owns a task's stores. For each registered store it keeps a StateStoreMetadata recording the StateStore, its changelogPartition (= changelogFor(store) with the task's partition), the current restored offset, the changelog endOffset, the restore callback, and a record converter (ProcessorStateManager.java:75, :374). changelogOffsets() returns offset + 1 per changelog partition (or 0 if unknown) and is what gets written to the on-disk checkpoint and to the assignor's lag computation (ProcessorStateManager.java:446).
Committed input offsets carry encoded metadata: StreamTask.findOffsetAndMetadata() attaches a TopicPartitionMetadata string holding the partition's stream-time and serialized ProcessorMetadata, so that after a restart the task can restore its stream-time and processor state from the committed offset (StreamTask.java:465, :1075).
Runtime architecture & threading model
One application, N stream threads, plus helpers
KafkaStreams creates num.stream.threads (default 1, StreamsConfig.java:1034) StreamThreads via StreamThread.create(), each with its own set of clients: a main consumer, a restore consumer (no group), a shared admin client, and, through ActiveTaskCreator, exactly one StreamsProducer per thread (StreamThread.java:399; ActiveTaskCreator.java:95). Each thread also creates a DefaultStateUpdater (its own restoration thread) and a ThreadCache sized to its share of the cache budget. If a GlobalKTable exists, a single GlobalStreamThread runs for the whole instance, fully bootstrapping the global topic before processing begins.
num.stream.threads)GlobalKTable exists)KafkaStreams instance. The top two layers repeat num.stream.threads times, each StreamThread owns its clients, TaskManager, ThreadCache and its own state-updater thread; the bottom two layers (the optional GlobalStreamThread and the shared services) exist exactly once per instance.The StreamThread event loop
StreamThread.run() transitions to STARTING, calls taskManager.init() (which starts the state-updater thread) and enters runLoop() (StreamThread.java:907, :934). The thread keeps polling while it is alive or a rebalance is in progress. Each iteration is runOnceWithoutProcessingThreads() (the default; a separate runOnceWithProcessingThreads() path exists behind the internal processing.threads flag). The default iteration (StreamThread.java:1198):
- Poll (
pollPhase(),:1445): callmainConsumer.poll()with a duration that depends on state (poll.ms= 100 ms when running,Duration.ZEROwhilePARTITIONS_REVOKEDso the join can complete), then hand records totaskManager.addRecordsToTasks(), which routes each partition's records into the matching task's queue. If a queue exceedsmaxBufferedSize, the consumer is paused for that partition (back-pressure). - Check the state updater (
checkStateUpdater(),:1420): drain tasks that finished restoring and transition them toRUNNING; once all are running and the thread wasPARTITIONS_ASSIGNED, the thread becomesRUNNING. - Process in a tight inner loop, up to
numIterationsrecords per task:taskManager.process(numIterations, time)→TaskExecutor.process()walks initialized active tasks and callsStreamTask.process()for each (TaskExecutor.java:67). - Punctuate:
taskManager.punctuate()fires due stream-time and wall-clock punctuators. - Commit if
commit.interval.mshas elapsed (maybeCommit(),:1817); periodically purge consumed repartition records viadeleteRecords(repartition.purge.interval.ms).
The inner loop uses an adaptive numIterations: it grows when there is steady work and is halved whenever a punctuate/commit fired or the thread is within half of max.poll.interval.ms of its poll deadline, bounding how long the thread can go without returning to poll() and thus avoiding being kicked from the group (StreamThread.java:1298–1308).
A StreamTask is only processable when every input partition's queue is non-empty (or idling has timed out via max.task.idle.ms), and it has no pending transaction commit. This is enforced by partitionGroup.readyToProcess() and the hasPendingTxCommit check in StreamTask.isProcessable() (StreamTask.java:716). Requiring data on all inputs is what makes joins deterministic with respect to timestamps.
Record selection and time
Within a processable task, partitionGroup.nextRecord() picks the record with the smallest timestamp across all partition queues, advancing the task's stream-time monotonically. StreamTask.doProcess() builds a ProcessorRecordContext (timestamp, offset, partition, topic, headers, raw key/value) and calls process() on the partition's SourceNode, which forwards down the DAG (StreamTask.java:773, :868). Three notions of time coexist: event-time (extracted from the record by a TimestampExtractor; default FailOnInvalidTimestamp), ingestion-time (broker append time, if the source topic uses LogAppendTime), and processing-time (wall clock). Windowed operators use stream-time to close windows; a window's grace period defines how long late records (timestamp below stream-time minus grace) are still admitted before being dropped.
Punctuation
StreamTask.schedule() registers a Punctuator on either the STREAM_TIME or WALL_CLOCK_TIME queue (StreamTask.java:1167). Stream-time punctuators are data-driven, they only fire when stream-time (derived from records) crosses the schedule, whereas wall-clock punctuators fire on the system clock regardless of input. Either firing sets commitNeeded = true (:1247, :1272).
Task lifecycle & the TaskManager
Assignment handling
When the consumer delivers a new assignment, the rebalance listener calls TaskManager.handleAssignment(activeTasks, standbyTasks) (TaskManager.java:356). The manager reconciles desired vs. current tasks:
- Tasks already owned with the same role → keep, only update input partitions / resume.
- Tasks that flipped active⇄standby → recycle: reuse the open state store and changelog position, swapping only the task wrapper (
recycleTaskFromStateUpdater). - Tasks no longer owned → close clean (commit + checkpoint + release the state-dir lock).
- Genuinely new tasks → created via
ActiveTaskCreator/StandbyTaskCreatorand added totasks.addPendingTasksToInit()(TaskManager.java:457).
New and recycled tasks start in CREATED; the state-updater thread initializes their stores and restores them before they are handed back to the thread as RUNNING. On revocation (handleRevocation, :1027) the manager commits the revoked active tasks (or, under EOS-v2, all tasks, since one transaction spans them) before suspending them, so no data is lost across the ownership change.
Active task state machine
· RESTORING → SUSPENDED, suspend mid-restore
· SUSPENDED → RESTORING, resume (re-restore, then run)
· CREATED → SUSPENDED, suspend before init
· CREATED → CLOSED, recycle / close before init
StreamTask.initializeIfNeeded / completeRestoration / suspend / resume / close); the main spine is shown vertically and the off-spine edges are noted below. A StandbyTask skips active RESTORING, it goes CREATED → RESTORING → RUNNING immediately and stays there, "restoring" forever via the changelog (StandbyTask.java:112).State restoration: the changelog reader & state updater
Dedicated restoration thread
Restoration does not happen on the StreamThread. Each thread owns a DefaultStateUpdater whose inner StateUpdaterThread runs an independent loop (DefaultStateUpdater.java:82, :193): drain add/remove actions from a queue, pause/resume tasks for paused topologies, call changelogReader.restore(updatingTasks), checkpoint, and idle if all changelogs are read. Restored active tasks are placed on a restoredActiveTasks queue that the StreamThread drains in checkStateUpdater(). This decoupling lets the main loop keep processing already-running tasks while other tasks are still rebuilding state.
Moving restoration and standby updating onto a separate StateUpdaterThread (the DefaultStateUpdater) means a long state rebuild on a newly-assigned task no longer stalls processing of tasks that are already caught up, and it cleanly separates the restore consumer's poll loop from the main consumer's. The restore consumer is deliberately not in a consumer group, it seeks explicitly by changelog offset and is paused/resumed per partition (StoreChangelogReader.pauseResumePartitions, :518).
The restore algorithm
StoreChangelogReader tracks each changelog partition as a ChangelogMetadata with a ChangelogState of REGISTERED → RESTORING → COMPLETED (StoreChangelogReader.java:75, :111). A single restore consumer multiplexes all of a thread's changelogs. The reader itself has a top-level ChangelogReaderState: ACTIVE_RESTORING (restoring active-task changelogs to their end offsets) vs STANDBY_UPDATING (continuously tailing standby changelogs). Per restore() call (StoreChangelogReader.java:439):
- Initialize any newly
REGISTEREDchangelogs: look up their end offsets, seek the restore consumer, transition toRESTORING. restoreConsumer.poll()a batch; buffer records per partition. AInvalidOffsetException(changelog truncated/compacted past the needed offset) throwsTaskCorruptedExceptionso the task is rebuilt from scratch (:499).- For each restoring changelog call
restoreChangelog(): hand the buffered batch tostateManager.restore(), advance the store's offset, fireonBatchRestored(active) oronBatchLoaded(standby) listeners (:645). - An active changelog whose store offset has reached its end offset transitions to
COMPLETED, is paused on the restore consumer, and triggersonRestoreEnd(:692).
stateManager.restore() converts records through the store's record converter and replays them via the RecordBatchingStateRestoreCallback, then records the batch end offset and persists task offsets to the StateDirectory (ProcessorStateManager.java:494). When all active changelogs for a thread complete, the reader transitions to STANDBY_UPDATING and thereafter only tails standby changelogs, updating their limit offsets from the broker so a standby never reads past the committed high-watermark of its source.
Partition assignment
Kafka Streams ships a custom consumer assignor, StreamsPartitionAssignor, plugged into the classic consumer group protocol. It runs on the group leader and is responsible for (a) verifying/creating internal topics with correct partition counts, (b) grouping partitions into tasks honoring co-partitioning, (c) placing tasks on instances (the task assignor), and (d) distributing each instance's tasks across its threads.
Subscription & assignment payloads
Each member encodes a SubscriptionInfo in its subscription user-data: its processId (a per-instance UUID), per-task offset sums (a proxy for how caught-up its local state is), its end-point for IQ routing, client tags, an error code, and a monotonically-incrementing uniqueField that forces the bytes to differ on every rejoin (StreamsPartitionAssignor.java:281). The leader returns an AssignmentInfo per member with the active task list, standby task map, host→partition routing tables, and an optional next rebalance time used to schedule follow-up rebalances (StreamsPartitionAssignor.java:1151).
Co-partitioning
Sources that are joined must be co-partitioned, same partition count so that key k lands in the same task on both sides. copartitionSources() records co-partition groups (InternalTopologyBuilder.java:771); the assignor validates that all topics in a group share a partition count and uses that count when creating repartition topics, throwing if external topics disagree.
Task placement: the High-Availability assignor (KIP-441)
The default task assignor is the High-Availability assignor (selected when task.assignor.class is null, StreamsConfig.java:1055). Its goal is to return to processing quickly without shipping cold state synchronously. Each instance reports, via offset sums, its lag on every stateful task. The assignor:
- Assigns an active task to an instance that is already caught up, lag within
acceptable.recovery.lag(default 10000 offsets,StreamsConfig.java:944). If none is caught up, it temporarily leaves the active task where it can run and assigns the task as a warmup standby on the intended destination. - Caps concurrent warmups at
max.warmup.replicas(default 2,StreamsConfig.java:1028) to throttle restore traffic. - Schedules a probing rebalance every
probing.rebalance.interval.ms(default 10 minutes,StreamsConfig.java:1242) by encoding a next rebalance time; when the warmup catches up, the next probing rebalance moves the active task to the now-warm instance. A single member per client is chosen to trigger it (StreamsPartitionAssignor.java:834,:1168; theStreamThreadfires it viamainConsumer.enforceRebalance()whennextProbingRebalanceMselapses,StreamThread.java:963).
Scaling out is eventually balanced, immediately available: a new instance first receives standby (warmup) copies and only takes over active tasks once its state is within acceptable.recovery.lag. This trades transient imbalance for never blocking processing on a cold restore.
Sticky distribution across threads
Within an instance, assignTasksToThreads() distributes the client's tasks over its consumer threads (StreamsPartitionAssignor.java:1293). It targets floor(totalTasks / consumers) per thread, first giving each thread back its previously-owned stateful tasks (ordered by lag via prevTasksByLag) up to that target, then interleaving the remainder. This keeps stateful tasks on the same thread across rebalances so their open RocksDB stores can be reused. Tasks revoked to satisfy a move are encoded with an immediate follow-up rebalance request so the new owner can pick them up.
Version probing & follow-up rebalances
The subscription carries the protocol version. If the leader sees a future version it cannot fully decode, it falls back and schedules another rebalance, this is version probing, which makes rolling upgrades safe (StreamsPartitionAssignor.java:607, :1416). The StreamThread bridges assignor↔thread via three shared atomics seeded into a ReferenceContainer: assignmentErrorCode (e.g. SHUTDOWN_REQUESTED, INCOMPLETE_SOURCE_TOPIC_METADATA), nextScheduledRebalanceMs, and a queue of non-fatal exceptions (StreamThread.java:364–369, :506).
The new Streams group protocol (KIP-1071)
Kafka 4.x introduces a broker-driven protocol for Streams, analogous to KIP-848 for consumers. When group.protocol=streams, the StreamThread builds an AsyncKafkaConsumer with a StreamsRebalanceData describing the sub-topologies, and assignment + topic creation + warmup are driven by the broker's group coordinator through the StreamsGroupHeartbeat RPC rather than by the client-side StreamsPartitionAssignor (StreamThread.java:556, :686; broker side: group-coordinator/.../streams/StreamsGroup.java). The client surfaces coordinator status codes, SHUTDOWN_APPLICATION, MISSING_SOURCE_TOPICS, INCORRECTLY_PARTITIONED_TOPICS, in handleStreamsRebalanceData() and, e.g., waits up to two heartbeat intervals for missing topics before failing (StreamThread.java:1531, :1573). The broker provides two assignors mirroring the classic ones: a highly-available assignor (KIP-441 semantics) and a sticky assignor. In this mode the client cannot trigger probing/enforced rebalances itself; the code explicitly skips enforceRebalance() when streamsRebalanceData is present (StreamThread.java:981, :1091).
The Streams group protocol was a preview in 4.1 and reached General Availability in 4.2, its core feature set is now a stable, production feature gated by streams.version (SV_1, bootstrap IBP_4_2_IV1, with LATEST_PRODUCTION = SV_1; StreamsVersion.java:27, :32). The default client group.protocol is nonetheless still classic (DEFAULT_GROUP_PROTOCOL = CLASSIC, StreamsConfig.java:606), so the classic StreamsPartitionAssignor path remains the out-of-the-box default and is what most of this chapter describes for assignment.
State stores & the record cache
Store types and layering
Three logical store abstractions exist: key/value, window and session stores (plus versioned key/value stores). The default physical engine is RocksDB (default.dsl.store = rocksDB, StreamsConfig.java:541); an in-memory option exists. Stores are composed as a stack of decorators, each adding one concern:
K,VThreadCache): deduplicates and batches puts, flushing on eviction or commitRecordCollectorput flows down through serde → cache → changelog → physical store. Window and session stores follow the same pattern with segmented RocksDB stores underneath.RocksDBStore
RocksDBStore implements KeyValueStore<Bytes,byte[]> over an embedded RocksDB instance, one directory per store under the task directory (rocksdb/<name>, RocksDBStore.java:111, :248). It opens with a configurable BlockBasedTableConfig (block cache), write options, and column families; put() and range scans are synchronized, and putAll() batches into a single WriteBatch for throughput (RocksDBStore.java:485, :508). Restoration plugs in a RecordBatchingStateRestoreCallback (this::restoreBatch) so the changelog reader writes directly into RocksDB in bulk (RocksDBStore.java:183).
The record cache (ThreadCache)
ThreadCache is a single byte-bounded LRU cache per thread, shared across all that thread's caching stores via namespaces (ThreadCache.java:39). Its capacity is statestore.cache.max.bytes (default 10 MiB, StreamsConfig.java:956) split across threads; KafkaStreams can dynamically resize() it, signalled into the thread via the cacheResizeSize atomic (StreamThread.java:1175). The cache is write-back: dirty entries accumulate and are flushed downstream (to the changelog and to forwarding) on eviction (when total bytes exceed the cap, ThreadCache.java:294) or on commit. This both reduces changelog traffic and changes emission semantics, with caching, only the latest value per key in a flush interval is emitted, so intermediate results are suppressed.
The record cache controls update rate and ordering, not just memory. Setting statestore.cache.max.bytes=0 makes every update flush immediately, so downstream operators and changelogs see every intermediate result, useful for low latency or tests, but it can dramatically increase output and changelog volume. The legacy cache.max.bytes.buffering is deprecated in favour of statestore.cache.max.bytes (StreamsConfig.java:466).
Processing guarantees
At-least-once vs exactly-once-v2
processing.guarantee is at_least_once (default) or exactly_once_v2 (StreamsConfig.java:410, :418). Under EOS-v2, choosing the guarantee also lowers the default commit.interval.ms from 30000 ms to 100 ms (EOS_DEFAULT_COMMIT_INTERVAL_MS, StreamsConfig.java:169, applied in StreamsConfig at :1695) to keep end-to-end latency reasonable despite transactional commits.
How EOS-v2 commits atomically
The thread's single StreamsProducer is transactional when EOS is on; its transactional.id is <application.id>-<processId>-<threadIdx> (ActiveTaskCreator.java:108). On commit, TaskExecutor.commitOffsetsOrTransaction() gathers the consumed input offsets across all EOS tasks and issues a single sendOffsetsToTransaction(offsets, consumerGroupMetadata) + commitTransaction() (TaskExecutor.java:173, :178; StreamsProducer.commitTransaction at :242). Because output records, changelog records and input-offset commits all live in one transaction, a consumer reading downstream with read_committed never sees the effects of a partially-processed batch.
Under EOS-v2 the unit of atomicity is the thread, not the task: one transaction per StreamThread spans every active task it owns. That is why handleRevocation commits all tasks before any are revoked (TaskManager.java:1027), and why a ProducerFencedException / InvalidProducerEpochException on commit is treated as TaskMigratedException, the thread lost its place and must drop and re-acquire all tasks.
Fencing
If two instances ever both believe they own a task (e.g. a missed rebalance), the transactional producer fences the zombie: any transactional call throwing ProducerFenced/InvalidProducerEpoch is converted to TaskMigratedException (StreamsProducer.java:185–:268), and StreamThread.handleTaskMigrated() closes all tasks dirty and rejoins the group (StreamThread.java:1139). This is the mechanism that prevents duplicate writes during ownership ambiguity.
Interactive Queries (IQv2)
Local state is queryable. KafkaStreams.query(StateQueryRequest) dispatches a typed query (e.g. KeyQuery, RangeQuery, WindowKeyQuery) to the right store partition and returns a StateQueryResult (query/StateQueryRequest.java, query/StateQueryResult.java). To route a key to the instance that owns it, StreamsMetadataState (fed from the assignor's host→partition tables) answers keyQueryMetadataForKey(store, key, partitioner) (the public KafkaStreams.queryMetadataForKey delegates to it), returning the active host plus standby hosts; an application can then forward the query over its own RPC layer. Window/session stores additionally support time-range queries. IQ reads go through the MeteredXxxStore layer, so they observe the same Serdes as writes.
Failure modes, edge cases & recovery
| Situation | Detection | Recovery |
|---|---|---|
| Changelog truncated/compacted past needed offset | InvalidOffsetException in restore poll (StoreChangelogReader.java:499) | TaskCorruptedException → close task dirty, wipe local store, re-bootstrap from changelog start; under classic protocol may trigger a rebalance. |
| Zombie task after missed rebalance (EOS) | ProducerFenced/InvalidProducerEpoch on send/commit | Mapped to TaskMigratedException; handleTaskMigrated closes all tasks dirty and rejoins (StreamThread.java:1139). |
| Transient broker timeout during commit/process | TimeoutException | Non-EOS: retry, bounded by task.timeout.ms via maybeInitTaskTimeoutOrThrow. EOS: a process/commit timeout escalates to TaskCorruptedException (StreamTask.java:816, TaskExecutor.java:186). |
| One input partition starved in a join | Task never processable | Idle up to max.task.idle.ms (default 0), then proceed with available data; logged every 2 min via maybeLogNotReady (StreamTask.java:757). |
| Missing source topics (Streams protocol) | Coordinator status MISSING_SOURCE_TOPICS | Wait up to 2× heartbeat interval; then throw MissingSourceTopicException (StreamThread.java:1573). |
| Uncaught exception in a thread | run() catch-all | Invoke the StreamsUncaughtExceptionHandler (replace-thread / shutdown-client / shutdown-application); completeShutdown closes clients and marks thread DEAD (StreamThread.java:918, :1883). |
Configuration reference
| Key | Default | Effect |
|---|---|---|
application.id | (required) | Consumer group id and prefix for all internal topics; defines an application's identity. |
num.stream.threads | 1 | Processing threads per instance (StreamsConfig.java:1034). |
processing.guarantee | at_least_once | at_least_once or exactly_once_v2 (:410). |
commit.interval.ms | 30000 (100 under EOS) | How often tasks commit/checkpoint (:481, :169). |
statestore.cache.max.bytes | 10485760 | Record-cache budget per instance, split across threads (:956). |
num.standby.replicas | 0 | Hot standby copies of each stateful task for fast failover (:920). |
max.warmup.replicas | 2 | Concurrent warmup standbys the HA assignor may create (:1028). |
acceptable.recovery.lag | 10000 | Max changelog lag for an instance to be deemed "caught up" and given the active task (:944). |
probing.rebalance.interval.ms | 600000 | How often a probing rebalance checks whether warmups have caught up (:1242). |
max.task.idle.ms | 0 | How long a task waits for empty input partitions before processing available data (:1023). |
buffered.records.per.partition | 1000 | Per-partition buffer cap before back-pressuring the consumer (:1104). |
poll.ms | 100 | Main-consumer max poll block time (:1237). |
replication.factor | -1 | RF for internal topics; -1 uses the broker default (:1060). |
task.assignor.class | null → HA assignor | Custom task placement (KIP-924); default is the HighAvailabilityTaskAssignor (:1055). |
topology.optimization | none | all enables source-topic reuse and repartition merging (:266). |
group.protocol | classic | streams opts into the broker-driven protocol (KIP-1071, :605). |
default.dsl.store | rocksDB | Default physical store type for DSL operators (:532). |
Invariants & guarantees
- Per-key ordering. A key is always processed by exactly one task (its partition's task), so per-key processing order matches input order within a partition.
- Co-partitioning. Joined inputs share partition counts; key
kmeets its counterpart in the same task. - Stream-time monotonicity. A task's stream-time never decreases; windows close and stream-time punctuators fire deterministically from data, independent of wall clock.
- State ⇔ changelog equivalence. Every persistent store write is mirrored to its changelog (offset checkpointed), so a relocated task rebuilds an identical store by replaying the changelog.
- EOS atomicity per thread. Output records, changelog records and input-offset commits for all of a thread's active tasks commit in one transaction or not at all.
- Sticky, available scaling. Stateful tasks stay on the same instance/thread across rebalances when possible; active work is never blocked on a cold restore (warmup-then-move).
Interactions with other subsystems
- Consumer client: each thread's main consumer drives the loop; the classic path plugs in
StreamsPartitionAssignor, whilegroup.protocol=streamsuses theAsyncKafkaConsumer+StreamsGroupHeartbeat. The restore consumer is group-less and offset-seeked. - Producer client: sink and changelog writes flow through one
StreamsProducerper thread; EOS uses its transactional API. - Transactions & EOS:
exactly_once_v2relies on idempotent + transactional producers and consumer-group offset commits inside transactions. - Group coordination: assignment runs over the consumer rebalance protocol (classic) or the dedicated Streams group on the broker (KIP-1071).
- Log management: changelog topics are compacted; windowed changelogs add retention; repartition topics are purged via
deleteRecordsup to the committed offset. - Share groups (KIP-932) and Kafka Connect are sibling client frameworks that, like Streams, are built on the core clients but solve different problems (queues / external connectors).
Design rationale & evolution
Streams pushes all processing logic to the client and reuses the broker only as a durable, partitioned log, for input, output, repartitioning and state changelogs. This means scaling is just "run more instances/threads," fault tolerance is "replay a compacted topic," and there is no bespoke storage tier to operate. The cost is that state lives on the processing nodes, which is exactly what standbys (num.standby.replicas) and the warmup machinery (KIP-441) exist to make cheap to move.
- KIP-441, Smooth scaling out: warmup standbys + probing rebalances, embodied by the HighAvailabilityTaskAssignor and
acceptable.recovery.lag/max.warmup.replicas. - KIP-924, Pluggable custom task assignment (
task.assignor.class). - KIP-925, Rack-aware task assignment (
RackAwareTaskAssignor). - KIP-848, The consumer-side next-generation protocol that KIP-1071 mirrors for Streams, moving assignment into the broker's group coordinator.
- KIP-1071, Streams Rebalance Protocol: dedicated
StreamsGroupHeartbeatRPC and broker-sideStreamsGroup, with highly-available and sticky broker assignors. A preview in 4.1, its core feature set reached General Availability in 4.2 (streams.versionSV_1); the defaultgroup.protocolstill remainsclassic.
Gotchas & operational notes
- Topology identity must match across instances. Tasks are positional; if two instances add operators in different orders, a task may reference a topic unknown to one instance, raising a
TopologyExceptionwhen building record queues (StreamTask.java:1461). Keep the topology code identical across the application. - Internal topic naming. Repartition/changelog names are derived from operator names/positions;
ensure.explicit.internal.resource.namingcan force explicit names so a topology change does not orphan topics. Renaming operators changes internal topic names and can require re-bootstrapping state. - Standbys cost changelog reads but speed failover. With
num.standby.replicas>0, a failed active task's replacement is already warm; the trade-off is continuous restore-consumer traffic on every standby. - EOS lowers the commit interval. Expect more frequent commits (default 100 ms) and the corresponding producer-transaction overhead; tune
commit.interval.msfor your latency/throughput balance. - The restore path is separate. Long restores show up on the
StateUpdaterthread, not theStreamThread; monitor restore/standby metrics there. A thread can beRUNNINGfor some tasks while others are stillRESTORING. - Caching changes what you see. Intermediate aggregation results are suppressed by the record cache; if you need every update (CDC-style), set the cache to 0 or use
EmitStrategy/Suppresseddeliberately.