Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we’ll learn how to subscribe a Kafka consumer to multiple topics. This is a common requirement when the same business logic is used for various topics.

2. Create a Model Class

We’ll consider a simple payment system with two Kafka topics, one for card payments and the other for bank transfers. Let’s create the model class:

public class PaymentData {
    private String paymentReference;
    private String type;
    private BigDecimal amount;
    private Currency currency;

    // standard getters and setters
}

3. Subscribe to Multiple Topics Using Kafka Consumer API

The first method we’ll discuss uses the Kafka Consumer API. Let’s add the required Maven dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.1</version>
</dependency>

Let’s also configure the Kafka consumer:

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "payments");
kafkaConsumer = new KafkaConsumer<>(properties);

Before consuming messages, we need to subscribe kafkaConsumer to both topics using the subscribe() method:

kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));

We’re ready now to test our configuration. Let’s publish one message on each of the topics:

void publishMessages() throws Exception {
    ProducerRecord<String, String> cardPayment = new ProducerRecord<>("card-payments", 
      "{\"paymentReference\":\"A184028KM0013790\", \"type\":\"card\", \"amount\":\"275\", \"currency\":\"GBP\"}");
    kafkaProducer.send(cardPayment).get();
    
    ProducerRecord<String, String> bankTransfer = new ProducerRecord<>("bank-transfers",
      "{\"paymentReference\":\"19ae2-18mk73-009\", \"type\":\"bank\", \"amount\":\"150\", \"currency\":\"EUR\"}");
    kafkaProducer.send(bankTransfer).get();
}

Finally, we can write the integration test:

@Test
void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
    publishMessages();
    kafkaConsumer.subscribe(Arrays.asList("card-payments", "bank-transfers"));

    int eventsProcessed = 0;
    for (ConsumerRecord<String, String> record : kafkaConsumer.poll(Duration.ofSeconds(10))) {
        log.info("Event on topic={}, payload={}", record.topic(), record.value());
        eventsProcessed++;
    }
    assertThat(eventsProcessed).isEqualTo(2);
}

4. Subscribe to Multiple Topics Using Spring Kafka

The second method we’ll discuss uses Spring Kafka.

Let’s add the spring-kafka and jackson-databind dependencies to our pom.xml:

<dependency> 
    <groupId>org.springframework.kafka</groupId> 
    <artifactId>spring-kafka</artifactId>
    <version>3.1.2</version>
</dependency>
<dependency> 
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.15.2</version>
</dependency>

Let’s also define the ConsumerFactory and ConcurrentKafkaListenerContainerFactory beans:

@Bean
public ConsumerFactory<String, PaymentData> consumerFactory() {
    List<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(
      config, new StringDeserializer(), new JsonDeserializer<>(PaymentData.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, PaymentData> containerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, PaymentData> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

We need to subscribe to both topics using the topics attribute of the @KafkaListener annotation:

@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")

Finally, we can create the consumer. Additionally, we’re also including the Kafka header to identify the topic where the message was received:

@KafkaListener(topics = { "card-payments", "bank-transfers" }, groupId = "payments")
public void handlePaymentEvents(
  PaymentData paymentData, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    log.info("Event on topic={}, payload={}", topic, paymentData);
}

Let’s validate our configuration:

@Test
public void whenSendingMessagesOnTwoTopics_thenConsumerReceivesMessages() throws Exception {
    CountDownLatch countDownLatch = new CountDownLatch(2);
    doAnswer(invocation -> {
        countDownLatch.countDown();
        return null;
    }).when(paymentsConsumer)
      .handlePaymentEvents(any(), any());

    kafkaTemplate.send("card-payments", createCardPayment());
    kafkaTemplate.send("bank-transfers", createBankTransfer());

    assertThat(countDownLatch.await(5, TimeUnit.SECONDS)).isTrue();
}

5. Subscribe to Multiple Topics Using Kafka CLI

Kafka CLI is the last method we’ll discuss.

First, let’s send a message on each topic:

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic card-payments
>{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic bank-transfers
>{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}

Now, we can start the Kafka CLI consumer. The include option allows us to specify the list of topics to include for message consumption:

$ bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --include "card-payments|bank-transfers"

Here’s the output when we run the previous command:

{"paymentReference":"A184028KM0013790", "type":"card", "amount":"275", "currency":"GBP"}
{"paymentReference":"19ae2-18mk73-009", "type":"bank", "amount":"150", "currency":"EUR"}

6. Conclusion

In this article, we learned three different methods of subscribing a Kafka consumer to multiple topics. This is useful when implementing the same functionality for several topics.

The first two methods are based on Kafka Consumer API and Spring Kafka and can be integrated into an existing application. The last one uses Kafka CLI and can be used to verify multiple topics quickly.

As always, the complete code can be found over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.