Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

Java 8’s Concurrent API introduced CompletableFuture, a valuable tool for simplifying asynchronous and non-blocking programming.

In this article, we’ll discuss Java’s CompletableFuture and the thread pool it leverages. We’ll explore the differences between its async and non-async methods and learn how to maximize the potential of the CompletableFuture API.

2. Non-Async Methods

CompletableFuture offers an extensive API consisting of more than 50 methods. Many of these methods are available in two variants: non-async and async. Let’s start with the non-async counterparts and delve into practical examples using the thenApply() method:

async vs nonasync

When utilizing thenApply(), we pass a function as a parameter that takes the previous value of the CompletableFuture as input, performs an operation, and returns a new value. Consequently, a fresh CompletableFuture is created to encapsulate the resulting value. To illustrate this concept, let’s consider a simple example where we convert a String value into an Integer representing its size. Additionally, we’ll also print the name of the thread responsible for executing this operation:

@Test
void whenUsingNonAsync_thenMainThreadIsUsed() throws Exception {
    CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

    CompletableFuture<Integer> nameLength = name.thenApply(value -> {
        printCurrentThread(); // will print "main"
        return value.length();
    });

   assertThat(nameLength.get()).isEqualTo(8);
}

private static void printCurrentThread() {
    System.out.println(Thread.currentThread().getName());
}

The function passed as a parameter to thenApply() will be executed by the thread that directly interacts with CompletableFuture‘s API, in our case, the main thread. However, if we extract the interaction with CompletableFuture and invoke it from a different thread, we should notice the change:

@Test
void whenUsingNonAsync_thenUsesCallersThread() throws Exception {
    Runnable test = () -> {
        CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

        CompletableFuture<Integer> nameLength = name.thenApply(value -> {
            printCurrentThread(); // will print "test-thread"
            return value.length();
        });

        try {
            assertThat(nameLength.get()).isEqualTo(8);
        } catch (Exception e) {
            fail(e.getMessage());
        }
    };

    new Thread(test, "test-thread").start();
    Thread.sleep(100l);
}

3. Async Methods

The majority of methods within the API possess an asynchronous counterpart. We can use these async variants to ensure that the intermediate operations are executed on a separate thread pool. Let’s change the previous code example and switch from thenApply() to thenApplyAsync():

@Test
void whenUsingAsync_thenUsesCommonPool() throws Exception {
    CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

    CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
        printCurrentThread(); // will print "ForkJoinPool.commonPool-worker-1"
        return value.length();
    });

    assertThat(nameLength.get()).isEqualTo(8);
}

According to the official documentation, if we use the async methods without explicitly providing an Executor, the functions will be executed using ForkJoinPool.commonPool(). Therefore, if we run the code snippet, we should expect to see one of the common ForkJoinPool workers: in my case, “ForkJoinPool.commonPool-worker-1″.

4. Async Methods With Custom Executor

We can notice that all the async methods are overloaded, providing an alternative that accepts the code to execute as well as an Executor. We can use this in order to use an explicit thread pool for the async operations. Let’s further update our test and provide a custom thread pool to be used for the thenApplyAsync() method:

@Test
void whenUsingAsync_thenUsesCustomExecutor() throws Exception {
    Executor testExecutor = Executors.newFixedThreadPool(5);
    CompletableFuture<String> name = CompletableFuture.supplyAsync(() -> "Baeldung");

    CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
        printCurrentThread(); // will print "pool-2-thread-1"
        return value.length();
    }, testExecutor);

   assertThat(nameLength.get()).isEqualTo(8);
}

As expected, when using the overloaded method, the CompletableFuture will no longer use the common ForkJoinPool.

5. Extending CompletableFuture

Lastly, we can extend CompletableFuture and override the defaultExecutor(), encapsulating a custom thread pool. As a result, we’ll be able to use the async methods without specifying an Executor, and the functions will be invoked by our thread pool instead of the common ForkJoinPool.

Let’s create a CustomCompletableFuture that extends CompletableFuture. Let’s use a newSingleThreadExecutor and create a thread with a name that can be easily recognized in the console while testing this. Furthermore, we’ll override the defaultExecutor() method, enabling CompletableFuture to seamlessly utilize our customized thread pool:

public class CustomCompletableFuture<T> extends CompletableFuture<T> {
    private static final Executor executor = Executors.newSingleThreadExecutor(
        runnable -> new Thread(runnable, "Custom-Single-Thread")
    );

    @Override
    public Executor defaultExecutor() {
        return executor;
    }
}

Additionally, let’s add a static factory method that follows the CompletableFuture patterns. This will allow us to easily create and complete a CustomCompletableFuture object:

public static <TYPE> CustomCompletableFuture<TYPE> supplyAsync(Supplier<TYPE> supplier) {
    CustomCompletableFuture<TYPE> future = new CustomCompletableFuture<>();
    executor.execute(() -> {
        try {
            future.complete(supplier.get());
        } catch (Exception ex) {
            future.completeExceptionally(ex);
        }
    });
    return future;
}

Now, let’s create an instance of our CustomCompletableFuture and perform the same transformation on the String value inside thenSupplyAsync(). Though, this time, we’ll no longer specify the Executor, but still expect the function to be invoked by our dedicated thread  “Custom-Single-Thread”:

@Test
void whenOverridingDefaultThreadPool_thenUsesCustomExecutor() throws Exception {
    CompletableFuture<String> name = CustomCompletableFuture.supplyAsync(() -> "Baeldung");

    CompletableFuture<Integer> nameLength = name.thenApplyAsync(value -> {
        printCurrentThread(); // will print "Custom-Single-Thread"
        return value.length();
    });

   assertThat(nameLength.get()).isEqualTo(8);
}

6. Conclusion

In this article, we learned that the majority of the methods from CompletableFuture‘s API allow both async and non-async execution. By calling the non-async variants, the thread calling the CompletableFuture‘s API will also execute all the intermediate operations and transformations. On the other hand, the async counterparts will use a different thread pool, the default one being the common ForkJoinPool.

After that, we discussed further customization of the execution, using custom Executors for each of the async steps. Lastly, we learned how to create a custom CompletableFuture object and override the defaultExecutor() method. This allowed us to leverage the async methods without specifying a custom Executor each time.

As always, we can find working code examples over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.