Java Top

The early-bird price of the new Learn Spring Security OAuth course packages will increase by $50 tomorrow:

>> CHECK OUT THE COURSE

1. Overview

The Executor Framework in Java is an attempt to decouple task submission from task execution. While this approach abstracts away the task execution details very nicely, sometimes, we still need to configure it for even more optimal executions.

In this tutorial, we're going to see what happens when a thread pool can't accept any more tasks. Then, we'll learn how to control this corner case by applying saturation policies appropriately.

2. Revisiting the Thread Pools

The following diagram shows how the executor service works internally:

Here's what happens when we submit a new task to the executor:

  1. If one of the threads is available, it processes the task.
  2. Otherwise, the executor adds the new task to its queue.
  3. When a thread finishes the current task, it picks up another one from the queue.

2.1. The ThreadPoolExecutor

Most executor implementations use the well-known ThreadPoolExecutor as their base implementation. Therefore, to better understand how the task queueing works, we should take a closer look at its constructor:

public ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,
  RejectedExecutionHandler handler
)

2.2. Core Pool Size

The corePoolSize parameter determines the initial size of the thread pool. Usually, the executor makes sure that the thread pool contains at least corePoolSize number of threads.

However, it's possible to have fewer threads if we enable the allowCoreThreadTimeOut parameter.

2.3. Maximum Pool Size

Let's suppose all core threads are busy executing a few tasks. As a result, the executor queues the new tasks until they get a chance to be processed later.

When this queue becomes full, the executor can add more threads to the thread pool. The maximumPoolSize puts an upper bound on the number of threads a thread pool can potentially contain.

When those threads remain idle for some time, the executor can remove them from the pool. Hence, the pool size can shrink back to its core size.

2.4. Queueing

As we saw earlier, when all core threads are busy, the executor adds the new tasks to a queue. There are three different approaches for queueing:

  • Unbounded Queue: The queue can hold an unlimited number of tasks. Since this queue never fills up, the executor ignores the maximum size. The fixed size and single thread executors both use this approach.
  • Bounded Queue: As its name suggests, the queue can only hold a limited number of tasks. As a result, the thread pool would grow when a bounded queue fills up.
  • Synchronous Handoff: Quite surprisingly, this queue can't hold any tasks! With this approach, we can queue a task if and only if there is another thread picking the same task on the other side at the same time. The cached thread pool executor uses this approach internally.

Let's suppose the following scenario when we're using either bounded queueing or synchronous handoff:

  • All core threads are busy
  • The internal queue becomes full
  • The thread pool grows to its maximum possible size, and all those threads are also busy

What happens when a new task comes in?

3. Saturation Policies

When all threads are busy, and the internal queue fills up, the executor becomes saturated.

Executors can perform predefined actions once they hit saturation. These actions are known as Saturation Policies. We can modify the saturation policy of an executor by passing an instance of RejectedExecutionHandler to its constructor.

Fortunately, Java provides a few built-in implementations for this class, each covering a specific use case. In the following sections, we'll evaluate those policies in detail.

3.1. Abort Policy

The default policy is the abort policy. Abort policy causes the executor to throw a RejectedExecutionException:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.AbortPolicy());

executor.execute(() -> waitFor(100));

assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected")))
  .isInstanceOf(RejectedExecutionException.class);

Since the first task takes a long time to execute, the executor rejects the second task.

3.2. Caller-Runs Policy

Instead of running a task asynchronously in another thread, this policy makes the caller thread execute the task:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.CallerRunsPolicy());

executor.execute(() -> waitFor(100));

long startTime = System.nanoTime();
executor.execute(() -> waitFor(100));

double blockedDuration = (System.nanoTime() - startTime) / 1_000_000.0;
assertThat(blockedDuration).isGreaterThanOrEqualTo(100);

After submitting the first task, the executor can't accept any more new tasks. Therefore, the caller thread blocks until the second task returns.

The caller-runs policy makes it easy to implement a simple form of throttling. That is, a slow consumer can slow down a fast producer to control the task submission flow.

3.3. Discard Policy

The discard policy silently discards the new task when it fails to submit it:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS,
  new SynchronousQueue<>(), 
  new ThreadPoolExecutor.DiscardPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("Discarded Result"));

assertThat(queue.poll(200, MILLISECONDS)).isNull();

Here, the second task publishes a simple message to a queue. Since it never gets a chance to execute, the queue remains empty, even though we're blocking on it for some time.

3.4. Discard-Oldest Policy

The discard-oldest policy first removes a task from the head of the queue, then re-submits the new task:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new ArrayBlockingQueue<>(2), 
  new ThreadPoolExecutor.DiscardOldestPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);

List<String> results = new ArrayList<>();
queue.drainTo(results);

assertThat(results).containsExactlyInAnyOrder("Second", "Third");

This time, we're using a bounded queue that can hold just two tasks. Here's what happens when we submit these four tasks:

  • The first tasks hogs the single thread for 100 milliseconds
  • The executor queues the second and third tasks successfully
  • When the fourth task arrives, the discard-oldest policy removes the oldest task to make room for this new one

The discard-oldest policy and priority queues don't play well together. Because the head of a priority queue has the highest priority, we may simply lose the most important task.

3.5. Custom Policy

It's also possible to provide a custom saturation policy just by implementing the RejectedExecutionHandler interface:

class GrowPolicy implements RejectedExecutionHandler {

    private final Lock lock = new ReentrantLock();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        lock.lock();
        try {
            executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1);
        } finally {
            lock.unlock();
        }

        executor.submit(r);
    }
}

In this example, when the executor becomes saturated, we increment the max pool size by one and then re-submit the same task:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, 
  new ArrayBlockingQueue<>(2), 
  new GrowPolicy());

executor.execute(() -> waitFor(100));

BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);

List<String> results = new ArrayList<>();
queue.drainTo(results);

assertThat(results).contains("First", "Second", "Third");

As expected, all four tasks are executed.

3.6. Shutdown

In addition to overloaded executors, saturation policies also apply to all executors that have been shut down:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.shutdownNow();

assertThatThrownBy(() -> executor.execute(() -> {}))
  .isInstanceOf(RejectedExecutionException.class);

The same is true for all executors that are in the middle of a shutdown:

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> waitFor(100));
executor.shutdown();

assertThatThrownBy(() -> executor.execute(() -> {}))
  .isInstanceOf(RejectedExecutionException.class);

4. Conclusion

In this tutorial, first, we had a reasonably quick refresher about thread pools in Java. Then, after introducing saturated executors, we learned how and when to apply different saturation policies.

As usual, the sample code is available over on GitHub.

Java bottom

The early-bird price of the new Learn Spring Security OAuth course packages will increase by $50 tomorrow:

>> CHECK OUT THE COURSE

Leave a Reply

avatar
  Subscribe  
Notify of