Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this article, we’ll be looking at the SynchronousQueue from the java.util.concurrent package.

Simply put, this implementation allows us to exchange information between threads in a thread-safe manner.

2. API Overview

The SynchronousQueue only has two supported operations: take() and put(), and both of them are blocking.

For example, when we want to add an element to the queue, we need to call the put() method. That method will block until some other thread calls the take() method, signaling that it is ready to take an element.

Although the SynchronousQueue has an interface of a queue, we should think about it as an exchange point for a single element between two threads, in which one thread is handing off an element, and another thread is taking that element.

3. Implementing Handoffs Using a Shared Variable

To see why the SynchronousQueue can be so useful, we will implement a logic using a shared variable between two threads and next, we will rewrite that logic using SynchronousQueue making our code a lot simpler and more readable.

Let’s say that we have two threads – a producer and a consumer – and when the producer is setting a value of a shared variable, we want to signal that fact to the consumer thread. Next, the consumer thread will fetch a value from a shared variable.

We will use the CountDownLatch to coordinate those two threads, to prevent a situation when the consumer accesses a value of a shared variable that was not set yet.

We will define a sharedState variable and a CountDownLatch that will be used for coordinating processing:

ExecutorService executor = Executors.newFixedThreadPool(2);
AtomicInteger sharedState = new AtomicInteger();
CountDownLatch countDownLatch = new CountDownLatch(1);

The producer will save a random integer to the sharedState variable, and execute the countDown() method on the countDownLatch, signaling to the consumer that it can fetch a value from the sharedState:

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    sharedState.set(producedElement);
    countDownLatch.countDown();
};

The consumer will wait on the countDownLatch using the await() method. When the producer signals that the variable was set, the consumer will fetch it from the sharedState:

Runnable consumer = () -> {
    try {
        countDownLatch.await();
        Integer consumedElement = sharedState.get();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

Last but not least, let’s start our program:

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(countDownLatch.getCount(), 0);

It will produce the following output:

Saving an element: -1507375353 to the exchange point
consumed an element: -1507375353 from the exchange point

We can see that this is a lot of code to implement such a simple functionality as exchanging an element between two threads. In the next section, we will try to make it better.

4. Implementing Handoffs Using the SynchronousQueue

Let’s now implement the same functionality as in the previous section, but with a SynchronousQueue. It has a double effect because we can use it for exchanging state between threads and for coordinating that action so that we don’t need to use anything besides SynchronousQueue.

Firstly, we will define a queue:

ExecutorService executor = Executors.newFixedThreadPool(2);
SynchronousQueue<Integer> queue = new SynchronousQueue<>();

The producer will call a put() method that will block until some other thread takes an element from the queue:

Runnable producer = () -> {
    Integer producedElement = ThreadLocalRandom
      .current()
      .nextInt();
    try {
        queue.put(producedElement);
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

The consumer will simply retrieve that element using the take() method:

Runnable consumer = () -> {
    try {
        Integer consumedElement = queue.take();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
};

Next, we will start our program:

executor.execute(producer);
executor.execute(consumer);

executor.awaitTermination(500, TimeUnit.MILLISECONDS);
executor.shutdown();
assertEquals(queue.size(), 0);

It will produce the following output:

Saving an element: 339626897 to the exchange point
consumed an element: 339626897 from the exchange point

We can see that a SynchronousQueue is used as an exchange point between the threads, which is a lot better and more understandable than the previous example which used the shared state together with a CountDownLatch.

5. Conclusion

In this quick tutorial, we looked at the SynchronousQueue construct. We created a program that exchanges data between two threads using shared state, and then rewrote that program to leverage the SynchronousQueue construct. This serves as an exchange point that coordinates the producer and the consumer thread.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven project, so it should be easy to import and run as it is.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are closed on this article!