Thursday, July 23, 2015

A Good Thread Pool

The java.uti.concurrency package comes with a whole bunch of classes that can be extremely useful in concurrent Java applications. This article is about the ThreadPoolExecutor class, how it behaved unexpectedly for me and what I did to make it do what I want.

As the name suggests a thread pool is an Executor in Java. It's even an ExecutorService but that's irrelevant for the understanding of the fundamental behavior. The only important operational method of a thread pool is void execute(Runnable task). You pass in your task and the pool will eventually execute it on one of its worker threads. A thread pool is made up of the following components:


When you create a thread pool you must pass in a BlockingQueue instance that will become the work queue of the thread pool. You can optionally pass in a thread factory and a rejection handler. You cannot control the implementation class of the internal worker pool but you can influence it's behavior with the following important parameters:

  1. The corePoolSize defines kind of a minimum number of worker threads to keep in the internal worker pool. The reason it's not called minPoolSize is probably that directly after the creation of the thread pool the internal worker pool starts with zero worker threads. Initial workers are then created as needed but they're only ever removed from the worker pool if there are more of them than corePoolSize.
  2. The maxPoolSize defines a strict upper bound for the number of work threads in the internal worker pool.
  3. The keepAliveTime defines the time that an idle worker thread may stay in the internal worker pool if there are more than corePoolSize workers in the pool.

The Javadoc of the ThreadPoolExecutor class recommends to use the Executors.newCachedThreadPool() factory method to create a thread pool. The result is an unbounded thread pool with automatic thread reclamation. A look at the code of the factory method reveals that corePoolSize=0 and maxPoolSize=Integer.MAX_VALUE. The work queue is a SynchronousQueue, which has no internal capacity; it basically functions as a direct pipe to the next idle or newly created worker thread.

When you hammer this thread pool with lots of tasks the work queue will never grow; tasks will never be rejected because the worker pool is unbounded. Your JVM will soon become unresponsive because the pool will create thousands of worker threads!

What I really wanted is a thread pool with, let's say, maxPoolSize=100 and a work queue that temporarily keeps all the tasks that are scheduled while all of the 100 threads are busy. So I instantiated a ThreadPoolExecutor directly (without the recommended factory method), passed in corePoolSize=10, maxPoolSize=100, and a LinkedBlockingQueue to be used as the work queue. And here comes the big surprise: This thread pool never creates more than corePoolSize worker threads! Instead the work queue will grow and grow and grow. The tasks in it will always compete for the 10 core workers. Why is that?

To understand you need to know how the execute() method works. Of course it's all Javadoc'ed, but that doesn't mean it's expectation-compliant (well, I know that expectations can be subjective). The following flow diagram illustrates what the execute() method does:


There are only three different outcomes, the task can be enqueued in the work queue, a new worker thread can be created, or the task can be rejected. Three conditions are checked to determine the outcome at a specific point in time. The first condition is only relevant in the warm-up phase of the pool, but then it becomes interesting:

enqueue is always preferred over newWorker!

That means that, with an unbounded work queue, no more than corePoolSize workers will  ever be created; maxPoolSize becomes completely irrelevant. Now we have seen one pool configuration that only ever creates new workers (the default) and one that only ever enqueues tasks. Between these two evils is probably the a thread pool with both a bounded worker pool and a bounded work queue, but obviously such a thread pool will reject tasks when hammered enough.

That's all not what I wanted, but wait:

I control the work queue implementation!

Peeking again at the code of the execute() method shows that the only interaction between the thread pool and the work queue here is the call workQueue.offer(task) and per contract this method returns whether it accepted the offer or not. So, the simple solution to my problem is a BlockingQueue implementation with an offer() method overridden to accept the offered task only if the worker pool contains less than maxPoolSize threads.

Subclassing LinkedBlockingQueue would do that trick but there's a small problem remaining now: The three conditions (see above) are checked in the execute() method of the thread pool without any synchronization. That means, if my work queue does not accept a task because there are still less than maxPoolSize workers allocated the third condition is not necessarily true a nanosecond later. The task would be completely rejected from the pool rather than be enqueued. The solution to this problem is a custom rejection handler that takes the rejected task and puts it back at the beginning of the work queue. And now it becomes clear why subclassing LinkedBlockingDequeue is a better alternative: It provides the needed addFirst(Runnable task) method.

If you try to implement these ideas you'll likely discover a few technical complications, such as the LinkedBlockingDeque class not being available in Java 1.5. If you're interested in my concrete solution please have a look at the source code of my good thread pool. Enjoy...