Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

In this tutorial, we’ll explore how to dynamically route messages in Kafka Streams. Dynamic routing is particularly useful when the destination topic for a message depends on its content, enabling us to direct messages based on specific conditions or attributes within the payload. This kind of conditional routing finds real-world applications in various domains like IoT event handling, user activity tracking, and fraud detection.

We’ll walk through the problem of consuming messages from a single Kafka topic and conditionally routing them to multiple destination topics. The primary focus will be on how to set this up in a Spring Boot application using the Kafka Streams library.

2. Kafka Streams Routing Techniques

Dynamic routing of messages in Kafka Streams isn’t confined to a single approach but rather can be achieved using multiple techniques. Each has its distinct advantages, challenges, and suitability for various scenarios:

  • KStream Conditional Branching: The KStream.split().branch() method is the conventional means to segregate a stream based on predicates. While this method is easy to implement, it has limitations when it comes to scaling the number of conditions and can become less manageable.
  • Branching with KafkaStreamBrancher: This feature appeared in Spring Kafka version 2.2.4. It offers a more elegant and readable way to create branches in a Kafka Stream, eliminating the need for ‘magic numbers’ and allowing more fluid chaining of stream operations.
  • Dynamic Routing with TopicNameExtractor: Another method for topic routing is to use a TopicNameExtractor. This allows for a more dynamic topic selection at runtime based on the message key, value, or even the entire record context. However, it requires topics to be created in advance. This method affords more granular control over topic selection and is more adaptive to complex use cases.
  • Custom Processors: For scenarios requiring complex routing logic or multiple chained operations, we can apply custom processor nodes in the Kafka Streams topology. This approach is the most flexible but also the most complex to implement.

Throughout this article, we’ll focus on implementing the first three approaches—KStream Conditional Branching, Branching with KafkaStreamBrancher, and Dynamic Routing with TopicNameExtractor.

3. Setting Up Environment

In our scenario, we have a network of IoT sensors streaming various types of data, such as temperature, humidity, and motion to a centralized Kafka topic named iot_sensor_data. Each incoming message contains a JSON object with a field named sensorType that indicates the type of data the sensor is sending. Our aim is to dynamically route these messages to dedicated topics for each type of sensor data.

First, let’s establish a running Kafka instance. We can set up Kafka, Zookeeper, and Kafka UI using Docker, along with Docker Compose, by creating a docker-compose.yml file:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  kafka_ui:
    image: provectuslabs/kafka-ui:latest
    depends_on:
      - kafka
    ports:
      - 8082:8080
    environment:
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
  kafka-init-topics:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - kafka
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
               cub kafka-ready -b kafka:29092 1 30 && \
               kafka-topics --create --topic iot_sensor_data --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:29092'"

Here we set all required environmental variables and dependencies between services. Furthermore, we are creating the iot_sensor_data topic by using specific commands in the kafka-init-topics service.

Now we can run Kafka inside Docker by executing docker-compose up -d.

Next, we have to add the Kafka Streams dependencies to the pom.xml file:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.6.1</version>`
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>

The first dependency is the org.apache.kafka.kafka-streams package, which provides Kafka Streams functionality. The subsequent Maven package, org.springframework.kafka.spring-kafka, facilitates the configuration and integration of Kafka with Spring Boot.

Another essential aspect is configuring the address of the Kafka broker. This is generally done by specifying the broker details in the application’s properties file. Let’s add this configuration along with other properties to our application.properties file:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.streams.application-id=baeldung-streams
spring.kafka.consumer.group-id=baeldung-group
spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
kafka.topics.iot=iot_sensor_data

Next, let’s define a sample data class IotSensorData:

public class IotSensorData {
    private String sensorType;
    private String value;
    private String sensorId;
}

Lastly, we need to configure Serde for the serialization and deserialization of typed messages in Kafka:

@Bean
public Serde<IotSensorData> iotSerde() {
    return Serdes.serdeFrom(new JsonSerializer<>(), new JsonDeserializer<>(IotSensorData.class));
}

4. Implementing Dynamic Routing in Kafka Streams

After setting up the environment and installing the required dependencies, let’s focus on implementing dynamic routing logic in Kafka Streams.

Dynamic message routing can be an essential part of an event-driven application, as it enables the system to adapt to various types of data flows and conditions without requiring code changes.

4.1. KStream Conditional Branching

Branching in Kafka Streams allows us to take a single stream of data and split it into multiple streams based on some conditions. These conditions are provided as predicates that evaluate each message as it passes through the stream.

In recent versions of Kafka Streams, the branch() method has been deprecated in favor of the newer split().branch() method, which is designed to improve the API’s overall usability and flexibility. Nevertheless, we can apply it in the same way to split a KStream into multiple streams based on certain predicates.

Here we define the configuration that utilizes the split().branch() method for dynamic topic routing:

@Bean
public KStream<String, IotSensorData> iotStream(StreamsBuilder streamsBuilder) {
   KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
   stream.split()
     .branch((key, value) -> "temp".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_temp")))
     .branch((key, value) -> "move".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_move")))
     .branch((key, value) -> "hum".equals(value.getSensorType()), Branched.withConsumer((ks) -> ks.to(iotTopicName + "_hum")))
     .noDefaultBranch();
   return stream;
}

In the example above, we split the initial stream from the iot_sensor_data topic into multiple streams based on the sensorType property and route them to other topics accordingly.

If a target topic name can be generated based on the message content, we can use a lambda function within the to method for more dynamic topic routing:

@Bean
public KStream<String, IotSensorData> iotStreamDynamic(StreamsBuilder streamsBuilder) {
    KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
    stream.split()
      .branch((key, value) -> value.getSensorType() != null, 
        Branched.withConsumer(ks -> ks.to((key, value, recordContext) -> "%s_%s".formatted(iotTopicName, value.getSensorType()))))
      .noDefaultBranch();
    return stream;
}

This approach provides greater flexibility for routing messages dynamically based on their content if a topic name can be generated based on a message’s content.

4.2. Routing With KafkaStreamBrancher

The KafkaStreamBrancher class provides a builder-style API that allows easier chaining of branching conditions, making code more readable and maintainable.

The primary benefit is the removal of the complexities associated with managing an array of branched streams, which is how the original KStream.branch method works. Instead, KafkaStreamBrancher lets us define each branch along with operations that should happen to that branch, removing the need for magic numbers or complex indexing to identify the correct branch. This approach is closely related to the previous one discussed earlier due to the introduction of split().branch() method.

Let’s apply this approach to a stream:

@Bean
public KStream<String, IotSensorData> kStream(StreamsBuilder streamsBuilder) {
    KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
    new KafkaStreamBrancher<String, IotSensorData>()
      .branch((key, value) -> "temp".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_temp"))
      .branch((key, value) -> "move".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_move"))
      .branch((key, value) -> "hum".equals(value.getSensorType()), (ks) -> ks.to(iotTopicName + "_hum"))
      .defaultBranch(ks -> ks.to("%s_unknown".formatted(iotTopicName)))
      .onTopOf(stream);
    return stream;
}

We’ve applied Fluent API to route the message to a specific topic.  Similarly, we can use a single branch() method call to route to multiple topics by using content as a part of a topic name:

@Bean
public KStream<String, IotSensorData> iotBrancherStream(StreamsBuilder streamsBuilder) {
    KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
    new KafkaStreamBrancher<String, IotSensorData>()
      .branch((key, value) -> value.getSensorType() != null, (ks) ->
        ks.to((key, value, recordContext) -> String.format("%s_%s", iotTopicName, value.getSensorType())))
      .defaultBranch(ks -> ks.to("%s_unknown".formatted(iotTopicName)))
      .onTopOf(stream);
    return stream;
}

By providing a higher level of abstraction for branching logic, KafkaStreamBrancher not only makes the code cleaner but also enhances its manageability, especially for applications with complex routing requirements.

4.3. Dynamic Topic Routing With TopicNameExtractor

Another approach to manage conditional branching in Kafka Streams is by using a TopicNameExtractor which, as the name suggests, extracts the topic name dynamically for each message in the stream. This method can be more straightforward for certain use cases compared to the previously discussed split().branch() and KafkaStreamBrancher approaches.

Here’s a sample configuration using TopicNameExtractor in a Spring Boot application:

@Bean
public KStream<String, IotSensorData> kStream(StreamsBuilder streamsBuilder) {
    KStream<String, IotSensorData> stream = streamsBuilder.stream(iotTopicName, Consumed.with(Serdes.String(), iotSerde()));
    TopicNameExtractor<String, IotSensorData> sensorTopicExtractor = (key, value, recordContext) -> "%s_%s".formatted(iotTopicName, value.getSensorType());
    stream.to(sensorTopicExtractor);
    return stream;
}

While the TopicNameExtractor method is proficient in its primary function of routing records to specific topics, it has some limitations when compared to other approaches like split().branch() and KafkaStreamBrancher. Specifically, TopicNameExtractor doesn’t provide the option to perform additional transformations like mapping or filtering within the same routing step.

5. Conclusion

In this article, we’ve seen different approaches for dynamic topic routing using Kafka Streams and Spring Boot.

We began by exploring the modern branching mechanisms like the split().branch() method and the KafkaStreamBrancher class. Furthermore, we examined the dynamic topic routing capabilities offered by TopicNameExtractor.

Each technique presents its advantages and challenges. For instance, the split().branch() can be cumbersome when handling numerous conditions, whereas the TopicNameExtractor provides a structured flow but restricts certain inline data processes. As a result, grasping the subtle differences of each approach is vital for creating an effective routing implementation.

As always, the full source code is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.