Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

There are several ways in Java that we can run tasks asynchronously. Built into Java, we have Future and CompletableFuture. We can also use the RxJava library, which gives us the Observable class. In this article, we’ll examine the differences between the three and the benefits and potential use cases for each.

2. Future

The Future interface first appeared in Java 5 and provides very limited functionality. An instance of a Future is a placeholder for a result that will be produced by an asynchronous process and may not yet be available. There is a small range of methods provided to help with this process. We can either cancel a task or get the result from a completed task and also check if a task has been canceled or completed.

To see this in action, let’s create an example asynchronous task. We’ll have an object and a Callable, which acts like it’s retrieving that object from a database. Our object can be very simple:

class TestObject {
    int dataPointOne;
    int dataPointTwo;
    TestObject() {
        dataPointOne = 10;
    }
    // Standard getters and setters
}

So on calling the constructor, we return an instance of TestObject with one of the data points set. We can now create a second class implementing the Callable interface to create that object for us:

class ObjectCallable implements Callable<TestObject> {
    @Override
    TestObject call() {
        return new TestObject();
    }
}

With both of those objects set up, we can write a test to fetch a TestObject using a Future:

@Test
void whenRetrievingObjectWithBasicFuture_thenExpectOnlySingleDataPointSet() throws ExecutionException, InterruptedException {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<TestObject> future = exec.submit(new ObjectCallable());
    TestObject retrievedObject = future.get();
    assertEquals(10, retrievedObject.getDataPointOne());
    assertEquals(0, retrievedObject.getDataPointTwo());
}

Here we’ve created an ExecutorService to which we can submit tasks. Next, we submitted our ObjectCallable class and received a Future in return. Finally, we can call get() on our Future to get the result. We see from the asserts that we have our object with a single data point populated.

3. CompletableFuture

CompletableFuture is an implementation of the Future interface that was released with Java 8. It extends the basic functionality of Future to let us have a lot more control over the results of our asynchronous operations. One of the biggest pieces of added functionality is the option to chain function calls onto the result of the initial task. Let’s see that in action here by repeating the task we did in the previous section. But this time, we want to hydrate the object after we retrieve it. Let’s create an object with a hydration method to populate the second data point in TestObject:

class ObjectHydrator {
    TestObject hydrateTestObject(TestObject testObject){
        testObject.setDataPointTwo(20);
        return testObject;
    }
}

We’ll also need to retrieve our initial TestObject from an implementation of Supplier this time:

class ObjectSupplier implements Supplier<TestObject> {
    @Override
    TestObject get() {
        return new TestObject();
    }
}

With both of those classes ready, let’s put them to use:

@Test
void givenACompletableFuture_whenHydratingObjectAfterRetrieval_thenExpectBothDataPointsSet() throws ExecutionException, InterruptedException {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    ObjectHydrator objectHydrator = new ObjectHydrator();
    CompletableFuture<TestObject> future = CompletableFuture.supplyAsync(new ObjectSupplier(), exec)
      .thenApply(objectHydrator::hydrateTestObject);
    TestObject retrievedObject = future.get();
    assertEquals(10, retrievedObject.getDataPointOne());
    assertEquals(20, retrievedObject.getDataPointTwo());
}

This time we can see from the assertions that thanks to the ability to chain on the hydration method, we’ve got both data points set on our object.

4. RxJava’s Observable

RxJava is a library that lets us build event-driven and asynchronous programs following the reactive programming paradigm.

To use RxJava in our project, we’ll need to import it into our pom.xml:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>3.1.6</version>
</dependency>

The latest version is available in the Maven Repository.

This library can do a lot, but today, we’ll focus on the Observable class. An Observable provides data to an Observer either on demand or whenever data becomes available. To run a task asynchronously, much like we did with Future and CompletableFuture, we can create an Observable which will produce data from an asynchronous source when requested:

@Test
void givenAnObservable_whenRequestingData_thenItIsRetrieved() {
    ObjectHydrator objectHydrator = new ObjectHydrator();
    Observable<TestObject> observable = Observable.fromCallable(new ObjectCallable()).map(objectHydrator::hydrateTestObject);
    observable.subscribe(System.out::println);
}

Here we’ve created an Observable from our ObjectCallable class and used map() to apply our hydrator. We then subscribe to the Observable and provide a method to handle the results. In our case, we’ve simply logged the results out. This gives exactly the same end result as our CompletableFuture implementation. The subscribe() method fills the same role as CompletableFutures get().

While we can clearly use RxJava for the same purpose as CompletableFuture, its main use case is the extensive amount of other functionality it provides. One example is performing the same task again but in a completely different way. We can create an Observable which will wait for data to arrive, and then data can be pushed to it from elsewhere:

@Test
void givenAnObservable_whenPushedData_thenItIsReceived() {
    PublishSubject<Integer> source = PublishSubject.create();
    Observable<Integer> observable = source.observeOn(Schedulers.computation());
    observable.subscribe(System.out::println, (throwable) -> System.out.println("Error"), () -> System.out.println("Done"));

    source.onNext(1);
    source.onNext(2);
    source.onNext(3);
    source.onComplete();
}

When run, this test produces the following output:

1
2
3
Done

So we’re able to subscribe to a data source that was not yet producing anything and simply wait. Once the data was ready, we pushed it onto the source with onNext() and were alerted through our subscription. This is an example of the reactive programming style RxJava allows for. We reacted to events and new data that an external source pushed to us instead of requesting it ourselves.

5. Conclusion

In this article, we’ve seen how the Future interface from early Java allows a useful but limited ability to execute tasks asynchronously and get results later. Next, we explored the benefits brought about by the newer implementation CompletableFuture. This gives us the ability to string together method calls and offers greater control over the whole process.

Finally, we saw that we could perform the same job with RxJava, but we also noted that it is an extensive library that allows us to do much more. We briefly looked at how with RxJava, we can push tasks asynchronously to an Observer while subscribing to a data stream indefinitely.

As always, the full code for the examples is available over on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – Junit (guide) (cat=Reactive)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.