Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Introduction

Consumer groups help to create more scalable Kafka applications by allowing more than one consumer to read from the same topic.

In this tutorial, we’ll understand consumer groups and how they rebalance partitions between their consumers.

2. What Are Consumer Groups?

A consumer group is a set of unique consumers associated with one or more topics. Each consumer can read from zero, one, or more than one partition. Furthermore, each partition can only be assigned to a single consumer at a given time. The partition assignment changes as the group members change. This is known as group rebalancing.

The consumer group is a crucial part of Kafka applications. This allows the grouping of similar consumers and makes it possible for them to read in parallel from a partitioned topic. Hence, it improves the performance and scalability of Kafka applications.

2.1. The Group Coordinator and the Group Leader

When we instantiate a consumer group, Kafka also creates the group coordinator. The group coordinator regularly receives requests from the consumers, known as heartbeats. If a consumer stops sending heartbeats, the coordinator assumes that the consumer has either left the group or crashed. That’s one possible trigger for a partition rebalance.

The first consumer who requests the group coordinator to join the group becomes the group leader. When a rebalance occurs for any reason, the group leader receives a list of the group members from the group coordinator. Then, the group leader reassigns the partitions among the consumers in that list using a customizable strategy set in the partition.assignment.strategy configuration.

2.2. Committed Offsets

Kafka uses the committed offset to keep track of the last position read from a topic. The committed offset is the position in the topic to which a consumer acknowledges having successfully processed. In other words, it’s the starting point for itself and other consumers to read events in subsequent rounds.

Kafka stores the committed offsets from all partitions inside an internal topic named __consumer_offsets. We can safely trust its information since topics are durable and fault-tolerant for replicated brokers.

2.3. Partition Rebalancing

A partition rebalance changes the partition ownership from one consumer to another. Kafka executes a rebalance automatically when a new consumer joins the group or when a consumer member of the group crashes or unsubscribes.

To improve scalability, when a new consumer joins the group, Kafka fairly shares the partitions from the other consumers with the newly added consumer. Additionally, when a consumer crashes, its partitions must be assigned to the remaining consumers in the group to avoid the loss of any unprocessed messages.

The partition rebalance uses the __consumer_offsets topic to make a consumer start reading a reassigned partition from the correct position.

During a rebalance, consumers can’t consume messages. In other words, the broker becomes unavailable until the rebalance is done. Additionally, consumers lose their state and need to recalculate their cached values. The unavailability and cache recalculation during partition rebalance make the event consumption slower.

3. Setting up the Application

In this section, we’ll configure the basics to get a Spring Kafka application up and running.

3.1. Creating the Basic Configurations

First, let’s configure the topic and its partitions:

@Configuration
public class KafkaTopicConfiguration {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    public NewTopic celciusTopic() {
        return TopicBuilder.name("topic-1")
            .partitions(2)
            .build();
    }
}

The above configuration is straightforward. We’re simply configuring a new topic named topic-1 with two partitions.

Now, let’s configure the producer:

@Configuration
public class KafkaProducerConfiguration {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, Double> kafkaProducer() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DoubleSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Double> kafkaProducerTemplate() {
        return new KafkaTemplate<>(kafkaProducer());
    }
}

In the Kafka producer configuration above, we’re setting the broker address and the serializers that they use to write messages.

Finally, let’s configure the consumer:

@Configuration
public class KafkaConsumerConfiguration {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ConsumerFactory<String, Double> kafkaConsumer() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DoubleDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Double> kafkaConsumerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Double> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumer());
        return factory;
    }
}

3.2. Setting up the Consumers

In our demo application, we’ll start with two consumers that belong to the same group named group-1 from topic-1:

@Service
public class MessageConsumerService {
    @KafkaListener(topics = "topic-1", groupId = "group-1")
    public void consumer0(ConsumerRecord<?, ?> consumerRecord) {
        trackConsumedPartitions("consumer-0", consumerRecord);
    }

    @KafkaListener(topics = "topic-1", groupId = "group-1")
    public void consumer1(ConsumerRecord<?, ?> consumerRecord) {
        trackConsumedPartitions("consumer-1", consumerRecord);
    }
}

The MessageConsumerService class registers two consumers to listen to topic-1 inside group-1 using the @KafkaListener annotation.

Now, let’s also define a field and a method in the MessageConsumerService class to keep track of the consumed partition:

Map<String, Set<Integer>> consumedPartitions = new ConcurrentHashMap<>();

private void trackConsumedPartitions(String key, ConsumerRecord<?, ?> record) {
    consumedPartitions.computeIfAbsent(key, k -> new HashSet<>());
    consumedPartitions.computeIfPresent(key, (k, v) -> {
        v.add(record.partition());
        return v;
    });
}

In the code above, we used ConcurrentHashMap to map each consumer name to a HashSet of all partitions consumed by that consumer.

4. Visualizing Partition Rebalance When a Consumer Leaves

Now that we have all configurations set up and the consumers registered, we can visualize what Kafka does when one of the consumers leaves group-1. To do that, let’s define the skeleton for the Kafka integration test that uses an embedded broker:

@SpringBootTest(classes = ManagingConsumerGroupsApplicationKafkaApp.class)
@EmbeddedKafka(partitions = 2, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
public class ManagingConsumerGroupsIntegrationTest {

    private static final String CONSUMER_1_IDENTIFIER = "org.springframework.kafka.KafkaListenerEndpointContainer#1";
    private static final int TOTAL_PRODUCED_MESSAGES = 50000;
    private static final int MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP = 10000;

    @Autowired
    KafkaTemplate<String, Double> kafkaTemplate;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    MessageConsumerService consumerService;
}

In the above code, we inject the necessary beans to produce and consume messages: kafkaTemplate and consumerService. We’ve also injected the bean kafkaListenerEndpointRegistry to manipulate registered consumers.

Finally, we defined three constants that will be used in our test case.

Now, let’s define the test case method:

@Test
public void givenContinuousMessageFlow_whenAConsumerLeavesTheGroup_thenKafkaTriggersPartitionRebalance() throws InterruptedException {
    int currentMessage = 0;

    do {
        kafkaTemplate.send("topic-1", RandomUtils.nextDouble(10.0, 20.0));
        Thread.sleep(0,100);
        currentMessage++;

        if (currentMessage == MESSAGE_WHERE_CONSUMER_1_LEAVES_GROUP) {
            String containerId = kafkaListenerEndpointRegistry.getListenerContainerIds()
                .stream()
                .filter(a -> a.equals(CONSUMER_1_IDENTIFIER))
                .findFirst()
                .orElse("");
            MessageListenerContainer container = kafkaListenerEndpointRegistry.getListenerContainer(containerId);
            Objects.requireNonNull(container).stop();
            kafkaListenerEndpointRegistry.unregisterListenerContainer(containerId);
            if(currentMessage % 1000 == 0){
                log.info("Processed {} of {}", currentMessage, TOTAL_PRODUCED_MESSAGES);
            }
        }
    } while (currentMessage != TOTAL_PRODUCED_MESSAGES);

    assertEquals(1, consumerService.consumedPartitions.get("consumer-1").size());
    assertEquals(2, consumerService.consumedPartitions.get("consumer-0").size());
}

In the test above, we’re creating a flow of messages, and at a certain point, we remove one of the consumers so Kafka will reassign its partitions to the remaining consumer. Let’s break down the logic to make it more transparent:

  1. The main loop uses kafkaTemplate to produce 50,000 events of random numbers using Apache Commons’ RandomUtils. When an arbitrary number of messages is produced —10,000 in our case — we stop and unregister one consumer from the broker.
  2. To unregister a consumer, we first use a stream to search for the matching consumer in the container and retrieve it using the getListenerContainer() method. Then, we call stop() to stop the container Spring component’s execution. Finally, we call unregisterListenerContainer() to programmatically unregister the listener associated with the container variable from the Kafka Broker.

Before discussing the test assertions, let’s glance at a few log lines that Kafka generated during the test execution.

The first vital line to see is the one that shows the LeaveGroup request made by consumer-1 to the group coordinator:

INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-group-1-1, groupId=group-1] Member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 sending LeaveGroup request to coordinator localhost:9092

Then, the group coordinator automatically triggers a rebalance and shows the reason behind that:

INFO  k.coordinator.group.GroupCoordinator - [GroupCoordinator 0]: Preparing to rebalance group group-1 in state PreparingRebalance with old generation 2 (__consumer_offsets-4) (reason: Removing member consumer-group-1-1-4eb63bbc-336d-44d6-9d41-0a862029ba95 on LeaveGroup)

Returning to our test, we’ll assert that the partition rebalance occurred correctly. Since we unregistered the consumer ending in 1, its partitions should be reassigned to the remaining consumer, which is consumer-0. Hence, we’ve used the map of tracked consumed records to check that consumer-1 only consumed from one partition, whereas consumer-0 consumed from two partitions.

4. Useful Consumer Configurations

Now, let’s talk about a few consumer configurations that impact partition rebalance and the trade-offs of setting specific values for them.

4.1. Session Timeouts and Heartbeats Frequency

The session.timeout.ms parameter indicates the maximum time in milliseconds that the group coordinator can wait for a consumer to send a heartbeat before triggering a partition rebalance. Alongside session.timeout.ms, the heartbeat.interval.ms indicates the frequency in milliseconds that a consumer sends heartbeats to the group coordinator.

We should modify the consumer timeout and heartbeat frequency together so that heartbeat.interval.ms is always lower than session.timeout.ms. This is because we don’t want to let a consumer die by timeout before sending their heartbeats. Typically, we set the heartbeat interval to 33% of the session timeout to guarantee that more than one heartbeat is sent before the consumer dies.

The default consumer session timeout is set to 45 seconds. We can modify that value as long as we understand the trade-offs of modifying it.

When we set the session timeout lower than the default, we increase the speed at which the consumer group recovers from a failure, improving the group availability. However, in Kafka versions before 0.10.1.0, if the main thread of a consumer is blocked when consuming a message that takes longer than the session timeout, the consumer can’t send heartbeats. Therefore, the consumer is considered dead, and the group coordinator triggers an unwanted partition rebalance. This was fixed in KIP-62, introducing a background thread that only sends heartbeats.

If we set higher values for the session timeout, we lose at detecting failures faster. However, this might fix the unwanted partition rebalance problem mentioned above for Kafka versions older than o.10.1.0.

4.2. Max Poll Interval Time

Another configuration is the max.poll.interval.ms, indicating the maximum time the broker can wait for idle consumers. After that time passes, the consumer stops sending heartbeats until it reaches the session timeout configured and leaves the group. The default wait time for max.poll.interval.ms is five minutes.

If we set higher values for max.poll.interval.ms, we’re giving more room for consumers to remain idle, which might be helpful to avoid rebalances. However, increasing that time might also increase the number of idle consumers if there are no messages to consume. This can be a problem in a low-throughput environment because consumers can remain idle longer, increasing infrastructure costs.

5. Conclusion

In this article, we’ve looked at the fundamentals of the roles of the group leader and the group coordinator. We’ve also looked into how Kafka manages consumer groups and partitions.

We’ve seen in practice how Kafka automatically rebalances the partitions within the group when one of its consumers leaves the group.

It’s essential to understand when Kafka triggers partition rebalance and tune the consumer configurations accordingly.

As always, the source code for the article is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.