Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE

1. Overview

In this tutorial, we’ll learn how to implement the Producer-Consumer problem in Java. This problem is also known as a bounded-buffer problem.

For more details on the problem, we can refer to the Producer-Consumer Problem wiki page. For Java threading/concurrency basics, make sure to visit our Java Concurrency article.

2. Producer-Consumer Problem

Producer and Consumer are two separate processes. Both processes share a common buffer or queue. The producer continuously produces certain data and pushes it onto the buffer, whereas the consumer consumes those data from the buffer.

Let’s review a diagram showing this simple scenario:

Producer-Consumer

Inherently, this problem has certain complexities to deal with:

  • Both producer and consumer may try to update the queue at the same time. This could lead to data loss or inconsistencies.
  • Producers might be slower than consumers. In such cases, the consumer would process elements fast and wait.
  • In some cases, the consumer can be slower than the producer. This situation leads to a queue overflow issue.
  • In real scenarios, we may have multiple producers, multiple consumers, or both. This may cause the same message to be processed by different consumers.

The diagram below depicts a case with multiple producers and multiple consumers:

Multi-Producers Multi-Consumers

We need to handle resource sharing and synchronization to solve a few complexities:

  • Synchronization on queue while adding and removing data
  • When the queue is empty, the consumer has to wait until the producer adds new data to the queue
  • When the queue is full, the producer has to wait until the consumer consumes data and the queue has some empty buffer

3. Java Example Using Threads

We have defined a separate class for each entity of the problem.

3.1. Message Class

The Message class holds the produced data:

public class Message {
    private int id;
    private double data;

    // constructors and getter/setters
}

The data could be of any type. It may be a JSON String, a complex object, or just a number. Also, it’s not mandatory to wrap data into a Message class.

3.2. DataQueue Class

The shared queue and related objects are wrapped into the DataQueue class:

public class DataQueue {
    private final Queue<Message> queue = new LinkedList<>();
    private final int maxSize;
    private final Object IS_NOT_FULL = new Object();
    private final Object IS_NOT_EMPTY = new Object();

    DataQueue(int maxSize) {
        this.maxSize = maxSize;
    }

    // other methods
}

To make the bounded buffer, a queue and its maxSize are taken.

In Java, the synchronized block uses an object to achieve thread synchronization. Each object has an intrinsic lock. Only the thread that acquires the lock first is allowed to execute the synchronized block.

Here, we created two references, IS_NOT_FULL and IS_NOT_EMPTY, to use for synchronization. As there is no other purpose for these handles, we initialized them using the Object class.

When the queue is full, the producer waits on the IS_NOT_FULL object. And, as soon as we remove a message, we notify that the queue is no longer full.

The producer process calls the waitIsNotFull method:

public void waitIsNotFull() throws InterruptedException {
    synchronized (IS_NOT_FULL) {
        IS_NOT_FULL.wait();
    }
}

When the consumer polls a message, the dataQueue notifies producers through the notifyIsNotFull method:

private void notifyIsNotFull() {
    synchronized (IS_NOT_FULL) {
        IS_NOT_FULL.notify();
    }
}

If the queue is empty, the consumer waits on the IS_NOT_EMPTY object. And, as soon as we add a message, we notify that the queue is no longer empty.

The consumer process waits using the waitIsNotEmpty method:

public void waitIsNotEmpty() throws InterruptedException {
    synchronized (IS_NOT_EMPTY) {
        IS_NOT_EMPTY.wait();
    }
}

When the producer adds a message, the dataQueue notifies consumers through the notifyIsNotEmpty method:

public void notifyIsNotEmpty() {
    synchronized (IS_NOT_EMPTY) {
        IS_NOT_EMPTY.notify();
    }
}

And the producer uses the add() method to add a message to the queue:

public void add(Message message) {
    queue.add(message);
    notifyIsNotEmpty();
}

The consumer calls the remove method to retrieve a message from the queue:

public Message remove() {
    Message mess = queue.poll();
    notifyIsNotFull();
    return mess;
}

3.3. Producer Class

The Producer class implements the Runnable interface to enable thread creation:

public class Producer implements Runnable {
    private final DataQueue dataQueue;
    private boolean running = false; 
    public Producer(DataQueue dataQueue) {
        this.dataQueue = dataQueue;
    }

    @Override
    public void run() {
        running = true;
        produce();
    }

    // Other methods
}

The constructor uses the shared dataQueue parameter. Member variable running helps in stopping the producer process gracefully. It is initialized to true.

Thread start calls the produce() method:

public void produce() {
    while (running) {
        if(dataQueue.isFull()) {
            try {
                dataQueue.waitIsNotFull();
            } catch (InterruptedException e) {
                log.severe("Error while waiting to Produce messages.");
                break;
            }
        }
        if (!running) {
            break;
        }
        dataQueue.add(generateMessage());
    }
    log.info("Producer Stopped");
}

The producer runs steps continuously in a while loop. This loop breaks when the running is false.

In each iteration, it generates a message. Then, it checks to see if the queue is full and waits as needed.

When the producer wakes up from waiting, it checks whether it still needs to continue or break out from the process. It adds a message to the queue and notifies a consumer waiting on an empty queue.

The stop() method terminates the process gracefully:

public void stop() {
    running = false;
    dataQueue.notifyIsNotFull();
}

After changing the running flag to false, all the producers that are waiting in a “queue full” state are notified. This ensures that all producer threads terminate.

3.4. Consumer Class

The Consumer class implements Runnable to enable thread creation:

public class Consumer implements Runnable {
    private final DataQueue dataQueue;
    private boolean running = false;

    public Consumer(DataQueue dataQueue) {
        this.dataQueue = dataQueue;
    }

    @Override
    public void run() {
        consume();
    }

    // Other methods
}

Its constructor has a shared dataQueue as a parameter. The running flag is initialized to true. This flag stops the consumer process when needed.

When the thread starts, it runs the consume method:

public void consume() {
    while (running) {
        if(dataQueue.isEmpty()) {
            try {
                dataQueue.waitIsNotEmpty();
            } catch (InterruptedException e) {
                log.severe("Error while waiting to Consume messages.");
                break;
            }
        }
        if (!running) {
            break;
        }
        Message message = dataQueue.poll();
        useMessage(message);
    }
    log.info("Consumer Stopped");
}

It has a continuously running while loop. And, this process stops gracefully when the running flag is false.

Each iteration checks if the queue is empty. If the queue is empty, the consumer waits for a message to be produced.

When the consumer wakes up from waiting, it checks the running flag. If the flag is false, then it breaks out of the loop. Otherwise, it reads a message from the queue and notifies the producer that it’s waiting in the “full queue” state. Finally, it consumes the message.

To stop the process gracefully, it uses the stop() method:

public void stop() {
    running = false;
    dataQueue.notifyIsNotEmpty();
}

After the running flag is set to false, all the consumers that are waiting in an empty queue state are notified. This ensures that all consumer threads terminate.

3.5. Running Producer and Consumer Threads

Let’s create a dataQueue object with max required capacity:

DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY);

Now, let’s create a producer object and a thread:

Producer producer = new Producer(dataQueue);
Thread producerThread = new Thread(producer);

Then, we’ll initialize a consumer object and a thread:

Consumer consumer = new Consumer(dataQueue);
Thread consumerThread = new Thread(consumer);

Finally, we start the threads to initiate the process:

producerThread.start();
consumerThread.start();

It runs continuously until we want to stop those threads. Stopping them is simple:

producer.stop();
consumer.stop();

3.6. Running Multiple Producers and Consumers

Running multiple producers and consumers is similar to the single producer and consumer case. We just need to create the required number of threads and start them.

Let’s create multiple producers and threads and start them:

List<Producer> producers = new ArrayList<>();
for(int i = 0; i < producerCount; i++) {
    Producer producer = new Producer(dataQueue);
    Thread producerThread = new Thread(producer);
    producerThread.start();
    producers.add(producer);
}

Next, let’s create the required number of consumer objects and threads:

List<Consumer> consumers = new ArrayList<>();
for(int i = 0; i < consumerCount; i++) {
    Consumer consumer = new Consumer(dataQueue);
    Thread consumerThread = new Thread(consumer);
    consumerThread.start();
    consumers.add(consumer);
}

We can stop the process gracefully by calling the stop() method on producer and consumer objects:

consumers.forEach(Consumer::stop);
producers.forEach(Producer::stop);

4. Simplified Example Using BlockingQueue

Java provides a BlockingQueue interface that is thread-safe. In other words, multiple threads can add and remove messages from this queue without any concurrency issues.

Its put() method blocks the calling thread if the queue is full. Similarly, if the queue is empty, its take() method blocks the calling thread.

4.1. Create Bounded BlockingQueue

We can create a bounded BlockingQueue using a capacity value in the constructor:

BlockingQueue<Double> blockingQueue = new LinkedBlockingDeque<>(5);

4.2. Simplified produce Method

In the produce() method, we can avoid explicit synchronization for our queue:

private void produce() {
    while (true) {
        double value = generateValue();
        try {
            blockingQueue.put(value);
        } catch (InterruptedException e) {
            break;
        }
    }
}

This method continuously produces objects and just adds them to the queue.

4.3. Simplified consume Method

The consume() method uses no synchronization explicitly:

private void consume() {
    while (true) {
        Double value;
        try {
            value = blockingQueue.take();
        } catch (InterruptedException e) {
            break;
        }
        // Consume value
    }
}

It just takes a value from the queue and consumes it, continuously.

4.4. Run Producer and Consumer Threads

We can create as many producers and consumer threads as required:

for (int i = 0; i < 2; i++) {
    Thread producerThread = new Thread(this::produce);
    producerThread.start();
}

for (int i = 0; i < 3; i++) {
    Thread consumerThread = new Thread(this::consume);
    consumerThread.start();
}

5. Conclusion

In this article, we’ve learned how to implement the Producer-Consumer problem using Java Threads. Also, we learned how to run scenarios with multiple producers and consumers.

A complete code sample can be found on GitHub.

Course – LS – All

Get started with Spring and Spring Boot, through the Learn Spring course:

>> CHECK OUT THE COURSE
res – REST with Spring (eBook) (everywhere)
Comments are open for 30 days after publishing a post. For any issues past this date, use the Contact form on the site.