21 · Kafka Connect Architecture
Source: Apache Kafka 4.4.0-SNAPSHOT (git 04bfe7d, 2026-06-15), KRaft mode. Derived from source code, not copied from official documentation.
Kafka Connect is a framework for streaming data between Apache Kafka and external systems without writing bespoke producers and consumers. It defines a small plugin SPI, SourceConnector/SinkConnector plus SourceTask/SinkTask, where a connector splits a job into task configs and tasks perform the actual IO. A Worker JVM instantiates these plugins and runs each task in its own thread, wrapping it with a producer (source) or consumer (sink), a converter for serialization, and a chain of Single Message Transforms (SMTs). In distributed mode a DistributedHerder forms a cluster of workers over Kafka's group-membership protocol, balances connectors and tasks with the incremental cooperative assignor, and keeps three internal compacted topics, config, offsets, and status, as the source of truth. This chapter reconstructs Connect's class collaboration, data structures, algorithms, threading model and failure handling directly from the runtime code.
Role & responsibilities
Connect exists to make data integration a matter of configuration rather than code. A user POSTs a connector config to a REST endpoint; the framework instantiates the connector class, asks it to enumerate the work as a list of task configurations, distributes those tasks across a cluster of worker processes, and runs each task in a managed loop that handles serialization, transformation, offset commit, error tolerance, retries, metrics and lifecycle. The responsibilities decompose cleanly:
- Connector plugins (
SourceConnector,SinkConnector), own no data flow. They divide a job into at mostmaxTaskstask configs viataskConfigs(int), and monitor the external system for changes that require reconfiguration, notifying the runtime through aConnectorContexthanded to the connector viainitialize()(connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java:40,:56). - Task plugins (
SourceTask,SinkTask), do the IO. A source taskpoll()s records from upstream; a sink task receives records viaput(Collection)and is asked toflush()/preCommit()them downstream. - Worker, the per-JVM container. It instantiates connectors and tasks, manages their class loaders, builds the Kafka clients, owns thread pools, and exposes start/stop primitives.
connect/runtime/.../runtime/Worker.java:140. - Herder, the control plane. It owns the desired-state model (which connectors exist, their configs, target states), validates configs, serves the REST API, and decides what each worker should run.
StandaloneHerderkeeps this in memory;DistributedHerdercoordinates a cluster. - Backing stores, durable state. In distributed mode, config, source offsets and status each live in a dedicated compacted Kafka topic.
The split between connector (plans the work, single instance) and task (does the work, parallel instances) is the central abstraction. A connector is cheap and mostly idle; tasks are where throughput and parallelism live. tasks.max bounds the parallelism, and the connector chooses how to partition work, e.g. tables across tasks, or topic-partitions round-robin (MirrorMaker does the latter at connect/mirror/.../MirrorSourceConnector.java:212).
Where it lives in the code
| Concern | Class | File |
|---|---|---|
| Connector SPI base | Connector / SourceConnector / SinkConnector | connect/api/.../connector/Connector.java, .../source/SourceConnector.java, .../sink/SinkConnector.java |
| Task SPI base | Task / SourceTask / SinkTask | connect/api/.../connector/Task.java, .../source/SourceTask.java, .../sink/SinkTask.java |
| Serialization SPI | Converter, HeaderConverter | connect/api/.../storage/Converter.java |
| Transform SPI | Transformation, Predicate | connect/api/.../transforms/Transformation.java |
| Worker container | Worker | connect/runtime/.../runtime/Worker.java |
| Task execution (base) | WorkerTask | connect/runtime/.../runtime/WorkerTask.java |
| Source loop | AbstractWorkerSourceTask, WorkerSourceTask, ExactlyOnceWorkerSourceTask | connect/runtime/.../runtime/AbstractWorkerSourceTask.java |
| Sink loop | WorkerSinkTask | connect/runtime/.../runtime/WorkerSinkTask.java |
| Connector lifecycle | WorkerConnector | connect/runtime/.../runtime/WorkerConnector.java |
| Herder (control plane) | AbstractHerder, StandaloneHerder, DistributedHerder | connect/runtime/.../runtime/AbstractHerder.java, .../standalone/StandaloneHerder.java, .../distributed/DistributedHerder.java |
| Cluster membership | WorkerCoordinator, WorkerGroupMember, ConnectProtocol | connect/runtime/.../distributed/WorkerCoordinator.java |
| Assignment | IncrementalCooperativeAssignor, EagerAssignor | connect/runtime/.../distributed/IncrementalCooperativeAssignor.java |
| Config store | KafkaConfigBackingStore | connect/runtime/.../storage/KafkaConfigBackingStore.java |
| Offset store (source) | KafkaOffsetBackingStore, OffsetStorageWriter | connect/runtime/.../storage/KafkaOffsetBackingStore.java |
| Status store | KafkaStatusBackingStore | connect/runtime/.../storage/KafkaStatusBackingStore.java |
| Cross-cluster replication | MirrorSourceConnector, MirrorHeartbeatConnector, MirrorCheckpointConnector | connect/mirror/.../mirror/*.java |
Core concepts & terminology
- Connector
- A plugin (instance of
SourceConnectororSinkConnector) that produces task configurations and watches for reconfiguration. One instance per connector name, run on exactly one worker. - Task
- A plugin (instance of
SourceTaskorSinkTask) that streams records. Identified by aConnectorTaskId= (connector name, task index). - Worker
- A JVM process running the Connect runtime. Hosts a
Workerobject plus a herder. Multiple workers with the samegroup.idform a distributed cluster. - Herder
- The component that reconciles desired state (configs) with running state by telling its
Workerwhat to start/stop. - Converter
- Translates between Connect's in-memory
SchemaAndValuedata model andbyte[]on the wire (e.g.JsonConverter,StringConverter). Separate key, value and header converters. - SMT
- Single Message Transform: a
Transformationapplied per record, optionally gated by aPredicate. Composed into aTransformationChain. - Source partition / source offset
- Connector-defined coordinates (arbitrary maps) that let a source task resume after restart. Distinct from Kafka topic-partitions/offsets.
connect/api/.../source/SourceRecord.java:45. - Target state
- The administratively requested run-state of a connector:
STARTED,PAUSED, orSTOPPED(connect/runtime/.../runtime/TargetState.java).
The plugin SPI in detail
Connector contract
A Connector is given a ConnectorContext via initialize(), then start(props). The two abstract methods that define its job are taskClass() (which Task implementation to run) and taskConfigs(int maxTasks) (return up to maxTasks property maps, one per task). The default reconfigure() simply stops and restarts the connector (Connector.java:107), and validate() runs the connector's ConfigDef over the supplied properties (Connector.java:137). Source connectors add three optional methods of note: exactlyOnceSupport() and canDefineTransactionBoundaries() (queried before start when EOS is enabled), and alterOffsets() for the offsets REST API (SourceConnector.java:55, :74, :112).
SourceTask contract
poll() returns a List<SourceRecord> or blocks; it must periodically return (possibly null) so the framework can pause or stop the task (SourceTask.java:105). commit() and commitRecord(record, metadata) are optional callbacks invoked after the framework has durably recorded offsets, useful for connectors that also track positions in the upstream system. stop() may be called from a different thread and only needs to signal poll() to unblock (SourceTask.java:122). Exactly-once introduces the transaction.boundary config with values POLL (default), INTERVAL, or CONNECTOR (SourceTask.java:36).
SinkTask contract
A sink task's lifecycle (documented at SinkTask.java:35) is: initialize → start → open(partitions) → repeated put(records) and flush(offsets) → on rebalance, close(partitions) then open(...) again → finally stop. The crucial offset-safety method is preCommit(currentOffsets): it returns the map of topic-partition offsets that are safe to commit (i.e. already written downstream), defaulting to flushing everything (SinkTask.java:138). A RetriableException from put() tells the framework to retry the same batch after a backoff; any other exception kills the task.
Sink offsets passed to flush/preCommit always use the original Kafka topic/partition/offset (before SMTs), because transforms may rewrite the destination topic. The framework correlates each transformed SinkRecord back to its source consumer record via InternalSinkRecord for error reporting (WorkerSinkTask.java:590).
Converters and transforms
A Converter implements fromConnectData(topic, headers, schema, value) → byte[] and toConnectData(topic, headers, byte[]) → SchemaAndValue (Converter.java:73, :97). Key and value use independent converters, so a connector can, e.g., emit a string key and a schema-bearing JSON value. A Transformation.apply(record) returns a (possibly new) record or null to drop it; it must be immutable-friendly and thread-safe (Transformation.java:55). A TransformationStage wraps one transform with an optional predicate and a negate flag, the transform runs only when negate ^ predicate.test(record) is true (TransformationStage.java:90). Stages are composed left-to-right into a TransformationChain that the worker applies in the pipeline.
Data structures
SourceRecord and the source-offset model
A SourceRecord extends ConnectRecord (topic, kafkaPartition, key+schema, value+schema, timestamp, headers) and adds two opaque maps: sourcePartition and sourceOffset (SourceRecord.java:45). These are connector-defined, e.g. {db,table} → {rowTimestamp}. The framework persists sourcePartition → sourceOffset so that, after a crash, the task can resume by reading its last committed offset through the OffsetStorageReader available on its SourceTaskContext (SourceTaskContext.java:41).
Internal compacted topics (distributed mode)
Distributed Connect stores all durable state in three compacted Kafka topics, each fronted by a KafkaBasedLog (a tail-following producer+consumer pair). Their names and shapes:
| Store | Config key | Partitions | Cleanup | Holds |
|---|---|---|---|---|
| Config | config.storage.topic | 1 (enforced) | compact | connector configs, task configs, commit markers, target states, session keys, task-count records, restart requests, cluster log levels |
| Offsets | offset.storage.topic | offset.storage.partitions (default 25) | compact | source-connector offsets (key = connector + sourcePartition) |
| Status | status.storage.topic | status.storage.partitions (default 5) | compact | connector/task/topic status records |
The config topic must be a single partition so Kafka acts as a totally-ordered write-ahead log; KafkaConfigBackingStore verifies partitionCount on startup (KafkaConfigBackingStore.java:390). Keys are ad-hoc strings (to be immune to converter changes) while values are serialized with the internal JSON converter. The string key prefixes (KafkaConfigBackingStore.java:197–275):
| Key prefix | Example key | Value schema | Meaning |
|---|---|---|---|
connector- | connector-foo | {properties: map} | connector config |
task- | task-foo-0 | {properties: map} | config for one task (buffered until commit) |
commit- | commit-foo | {tasks: int} | atomically applies the buffered task configs; declares the task count |
target-state- | target-state-foo | {state, state.v2?} | STARTED / PAUSED / STOPPED |
tasks-fencing- | tasks-fencing-foo | {task-count: int} | task-count record used by EOS zombie fencing |
session-key | session-key | {key, algorithm, creation-timestamp} | HMAC key for signing internal REST requests |
restart-connector- | restart-connector-foo | {include-tasks, only-failed} | restart request marker |
Connect deliberately does not use Kafka transactions for the config topic. To update a connector's tasks atomically it writes each task-foo-N record, then a single commit-foo with the task count. Readers buffer task config records and only apply the whole batch when the commit marker arrives and the buffered task-id set is complete (KafkaConfigBackingStore.java:1107). This prevents a half-written reconfiguration from causing two workers to double-assign the same partitions. Because the topic is compacted, an incomplete set after compaction marks the connector inconsistent rather than applying garbage (KafkaConfigBackingStore.java:1113).
ConnectProtocol wire format
Workers join the group with a subscription carrying their state, and the leader replies with an assignment. The V0 (eager) layout (ConnectProtocol.java:114, :170):
Subscription V0, worker → coordinator on JoinGroup
Assignment V0, leader → each worker on SyncGroup
-1 (CONNECTOR_TASK) inside Tasks means "run the Connector instance itself". Strings are length-prefixed (variable width); the bars show relative field size, not exact bytes.The sentinel task id -1 (ConnectProtocol.CONNECTOR_TASK) is how the leader tells a worker to run a connector instance as opposed to a task (ConnectProtocol.java:55, :229). V1/V2 (IncrementalCooperativeConnectProtocol) add a revoked list and a delay field to support cooperative rebalancing; V2 ("sessioned") is schematically identical to V1 and does not embed the session key in the wire format, it merely signifies that internal-request verification and session-key distribution (via the config topic) are enabled (IncrementalCooperativeConnectProtocol.java:73–82).
Architecture & control / data flow
Source task data flow (per iteration)
AbstractWorkerSourceTask.java:408, WorkerSourceTask.java:218). Only records that survive the SMT chain are converted, submitted to the in-flight deque, and sent; offsets are flushed behind fully-acked prefixes.Detailed mechanics
Worker: instantiating and running a task
Worker.startConnector() resolves the connector's class loader, instantiates the connector, builds a SinkConnectorConfig or SourceConnectorConfig, wires up the per-connector offset store (source only), wraps everything in a WorkerConnector, registers it in the connectors map, and submits it to the shared ExecutorService (Worker.java:300). connectorTaskConfigs() later calls connector.taskConfigs(maxTasks) under the connector's class loader, stamps each map with TaskConfig.TASK_CLASS_CONFIG, copies the topics/topics.regex through for sinks, and (since the tasks.max.enforce default is true) throws TooManyTasksException if the connector returns more configs than allowed (Worker.java:412, :438).
Worker.startTask() is where a task becomes a thread. It instantiates the Task implementation, resolves key/value/header converters (preferring connector-supplied converters, falling back to worker defaults, Worker.java:703), then uses a TaskBuilder to assemble a WorkerSinkTask, WorkerSourceTask or ExactlyOnceWorkerSourceTask and submits it (Worker.java:726, :748):
- SinkTaskBuilder creates a
KafkaConsumerwithclient.id = connector-consumer-<taskId>and group id derived from the connector, then the sink task subscribes totopicsortopics.regex(Worker.java:1914;WorkerSinkTask.java:324). - SourceTaskBuilder creates a
KafkaProducer(client.id = connector-producer-<taskId>), optionally aTopicAdminfor auto topic creation, anOffsetStorageWriterand reader bound to that task's offset store (Worker.java:1955).
Every task is a Runnable; its thread is renamed task-thread-<connector>-<index> for the duration of run() (WorkerTask.java:67, :297). The base run() sets up an MDC LoggingContext, runs doRun() (pause-handling → doStart() → execute()), then on exit invokes the appropriate status listener and counts down a shutdown latch.
Source loop, in-flight tracking and offset commit
AbstractWorkerSourceTask.execute() loops while not stopping: handle pause, poll() the task, then sendRecords() (AbstractWorkerSourceTask.java:346). For each polled record it applies the SMT chain, converts to a ProducerRecord, optionally creates the destination topic, and calls producer.send with a callback. Offset durability relies on SubmittedRecords: submit() records each in-flight record in a per-source-partition FIFO deque, and committableOffsets() returns, for each source partition, the offset of the longest fully-acknowledged prefix, so a stalled record never lets a later record's offset be committed prematurely (WorkerSourceTask.java:141, :337).
Because source tasks don't drive a poll loop with predictable wakeups, a dedicated SourceTaskOffsetCommitter single-thread scheduler calls commitOffsets() every offset.flush.interval.ms (default 60 000 ms) for each source task (SourceTaskOffsetCommitter.java:77). commitOffsets() pushes the committable offsets into the OffsetStorageWriter, calls beginFlush() then doFlush() with an offset.flush.timeout.ms budget (default 5 000 ms), and on success calls the task's commit() hook (WorkerSourceTask.java:218). The scheduler is extremely careful to swallow exceptions, because an uncaught throwable in a scheduleWithFixedDelay task silently halts that task's schedule forever (SourceTaskOffsetCommitter.java:115).
For at-least-once source semantics, an offset is only flushed once every record up to and including it has been acknowledged by the producer. The longest-acked-prefix rule in SubmittedRecords guarantees that a crash after a flush can lose no acknowledged record and can only ever re-deliver records whose offset had not yet been flushed.
Sink loop, rebalance and offset commit
WorkerSinkTask.execute() runs iteration() until stopping (WorkerSinkTask.java:219). Each iteration: (1) maybe commit offsets if the interval elapsed or a commit was requested; (2) check for a timed-out commit; (3) poll(timeoutMs) the consumer. poll() first rewind()s any seek requested by the task, fetches a batch, runs convertMessages() (key/value/header convert + SMT chain, building SinkRecords; dropped records advance the offset but are not delivered, WorkerSinkTask.java:525), then deliverMessages() calls task.put(batch) (WorkerSinkTask.java:622).
Offset commit (commitOffsets(), WorkerSinkTask.java:417) calls task.preCommit(currentOffsets); if the task returns offsets it commits them via the consumer (sync when closing, async otherwise). Because async commits can complete out of order, each commit carries a monotonically increasing commitSeqno and onCommitCompleted() ignores stale callbacks (WorkerSinkTask.java:288). Consumer-group rebalances are handled by an inner HandleRebalance listener: on onPartitionsRevoked it commits offsets and calls task.close(); on onPartitionsAssigned it primes per-partition offsets and calls task.open() (WorkerSinkTask.java:732). A RetriableException from put() pauses all partitions ("pausedForRedelivery") so the same batch can be retried without fetching new data (WorkerSinkTask.java:646).
Sink tasks join an ordinary Kafka consumer group and commit to __consumer_offsets, there is no per-sink offset store. This is why sink connectors reuse the entire group coordination machinery and inherit the same delivery semantics as a regular consumer (offset committed after put/flush ⇒ at-least-once).
Connector lifecycle state machine
A WorkerConnector wraps the connector with a small state machine, INIT, STARTED, PAUSED, STOPPED, FAILED (WorkerConnector.java:60). Like tasks, it runs on the shared executor and reconciles its actual state toward the herder-requested TargetState in doTransitionTo() (WorkerConnector.java:378). The STOPPED target is special: transitioning there fully shuts the connector/task down and releases its Kafka clients and SMTs; the worker treats a task whose target became STOPPED as a shutdown request and will not resume it without a restart (WorkerTask.java:208, :338).
Distributed coordination: the herder tick loop
The DistributedHerder "uses a single thread for most of its processing… config changes, task rebalances and serving requests" (class doc, DistributedHerder.java:145). That thread runs tick() in a loop (DistributedHerder.java:409):
- If a previous attempt failed to read the config log to the end, retry first (
readConfigToEnd), joining and immediately leaving would be impolite to the group. member.ensureActive()drives JoinGroup/SyncGroup;handleRebalanceCompleted()finishes any pending assignment.- If leader and the session key is due for rotation, generate and write a new
SessionKeyto the config topic. - Drain queued external requests (REST callbacks) whose scheduled time has arrived, then process restart requests.
- Apply batched config/target-state/task-config updates observed asynchronously from the config log.
member.poll(timeout)to let the group make progress, waking early on a new request or config update.
REST handlers don't touch shared state directly; they enqueue a DistributedHerderRequest and the tick thread runs it, which "keeps synchronization straightforward at the cost of some operations possibly blocking up this thread" (DistributedHerder.java:410). A consequence: a request can be blocked behind an in-progress rebalance; the herder often returns RebalanceNeededException immediately rather than block. Heavy start/stop work is offloaded to a fixed startAndStopExecutor of 8 threads so a rebalance can start/stop many connectors and tasks in parallel (DistributedHerder.java:162, :333).
Leadership, config writes and request forwarding
The group's leader (chosen by the assignor, surfaced in the assignment) is the only writer permitted to mutate the config topic for important records, connector configs, session keys, task-count records. On becoming leader it calls configBackingStore.claimWritePrivileges() (DistributedHerder.java:2677); a non-leader worker that receives a config-change REST request forwards it to the leader's URL via the internal REST client (using the leader URL carried in the assignment). This is the single-writer discipline that KafkaConfigBackingStore assumes (KafkaConfigBackingStore.java:181). Internal REST calls are HMAC-signed with the rotating session key when the protocol is sessioned (V2), per KIP-507 (DistributedHerder.java:354).
Incremental cooperative assignment (KIP-415)
The default assignor is IncrementalCooperativeAssignor (KIP-415, cited at IncrementalCooperativeAssignor.java:58). Unlike the legacy EagerAssignor, which revokes everything from every worker on each rebalance (stop-the-world), the cooperative assignor computes a minimal delta so that connectors/tasks that stay on the same worker are never interrupted. The leader's performTaskAssignment() works in terms of set algebra over snapshots (IncrementalCooperativeAssignor.java:197):
| Derived set | Definition | Action |
|---|---|---|
configured | all connectors+tasks in the config snapshot | baseline of what should run |
active | union of what members report running | baseline of what does run |
deleted | previous − configured | revoke from owners |
duplicated | running on >1 worker | revoke the redundant copies |
lost | previous − active − deleted | delayed reassignment (see below) |
created | configured − previous − active | assign to least-loaded workers |
New and lost-then-reclaimed work is spread by assignConnectors()/assignTasks(), which sort workers by current load and hand each item to the least-loaded worker (a load-balancing round robin, IncrementalCooperativeAssignor.java:771, :801). When the cluster is imbalanced, performLoadBalancingRevocations() revokes a few items from overloaded workers so a later round can rebalance them; an exponential backoff (consecutiveRevokingRebalancesBackoff) prevents thrashing across successive revoking rounds (IncrementalCooperativeAssignor.java:298).
When a worker leaves, its tasks become lost. Rather than immediately reshuffling them (only to reshuffle again seconds later when the worker rejoins after a transient blip or a rolling restart), the leader defers reassignment by up to scheduled.rebalance.max.delay.ms (default 300 000 ms). It remembers the departed worker as a candidate; if that worker rejoins within the delay, its work is simply returned to it. Only when the delay expires are the lost tasks reassigned to the remaining workers (IncrementalCooperativeAssignor.java:439, :470). The per-worker delay is carried back in the assignment's Delay field so members know to schedule a follow-up rebalance.
Status propagation
Task and connector state changes flow through the TaskStatus.Listener/ConnectorStatus.Listener interfaces (which AbstractHerder implements) into KafkaStatusBackingStore, which writes to the status topic; the new state only becomes visible to readers (and the REST API) after it is read back from the topic (KafkaStatusBackingStore.java:70). To reduce the window where a stale worker overwrites a newer status, putSafe() only writes when there is no prior value, the prior value came from the same workerId, or the current generation is higher; each cache entry also carries a sequence number so retried sends from an out-of-date generation are dropped (KafkaStatusBackingStore.java:75, :293). A DESTROYED status serializes to a null value, a compaction tombstone.
Concurrency & threading
| Thread / pool | Owner | Responsibility | Guards / notes |
|---|---|---|---|
DistributedHerder-<clientId>-0 (1 thread, queue depth 1) | DistributedHerder | the tick loop: membership, config reconciliation, REST request execution | single thread ⇒ most herder state needs no locks; long ops can block requests (DistributedHerder.java:321) |
StartAndStopExecutor-… (8 threads) | DistributedHerder | parallel start/stop of connectors and tasks during rebalance | DistributedHerder.java:333 |
ForwardRequestExecutor-… (1 thread) | DistributedHerder | forward config-change requests to the leader | DistributedHerder.java:327 |
task-thread-<conn>-<n> (cached pool) | Worker | runs each WorkerTask/WorkerConnector run() | one per running task; Executors.newCachedThreadPool() (Worker.java:176) |
SourceTaskOffsetCommitter-0 (1 scheduled thread) | Worker (non-EOS) | periodic source offset flush | per-task scheduleWithFixedDelay; must never let an exception escape (SourceTaskOffsetCommitter.java:66) |
| KafkaBasedLog consumer threads | each backing store | tail config/offset/status topics and fire update callbacks | callbacks run on background threads ⇒ stores expose immutable snapshots, not live state |
Within a WorkerTask, state transitions are synchronized on the task's own monitor because pause/resume/stop arrive from the herder thread while the task runs on its own thread (WorkerTask.java:57). targetState, failed, stopping and cancelled are volatile; transitionTo(), onShutdown(), onFailure() and awaitUnpause() all take the monitor and notifyAll() so a paused task wakes promptly (WorkerTask.java:333). A sink task additionally calls consumer.wakeup() on stop/transition to break out of a blocking poll (WorkerSinkTask.java:172, :213). The worker's connectors and tasks registries are ConcurrentHashMaps keyed by name/ConnectorTaskId (Worker.java:162).
Backing-store accessors return point-in-time snapshots (e.g. configBackingStore.snapshot() → ClusterConfigState). Updates continue to be applied on the background log-reading thread after you take a snapshot. The herder caches a snapshot in configState and only refreshes it at well-defined points in tick(); never assume the live store and your snapshot agree.
Configuration reference
| Key | Default | Effect |
|---|---|---|
name | , | connector name; primary key (ConnectorConfig.java:81) |
connector.class | , | connector implementation class (ConnectorConfig.java:85) |
tasks.max | 1 | upper bound on task configs the connector may generate (ConnectorConfig.java:137) |
tasks.max.enforce | true | if true, exceeding tasks.max fails the connector instead of warning (ConnectorConfig.java:144) |
key.converter / value.converter | worker default | serialization for key/value; connector override falls back to worker config (Worker.java:703) |
header.converter | SimpleHeaderConverter | serialization for record headers |
transforms / predicates | [] | ordered SMT aliases and predicate aliases (ConnectorConfig.java:154, :158) |
errors.tolerance | none | all routes failed records to the DLQ / drops them instead of killing the task (ConnectorConfig.java:188) |
offset.flush.interval.ms | 60000 | source offset flush period; also the sink offset-commit period (WorkerConfig.java:116) |
offset.flush.timeout.ms | 5000 | budget for a source offset flush before it's abandoned (WorkerConfig.java:121) |
task.shutdown.graceful.timeout.ms | 5000 | how long the worker waits for tasks to stop cleanly (WorkerConfig.java:109) |
group.id | , | distributed cluster identity (group membership) |
config.storage.topic | , | config topic; created with 1 partition, compacted (DistributedConfig.java:156) |
offset.storage.topic / .partitions | , / 25 | source offsets topic; compacted (DistributedConfig.java:138, :144) |
status.storage.topic / .partitions | , / 5 | status topic; compacted (DistributedConfig.java:168, :174) |
connect.protocol | sessioned | group protocol: eager / compatible / sessioned (DistributedConfig.java:186) |
scheduled.rebalance.max.delay.ms | 300000 | how long lost tasks wait before reassignment (DistributedConfig.java:193) |
inter.worker.key.ttl.ms | 3600000 | session-key rotation period for signed internal requests (DistributedConfig.java:211) |
exactly.once.source.support | disabled | cluster-wide EOS for source connectors (DistributedConfig.java:251) |
Exactly-once source (KIP-618)
When exactly.once.source.support is enabled, the worker builds an ExactlyOnceSourceTaskBuilder and runs an ExactlyOnceWorkerSourceTask (Worker.java:2003; DistributedHerder.java:2016). It differs from the plain source task in two ways. First, records and their source offsets are written in the same producer transaction: the offsets store for an EOS task writes into the same topic via the same transactional producer, so a transaction atomically commits both the data and the offset (the ConnectorOffsetBackingStore participates in the producer transaction). Transaction boundaries follow the connector's transaction.boundary setting, per POLL batch (default), on a time INTERVAL, or driven by the connector via a TransactionContext (ExactlyOnceWorkerSourceTask.java:124; SourceTask.java:41).
Second, zombie fencing: before a new generation of source tasks starts, the leader writes a tasks-fencing-<connector> (task-count) record and the worker fences out any older transactional producers using the connector's admin principal, via Worker.fenceZombies() (Worker.java:764). The task-start path runs fenceZombieSourceTasks() as a pre-producer check and verifies its own generation/ownership as a post-producer check (DistributedHerder.java:2026). This guarantees a crashed-but-still-producing old task cannot keep appending after a new one takes over.
Kafka clients and Streams had exactly-once for years, but Connect source connectors lacked it until Kafka 3.3. KIP-618 added the atomic record+offset write plus producer fencing of zombie tasks. The requirement it imposes on connectors, each source partition owned by at most one task, and resumable from committed offsets, is exactly what SubmittedRecords and the per-task offset store already model, so EOS reuses the same offset machinery with a transactional producer bolted on. (Sink-side exactly-once is a separate concern handled by the consumer + idempotent/transactional writes in the sink system.)
Failure modes, edge cases & recovery
- Task throws a non-retriable exception,
WorkerTask.doRun()setsfailed=true, firesonFailure(), and the task staysFAILEDuntil manually restarted via the REST API. The connector keeps running and other tasks are unaffected (WorkerTask.java:244). - Source producer send fails, tolerance=none, the failure is stashed in
producerSendExceptionand re-thrown on the next poll/send, killing the task without committing the failed offset (WorkerSourceTask.java:344). Witherrors.tolerance=allthe record is dropped (and routed to a DLQ if configured) and its offset is committed so the task proceeds. - Sink put() throws RetriableException, the batch is retried after pausing all partitions; the consumer keeps polling (to stay in the group and honor timeouts) but fetches no new data until delivery succeeds (
WorkerSinkTask.java:646). - Offset flush times out (source), the flush is cancelled and recorded as a failure; the writer retains the offsets and retries on the next interval with the latest offsets (
WorkerSourceTask.java:319). - Config topic half-write + compaction, a connector with an incomplete buffered task-config set is marked inconsistent in
ClusterConfigState; the herder will not start its tasks until the connector regenerates a complete set (KafkaConfigBackingStore.java:1108). - Leader fenced from config topic, if a zombie leader caused a fence, the real leader reclaims write privileges at the top of the next tick; a demoted leader simply relinquishes them (
DistributedHerder.java:440). - Worker falls behind on configs, the assignor computes assignments at the max config offset seen; lagging workers receive an assignment tagged with that offset and refuse to act on it until they catch up (
IncrementalCooperativeAssignor.java:111;DistributedHerder.java:1826forces a rebalance on offset mismatch). - Worker crash / network partition, its tasks are
lost; reassignment is deferred up toscheduled.rebalance.max.delay.msso a transient outage or rolling restart doesn't trigger an avoidable global reshuffle. - Slow task shutdown, the worker can
cancel()a task, which proactively closes its producer/offset reader (to unblock a hungsendor read) and suppresses any further status updates so a late-dying orphan cannot clobber the state of its replacement (WorkerTask.java:160;AbstractWorkerSourceTask.java:293).
Invariants & guarantees
- Single config writer. Only the elected leader writes connector configs, task configs, session keys and task-count records to the config topic; everyone else forwards (
KafkaConfigBackingStore.java:181). - Atomic task reconfiguration. A set of task configs becomes visible to readers only when its
commit-<connector>marker arrives with a matching, complete task-id set. - One connector instance. A given connector name runs on exactly one worker; tasks of a connector are distributed and each
ConnectorTaskIdruns on exactly one worker (duplicates are revoked). - At-least-once by default. Source offsets are flushed only behind fully-acked record prefixes; sink offsets are committed only after
put/preCommitapproves them. Exactly-once source requiresexactly.once.source.support+ a connector that declaresSUPPORTED. - Cooperative continuity. Under V1/V2, connectors and tasks that remain assigned to the same worker across a rebalance are never stopped (
IncrementalCooperativeAssignoronly revokes the delta). - Status read-after-write. A status change is authoritative only once it has round-tripped through the status topic.
Interactions with other subsystems
- Group Coordination, distributed workers form a group via the broker
GroupCoordinatorusing a Connect-specific embedded protocol; sink tasks additionally form an ordinary consumer group. The Connect assignor plugs into the same JoinGroup/SyncGroup mechanics described there. - Producer Client, every source task owns a
KafkaProducer; EOS source tasks use the producer's transactional API (transactional.idper task) and zombie fencing. - Consumer Client, every sink task owns a
KafkaConsumer, subscribes to its topics, and commits to__consumer_offsets; rebalance callbacks driveopen/close. - Transactions & EOS, KIP-618 exactly-once source builds directly on Kafka transactions and producer fencing.
- Log Management, Connect's three internal topics are compacted; their correctness depends on log compaction semantics, which the config store explicitly reasons about.
- Security, internal worker-to-worker REST calls are HMAC-signed with the rotating session key (KIP-507); connector client configs can be overridden subject to a
ConnectorClientConfigOverridePolicy. - Architecture Overview, Connect is a Kafka client application; it is KRaft-agnostic and talks only to broker APIs, never to the controller quorum directly.
MirrorMaker 2: Connect for cross-cluster replication
MirrorMaker 2 is implemented as a set of Connect source connectors in the connect/mirror module, so it inherits the entire runtime above. Three connectors cooperate (one logical direction per source→target cluster pair):
| Connector | What it replicates | Mechanism |
|---|---|---|
MirrorSourceConnector | topic data, configs and ACLs | discovers source topic-partitions, creates matching downstream topics, and splits partitions round-robin across tasks (MirrorSourceConnector.java:202) |
MirrorHeartbeatConnector | liveness/lag heartbeats | its task poll() emits a Heartbeat record to the heartbeats topic on a fixed interval (MirrorHeartbeatTask.java:64) |
MirrorCheckpointConnector | consumer-group offsets | divides source consumer groups among tasks and emits checkpoint records translating committed offsets into the target cluster (MirrorCheckpointConnector.java:132) |
MirrorSourceConnector.taskConfigs() shows the canonical "partition the work" pattern: it computes numTasks = min(maxTasks, knownSourceTopicPartitions.size()) and round-robins the known topic-partitions into per-task config maps (MirrorSourceConnector.java:206). The connector also runs a background Scheduler to periodically refresh the set of source topic-partitions, creating downstream topics and notifying the runtime to reconfigure tasks when the set changes (MirrorSourceConnector.java:162). Heartbeat and checkpoint records use the SourceRecord source-partition/source-offset fields to remain resumable, exactly like any other source connector. MirrorMaker can run inside a dedicated driver (MirrorMaker.java) that embeds Connect herders, or its connectors can be deployed into an ordinary distributed Connect cluster.
Gotchas & operational notes
- The config topic must have exactly one partition. Multiple partitions break the total-ordering assumption and Connect refuses to operate correctly. Set replication factor high; you cannot repartition it later without losing ordering.
- Default
offset.flush.interval.msis 60 s. Source duplicate windows on restart are bounded by this interval, lower it for tighter at-least-once bounds at the cost of more writes to the offsets topic. - The herder tick thread is a single thread. A connector whose
taskConfigs()or whose validation blocks (e.g. a slow network call to the upstream system) can stall REST responses cluster-wide. Connector authors must keep control-plane methods fast. - Cooperative vs eager. Mixing workers configured with
connect.protocol=eagerandsessioneddowngrades the whole cluster to the lowest common protocol and disables internal request signing, the herder logs a prominent warning (DistributedHerder.java:2648). - Scaling and the rebalance delay. Adding/removing workers triggers a delayed rebalance; during a rolling restart, set the delay high enough that work returns to a bouncing worker rather than churning across the cluster.
- Converter mismatch is the most common data bug. Producing with a schema-bearing JSON value and consuming with
StringConverter(or vice versa) silently corrupts data; key and value converters are independent and must match the data on the topic. - Failed tasks do not auto-restart. A task that hits a non-retriable error stays
FAILEDuntilPOST /connectors/{name}/tasks/{id}/restart; monitor the status topic / REST status endpoint.