An introduction to thread pools in Java
According to Moore’s law the number of transistors in an integrated circuit doubles approximately every two years. However, the exponential processor transistor growth does not always translate into exponentially greater practical CPU performance. Processor manufacturers for years delivered processors with higher clock rates and instruction parallelism. As a result, single-threaded code executed faster on newer generations of processors. Of course, it is impossible to speed up clock rates infinitely and processors like AMD FX-9590 with turbo clock speed at 5 GHz are rather unique. Today, processor manufacturers favour multi-core processors. It is common to have a quad-core CPU in smartphones, not to mention laptops or even desktop PCs. Consequently, software has to be written in a multi-threaded manner to take full advantage of the hardware. Thread pools can help programmers harness multi-core CPUs.
Thread pools #
Good software design techniques suggest that threads should not be created and destroyed manually. Thread creation and destruction are expensive processes which consume both CPU and memory, as they require JVM and OS activity. Default Java thread stack size is 1 MB for 64-bit JVMs. That is why creating a new thread for each request when requests are frequent and lightweight is a waste of resources. Thread pools can handle thread lifecycle automatically according to the strategy selected on thread pool creation. An important feature of a thread pool is that it allows applications to degrade gracefully. A server application can queue incoming requests and handle them when it has enough resources, like memory or CPU, to do so. Otherwise, without thread pools, the server application may crash. There are many reasons why there are no more resources. For example, multiple connections to the server caused by a denial-of-service attack may result in many threads running in parallel which in turn leads to thread starvation. Moreover, programmers who run threads manually must remember to handle exceptional situations when a thread dies due to an exception.
Even though you may not explicitly use thread pools in your application, they are heavily used in web servers like Tomcat or Undertow. It is good to know how thread pools work and how you can tune them in order to optimize performance.
Thread pools can be easily created using Executors
factory. All implementations of the ExecutorService
interface provided in JDK:
are Java implementations of the thread pool abstraction. The following code snippet presents the lifecycle of an ExecutorService:
public List<Future<T>> executeTasks(Collection<Callable<T>> tasks) {
// create an ExecutorService
final ExecutorService executorService = Executors.newSingleThreadExecutor();
// execute all tasks
final List<Future<T>> executedTasks = executorService.invokeAll(tasks);
// shutdown the ExecutorService after all tasks have completed
executorService.shutdown();
return executedTasks;
}
We begin with creating the simplest ExecutorService — a single-threaded executor. It uses one
thread to handle all incoming tasks. Of course, you can customize your ExecutorService in a wide
variety of ways or use one of the factory methods from the Executors
class:
newCachedThreadPool()
— creates an ExecutorService that creates new threads as needed and reuses existing threads to handle incoming tasks.newFixedThreadPool(int numberOfThreads)
— creates an ExecutorService that reuses a fixed number of threads.newScheduledThreadPool(int corePoolSize)
— creates an ExecutorService that schedules commands to run after a given delay (or to execute periodically).newSingleThreadExecutor()
— creates an ExecutorService that uses a single worker thread.newSingleThreadScheduledExecutor()
— creates a single-threaded ExecutorService that schedules commands to run after a given delay (or to execute periodically).newWorkStealingPool()
— creates an ExecutorService that uses multiple task queues to reduce contention.
In the example presented above we invoke all tasks at once, but you can use other methods to execute a task:
Finally, we gently ask the executorService to shutdown.
Shutdown()
is a non-blocking
method. Calling it makes an ExecutorService enter a “shutdown mode” in which all previously submitted tasks
are executed, but no new tasks are accepted. If you want to wait for submitted tasks to finish, you
should use the awaitTermination()
method.
ExecutorService is a very useful tool that allows us to execute all tasks in a convenient way.
What are the benefits? We don’t have to create any worker thread manually.
A worker thread is a thread which is used internally by the ExecutorService.
It is worth remembering that the Executor service manages the thread lifecycle for us.
It can increase the number of worker threads when the load increases. On the other
hand it can tear down threads which are inactive for a given period of time.
We shouldn’t think about any thread at all when we work with a thread pool.
We should instead think about tasks that are processed asynchronously.
Moreover, we don’t have
to recreate a thread when an unexpected exception occurs and we don’t have to worry about
reusing a thread when it finishes a task it was assigned.
Finally, after submitting a task we are provided with a useful future result
abstraction — a Future
.
Of course, since Java 8 we can use even better CompletableFuture
,
but converting a Future
into a CompletableFuture
is out of the scope of this post.
Please remember that working with Futures
makes sense only when we submit a Callable
,
because Callables
produce results while Runnables
don’t.
Internals #
Every thread pool consists of several building blocks:
- a task queue,
- a collection of worker threads,
- a thread factory,
- metadata for managing thread pool state.
There are many implementations of the ExecutorService interface, but let us focus on
the commonly used ThreadPoolExecutor
.
In fact, newCachedThreadPool()
, newFixedThreadPool()
and newSingleThreadExecutor()
methods
return instances of the ThreadPoolExecutor
class.
In order to create a ThreadPoolExecutor
manually you have to provide at least 5 arguments:
int corePoolSize
— the number of threads to keep in the pool.int maximumPoolSize
— the maximum number of threads in the pool.long keepAlive
andTimeUnit unit
— the number of threads abovecorePoolSize
will be torn down after being idle for the given amount of time.BlockingQueue<Runnable> workQueue
— submitted tasks wait in this queue to be executed.
(An image from Wikipedia)
BlockingQueue #
LinkedBlockingQueue
is used by default when you create a ThreadPoolExecutor
by calling one of the methods from the Executors
class.
PriorityBlockingQueue
is in fact an instance of a BlockingQueue
,
but processing tasks with respect to their priority is a tricky business. To begin with, submitted
Runnable
or Callable
tasks are wrapped in a RunnableFuture
which is then added to the queue. As a result, the ProrityBlockingQueue
compares wrong objects in
order to determine their priority (RunnableFuture wrappers instead of their payloads).
Moreover, when the corePoolSize
property is greater than 1 and
the worker threads are not busy, ThreadPoolExecutor
may serve requests in their insertion order,
before the PriorityBlockingQueue
can shuffle them with respect to their priority.
By default, the workQueue
used by ThreadPoolExecutor
is unbounded. It is OK in most
cases but, of course, you can change this behaviour. However, remember that
unbounded work queue may cause your application to fail due to out of memory error.
When you limit the size of the task queue remember to set
RejectionExecutionHandler
.
You can provide your custom implementation or choose from 4 handler flavours (AbortPolicy is used
by default):
- CallerRunsPolicy
- AbortPolicy
- DiscardPolicy
- DiscardOldestPolicy
Thread factory #
Thread Factories are often used to customize
the creation of worker threads. You can, for example, add a custom
Thread.UncaughtExceptionHandler
or set thread name. In the following example we log uncaught exceptions together with thread name and
thread sequential number:
public class LoggingThreadFactory implements ThreadFactory {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String THREAD_NAME_PREFIX = "worker-thread-";
private final AtomicInteger threadCreationCounter = new AtomicInteger();
@Override
public Thread newThread(Runnable task) {
int threadNumber = threadCreationCounter.incrementAndGet();
Thread workerThread = new Thread(task, THREAD_NAME_PREFIX + threadNumber);
workerThread.setUncaughtExceptionHandler(thread, throwable -> logger.error("Thread {} {}", thread.getName(), throwable));
return workerThread;
}
}
Producer-consumer example #
The producer-consumer problem is a common multi-process synchronization problem. In this example we solve this problem using an ExecutorService. However, this is not a textbook example of how this problem should be solved. My goal is to show that a thread pool can handle all of the synchronization issues and, as a result, programmers can instead focus on implementing the business logic.
Producer periodically fetches new data from the database to create business task objects and submits these tasks to the ExecutorService. Consumer, represented by a worker thread from a thread pool managed by the ExecutorService, processes business tasks (i.e. calculates prices and sends them back to customers).
To begin with, we start with a Spring configuration:
@Configuration
public class ProducerConsumerConfiguration {
@Bean
public ExecutorService executorService() {
// single consumer
return Executors.newSingleThreadExecutor();
}
// other beans such as a data source, a scheduler, etc.
}
Then, there is the Consumer
class together with the ConsumerFactory
component.
The factory is used by the producer in order to create a piece of work that will
be picked up by the worker thread at some point in the future.
public class Consumer implements Runnable {
private final BusinessTask businessTask;
private final BusinessLogic businessLogic;
public Consumer(BusinessTask businessTask, BusinessLogic businessLogic) {
this.businessTask = businessTask;
this.businessLogic = businessLogic;
}
@Override
public void run() {
businessLogic.processTask(businessTask);
}
}
@Component
public class ConsumerFactory {
private final BusinessLogic businessLogic;
public ConsumerFactory(BusinessLogic businessLogic) {
this.businessLogic = businessLogic;
}
public Consumer newConsumer(BusinessTask businessTask) {
return new Consumer(businessTask, businessLogic);
}
}
Finally, there is the Producer
class that fetches new data from the database
and creates business tasks. In this example we assume that the fetchData()
method is periodically called by a scheduler.
@Component
public class Producer {
private final DataRepository dataRepository;
private final ExecutorService executorService;
private final ConsumerFactory consumerFactory;
@Autowired
public Producer(DataRepository dataRepository, ExecutorService executorService,
ConsumerFactory consumerFactory) {
this.dataRepository = dataRepository;
this.executorService = executorService;
this.consumerFactory = consumerFactory;
}
public void fetchAndSubmitForProcessing() {
List<Data> data = dataRepository.fetchNew();
data.stream()
// create a business task from data fetched from the database
.map(BusinessTask::fromData)
// create a consumer for each business task
.map(consumerFactory::newConsumer)
// submit the task for further processing in the future (submit is a non-blocking method)
.forEach(executorService::submit);
}
}
Thanks to the ExecutorService we could focus on implementing the business logic and we don’t have to worry about synchronization issues. The code presented above uses only one producer and one consumer. However, it could be easily adapted to multi-producer and multi-consumer environment.
Summary #
JDK 5 arrived in 2004 and provided many useful concurrent goodies, with the ExecutorService class among them. The thread pool abstraction is commonly used in server environments under the hood (see Tomcat and Undertow). Of course, thread pools are not limited to server environments only. They are useful in solving any sort of embarrassingly parallel problems. Due to the fact that today it is more common to run software on a multi-core machine rather than on a single-core machine, thread pools are definitely worth considering.