Asynchronous REST Services with JAX-RS and CompletableFuture
One of new features introduced by JAX-RS 2.0 is asynchronous processing in Server and Client APIs.
We use these APIs together with CompletableFuture
and Java 8 lambda expressions to show how to create scalable
and well performing REST services in a more functional way.
Server API #
By default, request processing on the server works in a synchronous mode, which means that each request is processed in a single HTTP thread. When request processing is finished, the thread is returned to the thread pool. This is not very significant when resource method execution takes a short time and the number of concurrent connections is relatively not very high. In asynchronous mode the thread is returned to the thread pool before request processing is completed. Request processing is then continued in another thread, called a Worker. The released I/O thread can be used to accept new incoming requests. In many cases, just a few threads can handle lots of requests simultaneously, so the number of threads needed to handle incoming requests can be reduced significantly. By using a lot fewer threads we both save memory and improve performance (by reducing thread context switching) and we gain more resistance to [cascading failures] (http://en.wikipedia.org/wiki/Cascading_failure) (good explanation of cascading failures influence on performance can be found in this article).
In examples I’m using [Jersey] (https://jersey.java.net/documentation/latest/index.html), reference implementation of JAX-RS 2.0, with embedded Grizzly Http server. Tutorial how to setup clean project with Jersey and Grizzly you can find on Jersey Getting Started page.
The following example shows a simple asynchronous resource method defined using the JAX-RS async API:
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import java.util.concurrent.Executor;
@Path("/")
public class Resource {
@Inject
private Executor executor;
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
executor.execute(() -> {
String result = service.veryExpensiveOperation();
asyncResponse.resume(result);
});
}
}
A resource method that produces a response asynchronously must inject AsyncResponse
using @Suspended
annotation. In the example above, response is not produced immediately, but veryExpensiveOperation
is forked
to another thread and resource method returns immediately. When the execution of veryExpensiveOperation
is completed,
the connection is resumed and the response is returned by calling resume
on AsyncResponse
. If you are using Jersey’s
as JAX-RS implementation instead of manually executing task with Executor
you can use @ManagedAsync
annotation
and Resource.asyncGet()
method will be executed by an internal Jersey executor service.
import org.glassfish.jersey.server.ManagedAsync;
public class Resource {
@GET
@ManagedAsync
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
String result = service.veryExpensiveOperation();
asyncResponse.resume(result);
}
}
Response suspension time-out value can be specified with
setTimeout
method.
JAX-RS specification requires throwing ServiceUnavailableException
(mapped to 503 status) on time-out but this behaviour
can be modified by registering a custom TimeoutHandler
:
public class Resource {
@GET
@ManagedAsync
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
asyncResponse.setTimeout(1000, TimeUnit.MILLISECONDS);
asyncResponse.setTimeoutHandler(ar -> ar.resume(
Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("Operation timed out")
.build()));
String result = service.veryExpensiveOperation();
asyncResponse.resume(result);
}
}
It is possible to register callbacks on
AsyncResponse
:
CompletionCallback
for processing completionConnectionCallback
for connection termination
According to JSR-339 support for ConnectionCallback
is OPTIONAL.
Client API #
The Client API supports asynchronous invocations as part of the invocation building process.
By default, invocations are synchronous but can be set to run asynchronously by calling the async()
method.
Let see this on example call to Facebook Graph API:
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import java.util.concurrent.Future;
public class FacebookService {
private final WebTarget target = ClientBuilder.newClient()
.target("http://graph.facebook.com/");
public Future<FacebookUser> userAsync(String user) {
return target
.path("/{user}")
.resolveTemplate("user", user)
.request()
.async()
.get(FacebookUser.class);
}
}
additionally instance of InvocationCallback
can be registered:
import javax.ws.rs.client.InvocationCallback;
public class FacebookService {
private final WebTarget target = ClientBuilder.newClient()
.target("http://graph.facebook.com/");
public Future<FacebookUser> userAsync(String user) {
return target
.path("/{user}")
.resolveTemplate("user", user)
.request()
.async()
.get(new InvocationCallback<FacebookUser>() {
@Override
public void completed(FacebookUser facebookUser) {
// on complete
}
@Override
public void failed(Throwable throwable) {
// on fail
}
});
}
}
In examples above the call to get()
after calling async()
returns immediately without blocking the caller’s thread.
Unfortunately, there is a bug in Jersey (reference JAX-RS implementation)
that causes client to put each request in its own thread, which then blocks waiting for a response. Additionally,
when the response comes back yet another thread is started for each request, so there are two threads per request
plus the thread pool of the underlying http client. In comments under this bug it is mentioned that Jersey
internally relies on blocking IO API. Maybe in JAX-RS 2.1 it will be changed. Proposed specification
contains information about possible support for non-blocking IO.
Spring Framework is an alternative, that provides similar capabilities both for Server and Client APIs.
Mixing with CompletableFuture #
CompletableFuture<T>
is a new abstraction introduced in Java 8. It extends Future<T>
and adds callbacks support to handle event-driven work.
In other languages this concept is called a promise or deferred object. First example with CompletableFuture
looks like this:
import java.util.concurrent.CompletableFuture;
public class Resource {
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
CompletableFuture
.runAsync(() -> service.veryExpensiveOperation())
.thenApply((result) -> asyncResponse.resume(result));
}
}
Method runAsync
returns CompletableFuture
that is asynchronously completed by a task running in the [ForkJoinPool.commonPool()
]
(http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html#commonPool–).
Custom Executor
can be provided as second argument so different thread pools can be used. Multiple CompletableFuture
instances
can be processed and combined without retrieving the result, in a non-blocking way. In example below we merge user data
from Facebook with data from GitHub:
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import java.util.concurrent.CompletableFuture;
@Path("/")
public class AsyncResource {
@GET
@Path("/userInfo/{user}")
@Produces(MediaType.APPLICATION_JSON)
public void userInfoAsync(@Suspended AsyncResponse asyncResponse,
@PathParam("user") String user) {
CompletableFuture<GitHubUser> gitHubFuture =
Futures.toCompletable(gitHubService.userAsync(user), executor);
CompletableFuture<FacebookUser> facebookFuture =
Futures.toCompletable(facebookService.userAsync(user), executor);
gitHubFuture
.thenCombine(facebookFuture, (g, f) -> new UserInfo(f, g))
.thenApply(info -> asyncResponse.resume(info))
.exceptionally(e -> asyncResponse.resume(
Response.status(INTERNAL_SERVER_ERROR).entity(e).build()));
asyncResponse.setTimeout(1000, TimeUnit.MILLISECONDS);
asyncResponse.setTimeoutHandler(ar -> ar.resume(
Response.status(SERVICE_UNAVAILABLE).entity("Operation timed out").build()));
}
}
//----------------------------------------------------------------------
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
public class Futures {
//transforms Future<T> to CompletableFuture<T>
public static <T> CompletableFuture<T> toCompletable(Future<T> future, Executor executor) {
return CompletableFuture.supplyAsync(() -> {
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}, executor);
}
}
You can process and combine as many CompletableFutures
as you want.
Each of them can be a call to different external resource, like database, REST service or file system.
Let’s see another example:
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import java.util.concurrent.CompletableFuture;
@Path("/")
public class AsyncResource {
@GET
@Path("/contributors/{user}")
@Produces(MediaType.APPLICATION_JSON)
public void contributorsAsync(@Suspended AsyncResponse asyncResponse,
@PathParam("user") String user) {
// get all user repos
Futures.toCompletable(gitHubService.reposAsync(user), executor)
// for all repos find contributors
.thenCompose(repos -> getContributors(user, repos))
// flat Stream<List<T>> to Stream<T>
.thenApply(contributors -> contributors.flatMap(list -> list.stream()))
// group contributors by login and count them
.thenApply(contributors ->
contributors.collect(Collectors.groupingBy(c ->
c.getLogin(), Collectors.counting())))
//resume response
.thenApply(contributors ->
asyncResponse.resume(contributors))
// handle exceptions
.exceptionally(e ->
asyncResponse.resume(Response.status(INTERNAL_SERVER_ERROR).entity(e).build()));
}
private CompletableFuture<Stream<List<GitHubContributor>>> getContributors(
String user, List<GitHubRepo> repos) {
return Futures.sequence(
repos.stream().map(r ->
Futures.toCompletable(gitHubService.contributorsAsync(user, r.getName()), executor)));
}
}
//----------------------------------------------------------------------------------
import java.util.concurrent.CompletableFuture;
public class Futures {
public static <T> CompletableFuture<Stream<T>> sequence(Stream<CompletableFuture<T>> futures) {
List<CompletableFuture<T>> futureList = futures
.filter(f -> f != null)
.collect(Collectors.toList());
CompletableFuture<Void> allDoneFuture =CompletableFuture.allOf(
futureList.toArray(new CompletableFuture[futureList.size()]));
return allDoneFuture.thenApply(v ->
futureList.stream().map(future -> future.join()));
}
}
In the code above we get all repos for given users, and for each repo we count contributors. In this case
CompletableFuture
s are processed like functions, and processing is still non-blocking. Complex blocks of code are extracted
to methods and code remains readable. But debugging and error handling in this code is more difficult.
Summary #
Asynchronous processing is more complicated than synchronous processing, but with new Java 8 features (streams,
lambda expressions and CompletableFuture
) it looks similar to a functional processing of data. You can easily chain operations
and fetch result at the end of processing. Java 8 functional API is far from perfect but still allows writing more
expressive code than before. There is also a third party library: RxJava
that implements a concept called reactive programming. RxJava is defined as “a library for composing asynchronous
and event-based programs using observable sequences for the Java VM”. In my opinion is more powerful and provides better,
more consistent API than standard Java 8. With asynchronous processing you can make your REST application more scalable:
both latency and throughput can be improved. To take full advantage clients with non-blocking IO should be used
(like AsyncHttpClient based on Netty).
Full code is available on GitHub: Grizzly2 and Spring Boot examples.