<

I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we’ll introduce the JCTools (Java Concurrency Tools) library.

Simply put, this provides a number of utility data structures suitable for working in a multi-threaded environment.

2. Non-Blocking Algorithms

Traditionally, multi-threaded code which works on a mutable shared state uses locks to ensure data consistency and publications (changes made by one thread that are visible to another).

This approach has a number of drawbacks:

  • threads might become blocked in an attempt to acquire a lock, making no progress until another thread’s operation is finished – this effectively prevents parallelism
  • the heavier lock contention is, the more time the JVM spends dealing with scheduling threads, managing contention and queues of waiting threads and the less real work it is doing
  • deadlocks are possible if more than one lock is involved and they are acquired/released in wrong order
  • a priority inversion hazard is possible – a high-priority thread is locked in an attempt to get a lock held by a low-priority thread
  • most of the time coarse-grained locks are used, hurting parallelism a lot – fine-grained locking requires more careful design, increases locking overhead and is more error-prone

An alternative is to use a non-blocking algorithm, i.e. an algorithm where failure or suspension of any thread cannot cause failure or suspension of another thread.

A non-blocking algorithm is lock-free if at least one of the involved threads is guaranteed to make progress over an arbitrary period of time, i.e. deadlocks cannot arise during the processing.

Furthermore, these algorithms are wait-free if there’s also a guaranteed per-thread progress.

Here’s a non-blocking Stack example from the excellent Java Concurrency in Practice book; it defines the basic state:

public class ConcurrentStack<E> {

    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();

    private static class Node <E> {
        public E item;
        public Node<E> next;

        // standard constructor
    }
}

And also a couple of API methods:

public void push(E item){
    Node<E> newHead = new Node<E>(item);
    Node<E> oldHead;
    
    do {
        oldHead = top.get();
        newHead.next = oldHead;
    } while(!top.compareAndSet(oldHead, newHead));
}

public E pop() {
    Node<E> oldHead;
    Node<E> newHead;
    do {
        oldHead = top.get();
        if (oldHead == null) {
            return null;
        }
        newHead = oldHead.next;
    } while (!top.compareAndSet(oldHead, newHead));
    
    return oldHead.item;
}

We can see that the algorithm uses fine-grained compare-and-swap (CAS) instructions and is lock-free (even if multiple threads call top.compareAndSet() simultaneously, one of them is guaranteed to be successful) but not wait-free as there’s no guarantee that CAS eventually succeeds for any particular thread.

3. Dependency

First, let’s add the JCTools dependency to our pom.xml:

<dependency>
    <groupId>org.jctools</groupId>
    <artifactId>jctools-core</artifactId>
    <version>2.1.2</version>
</dependency>

Please note that the latest available version is available on Maven Central.

4. JCTools Queues

The library offers a number of queues to use in a multi-threaded environment, i.e. one or more threads write to a queue and one or more threads read from it in a thread-safe lock-free manner.

The common interface for all Queue implementations is org.jctools.queues.MessagePassingQueue.

4.1. Types of Queues

All queues can be categorized on their producer/consumer policies:

  • single producer, single consumer – such classes are named using the prefix Spsc, e.g. SpscArrayQueue
  • single producer, multiple consumers – use Spmc prefix, e.g. SpmcArrayQueue
  • multiple producers, single consumer – use Mpsc prefix, e.g. MpscArrayQueue
  • multiple producers, multiple consumers – use Mpmc prefix, e.g. MpmcArrayQueue

It’s important to note that there are no policy checks internally, i.e. a queue might silently misfunction in case of incorrect usage.

E.g. the test below populates a single-producer queue from two threads and passes even though the consumer is not guaranteed to see data from different producers:

SpscArrayQueue<Integer> queue = new SpscArrayQueue<>(2);

Thread producer1 = new Thread(() -> queue.offer(1));
producer1.start();
producer1.join();

Thread producer2 = new Thread(() -> queue.offer(2));
producer2.start();
producer2.join();

Set<Integer> fromQueue = new HashSet<>();
Thread consumer = new Thread(() -> queue.drain(fromQueue::add));
consumer.start();
consumer.join();

assertThat(fromQueue).containsOnly(1, 2);

4.2. Queue Implementations

Summarizing the classifications above, here is the list of JCTools queues:

  • SpscArrayQueue single producer, single consumer, uses an array internally, bound capacity
  • SpscLinkedQueue single producer, single consumer, uses linked list internally, unbound capacity
  • SpscChunkedArrayQueue single producer, single consumer, starts with initial capacity and grows up to max capacity
  • SpscGrowableArrayQueue single producer, single consumer, starts with initial capacity and grows up to max capacity. This is the same contract as SpscChunkedArrayQueue, the only difference is internal chunks management. It’s recommended to use SpscChunkedArrayQueue because it has a simplified implementation
  • SpscUnboundedArrayQueue single producer, single consumer, uses an array internally, unbound capacity
  • SpmcArrayQueue single producer, multiple consumers, uses an array internally, bound capacity
  • MpscArrayQueue multiple producers, single consumer, uses an array internally, bound capacity
  • MpscLinkedQueue multiple producers, single consumer, uses a linked list internally, unbound capacity
  • MpmcArrayQueue multiple producers, multiple consumers, uses an array internally, bound capacity

4.3. Atomic Queues

All queues mentioned in the previous section use sun.misc.Unsafe. However, with the advent of Java 9 and the JEP-260 this API becomes inaccessible by default.

So, there are alternative queues which use java.util.concurrent.atomic.AtomicLongFieldUpdater (public API, less performant) instead of sun.misc.Unsafe.

They are generated from the queues above and their names have the word Atomic inserted in between, e.g. SpscChunkedAtomicArrayQueue or MpmcAtomicArrayQueue.

It’s recommended to use ‘regular’ queues if possible and resort to AtomicQueues only in environments where sun.misc.Unsafe is prohibited/ineffective like HotSpot Java9+ and JRockit.

4.4. Capacity

All JCTools queues might also have a maximum capacity or be unbound. When a queue is full and it’s bound by capacity, it stops accepting new elements.

In the following example, we:

  • fill the queue
  • ensure that it stops accepting new elements after that
  • drain from it and ensure that it’s possible to add more elements afterward

Please note that a couple of code statements are dropped for readability. The complete implementation can be found over on GitHub:

SpscChunkedArrayQueue<Integer> queue = new SpscChunkedArrayQueue<>(8, 16);
CountDownLatch startConsuming = new CountDownLatch(1);
CountDownLatch awakeProducer = new CountDownLatch(1);

Thread producer = new Thread(() -> {
    IntStream.range(0, queue.capacity()).forEach(i -> {
        assertThat(queue.offer(i)).isTrue();
    });
    assertThat(queue.offer(queue.capacity())).isFalse();
    startConsuming.countDown();
    awakeProducer.await();
    assertThat(queue.offer(queue.capacity())).isTrue();
});

producer.start();
startConsuming.await();

Set<Integer> fromQueue = new HashSet<>();
queue.drain(fromQueue::add);
awakeProducer.countDown();
producer.join();
queue.drain(fromQueue::add);

assertThat(fromQueue).containsAll(
  IntStream.range(0, 17).boxed().collect(toSet()));

5. Other JCTools Data Structures

JCTools offers a couple of non-Queue data structures as well.

All of them are listed below:

  • NonBlockingHashMap a lock-free ConcurrentHashMap alternative with better-scaling properties and generally lower mutation costs. It’s implemented via sun.misc.Unsafe, so, it’s not recommended to use this class in a HotSpot Java9+ or JRockit environment
  • NonBlockingHashMapLong like NonBlockingHashMap but uses primitive long keys
  • NonBlockingHashSet a  simple wrapper around NonBlockingHashMap like JDK’s java.util.Collections.newSetFromMap()
  • NonBlockingIdentityHashMap like NonBlockingHashMap but compares keys by identity.
  • NonBlockingSetInt – a multi-threaded bit-vector set implemented as an array of primitive longs. Works ineffectively in case of silent autoboxing

6. Performance Testing

Let’s use JMH for comparing the JDK’s ArrayBlockingQueue vs. JCTools queue’s performance. JMH is an open-source micro-benchmark framework from Sun/Oracle JVM gurus which protects us from indeterminism of compiler/jvm optimization algorithms). Please feel free to get more details on it in this article.

Note that the code snippet below misses a couple of statements in order to improve readability. Please find the complete source code on GitHub:

public class MpmcBenchmark {

    @Param({PARAM_UNSAFE, PARAM_AFU, PARAM_JDK})
    public volatile String implementation;

    public volatile Queue<Long> queue;

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(PRODUCER_THREADS_NUMBER)
    public void write(Control control) {
        // noinspection StatementWithEmptyBody
        while (!control.stopMeasurement && !queue.offer(1L)) {
            // intentionally left blank
        }
    }

    @Benchmark
    @Group(GROUP_NAME)
    @GroupThreads(CONSUMER_THREADS_NUMBER)
    public void read(Control control) {
        // noinspection StatementWithEmptyBody
        while (!control.stopMeasurement && queue.poll() == null) {
            // intentionally left blank
        }
    }
}

Results (excerpt for the 95th percentile, nanoseconds per-operation):

MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcArrayQueue sample 1052.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 MpmcAtomicArrayQueue sample 1106.000 ns/op
MpmcBenchmark.MyGroup:MyGroup·p0.95 ArrayBlockingQueue sample 2364.000 ns/op

We can see that MpmcArrayQueue performs just slightly better than MpmcAtomicArrayQueue and ArrayBlockingQueue is slower by a factor of two.

7. Drawbacks of Using JCTools

Using JCTools has an important drawback – it’s not possible to enforce that the library classes are used correctly. For example, consider a situation when we start using MpscArrayQueue in our large and mature project (note that there must be a single consumer).

Unfortunately, as the project is big, there is a possibility that someone makes a programming or configuration error and the queue is now read from more than one thread. The system seems to work as before but now there is a chance that consumers miss some messages. That is a real problem which might have a big impact and is very hard to debug.

Ideally, it should be possible to run a system with a particular system property which forces JCTools to ensure thread access policy. E.g. local/test/staging environments (but not production) might have it turned on. Sadly, JCTools does not provide such a property.

Another consideration is that even though we ensured that JCTools is significantly faster than the JDK’s counterpart, it doesn’t mean our application gains the same amount of speed as we start using the custom queue implementations. Most applications don’t exchange a lot of objects between threads and are mostly I/O bound.

8. Conclusion

We now have a basic understanding of the utility classes offered by JCTools and saw how well they perform, compared to the JDK’s counterparts under heavy load.

In conclusion, it’s worth to use the library only if we exchange a lot of objects between threads and even then it’s necessary to be very careful to preserve thread access policy.

As always, the full source code for the samples above can be found over on GitHub.

I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE LESSONS

Leave a Reply

Be the First to Comment!

avatar
  Subscribe  
Notify of