Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we’ll briefly introduce Apache Kafka and then see how to programmatically create and configure topics in a Kafka cluster.

2. Introduction to Kafka

Apache Kafka is a powerful, high-performance, distributed event-streaming platform.

Generally, producer applications publish events to Kafka while consumers subscribe to these events in order to read and process them. Kafka uses topics to store and categorize these events, e.g., in an e-commerce application, there could be an ‘orders’ topic.

Kafka topics are partitioned, which distributes data across multiple brokers for scalability. They can be replicated in order to make the data fault-tolerant and highly available. Topics also retain events even after consumption for as long as required. This is all managed on a per-topic basis via Kafka command-line tools and key-value configurations.

However, in addition to the command-line tools, Kafka also provides an Admin API to manage and inspect topics, brokers, and other Kafka objects. In our example, we’ll be using this API to create new topics.

3. Dependencies

To use Admin API, let’s add the kafka-clients dependency to our pom.xml:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

4. Setting Up Kafka

Before creating new topics, we need at least a single-node Kafka cluster.

In this tutorial, we’ll use the Testcontainers framework to instantiate a Kafka container. We can then run reliable and self-contained integration tests that don’t rely on an external Kafka server running. For this, we’ll need two more dependencies specifically for our tests.

First, let’s add the Testcontainers Kafka dependency to our pom.xml:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>kafka</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>

Next, we’ll add the junit-jupiter artifact for running Testcontainer tests using JUnit 5:

<dependency>
    <groupId>org.testcontainers</groupId>
    <artifactId>junit-jupiter</artifactId>
    <version>1.19.3</version>
    <scope>test</scope>
</dependency>

Now that we have all the necessary dependencies configured, we can write a simple application to programmatically create new topics.

5. Admin API

Let’s begin by creating a new Properties instance with minimal configuration for a local broker:

Properties properties = new Properties();
properties.put(
  AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()
);

Now we can obtain an Admin instance:

Admin admin = Admin.create(properties)

The create method accepts a Properties object (or a Map) with the bootstrap.servers property and returns a thread-safe instance.

The admin client uses this property to discover the brokers in the cluster and subsequently perform any admin operations. As such, it would usually be enough to include two or three broker addresses in order to cover the possibility of some instances being unavailable.

The AdminClientConfig class contains constants for all the admin client configuration entries.

6. Topic Creation

Let’s start by creating a JUnit 5 test with Testcontainers to verify successful topic creation. We’ll utilize the Kafka module, which uses the official Kafka Docker image for Confluent OSS Platform:

@Test
void givenTopicName_whenCreateNewTopic_thenTopicIsCreated() throws Exception {
    kafkaTopicApplication.createTopic("test-topic");

    String topicCommand = "/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --list";
    String stdout = KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", topicCommand)
      .getStdout();

    assertThat(stdout).contains("test-topic");
}

Here, Testcontainers will automatically instantiate and manage the Kafka container during test execution. We simply invoke our application code and verify that the topic has been successfully created in the running container.

6.1. Create With Default Options

Topic partitions and the replication factor are the key considerations for new topics. We’ll keep things simple and create our example topic with 1 partition and a replication factor of 1:

try (Admin admin = Admin.create(properties)) {
    int partitions = 1;
    short replicationFactor = 1;
    NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
    
    CreateTopicsResult result = admin.createTopics(
      Collections.singleton(newTopic)
    );

    KafkaFuture<Void> future = result.values().get(topicName);
    future.get();
}

Here, we’ve used the Admin.createTopics method to create a batch of new topics with default options. As the Admin interface extends the AutoCloseable interface, we’ve used try-with-resources to execute our operation. This ensures that resources are released appropriately.

Importantly, this method communicates with the Controller Broker and executes asynchronously. The returned CreateTopicsResult object exposes a KafkaFuture for accessing the results of each item in the request batch. This follows the Java asynchronous programming pattern and allows callers to obtain the results of the operation using the Future.get method.

For synchronous behavior, we can call this method immediately to retrieve the result of our operation. This blocks until the operation is complete or has failed. In case of failure, it results in an ExecutionException which wraps the underlying cause.

6.2. Create With Options

Instead of default options, we can also use the overloaded form of the Admin.createTopics method and provide some options via the CreateTopicsOptions object. We can use these to modify the admin client behavior when creating new topics:

CreateTopicsOptions topicOptions = new CreateTopicsOptions()
  .validateOnly(true)
  .retryOnQuotaViolation(false);

CreateTopicsResult result = admin.createTopics(
  Collections.singleton(newTopic), topicOptions
);

Here, we’ve set the validateOnly option to true, meaning that the client will only validate without actually creating the topic. Similarly, the retryOnQuotaViolation option is set to false so that the operation is not retried in case of quota violation.

6.3. New Topic Configuration

Kafka has a wide range of topic configurations that control topic behavior, such as data retention and compression, etc. These have both a server default value as well as an optional per-topic override.

We can provide the topic configurations by using a config map for the new topic:

// Create a compacted topic with 'lz4' compression codec
Map<String, String> newTopicConfig = new HashMap<>();
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");

NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
  .configs(newTopicConfig);

The TopicConfig class from the Admin API contains the keys that can be used to configure topics at creation time.

7. Other Topic Operations

As well as the ability to create new topics, Admin API also has operations to delete, list, and describe topics. All these topic-related operations follow the same pattern as we’ve seen for topic creation.

Each of these operation methods has an overloaded version that takes an xxxTopicOptions object as input. All of these methods return the corresponding xxxTopicsResult object. This, in turn, provides the KafkaFuture for accessing the results of the asynchronous operation.

Finally, it’s also worth mentioning that since its introduction in Kafka version 0.11.0.0, the admin API is still evolving, as indicated by the InterfaceStability.Evolving annotation. This implies that the API can change in the future, and a minor release may break compatibility.

8. Conclusion

In this tutorial, we’ve seen how to create a new topic in Kafka using the Java admin client.

Initially, we created a topic with default and then with explicit options. Following on from this, we saw how to configure the new topic using various properties. Finally, we briefly covered other topic-related operations using the admin client.

Along the way, we also saw how to use Testcontainers to set up a simple single-node cluster from our tests.

As always, the full source code of the article is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.