A quick guide to a producer-consumer using wait, notify, notifyAll in Java

Let me start by saying that you should never end up directly using wait, notify, or notifyAll(unless you have a good reason).
Use a concurrency library if you need to do multithreaded things.

Building a producer/consumer using wait/notify/notifyAll is one of those interview questions that you may get when talking about multithreading in Java. Given how rarely you’d end up using wait, notify, or notifyAll I couldn’t blame you if you get it wrong. While it’s not necessarily a hard question, and most candidates will get it right eventually, it does have some interesting aspects to it, some of them go in a lot of depth.

In case you’re unfamiliar with the producer/consumer problem, here’s an excellent wikipedia article about it. For this article specifically it means one class will produce a string and one class will consume it.

Lets look at some code to get us started.

public class Main {
    public static void main(String[] args) {
        Executor executor = Executors.newFixedThreadPool(2);

        Queue<String> broker = new LinkedList<>();

        Producer producer = new Producer(broker);
        Consumer consumer = new Consumer(broker);

        executor.execute(consumer);
        executor.execute(producer);
    }
}

Producer

public class Producer implements Runnable{
    private final int MAX_SIZE = 2;
    private final Queue<String> broker;

    public Producer(Queue<String> broker) { this.broker = broker; }

    @Override
    public void run() {
        while(true){
            System.out.println("Producer thread waiting for lock...");
            synchronized (broker) {
                System.out.println("Producer acquired lock...");
                while(broker.size() == MAX_SIZE) {
                    try {
                        System.out.println("Producer waiting");
                        broker.wait();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }

                String uuid = UUID.randomUUID().toString();
                broker.add(uuid);
                System.out.println("Producer produced string" + uuid);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                broker.notifyAll();
            }
        }
    }
}

Consumer

public class Consumer implements Runnable {
    private final Queue<String> broker;

    public Consumer(Queue<String> broker) { this.broker = broker; }

    @Override
    public void run() {
        while(true){
            System.out.println("Consumer thread waiting for lock...");
            synchronized (broker) {
                System.out.println("Consumer acquired lock...");
                while(broker.isEmpty()) {
                    try {
                        System.out.println("Consumer waiting");
                        broker.wait();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }

                String took = broker.remove();
                System.out.println("Consumer got string " + took);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                broker.notifyAll();
            }
        }
    }
}

As a small side note, you don’t need the Thread.sleep(500) at all, I just added it so the console doesn’t get spammed with messages.

If you were to run this small program you’d end up with an output like this

Consumer thread waiting for lock...
Consumer acquired lock...
Producer thread waiting for lock...
Consumer waiting
Producer acquired lock...
Producer produced stringa2975d33-a6dd-4692-9988-3868df02daa1
Producer thread waiting for lock...
Consumer got string a2975d33-a6dd-4692-9988-3868df02daa1
Consumer thread waiting for lock...
Producer acquired lock...
Producer produced stringac36bede-779a-42bb-b98d-35e8b99a4035

And it would go on forever unless stopped.

Lets run it again for the sake of variety

Producer thread waiting for lock...
Producer acquired lock...
Consumer thread waiting for lock...
Producer produced string574b4875-0dc8-4f51-b2b4-e80ffbd31bec
Producer thread waiting for lock...
Consumer acquired lock...
Consumer got string 574b4875-0dc8-4f51-b2b4-e80ffbd31bec
Consumer thread waiting for lock...
Producer acquired lock...
Producer produced string6a1e3ac7-f606-4520-9afe-9805091a535d
Producer thread waiting for lock...
Consumer acquired lock...

The more perceptive of you may have noticed that the output is different. The code is the same, I didn’t change anything.

Which leads me to next point: Java Thread Scheduler.

You can safely assume I’m talking about the HotSpot JVM in this article.

Lets talk about different types of threads first.

Platform Threads: The JVM associates each java thread with a unique platform thread(also known as a native thread). The association of a java thread with a native thread is persistent and stable for the lifetime of the java thread. Platform threads are more resource intensive and are better suited for long running processes. This is what I’m using here and what java uses out of the box unless otherwise specified.

Virtual Threads(or Green Threads) are fully managed by the JVM, although they use Platform Threads to actually run, but it’s rarely a 1:1 ratio. A single JVM can, in theory, support millions of Virtual Threads. They’re suited for short running, not CPU intensive tasks. They use a lot less resources than platform threads and have less creation overhead.

The type of threads shouldn’t have any impact on scheduled execution.

When it comes to thread scheduling there are 2 determining factors: priority and arrival time.

Priority is an integer value ranging from 1 to 10. The higher the number, the higher the priority. The Thread class also defines 3 constants with a value of 1, 5, and 10 associated with MIN_PRIORITY, NORM_PRIORITY, and MAX_PRIORITY respectively. By default each new thread gets a priority of 5 unless specified otherwise. In our case we didn’t specify a value, so the default priority is NORM_PRIORITY.

Time of arrival is pretty much what you’d expect it to be, however you’re left at the whims of the Thread Scheduler here.

If both threads have the same priority then it boils down to which thread enters the Runnable state first. Which can be any of those 2 threads. You may assume that because I wrote the code in this order

executor.execute(producer);
executor.execute(consumer);

that the producer thread would run first. The code that calls execute with the producer parameter will run first, yes, but that doesn’t mean the Producer thread will run immediately. When the Runnable instance is passed to the execute method of the Executor the task will run sometime in the future. “Sometime” is intentionally vague since you can’t guarantee Runnable state time.

Using a Thread class won’t get you better results either. The thread scheduler cares very little about the form in which a thread arrives.

You can try this yourself by running the code multiple times, and you’ll notice that it’s anyone’s guess which thread will start first.

If you want to guarantee that a thread will run before another thread you’d want to use a synchronizer, in this case a CountDownLatch would work nicely. For our code it doesn’t matter which thread will start first since we have conditions that will make it so the consumer will wait on the producer if the consumer starts first.


Looking at the code again you’ll probably notice that we’re using a Queue as the broker and we’re also using it as the monitor object. Remember locking is done on the object that you’re sharing, not on the threads themselves.

Doing something like

synchronized(this)

would only guarantee that you’d be locking the current class, leaving the broker free to suffer from all the horrors of multithreading. In fact you’d be very soon greeted with a IllegalMonitorStateException since you’d try to wait or notifyAll on an monitor the thread doesn’t own.

In short we need to lock the object we’re sharing. You can think of it as 2(or more) people sharing a single key to a room, if you have the key, you enter the room, lock the door, do whatever, unlock the door, leave the room, lock the door, and pass the key to the front desk(thread scheduler) and the front desk decides who gets the key next.

The monitor doesn’t have to be the broker. Any shared object can act as the monitor(it’s just very convenient to have the monitor as the broker). Lets modify the code so that we lock on another object. I’ve removed parts of the code for clarity.

public class Main {
    public static void main(String[] args) throws InterruptedException {
        ...
        Object lock = new Object(); // new monitor here

        Producer producer = new Producer(broker, lock);
        Consumer consumer = new Consumer(broker, lock);
        ...
    }
}
public class Producer implements Runnable{
    @Override
    public void run() {
        while(true){
            synchronized (lock) { // synchronized on the new monitor
                while(broker.size() == MAX_SIZE) {
                   lock.wait(); // wait on the new monitor
                }
                broker.add(uuid);
                lock.notifyAll(); // don't forget to notify too
            }
        }
    }
}
public class Consumer implements Runnable {
    @Override
    public void run() {
        while(true){
            synchronized (lock) {
                while(broker.isEmpty()) {
                   lock.wait(); // wait on the new monitor
                }

                broker.remove();
                lock.notifyAll(); // use the new monitor
            }
        }
    }
}

Running the above will output

Producer thread waiting for lock...
Producer acquired lock...
Consumer thread waiting for lock...
Producer produced string2e63f16b-af44-46b3-8733-32c6ae886547
Producer thread waiting for lock...
Consumer acquired lock...
...

So pretty much nothing has changed.

You just have to remember to call wait and notifyAll on the new lock, not the broker.

You can also just use a regular Lock but that defeats the purpose of using wait and notifyAll.


The next thing you’ll notice this interesting piece of code for the Consumer

while(broker.isEmpty())

and this one for the Producer

while(broker.size() == MAX_SIZE) // MAX_SIZE = 1

Why don’t just use an if here?

The reason is spurious wake-ups.

This is actually also mentioned in the wait documentation.

Ok, so what’s a spurious wakeup?

Wikipedia says that it can happen in between when a condition was signaled and when the thread finally ran, the condition can change because some other thread ran and changed that condition. Another reason is a so called stolen wake up, when in between when that thread has waken up and the time it takes to get it running another thread has come in and taken whatever that thread was waiting for, thus it must go back to waiting.

Spurious wakeups are not only Java related, the Linux pthread_cond_wait function can sometimes cause a spurious wakeup. Quoting from the docs here

Spurious wakeups from the pthread_cond_timedwait() or pthread_cond_wait() functions may occur. Since the return from pthread_cond_timedwait() or pthread_cond_wait() does not imply anything about the value of this predicate, the predicate should be re-evaluated upon such return.

Windows isn’t better off either so don’t go thinking you’re safe. Concurrent Programming on Windows

Condition variables are not fair. It’s possible – and even likely – that another thread will acquire the associated lock and make the condition false again before the awakened thread has a chance to reacquire the lock and return to the critical region.

Point is, check in a loop.


Next thing you may notice is that I’m using notifyAll when in fact I only have 2 threads locking on the same monitor. This is just good practice, there’s no reason to use notify.

The difference between notify and notifyAll is that notify will wake up a random thread waiting on the monitor, while notifyAll will wake up all threads waiting on the monitor.

In this particular example notify will work just fine, since there’s only one other thread waiting on the monitor, so it’s impossible for that thread to not wake up by the virtue of being the only thread waiting on the notify.

But when you’re unsure, it’s simply better practice to use notifyAll.


And lastly lets see a step by step flow of how the code actually runs, it makes it a lot easier to understand it.

Lets assume that we run the code and the Producer wins the race to run first.

I’ll copy paste the code here again but with only the important bits left.

// PRODUCER
while(true){
    synchronized (broker) {
        while(broker.size() == MAX_SIZE) {
            broker.wait();
        }

        broker.add(uuid);
        broker.notifyAll();
    }
}

I removed all the println, try/catch, sleep, etc. Stuff that just gets in the way of clarity.

On line 1 we’re running in a loop so that the Producer will always run.

On line 2 we’re entering the sync block. If another thread has acquired the lock then other threads who want to acquire the same lock are in a BLOCKED state.

On line 3 we’re checking, in a loop, if the queue size is equal to our desired size, which in this case is 2, on the first run, it’s obviously not, so we skip to line 7.

On line 7 we add the string that would have been produced somehow.

On line 8 we call notifyAll, but currently we have no WAITING threads, so nothing happens.

On line 9 we exit the sync block an release the lock.

Now, at this point we have 2 threads(Producer and Consumer) competing to acquire the same lock.

Lets assume the Consumer now acquires the lock.

// CONSUMER
while(true){
    synchronized (broker) {
        while(broker.isEmpty()) {
            broker.wait();
        }

        broker.remove();
        broker.notifyAll();
    }
}

On line 1, the same as the Producer, we’re running in a loop.

On line 2, Consumer wins the race and acquires the lock on the monitor.

On line 3, it checks if the broker is empty. Which it isn’t, so it moves on.

On line 7, we take one item from the broker queue.

On line 8, we use notifyAll but there are no waiting threads so nothing happens.

On line 9, we exit the sync block and release the lock.

Once we go past line 9 the 2 threads race to win the lock on broker.

Now, lets switch things up a bit. Lets assume the Producer wins the next 2 races.

So now the broker queue has 2 elements in it and the Producer is left in a WAITING state. Remember, when you call wait the lock will be released (on the object that wait was called on, other locks are kept).

The consumer will acquire the lock, check that the broker isEmpty which will be false, so it moves to line 8, take one element out, and call notifyAll on the monitor, at this point the Producer is moved from WAITING to BLOCKED(because the Consumer has not yet exited the sync block, it’s still on line 8, and the Producer is trying to acquire a lock that’s not yet released, thus it’s in a blocked state). Once the Consumer exits the sync block we have a race between the threads to see which one will get the lock again. It can be any of them.

Then this whole process repeats again.



Posted

in

by