Last updated at Fri, 03 Nov 2017 18:19:27 GMT

Of the many additions to Java 8 such as the Stream API and lambdas, I noticed one of the lesser talked about ones was CompletableFutures. So I decided to have a play around with them on the last Java component I wrote. My use case in a nutshell was piping large volumes of data from a distributed file system, compressing it and uploading to individual destinations on Amazon S3. playing-with-java8

For anyone who has worked with Guava’s ListenableFutures, which allow you to add a completion listener to a regular Java future, CompletableFutures are along the same line as these with a bit more functionality — such as easily composing multiple stages of future work for a single computation. To create a basic CompletableFuture is as simple as creating an existing Future, the following example shows the creation and retrieval of a computation:<code class="language-java" data-lang="java"><span class="o"><br></br></span>

CompletableFuture futureCount = CompletableFuture.supplyAsync(
    () -> {
        try {
            // Simulate long running task
            Thread.sleep(5000);
        } catch (InterruptedException e) { }
        return 10;
    });
// Do some other work in the meantime

CompletableFuture.supplyAsync allows you to run a task asynchronously on Java’s ForkJoinPool and also has the option to supply your own Executor if you want more control on the ThreadPool. Now lets get the result of the above computation.

try {
    int count = futureCount.get();
    System.out.println(count);
} catch (InterruptedException | ExecutionException ex) {
    // Exceptions that occur in future will be thrown here.
}

As with regular Java Future’s, any exceptions that occur during computation will be thrown when .get() is called. Now lets look at composing multiple stages of work to a CompletableFuture. I’m going to use RxJava to to populate my mock external data source, if you’re unfamiliar with Observables, they simply provide a stream of objects which are pushed to subscribers asynchronously. They work quite nicely with CompletableFutures.

public CompletableFuture countEvents() {
    CompletableFuture result = new CompletableFuture<>();
    AtomicInteger count = new AtomicInteger();
    Observable.just("1", "2", "3", "err", "4").subscribe(ev -> {
            try {
                int x = Integer.parseInt(ev);
                count.set(count.get() + x);
            } catch (NumberFormatException e) { }
        },
        throwable -> result.complete(0);
        () -> {
            try {
                //simulate io delay
                Thread.sleep(3000);
            } catch (InterruptedException e) { }
            result.complete(count.get());
        }
    );
    return result;
}

This code is the same as before, but I’m showing the example external data streaming asynchronously and used it to create the result of the CompletableFuture. Next we’ll set up some additional stages of work to perform on the result before calling .get() on the future.

CompletableFuture data = countEvents()
    .thenApply(count -> {
        int transformedValue = count * 25;
        return transformedValue;
    }).thenApply(transformed -> "data-" + transformed);

try {
    System.out.println(data.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStaceTrace();
}

This basic example shows how you can very easily transform and manipulate your data through several stages using CompletableFutures. In the above example, the final value printed would be data-250. I hope you found this helpful as a quick introduction into CompletableFutures. They are worth playing around with if you are interested in writing code with a more functional approach. There is plenty more functionality to be taken advantage of which we will cover in a future blog post. Stay tuned!