Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

Apache Kafka is a distributed streaming platform that allows us to publish and subscribe to streams of records, often referred to as messages. Additionally, Kafka headers provide a way to attach metadata to Kafka messages, enabling additional context and flexibility in message processing.

In this tutorial, we’ll delve into commonly used Kafka headers and learn how to view and extract them using Java.

2. Overview of Kafka Headers

Kafka headers represent key-value pairs attached to Kafka messages, offering a means to include supplementary metadata alongside the primary message content.

For example, Kafka headers facilitate message routing by providing data for directing messages to specific processing pipelines or consumers. Moreover, headers are versatile in carrying custom application metadata tailored to the application’s processing logic.

3. Kafka Default Headers

Kafka automatically includes several default headers in messages sent by the Kafka producer. Moreover, these headers offer crucial metadata and context about the message. In this section, we’ll delve into a few commonly used headers and their significance in the realm of Kafka message processing.

3.1. Producer Headers

When messages are produced in Kafka, several default headers are automatically included by the producer, such as:

  • KafkaHeaders.TOPIC – This header contains the name of the topic to which the message belongs.
  • KafkaHeaders.KEY – If the message is produced with a key, Kafka automatically includes a header named “key” containing the serialized key bytes.
  • KafkaHeaders.PARTITION – Kafka adds a header named “partition” to indicate the partition ID to which the message belongs.
  • KafkaHeaders.TIMESTAMP – Kafka attaches a header named “timestamp” to each message, indicating the timestamp of when the message was produced by the producer.

3.2. Consumer Headers

Headers prefixed with RECEIVED_ are added by the Kafka consumer upon message consumption to provide metadata about the message’s reception process:

  • KafkaHeaders.RECEIVED_TOPIC – This header contains the name of the topic from which the message was received.
  • KafkaHeaders.RECEIVED_KEY – This header allows consumers to access the key associated with the message.
  • KafkaHeaders.RECEIVED_PARTITION – Kafka adds this header to indicate the ID of the partition to which the message was assigned.
  • KafkaHeaders.RECEIVED_TIMESTAMP – This header reflects the time at which the consumer received the message.
  • KafkaHeaders.OFFSET – The offset indicates the position of the message in the partition’s log.

4. Consuming Messages With Headers

To begin, we instantiate a KafkaConsumer object. The KafkaConsumer is responsible for subscribing to Kafka topics and fetching messages from them. After instantiating the KafkaConsumer, we subscribe to the Kafka topic from which we want to consume messages. By subscribing to a topic, the consumer can receive messages published on that topic.

Once the consumer subscribes to the topic, we proceed to fetch records from Kafka. During this process, the KafkaConsumer retrieves messages from the subscribed topic, along with their associated headers.

Here’s a code example demonstrating how to consume messages with headers:

@KafkaListener(topics = "my-topic")
public void listen(String message, @Headers Map<String, Object> headers) {
    System.out.println("Received message: " + message);
    System.out.println("Headers:");
    headers.forEach((key, value) -> System.out.println(key + ": " + value));
}

The Kafka listener container invokes the listen() method when receiving a message from the specified topic(s), such as “my-topic“. The @Headers annotation indicates that the parameter should be populated with the headers of the received message.

Below is an example output:

Received message: Hello Baeldung!
Headers:
kafka_receivedMessageKey: null
kafka_receivedPartitionId: 0
kafka_receivedTopic: my-topic
kafka_offset: 123
... // other headers

To access a specific header, we can use the get() method of the headers map, providing the key of the desired header. Below is an example to access the topic name:

String topicName = headers.get(KafkaHeaders.TOPIC);

The topicName should return my-topic.

Additionally, when consuming messages, we can directly extract the headers needed for processing as method parameters if we already know them. This approach offers a more concise and targeted way to access specific header values without iterating through all headers.

Here’s a code example demonstrating how to consume messages with headers, directly extracting specific headers as method parameters:

@KafkaListener(topics = "my-topic")
public void listen(String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    System.out.println("Received message: " + message);
    System.out.println("Partition: " + partition);
}

In the listen() method, we directly extract the RECEIVED_PARTITION header using the @Header annotation. This annotation allows us to specify the header we want to extract and its corresponding type. Injecting the value of the header directly into the method parameter (in this case, partition) enables direct access within the method body.

Below is the output:

Received message: Hello Baeldung!
Partition: 0

5. Conclusion

In this article, we’ve explored the significance of Kafka headers in message processing within Apache Kafka. We’ve explored the default headers that both producers and consumers automatically include. Additionally, we’ve learned how to extract and work with these headers.

As always, the code for the examples is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.