Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

Apache Kafka is an open-source and distributed event stream processing system. It’s basically an event streaming platform that can publish, subscribe to, store, and process a stream of records.

Kafka provides a high-throughput and low-latency platform for real-time data processing. Basically, Kafka implements a publisher-subscriber model where producer applications publish events to Kafka while consumer applications subscribe to these events.

In this tutorial, we’ll learn how we can read data from the beginning of a Kafka topic using the Kafka Consumer API.

2. Setup

Before we begin, let’s first set up the dependencies, initialize the Kafka cluster connection, and publish some messages to Kafka.

Kafka provides a convenient Java client library that we can use to perform various operations on the Kafka cluster.

2.1. Dependencies

Firstly, let’s add the Kafka Clients Java library’s Maven dependency to our project’s pom.xml file:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

2.2. Cluster and Topic Initialization

Throughout the guide, we’ll assume that a Kafka cluster is running on our local system with the default configurations.

Secondly, we need to create a Kafka topic that we can use to publish and consume messages. Let’s create a Kafka topic named “baeldung” by referring to our Kafka Topic Creation guide.

Now that we have the Kafka cluster up and running with a topic created, let’s publish some messages to Kafka.

2.3. Publishing Messages

Lastly, let’s publish a few dummy messages to the Kafka topic “baeldung“.

To publish messages, let’s create an instance of KafkaProducer with a basic configuration defined by a Properties instance:

Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties);

We use the KafkaProducer.send(ProducerRecord) method to publish messages to the Kafka topic “baeldung“:

for (int i = 1; i <= 10; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>("baeldung", String.valueOf(i));
    producer.send(record);
}

Here, we published ten messages to our Kafka cluster. We’ll use these to demonstrate our consumer implementations.

3. Consuming Messages From the Beginning

Until now, we have initialized our Kafka cluster and published a few sample messages to the Kafka topic. Next, let’s see how we can read messages from the beginning.

To demonstrate this, we first initialize an instance of KafkaConsumer with a specific set of consumer properties defined by the Properties instanceThen, we use the created KafkaConsumer instance to consume messages and seek back again to the start of the partition offset.

Let’s take a look at each of these steps in detail.

3.1. Consumer Properties

To consume messages from the beginning of a Kafka topic, we create an instance of KafkaConsumer with a randomly generated consumer group id. We do so by setting the “group.id” property of the consumer to a randomly generated UUID:

Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());

When we generate a new consumer group id for the consumer, the consumer will always belong to a new consumer group identified by the “group.id” property. A new consumer group won’t have any offset associated with it. In such cases, Kafka provides a property “auto.offset.reset” that indicates what should be done when there’s no initial offset in Kafka or if the current offset doesn’t exist anymore on the server.

The “auto.offset.reset” property accepts the following values:

  • earliest: This value automatically resets the offset to the earliest offset
  • latest: This value automatically resets the offset to the latest offset
  • none: This value throws an exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: If anything else other than the previous three values is set, an exception is thrown to the consumer

Since we want to read from the beginning of the Kafka topic, we set the value of the “auto.offset.reset” property to “earliest”:

consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

Let’s now create an instance of KafkaConsumer using the consumer properties:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);

We use this KafkaConsumer instance to consume messages from the beginning of the topic.

3.2. Consuming Messages

To consume messages, we first subscribe our consumer to consume messages from the topic “baeldung”:

consumer.subscribe(Arrays.asList("baeldung"));

Next, we use the KafkaConsumer.poll(Duration duration) method to poll for new messages from the topic “baeldung” until the time specified by the Duration parameter:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));

for (ConsumerRecord<String, String> record : records) {
    logger.info(record.value());
}

With this, we have read all the messages from the beginning of the “baeldung” topic.

Additionally, to reset the existing consumer to read from the beginning of the topic, we use the KafkaConsumer.seekToBeginning(Collection<TopicPartition> partitions) methodThis method accepts a collection of TopicPartition and points the offset of the consumer to the beginning of the partition:

consumer.seekToBeginning(consumer.assignment());

Here, we pass the value of KafkaConsumer.assignment() to the seekToBeginning() method. The KafkaConsumer.assignment() method returns the set of partitions currently assigned to the consumer.

Finally, polling the same consumer again for messages now reads all the messages from the beginning of the partition:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));

for (ConsumerRecord<String, String> record : records) {
    logger.info(record.value());
}

4. Conclusion

In this article, we’ve learned how to read messages from the beginning of a Kafka topic using the Kafka Consumer API.

We first look at how a new consumer can read a message from the beginning of a Kafka topic, along with its implementation. We then saw how an already consuming consumer could seek its offset to read messages from the beginning.

As always, the complete code for all the examples is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.