Course – LS – All

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we'll learn how to implement the Producer-Consumer problem in Java. This problem is also known as the bounded-buffer problem.

For more details on the problem, we can refer to the Producer-Consumer Problem wiki page. For Java threading/concurrency basics, make sure to visit our Java Concurrency article.

2. Producer-Consumer Problem

Producer and Consumer are two separate processes. Both processes share a common buffer or queue. The producer continuously produces certain data and pushes it onto the buffer, whereas the consumer consumes those data from the buffer.

Let's review a diagram showing this simple scenario:

Producer-Consumer

Inherently, this problem has certain complexities to deal with:

  • Both producer and consumer may try to update the queue at the same time. This could lead to data loss or inconsistencies.
  • Producers might be slower than consumers. In such cases, the consumer would process elements fast and wait.
  • In some cases, the consumer can be slower than a producer. This situation leads to a queue overflow issue.
  • In real scenarios, we may have multiple producers, multiple consumers, or both. This may cause the same message to be processed by different consumers.

The diagram below depicts a case with multiple producers and multiple consumers:

Multi-Producers Multi-Consumers

We need to handle resource sharing and synchronization to solve a few complexities:

  • Synchronization on queue while adding and removing data
  • On queue empty, the consumer has to wait until the producer adds new data to the queue
  • When the queue is full, the producer has to wait until the consumer consumes data and the queue has some empty buffer

3. Java Example Using Threads

We have defined a separate class for each entity of the problem.

3.1. Message Class

The Message class holds the produced data:

public class Message {
    private int id;
    private double data;

    // constructors and getter/setters
}

The data could be of any type. It may be a JSON String, a complex object, or just a number. Also, it's not mandatory to wrap data into a Message class.

3.2. DataQueue Class

The shared queue and related objects are wrapped into the DataQueue class:

public class DataQueue {
    private final Queue<Message> queue = new LinkedList<>();
    private final int maxSize;
    private final Object FULL_QUEUE = new Object();
    private final Object EMPTY_QUEUE = new Object();
    public boolean runFlag = true;

    DataQueue(int maxSize) {
        this.maxSize = maxSize;
    }

    // other methods
}

To make the bounded buffer, a queue and its maxSize are taken.

In Java, the synchronized block uses an object to achieve thread synchronization. Each object has an intrinsic lock. Only the thread that acquires the lock first is allowed to execute the synchronized block.

Here, we created two references, FULL_QUEUE and EMPTY_QUEUE, to use for synchronization. As there is no other purpose for these handles, we initialized them using the Object class.

When the queue is full, the producer waits on the FULL_QUEUE object. And, the consumer notifies as soon as it consumes a message.

The producer process calls the waitOnFull method:

public void waitOnFull() throws InterruptedException {
    synchronized (FULL_QUEUE) {
        FULL_QUEUE.wait();
    }
}

And the consumer process notifies the producer through the notifyAllForFull method:

public void notifyAllForFull() {
    synchronized (FULL_QUEUE) {
        FULL_QUEUE.notifyAll();
    }
}

If the queue is empty, the consumer waits on the EMPTY_QUEUE object. And, the producer notifies it as soon as a message is added to the queue.

The consumer process waits using the waitOnEmpty method:

public void waitOnEmpty() throws InterruptedException {
    synchronized (EMPTY_QUEUE) {
        EMPTY_QUEUE.wait();
    }
}

The producer notifies the consumer using the notifyAllForEmpty method:

public void notifyAllForEmpty() {
    synchronized (EMPTY_QUEUE) {
        EMPTY_QUEUE.notifyAll();
    }
}

And the producer uses the add() method to add a message to the queue:

public void add(Message message) {
    synchronized (queue) {
        queue.add(message);
    }
}

The consumer calls the remove method to retrieve a message from the queue:

public Message remove() {
    synchronized (queue) {
        return queue.poll();
    }
}

This flag will manage the graceful shutdown of the producer and consumer threads:

public boolean runFlag = true;

3.3. Producer Class

The Producer class implements the Runnable interface to enable thread creation:

public class Producer implements Runnable {
    private final DataQueue dataQueue;

    public Producer(DataQueue dataQueue) {
        this.dataQueue = dataQueue;
    }

    @Override
    public void run() {
        produce();
    }

    // Other methods
}

The constructor uses the shared dataQueue parameter. Member variable runFlag helps in stopping the producer process gracefully. It is initialized to true.

Thread start calls the produce() method:

public void produce() {
    while (dataQueue.runFlag) {
        synchronized (dataQueue) {
            while (dataQueue.isFull() && dataQueue.runFlag) {
                try {
                    dataQueue.waitOnFull();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
            if (!dataQueue.runFlag) {
                break;
            }
            Message message = generateMessage();
            dataQueue.add(message);
            dataQueue.notifyAllForEmpty();
        }
    }
    log.info("Producer Stopped");
}

The producer runs steps continuously in a while loop. This loop breaks when runFlag is false.

In each iteration, it generates a message. Then, it checks to see if the queue is full and waits as needed. Instead of an if block, a while loop is used to check whether the queue is full. This is to avoid a spurious wake-up from the wait state.

When the producer wakes up from wait, it checks whether it still needs to continue or break out from the process. It adds a message to the queue and notifies a consumer waiting on an empty queue.

The stop() method terminates the process gracefully:

public void stop() {
    dataQueue.runFlag = false;
    dataQueue.notifyAllForFull();
}

After changing runFlag to false, all the producers that are waiting in a “queue full” state are notified. This ensures that all producer threads terminate.

3.4. Consumer Class

The Consumer class implements Runnable to enable thread creation:

public class Consumer implements Runnable {
    private final DataQueue dataQueue;

    public Consumer(DataQueue dataQueue) {
        this.dataQueue = dataQueue;
    }

    @Override
    public void run() {
        consume();
    }

    // Other methods
}

Its constructor has a shared dataQueue as a parameter. The runFlag is initialized to true. This flag stops the consumer process when needed.

When the thread starts, it runs the consume method:

public void consume() {
    while (dataQueue.runFlag) {
        synchronized (dataQueue) {
            while (dataQueue.isEmpty() && dataQueue.runFlag) {
                try {
                    dataQueue.waitOnEmpty();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
            if (!dataQueue.runFlag) {
                break;
            }
            Message message = dataQueue.remove();
            dataQueue.notifyAllForFull();
            useMessage(message);
        }
    }
    log.info("Consumer Stopped");
}

It has a continuously running while loop. And, this process stops gracefully when the runFlag is false.

Each iteration checks if the queue is empty. If the queue is empty, the consumer waits for a message to be produced. This wait is also used by the while loop to avoid spurious wakeups.

When the consumer wakes up from wait, it checks the runFlag. If the flag is false, then it breaks out of the loop. Otherwise, it reads a message from the queue and notifies the producer that it's waiting in the “full queue” state. Finally, it consumes the message.

To stop the process gracefully, it uses the stop() method:

public void stop() {
    dataQueue.runFlag = false;
    dataQueue.notifyAllForEmpty();
}

After runFlag is set to false, all the consumers that are waiting in an empty queue state are notified. This ensures that all consumer threads terminate.

3.5. Running Producer and Consumer Threads

Let's create a dataQueue object with max required capacity:

DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY);

Now, let's create producer object and a thread:

Producer producer = new Producer(dataQueue);
Thread producerThread = new Thread(producer);

Then, we'll initialize a consumer object and a thread:

Consumer consumer = new Consumer(dataQueue);
Thread consumerThread = new Thread(consumer);

Finally, we start the threads to initiate the process:

producerThread.start();
consumerThread.start();

It runs continuously until we want to stop those threads. Stopping them is simple:

producer.stop();
consumer.stop();

3.6. Running Multiple Producers and Consumers

Running multiple producers and consumers is similar to the single producer and consumer case. We just need to create the required number of threads and start them.

Let's create multiple producers and threads and start them:

Producer producer = new Producer(dataQueue);
for(int i = 0; i < producerCount; i++) {
    Thread producerThread = new Thread(producer);
    producerThread.start();
}

Next, let's create the required number of consumer objects and threads:

Consumer consumer = new Consumer(dataQueue);
for(int i = 0; i < consumerCount; i++) {
    Thread consumerThread = new Thread(consumer);
    consumerThread.start();
}

We can stop the process gracefully by calling the stop() method on producer and consumer objects:

producer.stop();
consumer.stop();

4. Simplified Example Using BlockingQueue

Java provides a BlockingQueue interface that is thread-safe. In other words, multiple threads can add and remove from this queue without any concurrency issues.

Its put() method blocks the calling thread if the queue is full. Similarly, if the queue is empty, its take() method blocks the calling thread.

4.1. Create Bounded BlockingQueue

We can create a bounded BlockingQueue using a capacity value in the constructor:

BlockingQueue<Double> blockingQueue = new LinkedBlockingDeque<>(5);

4.2. Simplified produce Method

In the produce() method, we can avoid explicit synchronization for our queue:

private void produce() {
    while (true) {
        double value = generateValue();
        try {
            blockingQueue.put(value);
        } catch (InterruptedException e) {
            break;
        }
    }
}

This method continuously produces objects and just adds them to the queue.

4.3. Simplified consume Method

The consume() method uses no synchronization explicitly:

private void consume() {
    while (true) {
        Double value;
        try {
            value = blockingQueue.take();
        } catch (InterruptedException e) {
            break;
        }
        // Consume value
    }
}

It just takes a value from the queue and consumes it, continuously.

4.4. Run Producer and Consumer Threads

We can create as many producers and consumer threads as required:

for (int i = 0; i < 2; i++) {
    Thread producerThread = new Thread(this::produce);
    producerThread.start();
}

for (int i = 0; i < 3; i++) {
    Thread consumerThread = new Thread(this::consume);
    consumerThread.start();
}

5. Conclusion

In this article, we've learned how to implement the Producer-Consumer problem using Java Threads. Also, we learned how to run scenarios with multiple producers and consumers.

A complete code sample can be found over on GitHub.

Course – LS – All

Get started with Spring 5 and Spring Boot 2, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are closed on this article!