Java Top

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we'll look at the concept of work stealing in Java.

2. What Is Work Stealing?

Work stealing was introduced in Java with the aim of reducing contention in multi-threaded applications. This is done using the fork/join framework.

2.1. Divide and Conquer Approach

In the fork/join framework, problems or tasks are recursively broken down into sub-tasks. The sub-tasks are then solved individually, with the sub-results combined to form the result:

Result solve(Problem problem) {
    if (problem is small)
        directly solve problem
    else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}

2.2. Worker Threads

The broken-down task is solved with the help of worker threads provided by a thread pool. Each worker thread will have sub-tasks it's responsible for. These are stored in double-ended queues (deques).

Each worker thread gets sub-tasks from its deque by continuously popping a sub-task off the top of the deque. When a worker thread's deque is empty, it means that all the sub-tasks have been popped off and completed.

At this point, the worker thread randomly selects a peer thread-pool thread it can “steal” work from. It then uses the first-in, first-out approach (FIFO) to take sub-tasks from the tail end of the victim's deque.

3. Fork/Join Framework Implementation

We can create a work-stealing thread pool using either the ForkJoinPool class or the Executors class:

ForkJoinPool commonPool = ForkJoinPool.commonPool();
ExecutorService workStealingPool = Executors.newWorkStealingPool();

The Executors class has an overloaded newWorkStealingPool method, which takes an integer argument representing the level of parallelism.

Executors.newWorkStealingPool is an abstraction of ForkJoinPool.commonPool. The only difference is that Executors.newWorkStealingPool creates a pool in asynchronous mode and ForkJoinPool.commonPool doesn't.

4. Synchronous vs Asynchronous Thread Pools

ForkJoinPool.commonPool uses a last-in, first-out (LIFO) queue configuration, whereas Executors.newWorkStealingPool uses first-in, first-out approach (FIFO) one.

According to Doug Lea, the FIFO approach has these advantages over LIFO:

  • It reduces contention by having stealers operate on the opposite side of the deque as owners
  • It exploits the property of recursive divide−and−conquer algorithms of generating “large” tasks early

The second point above means that it is possible to further break down an older stolen task by a thread that stole it.

As per the Java documentation, setting asyncMode to true may be suitable for use with event-style tasks that are never joined.

5. Working Example – Finding Prime Numbers

We'll use the example of finding prime numbers from a collection of numbers to show the computation time benefits of the work-stealing framework. We'll also show the differences between using synchronous and asynchronous thread pools.

5.1. The Prime Numbers Problem

Finding prime numbers from a collection of numbers can be a computationally expensive process. This is mainly due to the size of the collection of numbers.

The PrimeNumbers class helps us find prime numbers:

public class PrimeNumbers extends RecursiveAction {

    private int lowerBound;
    private int upperBound;
    private int granularity;
    static final List<Integer> GRANULARITIES
      = Arrays.asList(1, 10, 100, 1000, 10000);
    private AtomicInteger noOfPrimeNumbers;

    PrimeNumbers(int lowerBound, int upperBound, int granularity, AtomicInteger noOfPrimeNumbers) {
        this.lowerBound = lowerBound;
        this.upperBound = upperBound;
        this.granularity = granularity;
        this.noOfPrimeNumbers = noOfPrimeNumbers;
    }

    // other constructors and methods

    private List<PrimeNumbers> subTasks() {
        List<PrimeNumbers> subTasks = new ArrayList<>();

        for (int i = 1; i <= this.upperBound / granularity; i++) {
            int upper = i * granularity;
            int lower = (upper - granularity) + 1;
            subTasks.add(new PrimeNumbers(lower, upper, noOfPrimeNumbers));
        }
        return subTasks;
    }

    @Override
    protected void compute() {
        if (((upperBound + 1) - lowerBound) > granularity) {
            ForkJoinTask.invokeAll(subTasks());
        } else {
            findPrimeNumbers();
        }
    }

    void findPrimeNumbers() {
        for (int num = lowerBound; num <= upperBound; num++) {
            if (isPrime(num)) {
                noOfPrimeNumbers.getAndIncrement();
            }
        }
    }

    public int noOfPrimeNumbers() {
        return noOfPrimeNumbers.intValue();
    }
}

A few important things to note about this class:

  • It extends RecursiveAction, which allows us to implement the compute method used in computing tasks using a thread pool
  • It recursively breaks down tasks into sub-tasks based on the granularity value
  • The constructors take lower and upper bound values which control the range of numbers we want to determine prime numbers for
  • It enables us to determine prime numbers using either a work-stealing thread pool or a single thread

5.2. Solving the Problem Faster with Thread Pools

Let's determine prime numbers in a single-threaded manner and also using work-stealing thread pools.

First, let's see the single-threaded approach:

PrimeNumbers primes = new PrimeNumbers(10000);
primes.findPrimeNumbers();

And now, the ForkJoinPool.commonPool approach:

PrimeNumbers primes = new PrimeNumbers(10000);
ForkJoinPool pool = ForkJoinPool.commonPool();
pool.invoke(primes);
pool.shutdown();

Finally, we'll have a look at the Executors.newWorkStealingPool approach:

PrimeNumbers primes = new PrimeNumbers(10000);
int parallelism = ForkJoinPool.getCommonPoolParallelism();
ForkJoinPool stealer = (ForkJoinPool) Executors.newWorkStealingPool(parallelism);
stealer.invoke(primes);
stealer.shutdown();

We use the invoke method of the ForkJoinPool class to pass tasks to the thread pool. This method takes in instances of sub-classes of RecursiveAction. Using Java Microbench Harness, we benchmark these different approaches against each other in terms of the average time per operation:

# Run complete. Total time: 00:04:50

Benchmark                                                      Mode  Cnt    Score   Error  Units
PrimeNumbersUnitTest.Benchmarker.commonPoolBenchmark           avgt   20  119.885 ± 9.917  ms/op
PrimeNumbersUnitTest.Benchmarker.newWorkStealingPoolBenchmark  avgt   20  119.791 ± 7.811  ms/op
PrimeNumbersUnitTest.Benchmarker.singleThread                  avgt   20  475.964 ± 7.929  ms/op

It is clear that both ForkJoinPool.commonPool and Executors.newWorkStealingPool allow us to determine prime numbers faster than with a single-threaded approach.

The fork/join pool framework lets us break down the task into sub-tasks. We broke down the collection of 10,000 integers into batches of 1-100, 101-200, 201-300 and so on. We then determined prime numbers for each batch and made the total number of prime numbers available with our noOfPrimeNumbers method.

5.3. Stealing Work to Compute

With a synchronous thread pool, ForkJoinPool.commonPool puts threads in the pool as long as the task is still in progress. As a result, the level of work stealing is not dependent on the level of task granularity.

The asynchronous Executors.newWorkStealingPool is more managed, allowing the level of work stealing to be dependent on the level of task granularity.

We get the level of work stealing using the getStealCount of the ForkJoinPool class:

long steals = forkJoinPool.getStealCount();

Determining the work-stealing count for Executors.newWorkStealingPool and ForkJoinPool.commonPool gives us dissimilar behavior:

Executors.newWorkStealingPool ->
Granularity: [1], Steals: [6564]
Granularity: [10], Steals: [572]
Granularity: [100], Steals: [56]
Granularity: [1000], Steals: [60]
Granularity: [10000], Steals: [1]

ForkJoinPool.commonPool ->
Granularity: [1], Steals: [6923]
Granularity: [10], Steals: [7540]
Granularity: [100], Steals: [7605]
Granularity: [1000], Steals: [7681]
Granularity: [10000], Steals: [7681]

When granularity changes from fine to coarse (1 to 10,000) for Executors.newWorkStealingPool, the level of work stealing decreases. Therefore, the steal count is one when the task is not broken down (granularity of 10,000).

The ForkJoinPool.commonPool has a different behavior. The level of work stealing is always high and not influenced much by the change in task granularity.

Technically speaking, our prime numbers example is one that supports asynchronous processing of event-style tasks. This is because our implementation does not enforce the joining of results.

A case can be made that Executors.newWorkStealingPool offers the best use of resources in solving the problem.

6. Conclusion

In this article, we looked at work stealing and how to apply it using the fork/join framework. We also looked at the examples of work stealing and how it can improve processing time and use of resources.

As always, the full source code of the example is available over on GitHub.

Java bottom

I just announced the new Learn Spring course, focused on the fundamentals of Spring 5 and Spring Boot 2:

>> CHECK OUT THE COURSE
Comments are closed on this article!