1. Overview
Apache Kafka is a distributed streaming platform that handles high-throughput data feeds through a partition-based architecture. When we send messages to a Kafka topic, they’re distributed across multiple partitions for parallel processing. This design enables Kafka to scale horizontally while maintaining performance.
Understanding how Kafka delivers messages across partitions is crucial for building reliable distributed systems. The partition strategy affects the message ordering, consumer scaling, and overall system behavior. In this article, we’ll explore how Kafka delivers messages when topics contain multiple partitions, focusing on routing strategies, ordering guarantees, and consumer coordination.
2. Message Routing to Partitions
Kafka uses two primary strategies to determine which partition receives a message, based on whether the message includes a key. This decision fundamentally impacts how messages are distributed and processed.
2.1. Key-Based Partitioning
When we send a message with a key, Kafka uses a deterministic hash function, most likely Murmur2 Hash, to consistently route it to the same partition. This ensures related messages stay together:
public void sendMessagesWithKey() {
String key = "user-123";
for (int i = 0; i <= 5; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("user-events", key, "Event " + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
logger.info("Key: {}, Partition: {}, Offset: {}", key, metadata.partition(),
metadata.offset());
}
});
}
producer.flush();
}
Kafka applies the MurmurHash2 algorithm to the key and uses modulo arithmetic with the partition count to select the target partition. All messages with key “user-123” will consistently land in the same partition, ensuring they’re processed in order. This is particularly useful when we need to maintain state or sequence for specific entities.
2.2. Round-Robin for Keyless Messages
Messages without keys are distributed using sticky partitioning, a strategy that improves throughput by batching messages efficiently:
public Map<Integer, Integer> sendMessagesWithoutKey() {
Map<Integer, Integer> partitionCounts = new HashMap<>();
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("events", null, // no key
"Message " + i);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
synchronized (partitionCounts) {
partitionCounts.merge(metadata.partition(), 1, Integer::sum);
}
}
});
}
producer.flush();
logger.info("Distribution across partitions: {}", partitionCounts);
return partitionCounts;
}
Without a key, Kafka fills batches in one partition before moving to the next. This reduces network requests and improves compression ratios compared to pure round-robin distribution. The sticky behavior continues until the batch is full or the linger time expires.
3. Ordering Guarantees Across Partitions
Kafka’s ordering guarantees depend entirely on the partition structure. Understanding them is essential for designing systems that handle sequential operations correctly.
3.1. Within Partition Ordering
Each partition maintains strict ordering through sequential offset assignment. Messages are appended to the partition log and consumed in that exact order:
public void demonstratePartitionOrdering() throws InterruptedException {
String orderId = "order-789";
String[] events = { "created", "validated", "paid", "shipped", "delivered" };
for (String event : events) {
ProducerRecord<String, String> record = new ProducerRecord<>("orders", orderId, event);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
logger.info("Event: {} -> Partition: {}, Offset: {}", event, metadata.partition(),
metadata.offset());
}
});
// small delay to demonstrate sequential processing
Thread.sleep(100);
}
producer.flush();
}
Since all messages share the same key, they route to the same partition and maintain their sequence during consumption. This guarantee is absolute within a partition: Consumers will always read these events in the order they were produced.
3.2. No Cross-Partition Ordering
Messages with different keys may land in different partitions, and Kafka provides no ordering guarantees across partitions:
public void demonstrateCrossPartitionBehavior() {
long startTime = System.currentTimeMillis();
// these will likely go to different partitions
producer.send(new ProducerRecord<>("events", "key-A", "First at " + (System.currentTimeMillis() - startTime) + "ms"));
producer.send(new ProducerRecord<>("events", "key-B", "Second at " + (System.currentTimeMillis() - startTime) + "ms"));
producer.send(new ProducerRecord<>("events", "key-C", "Third at " + (System.currentTimeMillis() - startTime) + "ms"));
producer.flush();
}
Even though we sent these messages sequentially, consumers might process them out of order since they’re in different partitions. One partition might be processed faster than another due to consumer load or network conditions.
4. Consumer Group Coordination
Kafka enables horizontal scaling by distributing partitions among consumers in a group. This coordination is fundamental to Kafka’s scalability model.
4.1. Partition Assignment Within Groups
When multiple consumers join the same group, Kafka assigns each partition to exactly one consumer, preventing duplicate processing within the group:
public void createConsumerGroup() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "order-processors");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
int recordCount = 0;
while (recordCount < 10) { // process limited records for demo
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
logger.info("Consumer: {}, Partition: {}, Offset: {}, Value: {}", Thread.currentThread()
.getName(), record.partition(), record.offset(), record.value());
recordCount++;
}
consumer.commitSync();
}
consumer.close();
}
If we have six partitions and three consumers in the group, each consumer typically handles two partitions. This distribution ensures balanced load and no message duplication within the group. Kafka’s group coordinator manages these assignments automatically.
4.2. Multiple Groups for Fan-Out
Different consumer groups can independently process the same messages, enabling multiple applications to react to the same events:
public void startMultipleGroups() {
String[] groupIds = { "analytics-group", "audit-group", "notification-group" };
CountDownLatch latch = new CountDownLatch(groupIds.length);
for (String groupId : groupIds) {
startConsumerGroup(groupId, latch);
}
try {
latch.await(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread()
.interrupt();
}
}
private void startConsumerGroup(String groupId, CountDownLatch latch) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
// other properties
new Thread(() -> {
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("orders"));
int recordCount = 0;
while (recordCount < 5) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
recordCount += processRecordsForGroup(groupId, records);
}
} finally {
latch.countDown();
}
}).start();
}
Each group maintains its own offset tracking, allowing different services to process messages at their own pace. This pattern enables event-driven architectures where multiple systems react to the same business events.
5. Handling Consumer Rebalancing
When consumers join or leave a group, Kafka rebalances partition assignments. This process ensures continued operation but can cause temporary disruptions. We can use cooperative rebalancing to minimize impact:
public void configureCooperativeRebalancing() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "cooperative-group");
props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection partitions) {
logger.info("Revoked partitions: {}", partitions);
// complete processing of current records
}
@Override
public void onPartitionsAssigned(Collection partitions) {
logger.info("Assigned partitions: {}", partitions);
// initialize any partition-specific state
}
});
// process a few records to demonstrate
int recordCount = 0;
while (recordCount < 5) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
recordCount += records.count();
}
consumer.close();
}
Cooperative rebalancing allows unaffected consumers to continue processing while only reassigning necessary partitions. This significantly reduces the impact of scaling operations.
6. Processing Guarantees
For reliable message processing, we typically implement at-least-once delivery by manually controlling offset commits:
public void processWithManualCommit() {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("group.id", "manual-commit-group");
props.put("enable.auto.commit", "false");
props.put("max.poll.records", "10");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
int totalProcessed = 0;
while (totalProcessed < 10) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processOrder(record);
totalProcessed++;
} catch (Exception e) {
logger.error("Processing failed for offset: {}", record.offset(), e);
break;
}
}
if (!records.isEmpty()) {
consumer.commitSync();
logger.info("Committed {} records", records.count());
}
}
consumer.close();
}
This approach ensures that our messages aren’t lost if processing fails, though we must design our processing logic to handle potential duplicates gracefully.
7. Putting It All Together
When a producer sends a message to Kafka, the process starts with partition selection. Messages with keys use consistent hashing to ensure related data stays together in the same partition. Keyless messages use sticky partitioning to improve batching efficiency. Within each partition, Kafka assigns sequential offsets to maintain strict order, but there’s no global ordering across partitions.
Each partition is assigned to one consumer per group, enabling parallel processing without duplication. Different consumer groups consume the same messages independently, each with separate offset tracking. When consumers join or leave, Kafka rebalances partition assignments using cooperative strategies to reduce disruption. This design allows Kafka to scale horizontally while preserving order within each partition.
8. Conclusion
In this article, we’ve explored how Kafka’s partition-based architecture handles message delivery while maintaining ordering guarantees where they matter most. We’ve seen that Kafka prioritizes scalability and throughput over global ordering, providing partition-level guarantees that align with most real-world requirements.
The key takeaway is understanding that partitions are the unit of parallelism and ordering in Kafka. By designing our applications around these constraints, we can build scalable systems that process millions of messages efficiently.
The code backing this article is available on GitHub. Once you're
logged in as a Baeldung Pro Member, start learning and coding on the project.