I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE COURSE

1. Overview

In this article, we will look at one of the most useful constructs java.util.concurrent to solve the concurrent producer-consumer problem. We’ll look at an API of the BlockingQueue interface and how methods from that interface make writing concurrent programs easier.

Later in the article, we will show an example of a simple program that has multiple producer threads and multiple consumer threads.

2. BlockingQueue Types

We can distinguish two types of BlockingQueue:

  • unbounded queue – can grow almost indefinitely
  • bounded queue – with maximal capacity defined

2.1. Unbounded Queue

Creating unbounded queues is simple:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();

The Capacity of blockingQueue will be set to Integer.MAX_VALUE. All operations that add an element to the unbounded queue will never block, thus it could grow to a very large size.

The most important thing when designing a producer-consumer program using unbounded BlockingQueue is that consumers should be able to consume messages as quickly as producers are adding messages to the queue. Otherwise, the memory could fill up and we would get an OutOfMemory exception.

2.2. Bounded Queue

The second type of queues is the bounded queue. We can create such queues by passing the capacity as an argument to a constructor:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

Here we have a blockingQueue that has a capacity equal to 10. It means that when a consumer tries to add an element to an already full queue, depending on a method that was used to add it (offer(), add() or put()), it will block until space for inserting object becomes available. Otherwise, the operations will fail.

Using bounded queue is a good way to design concurrent programs because when we insert an element to an already full queue, that operations need to wait until consumers catch up and make some space available in the queue. It gives us throttling without any effort on our part.

3. BlockingQueue API

There are two types of methods in the BlockingQueue interface – methods responsible for adding elements to a queue and methods that retrieve those elements. Each method from those two groups behaves differently in case the queue is full/empty.

3.1. Adding Elements

  • add() – returns true if insertion was successful, otherwise throws an IllegalStateException
  • put() – inserts the specified element into a queue, waiting for a free slot if necessary
  • offer() – returns true if insertion was successful, otherwise false
  • offer(E e, long timeout, TimeUnit unit) – tries to insert element into a queue and waits for an available slot within a specified timeout

3.2. Retrieving Elements

  • take() – waits for a head element of a queue and removes it. If the queue is empty, it blocks and waits for an element to become available
  • poll(long timeout, TimeUnit unit) –  retrieves and removes the head of the queue, waiting up to the specified wait time if necessary for an element to become available. Returns null after a timeout

These methods are the most important building blocks from BlockingQueue interface when building producer-consumer programs.

4. Multithreaded Producer-Consumer Example

Let’s create a program that consists of two parts – a Producer and a Consumer.

The Producer will be producing a random number from 0 to 100 and will put that number in a BlockingQueue. We’ll have 4 producer threads and use the put() method to block until there’s space available in the queue.

The important thing to remember is that we need to stop our consumer threads from waiting for an element to appear in a queue indefinitely.

A good technique to signal from producer to the consumer that there are no more messages to process is to send a special message called a poison pill. We need to send as many poison pills as we have consumers. Then when a consumer will take that special poison pill message from a queue, it will finish execution gracefully.

Let’s look at a producer class:

public class NumbersProducer implements Runnable {
    private BlockingQueue<Integer> numbersQueue;
    private final int poisonPill;
    private final int poisonPillPerProducer;
    
    public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) {
        this.numbersQueue = numbersQueue;
        this.poisonPill = poisonPill;
        this.poisonPillPerProducer = poisonPillPerProducer;
    }
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void generateNumbers() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            numbersQueue.put(ThreadLocalRandom.current().nextInt(100));
        }
        for (int j = 0; j < poisonPillPerProducer; j++) {
            numbersQueue.put(poisonPill);
        }
     }
}

Our producer constructor takes as an argument the BlockingQueue that is used to coordinate processing between the producer and the consumer. We see that method generateNumbers() will put 100 elements in a queue. It takes also poison pill message, to know what type of message put into a queue when the execution will be finished. That message needs to be put poisonPillPerProducer times into a queue.

Each consumer will take an element from a BlockingQueue using take() method so it will block until there is an element in a queue. After taking an Integer from a queue it checks if the message is a poison pill, if yes then execution of a thread is finished. Otherwise, it will print out the result on standard output along with current thread’s name.

This will give us insight into inner workings of our consumers:

public class NumbersConsumer implements Runnable {
    private BlockingQueue<Integer> queue;
    private final int poisonPill;
    
    public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) {
        this.queue = queue;
        this.poisonPill = poisonPill;
    }
    public void run() {
        try {
            while (true) {
                Integer number = queue.take();
                if (number.equals(poisonPill)) {
                    return;
                }
                System.out.println(Thread.currentThread().getName() + " result: " + result);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

The important thing to notice is the usage of a queue. Same as in the producer constructor, a queue is passed as an argument. We can do it because BlockingQueue can be shared between threads without any explicit synchronization.

Now that we have our producer and consumer, we can start our program. We need to define the queue’s capacity, and we set it to 100 elements.

We want to have 4 producer threads and a number of consumers threads will be equal to the number of available processors:

int BOUND = 10;
int N_PRODUCERS = 4;
int N_CONSUMERS = Runtime.getRuntime().availableProcessors();
int poisonPill = Integer.MAX_VALUE;
int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS;

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(BOUND);

for (int i = 0; i < N_PRODUCERS; i++) {
    new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start();
}

for (int j = 0; j < N_CONSUMERS; j++) {
    new Thread(new NumbersConsumer(queue, poisonPill)).start();
}

BlockingQueue is created using construct with a capacity. We’re creating 4 producers and N consumers. We specify our poison pill message to be an Integer.MAX_VALUE because such value will never be sent by our producer under normal working conditions. The most important thing to notice here is that BlockingQueue is used to coordinate work between them.

When we run the program, 4 producer threads will be putting random Integers in a BlockingQueue and consumers will be taking those elements from the queue. Each thread will print to standard output the name of the thread together with a result.

5. Conclusion

This article shows a practical use of BlockingQueue and explains methods that are used to add and retrieve elements from it. Also, we’ve shown how to build a multithreaded producer-consumer program using BlockingQueue to coordinate work between producers and consumers.

The implementation of all these examples and code snippets can be found in the GitHub project – this is a Maven-based project, so it should be easy to import and run as it is.

I just announced the new Spring 5 modules in REST With Spring:

>> CHECK OUT THE LESSONS