Towards Ada Style Thread Communication in Java

or How to implement Thread Method Invocation

Jens Riboe

September 2002

 

Table of Contents

Introduction. 2

Basic Thread Communication. 2

Problem 1 – Why you need to use synchronized. 2

Analysis. 4

Solution. 4

Bug warning 1. 5

Bug warning 2. 6

Bug warning 3. 6

Problem 2 – Why you need to use wait and notify. 6

Analysis. 8

Solution. 8

About the concept monitor. 9

Problem 3 – Why you should use notifyAll instead of notify. 9

Analysis. 11

Solution. 12

The final mailbox. 13

Problem 4 – Why you should encapsulate a mailbox on the receiving side. 14

Finding prime numbers. 15

Buffered Thread Communication. 17

Problem 5 – Why you should use bounded queues. 17

Analysis. 18

Solution. 18

Problem 6 – Why you sometimes can’t suspend the sender, when the queue is full. 20

A small class library for thread message queues. 21

TMI – Thread Method Invocation. 24

Problem 7 – Why you should consider Thread Method Invocation (TMI) 24

Solution. 24

Non-Deterministic Message Acceptance. 28

Problem 8 – Why you sometimes must combine TMI with non-determinism.. 28

Analysis. 29

Solution. 30

Problem 9 – Why you might need guards when you are using non-determinism.. 35

Conclusions. 38

 


 

Introduction

The Java platform have introduced threads as a standard tool for solving day to day programming tasks. Before the Java era, thread programming was something for the specialists lurking around with POSIX or Win32 Threads in C or C++.

However, thread based programs may suffer from a complete new category of bugs essentially different from traditionally bugs. Many of these bugs originate from misconceptions of how threads communicate.

In this article, I will show you step by step how to build up a robust and reliable library for thread communication. The synopsis is problem driven, which means I will formulated and demonstrate a bug, then analyse it and finally present a solution.

The article starts with the very basics, how to send some data from one thread to another. It then introduces the concept rendezvous taken from the Ada programming language, and renames it into Thread Method Invocation (TMI). Finally, it discusses the idea of non-deterministic message acceptance, represented by the SELECT statement in Ada. All discussions leads into the definition of a small class library for thread communication, which provides both basic primitives and ones that are more complex.

Basic Thread Communication

This section discusses and motivates the minimum amount of code needed to transfer a data message from one thread to another. It ends up into how to proper use the keyword synchronized and the methods wait, notify and notifyAll. If you already are very familiar with these, you might want to skim this section and proceed to the next.

When two (or more) threads want to interchange data, they need to co-operate about a common place where one thread can put data and another can get it. This place is often referred to as a critical region, which loosely speaking means it’s critical to the correctness of the program execution. To understand the characteristics of such a region, let’s dive into a model problem that illustrates the problem and then how to solve it.

Problem 1 – Why you need to use synchronized

Consider a single bank account object containing an integer balance value. What happens if many actors update the account concurrently? Here is the class declaration for the Account class.

class Account {

    private int     balance = 0;

    public Account() {}

    public void updateBalance(int amount) {

        // balance += amount;

        int b = getBalance();  //READ

        b = b + amount;        //MODIFY

        setBalance(b);         //WRITE

    }

    public  int  getBalance()      {return balance;}

    private void setBalance(int b) {balance = b;}

}

As you can see above, I have decomposed the update statement into three statements to make it clear exactly what is going on. To perform an update account operation we need to read, modify and write data. These three operations are part of every kind of update operation, independent of context.

In addition to the Account class, we need an updater thread. A thread can loosely be viewed as a separate program, which means, it has its own program statement pointer and a function call stack. Threads consist of a little bit more than that, but we leave that out for now.

The clearest way to define a thread in Java is to extend the java.lang.Thread class. A thread object in Java behaves like an ordinary object, plus it can execute its run method concurrently with other threads. Let’s skip the details of threads and focus on the significant code in the run method of thread class Updater. Here it is

int amount = 100;

for (int i = 0; i < numTransactions; i++) {

    account.updateBalance(amount);

}

for (int i = 0; i < numTransactions; i++) {

    account.updateBalance(-amount);

}

As you can see above, an updater thread increments and decrements the balance of the account, but the net effect, after completion, is that nothing has changed, i.e. the balance is still zero. You can view the complete source code, if you scroll to the end of this article. At the end, you can also find a zip file containing all source code for this article, plus an Ant build script and needed jar files.

Now, let’s run the program and inspect the final balance. I have simplified the command line and the outputs for clarity. You can find detailed run instructions in the zip file.

>java AccountUpdater

NumUpdaters    : 5

NumTransactions: 10000

----------------

Final balance = 0

 

>java AccountUpdater -u 10 -t 100000

NumUpdaters    : 10

NumTransactions: 100000

----------------

Final balance = 0

It seems to be working perfect; end of story. Eh,… not quite. Let’s run it again but this time load it substantially more.

>java AccountUpdater -u 10 -t 10000000

NumUpdaters    : 10

NumTransactions: 10000000

----------------

Final balance = 1154011500

 

>java AccountUpdater -u 10 -t 10000000

NumUpdaters    : 10

NumTransactions: 10000000

----------------

Final balance = -1652878200

As you can see above, we clearly have a bug. When the number of transactions is high, the final balance has a random value. Why?

Analysis

To understand the nature of the problem we need to understand how threads are executing and to recap the read/modify/write operation. A thread executes as a small program and it proceeds uninterrupted until something forces it to stop. Besides, of the trivial reason that the program has ended, it also takes a break when it waits for a disc I/O completion, for example. However, that doesn’t explain our program, because we don’t have such operations in it.

Most modern operating systems and corresponding Java implementations are using so called pre-emptive scheduling, which means that each thread executes, at maximum, a specified amount of time. Then it is forced to take a break (suspend) to allow other threads execute. Eventually, the thread resumes its execution for another time slice. Either it hits the end of the slice or it performs a suspending operation, like a write operation to disc. In both cases, it is suspended again. The end of slice suspension is performed by a timer interrupt. An interrupt can occur anytime, for example inside a read/modify/write operation.

Let’s trace the execution of the updateBalance method. I have expanded the method body into the caller’s for loop, so you can see what’s going on.

for (int i = 0; i < numTransactions; i++) {

    // balance += amount;

    int b = getBalance();  //READ

    b = b + amount;        //MODIFY

    setBalance(b);         //WRITE

}

Pretend that we have only two updater threads. Thread 1 executes the loop, for example 10,000 times, until it reaches the end of its time slice and thread 2 starts executing the loop. The end of a slice might appear after the modify statement, but before the write statement. If that happens, the member variable balance has the value 1,000,000 but the local variable b has the value 1,000,100.

The execution now proceeds with thread 2, which increases the balance to, say, 2,000,000. When thread 1 resumes its execution, it overwrites the balance with the value in the local variable b. Be aware of that all local variables are local to a thread, i.e., we have one b for each thread. Instead of continuing from 2,000,000, thread 1 now continues from 1,000,100.

One single interrupt at the “wrong” place completely spoils the result of the program. The likelihood that an end of a time slice appears after read but before write is quite low, because there are more statements to execute than the three in the loop body. Therefore, we need to increase the number of transactions to more than a million before we can discover any problem.

The statement read/modify/write is traditionally called a critical (code) section, because it’s critical for the correctness of the program result. The result ends up incorrect, because of a timer interrupt, in-between read and write, but all interrupts outside the critical section are harm less.

Solution

Java has a proper solution to the problem. By declaring the updateBalance method as synchronized, we have solved the problem. This keyword guarantees that only one thread at a time can execute inside a synchronized method. If the section is occupied, all other threads must wait in a queue outside the section.

I have added a subclass to Account. Here it is

class SafeAccount extends Account {

    public synchronized void updateBalance(int amount) {

        super.updateBalance(amount);

    }

}

Running the demo once more with the latest argument and the new Account class, gives the output below.

>java AccountUpdater -u 10 -t 10000000 +s

NumUpdaters    : 10

NumTransactions: 10000000

UseSynchronized: true

----------------

Final balance = 0

Well, it’s not a proof, but you have to trust me – it is the solution to the problem. Run the program with a large number of transactions and threads, and convince yourself.

Bear in mind that the execution time will be substantially longer than before. The reason is that we now have many more thread context switches. Every time a thread completes an updateBalance operation and leaves the critical section for the next turn in the loop, the queue is non-empty, which causes the thread to wait in the queue instead and another thread to enter the section.

Instead of marking a method as synchronized, it’s possible to use a synchronized block instead. Such a block takes an object argument, which is then used for synchronization. Here is an equivalent to the synchronized method above, using a ditto block instead

public void updateBalance(int amount) {

    synchronized (this) {

        super.updateBalance(amount);

    }

}

You can use an arbitrary object as the synchronization object, for example

public void updateBalance(int amount) {

    synchronized (System.out) {

        super.updateBalance(amount);

    }

}

Sometimes you must use a synchronized block, but usually you should stick to a synchronized method instead, because it makes life a little bit simpler. Using a single global synchronization object, as System.out, means you have exactly one critical section in your program, which is seldom the case.

Bug warning 1

Using the keyword this as the argument to a synchronized block might be correct depending on the context. Can you see why the usage below is wrong?

class Data {public int  value=0;}

 

class Updater extends Thread {

    private Data  d;

    public Updater(Data d) {this.d = d;}

    public void    run() {

        while (true) {

            synchronized (this) {d.value += 100;}

        }

    }

}

Every updater thread is using its own synchronization object, instead of a common one. This is a kind of bug that easily slips through a code review and can be very difficult to spot later, when the application has been shipped to the customers. You can easily correct the error, by changing to

            synchronized (d) {d.value += 100;}

However, the best solution is to change the Data class to use synchronized methods instead.

Bug warning 2

Using synchronized methods is not a silver bullet. You must also know what is proper usage. For example, you have a class with synchronized public methods and add a new public method, but forget to declare it as synchronized. Now you have created a backdoor into the critical section.

This bug can be very hard to find, because a class, in general, has many methods where not necessarily all of them must be synchronized. Therefore, you can’t just say; make all methods synchronized. Although, you might want to say make all public methods synchronized.

Bug warning 3

I’m sorry to tell you that it doesn’t stop there either. Take a look at this class, which has all of its methods declared synchronized.

class Account {

    private int    balance=0;

    public synchronized void   setBalance(int b) {balance=b;}

    public synchronized int    getBalance()      {return balance;}

}

The class declaration looks irrefutable. However, what do you say about its usage?

void  performUpdate(Account a) {

    a.setBalance(a.getBalance() + 100);

}

The two methods are synchronized, but not the composite operation read/modify/write. In fact, this is a reincarnation of our original problem. Therefore, you must always analyse how a synchronized class should be used. The easy answer is to skip setters and stick to synchronized getters and updaters. An updater method takes an argument and performs the complete read/modify/write operation.

Now, you know a lot about the usage of synchronized. However, we have not solved our original objective; how to send data from one thread to another. I started this section with the observation that two threads that wants to communicate must agree on a common place where one can put data and the other can get data. The Account class is a typical common place and we can use it as the basis for our next class a mailbox.

Problem 2 – Why you need to use wait and notify

Consider two threads, Producer and Consumer, which want to communicate using a common place (mailbox). Let’s rename the last version of class Account and change the name of the member variable. Here is the new class, called a mailbox

class MailBox {

    private int    msg = 0;

    public synchronized void put(int msg) {this.msg=msg;}

    public synchronized int  get()        {return msg;}

}

The producer thread sends a stream of 1’s to the mailbox, by calling the method put and the consumer is receiving these 1’s by calling the method get on the mailbox object.

Below you can find the class declarations for both the producer and the consumer

class Producer extends Thread {

    private MailBox   out;

    private int       numMessages;

    public Producer(int numMessages, MailBox out) {

        this.out         = out;

        this.numMessages = numMessages;

    }

    public void run() {

        for (int i = 0; i < numMessages; i++) {

            out.put(+1);

        }

        out.put(-1);

        System.out.println("num = "+numMessages);

    }

}

 

class Consumer extends Thread {

    private MailBox    in;

    public Consumer(MailBox in) {

        this.in = in;

    }

    public void run() {

        int  sum=0, cnt=0, n;

        while ((n = in.get()) >= 0) {

            sum += n;

            cnt++;

        }

        System.out.println("sum = " + sum);

        System.out.println("cnt = " + cnt);

    }

}

If the producer sends 1000 1’s, I guess that you will expect the value of sum and cnt in the consumer to be 1000. Right? So, let’s run the program and check. I have modified the output for clarity. You can find a link to the source code in the end of this article, where you also can find a build script and run instructions.

>java NumberCounter

NumMessages   : 1000

----------------

Producer: num = 1000

Consumer: sum = 0

Consumer: cnt = 527361

 

>java NumberCounter -m 10000

NumMessages   : 10000

----------------

Producer: num = 10000

Consumer: sum = 0

Consumer: cnt = 0

 

>java NumberCounter -m 1000000

NumMessages   : 1000000

----------------

Producer: num = 1000000

Consumer: sum = 899622

Consumer: cnt = 1424308

Interesting, eh…? The sum of 1’s is always different from the number of 1’s sent and the number of turns in the consumer’s loop is always different from the sum of the 1’s. Strange, to say the least.

Analysis

Look at the program code once more. The msg member variable is initialised to 0. If the consumer starts executing, it will read the value 0 from the mailbox several times. The sum value remains zero, but the cnt value increases. Eventually, the consumer’s time slice ends and the producer starts executing. For a moderate number of messages, can the producer complete its for loop during one time slice and just before termination put the value –1 in the mailbox. The consumer will then resume, picking up the –1 and break its while loop.

If the producer starts executing, is it likely it can complete its for loop and leave a –1 in the mailbox, which then cause the consumer to break its while loop immediately.

If the number of messages is large, will the two threads switch a couple times, which results in a positive value of the sum variable.

Solution

Java has, of course, a solution to this problem. Every class inherits directly or indirectly from the class java.lang.Object. The Object class contains, among others, two methods of importance in this context. The wait method suspends the calling thread and the notify method resumes another thread that has already been suspended of a previous wait. These methods must be paced inside a synchronized block (method) else a IllegalMonitorStateException will be thrown. Nevertheless, how can this solve our problem?

The root cause of the incorrect result is that the two threads don’t wait for each other. When the producer has sent one message, it should wait until the consumer has received it. Moreover, the consumer must wait for the producer to send the next message. This is in general called, event synchronization. A thread might wait for an event (or condition) to occur and another thread might cause the event to occur (notify).

Let’s apply this insight and implement a subclass to Mailbox that has event synchronization. The events (or conditions) to wait for are the mailbox is full and it is empty respectively. The producer calls put and suspends it self if the mailbox is already full and the consumer suspends it self if the mailbox is empty when it calls method get. Here is the subclass

class SafeMailBox extends MailBox {

    private boolean isFull = false;

 

    public synchronized void put(int msg) {

        while (isFull)

            try{ wait(); }catch(InterruptedException x) {}

        super.put(msg);

        isFull = true;

        notify();

    }

 

    public synchronized int get() {

        while (!isFull)

            try{ wait(); }catch(InterruptedException x) {}

        int msg = super.get();

        isFull = false;

        notify();

        return msg;

    }

}

At the end of each method the method, notify is called to notify the other side that its waiting condition has changed. When a thread resumes after a wait, it rechecks the condition to be sure. This is a good habit and should always be applied instead of using a simpler if-statement. If we re-run the program now with the new mailbox implementation, it works as expected. Try it for your self! You can download the source code, if you scroll to the end of this article. Here is the output

>java NumberCounter -m 1000 +s

NumMessages   : 1000

UseWait&Notify: true

----------------

Producer: num = 1000

Consumer: sum = 1000

Consumer: cnt = 1000

The new mailbox contains the minimum amount of code needed for two threads to safely interchange messages. There are variations on the theme, as we will see later in this article, but no short cuts. Every attempt to leave out wait/notify will cause a bug.

About the concept monitor

An object using synchronized and wait/notify is usually referred to as a monitor, in concurrency theory. A monitor consists of (shared) data and a set of access methods. The only way to access the data is to call one of the access methods. This guarantees that only one thread at a time can be executing inside a monitor. To better, understand the semantics of a monitor look at the illustration below.

A monitor (synchronized object) is using two queues and a lock to realize its semantics. When a thread calls one of the access methods, the lock is inspected. If it is unlocked, the thread becomes the lock owner and the monitor is now locked until the thread leaves the monitor. If the monitor is already locked, is the thread suspended and inserted into the first queue (enterQ).

A thread executing inside a monitor might want to check a synchronization event (condition) and if needed suspend it self until the event occur. This is performed by the wait method in Java and the thread is inserted into the second queue (waitQ). The monitor is also unlocked, to allow another thread in enterQ to acquire the monitor.

Eventually, a thread inside the monitor calls the method notify, which moves one thread from waitQ to the front of enterQ. Whenever the notifying thread leaves the monitor and it becomes suspended, because of an end of time slice interrupt or some kind of suspending operation, the awoken thread resumes its execution after the wait statement with the monitor re-acquired. The explanation above contains some simplifications, but serves as a good model for monitors.

Java implements a so called resume&continue monitor, that means the notifying thread wakes up a thread from waitQ, but continues executing inside the monitor. The alternative is a resume&wait monitor, were the notifying thread becomes suspended and the awoken thread starts executing inside the monitor. A third alternative is a so-called implicit resume monitor. This variant lacks wait and notify primitives. Instead, it has entrance conditions (called guards) and re-checks them every time a thread leaves the monitor. This variant is an important concurrency building block in the real-time programming language Ada95.

Now, you know how to set up a communication link between two threads, but not between more than two! Follow on to the next problem to understand why.

Problem 3 – Why you should use notifyAll instead of notify

Let’s modify our latest program a little bit, so we allow more than one producer and let each of them send the sequence 1,2…n. The (single) consumer adds all numbers and prints out the result when the program terminates. If we have p number of threads, should the result be p * n * (n+1) / 2.

I’m using the same mailbox as before, slightly modified. Here it is.

class MailBox {

    protected int       data   = 0;

    protected boolean   isFull = false;

    public synchronized void put(int msg) {

        while (isFull)

            try{ wait(); }catch(InterruptedException x) {}

        data   = msg;

        isFull = true;

        doNotify();

    }

    public synchronized int get() {

        while (!isFull)

            try{ wait(); }catch(InterruptedException x) {}

        int msg = data;

        isFull  = false;

        doNotify();

        return msg;

    }

    protected void doNotify() {notify();}

}

Here is the producer and consumer thread classes. I have simplified the code for clarity. You can find a link to the complete source code at the end of this article.

class Producer extends Thread {

    private MailBox out;

    private int numMessages;

    public Producer(int numMessages, MailBox out) {

        this.out         = out;

        this.numMessages = numMessages;

    }

    public void run() {

        for (int i = 1; i <= numMessages; i++) {

            out.put(i);

        }

    }

}

 

class Consumer extends Thread {

    private MailBox     in;

    private int         sum = 0;

    public Consumer(MailBox in) {

        this.in = in;

    }

    public void run() {

        int n;

        while ((n = in.get()) >= 0) {

            sum += n;

        }

        System.out.println("sum = " + sum);

    }

}

The main program is responsible for creating a mailbox, the consumer, and a number of producers. It then waits for the completion of all producers, sends a –1 via the mailbox to the consumer and waits for the completion of the consumer. Now, let’s run the program with two producers and see what happens.

1)    >java NumberSequencer +v -p 2

2)    NumProducers: 2

3)    Max         : 10

4)    UseNotifyAll: false

5)    Consumer: Started

6)    Consumer: Mailbox<data=0, isFull=false> get(): enter

7)    Consumer: Mailbox<data=0, isFull=false> get(): before wait

8)    Producer-1: Started