Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

Apache Kafka is a distributed streaming platform that excels in handling massive real-time data streams. Kafka organizes data into topics and further divides topics into partitions. Each partition acts as an independent channel, enabling parallel processing and fault tolerance.

In this tutorial, we delve into the techniques for sending data to specific partitions in Kafka. We’ll explore the benefits, implementation methods, and potential challenges associated with this approach.

2. Understanding Kafka Partitions

Now, let’s explore the fundamental concept of Kafka partitions.

2.1. What Are Kafka Partitions

When a producer sends messages to a Kafka topic, Kafka organizes these messages into partitions using a specified partitioning strategy. A partition is a fundamental unit that represents a linear, ordered sequence of messages. Once a message is produced, it is assigned to a particular partition based on the chosen partitioning strategy. Subsequently, the message is appended to the end of the log within that partition.

2.2. Parallelism and Consumer Groups

A Kafka topic may be divided into multiple partitions, and a consumer group can be assigned a subset of these partitions. Each consumer within the group processes messages independently from its assigned partitions. This parallel processing mechanism enhances overall throughput and scalability, allowing Kafka to handle large volumes of data efficiently.

2.3. Ordering and Processing Guarantee

Within a single partition, Kafka ensures that messages are processed in the same order they were received. This guarantees sequential processing for applications that rely on message order, like financial transactions or event logs. However, note that the order messages are received may differ from the order they were originally sent due to network delays and other operational considerations.

Across different partitions, Kafka does not impose a guaranteed order. Messages from different partitions may be processed concurrently, introducing the possibility of variations in the order of events. This characteristic is essential to consider when designing applications that rely on the strict ordering of messages.

2.4. Fault Tolerance and High Availability

Partitions also contribute to Kafka’s exceptional fault tolerance. Each partition can be replicated across multiple brokers. In the event of a broker failure, the replicated partitions can still be accessed and ensure continuous access to the data.

The Kafka cluster can seamlessly redirect consumers to healthy brokers, maintaining data availability and high system reliability.

3. Why Send Data to Specific Partitions

In this section, let’s explore the reasons for sending data to specific partitions.

3.1. Data Affinity

Data affinity refers to the intentional grouping of related data within the same partition. By sending related data to specific partitions, we ensure that it is processed together, leading to increased processing efficiency.

For instance, consider a scenario where we might want to ensure a customer’s orders reside in the same partition for order tracking and analytics. Guaranteeing that all orders from a specific customer end up in the same partition simplifies tracking and analysis processes.

3.2. Load Balancing

Additionally, distributing data evenly across partitions can help to ensure optimal resource utilization. Evenly distributing data across partitions helps optimize resource utilization within a Kafka cluster. By sending data to partitions based on load considerations, we can prevent resource bottlenecks and ensure that each partition receives a manageable and balanced workload.

3.3. Prioritization

In certain scenarios, not all data has equal priority or urgency. Kafka’s partitioning capabilities enable the prioritization of critical data by directing it to dedicated partitions for expedited handling. This prioritization ensures that high-priority messages receive prompt attention and faster processing compared to less critical ones.

4. Methods for Sending to Specific Partitions

Kafka provides various strategies for assigning messages to partitions, offering data distribution and processing flexibility. Below are some common methods that can be used to send messages to a specific partition.

4.1. Sticky Partitioner

In Kafka versions 2.4 and above, the sticky partitioner aims to keep messages without keys together in the same partition. However, this behavior isn’t absolute and interacts with batching settings such as batch.size and linger.ms.

To optimize the message delivery, Kafka groups messages into batches before sending them to brokers. The batch.size setting (default 16,384 bytes) controls the maximum batch size, affecting how long messages stay in the same partition under the sticky partitioner.

The linger.ms configuration (default: 0 milliseconds) introduces a delay before sending batches, potentially prolonging sticky behavior for messages without keys.

In the following test case, assuming the default batching configuration remains in place. We’ll send three messages without explicitly assigning a key. We should be expecting them to be initially assigned to the same partition:

kafkaProducer.send("default-topic", "message1");
kafkaProducer.send("default-topic", "message2");
kafkaProducer.send("default-topic", "message3");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 3);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

Set<Integer> uniquePartitions = records.stream()
  .map(ReceivedMessage::getPartition)
  .collect(Collectors.toSet());

Assert.assertEquals(1, uniquePartitions.size());

4.2. Key-based Approach

In the key-based approach, Kafka directs messages with identical keys to the same partition, optimizing the processing of related data. This is achieved through a hash function, ensuring deterministic mapping of message keys to partitions.

In this test case, messages with the same key partitionA should always land in the same partition. Let’s illustrate key-based partitioning with the following code snippet:

kafkaProducer.send("order-topic", "partitionA", "critical data");
kafkaProducer.send("order-topic", "partitionA", "more critical data");
kafkaProducer.send("order-topic", "partitionB", "another critical message");
kafkaProducer.send("order-topic", "partitionA", "another more critical data");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 4);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();
Map<String, List<ReceivedMessage>> messagesByKey = groupMessagesByKey(records);

messagesByKey.forEach((key, messages) -> {
    int expectedPartition = messages.get(0)
      .getPartition();
    for (ReceivedMessage message : messages) {
        assertEquals("Messages with key '" + key + "' should be in the same partition", message.getPartition(), expectedPartition);
    }
});

In addition, with the key-based approach, messages sharing the same key are consistently received in the order they were produced within a specific partition. This guarantees the preservation of message order within a partition, especially for related messages.

In this test case, we produce messages with the key partitionA in a specific order, and the test actively verifies that these messages are received in the same order within the partition:

kafkaProducer.send("order-topic", "partitionA", "message1");
kafkaProducer.send("order-topic", "partitionA", "message3");
kafkaProducer.send("order-topic", "partitionA", "message4");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 3);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

StringBuilder resultMessage = new StringBuilder();
records.forEach(record -> resultMessage.append(record.getMessage()));
String expectedMessage = "message1message3message4";

assertEquals("Messages with the same key should be received in the order they were produced within a partition", 
  expectedMessage, resultMessage.toString());

4.3. Custom Partitioning

For fine-grained control, Kafka allows defining custom partitioners. These classes implement the Partitioner interface, enabling us to write logic based on message content, metadata, or other factors to determine the target partition.

In this section, we’ll create a custom partitioning logic based on the customer type when dispatching orders to a Kafka topic. Specifically, premium customer orders will be directed to one partition, while normal customer orders will find their way to another.

To begin, we create a class named CustomPartitioner, inheriting from the Kafka Partitioner interface. Within this class, we override the partition() method with custom logic to determine the destination partition for each message:

public class CustomPartitioner implements Partitioner {
    private static final int PREMIUM_PARTITION = 0;
    private static final int NORMAL_PARTITION = 1;

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String customerType = extractCustomerType(key.toString());
        return "premium".equalsIgnoreCase(customerType) ? PREMIUM_PARTITION : NORMAL_PARTITION;
    }

    private String extractCustomerType(String key) {
        String[] parts = key.split("_");
        return parts.length > 1 ? parts[1] : "normal";
    }
   
    // more methods
}

Next, to apply this custom partitioner in Kafka, we need to set the PARTITIONER_CLASS_CONFIG property in the producer configuration. Kafka will use this partitioner to determine the partition for each message based on the logic defined in the CustomPartitioner class.

The method setProducerToUseCustomPartitioner() is used to set up the Kafka producer to use the CustomPartitioner:

private KafkaTemplate<String, String> setProducerToUseCustomPartitioner() {
    Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());
    DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);

    return new KafkaTemplate<>(producerFactory);
}

We then construct a test case to ensure that the custom partitioning logic correctly routes premium and normal customer orders to their respective partitions:

KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();

kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    assertEquals("Premium order message should be in partition 0", 0, record.partition());
    assertEquals("123_premium", record.key());
}

4.4. Direct Partition Assignment

When manually migrating data between topics or adjusting data distribution across partitions, direct partition assignment could help control the message placement. Kafka also offers the ability to send messages directly to specific partitions using the ProductRecord constructor that accepts a partition number. By specifying the partition number, we can explicitly dictate the destination partition for each message.

In this test case, we specify the second argument in the send() method to take in the partition number:

kafkaProducer.send("order-topic", 0, "123_premium", "Premium order message");
kafkaProducer.send("order-topic", 1, "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

List<ReceivedMessage> records = kafkaMessageConsumer.getReceivedMessages();

for (ReceivedMessage record : records) {
    if ("123_premium".equals(record.getKey())) {
        assertEquals("Premium order message should be in partition 0", 0, record.getPartition());
    } else if ("456_normal".equals(record.getKey())) {
        assertEquals("Normal order message should be in partition 1", 1, record.getPartition());
    }
}

5. Consume from Specific Partitions

To consume data from specific partitions in Kafka on the consumer side, we can specify the partitions we want to subscribe to using the KafkaConsumer.assign() method. This grants fine-grained control over consumption but requires managing partition offsets manually.

Here’s an example of consuming messages from specific partitions using the assign() method:

KafkaTemplate<String, String> kafkaTemplate = setProducerToUseCustomPartitioner();

kafkaTemplate.send("order-topic", "123_premium", "Order 123, Premium order message");
kafkaTemplate.send("order-topic", "456_normal", "Normal order message");

await().atMost(2, SECONDS)
  .until(() -> kafkaMessageConsumer.getReceivedMessages()
    .size() >= 2);

consumer.assign(Collections.singletonList(new TopicPartition("order-topic", 0)));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

for (ConsumerRecord<String, String> record : records) {
    assertEquals("Premium order message should be in partition 0", 0, record.partition());
    assertEquals("123_premium", record.key());
}

6. Potential Challenges and Considerations

When sending messages to specific partitions, there is a risk of uneven load distribution among partitions. This can occur if the logic used for partitioning doesn’t distribute messages uniformly across all partitions. Moreover, scaling the Kafka cluster, which involves adding or removing brokers, can trigger partition reassignment. During reassignment, brokers may move partitions, potentially disrupting the order of messages or causing temporary unavailability.

Therefore, we should regularly monitor the load on each partition using Kafka tools or metrics. For example, Kafka Admin Client and Micrometer can assist in gaining insights into partition health and performance. We can use the Admin Client to retrieve information about topics, partitions, and their current state; and use the Micrometer for metrics monitoring.

Additionally, anticipate the need to proactively adjust the partitioning strategy or horizontally scale the Kafka cluster to manage the increased load on specific partitions effectively. We may also consider increasing the number of partitions or adjusting key ranges for a more even spread.

7. Conclusion

In summary, the ability to send messages to specific partitions in Apache Kafka opens up powerful possibilities for optimizing data processing and enhancing overall system efficiency.

Throughout this tutorial, we explored various methods for directing messages to specific partitions, including the key-based approach, custom partitioning, and direct partition assignment. Each method offers distinct advantages, allowing us to tailor based on the specific requirements of the applications.

As always, the source code for the examples is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.