Towards Ada Style Thread Communication in Java
or How to implement
Thread Method Invocation
Jens Riboe
September
2002
Table of Contents
Problem 1 – Why you need to use synchronized
Problem 2 – Why you need to use wait and notify
Problem 3 – Why you should use notifyAll instead of notify
Problem 4 – Why you should encapsulate a mailbox on the receiving side
Problem 5 – Why you should use bounded queues
Problem 6 – Why you sometimes can’t suspend the sender, when the queue
is full
A small class library for thread message queues
TMI – Thread
Method Invocation
Problem 7 – Why you should consider Thread Method Invocation (TMI)
Non-Deterministic
Message Acceptance
Problem 8 – Why you sometimes must combine TMI with non-determinism
Problem 9 – Why you might need guards when you are using
non-determinism
The Java
platform have introduced threads as a standard tool for solving day to day
programming tasks. Before the Java era, thread programming was something for
the specialists lurking around with POSIX or Win32 Threads in C or C++.
However,
thread based programs may suffer from a complete new category of bugs essentially
different from traditionally bugs. Many of these bugs originate from
misconceptions of how threads communicate.
In this
article, I will show you step by step how to build up a robust and reliable
library for thread communication. The synopsis is problem driven, which means I
will formulated and demonstrate a bug, then analyse it and finally present a
solution.
The article
starts with the very basics, how to send some data from one thread to another.
It then introduces the concept rendezvous taken from the Ada programming
language, and renames it into Thread Method Invocation (TMI). Finally,
it discusses the idea of non-deterministic message acceptance, represented by
the SELECT statement in Ada. All discussions leads into the definition of a
small class library for thread communication, which provides both basic
primitives and ones that are more complex.
This
section discusses and motivates the minimum amount of code needed to transfer a
data message from one thread to another. It ends up into how to proper use the
keyword synchronized and the methods wait, notify and notifyAll. If you already
are very familiar with these, you might want to skim this section and proceed
to the next.
When two
(or more) threads want to interchange data, they need to co-operate about a
common place where one thread can put data and another can get
it. This place is often referred to as a critical region, which loosely
speaking means it’s critical to the correctness of the program execution. To
understand the characteristics of such a region, let’s dive into a model
problem that illustrates the problem and then how to solve it.
Consider a
single bank account object containing an integer balance value. What happens if
many actors update the account concurrently? Here is the class declaration for
the Account class.
class Account {
private int balance = 0;
public Account() {}
public void updateBalance(int amount) {
// balance += amount;
int b = getBalance(); //READ
b = b + amount; //MODIFY
setBalance(b); //WRITE
}
public int getBalance() {return balance;}
private void setBalance(int b) {balance = b;}
}
As you can
see above, I have decomposed the update statement into three statements to make
it clear exactly what is going on. To perform an update account operation we
need to read, modify and write data. These three operations are part of every
kind of update operation, independent of context.

In addition
to the Account class, we need an updater thread. A thread can loosely be viewed
as a separate program, which means, it has its own program statement pointer
and a function call stack. Threads consist of a little bit more than that, but
we leave that out for now.
The
clearest way to define a thread in Java is to extend the java.lang.Thread
class. A thread object in Java behaves like an ordinary object, plus it can
execute its run method concurrently with other threads. Let’s skip the details
of threads and focus on the significant code in the run method of thread class
Updater. Here it is
int amount =
100;
for (int i =
0; i < numTransactions; i++) {
account.updateBalance(amount);
}
for (int i =
0; i < numTransactions; i++) {
account.updateBalance(-amount);
}
As you can
see above, an updater thread increments and decrements the balance of the
account, but the net effect, after completion, is that nothing has changed,
i.e. the balance is still zero. You can view the complete source code, if you
scroll to the end of this article. At the end, you can also find a zip file
containing all source code for this article, plus an Ant build script and
needed jar files.
Now, let’s
run the program and inspect the final balance. I have simplified the command
line and the outputs for clarity. You can find detailed run instructions in the
zip file.
>java
AccountUpdater
NumUpdaters : 5
NumTransactions:
10000
----------------
Final balance
= 0
>java
AccountUpdater -u 10 -t 100000
NumUpdaters : 10
NumTransactions:
100000
----------------
Final balance
= 0
It seems to
be working perfect; end of story. Eh,… not quite. Let’s run it again but this
time load it substantially more.
>java
AccountUpdater -u 10 -t 10000000
NumUpdaters : 10
NumTransactions:
10000000
----------------
Final balance
= 1154011500
>java
AccountUpdater -u 10 -t 10000000
NumUpdaters : 10
NumTransactions:
10000000
----------------
Final balance
= -1652878200
As you can
see above, we clearly have a bug. When the number of transactions is high, the
final balance has a random value. Why?
To
understand the nature of the problem we need to understand how threads are
executing and to recap the read/modify/write operation. A thread
executes as a small program and it proceeds uninterrupted until something
forces it to stop. Besides, of the trivial reason that the program has ended,
it also takes a break when it waits for a disc I/O completion, for example.
However, that doesn’t explain our program, because we don’t have such
operations in it.
Most modern
operating systems and corresponding Java implementations are using so called
pre-emptive scheduling, which means that each thread executes, at maximum, a
specified amount of time. Then it is forced to take a break (suspend) to allow
other threads execute. Eventually, the thread resumes its execution for another
time slice. Either it hits the end of the slice or it performs a suspending
operation, like a write operation to disc. In both cases, it is suspended again.
The end of slice suspension is performed by a timer interrupt. An interrupt can
occur anytime, for example inside a read/modify/write operation.
Let’s trace
the execution of the updateBalance method. I have expanded the method body into
the caller’s for loop, so you can see what’s going on.
for (int i =
0; i < numTransactions; i++) {
// balance += amount;
int b = getBalance(); //READ
b = b + amount; //MODIFY
setBalance(b); //WRITE
}
Pretend
that we have only two updater threads. Thread 1 executes the loop, for example
10,000 times, until it reaches the end of its time slice and thread 2 starts
executing the loop. The end of a slice might appear after the modify statement,
but before the write statement. If that happens, the member variable balance
has the value 1,000,000 but the local variable b has the value 1,000,100.
The
execution now proceeds with thread 2, which increases the balance to, say,
2,000,000. When thread 1 resumes its execution, it overwrites the balance with the
value in the local variable b. Be aware of that all local variables are local
to a thread, i.e., we have one b for each thread. Instead of continuing from
2,000,000, thread 1 now continues from 1,000,100.
One single
interrupt at the “wrong” place completely spoils the result of the program. The
likelihood that an end of a time slice appears after read but before write
is quite low, because there are more statements to execute than the three in
the loop body. Therefore, we need to increase the number of transactions to
more than a million before we can discover any problem.
The
statement read/modify/write is traditionally called a critical (code) section,
because it’s critical for the correctness of the program result. The result
ends up incorrect, because of a timer interrupt, in-between read and write, but
all interrupts outside the critical section are harm less.
Java has a
proper solution to the problem. By declaring the updateBalance method as
synchronized, we have solved the problem. This keyword guarantees that only one
thread at a time can execute inside a synchronized method. If the section is
occupied, all other threads must wait in a queue outside the section.
I have
added a subclass to Account. Here it is
class
SafeAccount extends Account {
public synchronized void
updateBalance(int amount) {
super.updateBalance(amount);
}
}
Running the
demo once more with the latest argument and the new Account class, gives the
output below.
>java
AccountUpdater -u 10 -t 10000000 +s
NumUpdaters : 10
NumTransactions:
10000000
UseSynchronized:
true
----------------
Final balance
= 0
Well, it’s
not a proof, but you have to trust me – it is the solution to the problem. Run
the program with a large number of transactions and threads, and convince
yourself.
Bear in
mind that the execution time will be substantially longer than before. The
reason is that we now have many more thread context switches. Every time a
thread completes an updateBalance operation and leaves the critical section for
the next turn in the loop, the queue is non-empty, which causes the thread to
wait in the queue instead and another thread to enter the section.
Instead of
marking a method as synchronized, it’s possible to use a synchronized block
instead. Such a block takes an object argument, which is then used for
synchronization. Here is an equivalent to the synchronized method above, using
a ditto block instead
public void
updateBalance(int amount) {
synchronized (this) {
super.updateBalance(amount);
}
}
You can use
an arbitrary object as the synchronization object, for example
public void
updateBalance(int amount) {
synchronized (System.out) {
super.updateBalance(amount);
}
}
Sometimes
you must use a synchronized block, but usually you should stick to a
synchronized method instead, because it makes life a little bit simpler. Using
a single global synchronization object, as System.out, means you have exactly
one critical section in your program, which is seldom the case.
Using the
keyword this as the argument to a synchronized block might be correct depending
on the context. Can you see why the usage below is wrong?
class Data
{public int value=0;}
class Updater
extends Thread {
private Data d;
public Updater(Data d) {this.d = d;}
public void run() {
while (true) {
synchronized (this)
{d.value += 100;}
}
}
}
Every
updater thread is using its own synchronization object, instead of a common
one. This is a kind of bug that easily slips through a code review and can be
very difficult to spot later, when the application has been shipped to the
customers. You can easily correct the error, by changing to
synchronized (d) {d.value
+= 100;}
However,
the best solution is to change the Data class to use synchronized methods
instead.
Using
synchronized methods is not a silver bullet. You must also know what is proper
usage. For example, you have a class with synchronized public methods and add a
new public method, but forget to declare it as synchronized. Now you have
created a backdoor into the critical section.
This bug
can be very hard to find, because a class, in general, has many methods where
not necessarily all of them must be synchronized. Therefore, you can’t just
say; make all methods synchronized. Although, you might want to say make all public
methods synchronized.
I’m sorry
to tell you that it doesn’t stop there either. Take a look at this class, which
has all of its methods declared synchronized.
class Account
{
private int balance=0;
public synchronized void setBalance(int b) {balance=b;}
public synchronized int getBalance() {return balance;}
}
The class
declaration looks irrefutable. However, what do you say about its usage?
void performUpdate(Account a) {
a.setBalance(a.getBalance() + 100);
}
The two
methods are synchronized, but not the composite operation read/modify/write.
In fact, this is a reincarnation of our original problem. Therefore, you must
always analyse how a synchronized class should be used. The easy answer is to
skip setters and stick to synchronized getters and updaters. An updater method
takes an argument and performs the complete read/modify/write operation.
Now, you
know a lot about the usage of synchronized. However, we have not solved our
original objective; how to send data from one thread to another. I started this
section with the observation that two threads that wants to communicate must
agree on a common place where one can put data and the other can get data. The
Account class is a typical common place and we can use it as the basis for our
next class a mailbox.
Consider
two threads, Producer and Consumer, which want to communicate using a common
place (mailbox). Let’s rename the last version of class Account and change the
name of the member variable. Here is the new class, called a mailbox
class MailBox
{
private int msg = 0;
public synchronized void put(int msg)
{this.msg=msg;}
public synchronized int get()
{return msg;}
}
The
producer thread sends a stream of 1’s to the mailbox, by calling the method put
and the consumer is receiving these 1’s by calling the method get on the
mailbox object.

Below you
can find the class declarations for both the producer and the consumer
class
Producer extends Thread {
private MailBox out;
private int numMessages;
public Producer(int numMessages, MailBox out) {
this.out = out;
this.numMessages = numMessages;
}
public void run() {
for (int i = 0; i < numMessages;
i++) {
out.put(+1);
}
out.put(-1);
System.out.println("num =
"+numMessages);
}
}
class
Consumer extends Thread {
private MailBox in;
public Consumer(MailBox in) {
this.in = in;
}
public void run() {
int
sum=0, cnt=0, n;
while ((n = in.get()) >= 0) {
sum += n;
cnt++;
}
System.out.println("sum = "
+ sum);
System.out.println("cnt = "
+ cnt);
}
}
If the
producer sends 1000 1’s, I guess that you will expect the value of sum and cnt
in the consumer to be 1000. Right? So, let’s run the program and check. I have
modified the output for clarity. You can find a link to the source code in the
end of this article, where you also can find a build script and run
instructions.
>java
NumberCounter
NumMessages : 1000
----------------
Producer: num
= 1000
Consumer: sum
= 0
Consumer: cnt
= 527361
>java
NumberCounter -m 10000
NumMessages : 10000
----------------
Producer: num
= 10000
Consumer: sum
= 0
Consumer: cnt
= 0
>java
NumberCounter -m 1000000
NumMessages : 1000000
----------------
Producer: num
= 1000000
Consumer: sum
= 899622
Consumer: cnt
= 1424308
Interesting,
eh…? The sum of 1’s is always different from the number of 1’s sent and the
number of turns in the consumer’s loop is always different from the sum of the
1’s. Strange, to say the least.
Look at the
program code once more. The msg member variable is initialised to 0. If the
consumer starts executing, it will read the value 0 from the mailbox several
times. The sum value remains zero, but the cnt value increases. Eventually, the
consumer’s time slice ends and the producer starts executing. For a moderate
number of messages, can the producer complete its for loop during one time
slice and just before termination put the value –1 in the mailbox. The consumer
will then resume, picking up the –1 and break its while loop.
If the
producer starts executing, is it likely it can complete its for loop and leave
a –1 in the mailbox, which then cause the consumer to break its while loop
immediately.
If the
number of messages is large, will the two threads switch a couple times, which
results in a positive value of the sum variable.
Java has,
of course, a solution to this problem. Every class inherits directly or
indirectly from the class java.lang.Object. The Object class contains, among
others, two methods of importance in this context. The wait method suspends the
calling thread and the notify method resumes another thread that has already
been suspended of a previous wait. These methods must be paced inside a
synchronized block (method) else a IllegalMonitorStateException will be thrown.
Nevertheless, how can this solve our problem?
The root
cause of the incorrect result is that the two threads don’t wait for each
other. When the producer has sent one message, it should wait until the
consumer has received it. Moreover, the consumer must wait for the producer to
send the next message. This is in general called, event synchronization.
A thread might wait for an event (or condition) to occur and another thread
might cause the event to occur (notify).
Let’s apply
this insight and implement a subclass to Mailbox that has event
synchronization. The events (or conditions) to wait for are the mailbox is full
and it is empty respectively. The producer calls put and suspends it self if
the mailbox is already full and the consumer suspends it self if the mailbox is
empty when it calls method get. Here is the subclass
class
SafeMailBox extends MailBox {
private boolean isFull = false;
public synchronized void put(int msg) {
while (isFull)
try{ wait(); }catch(InterruptedException
x) {}
super.put(msg);
isFull = true;
notify();
}
public synchronized int get() {
while (!isFull)
try{ wait();
}catch(InterruptedException x) {}
int msg = super.get();
isFull = false;
notify();
return msg;
}
}
At the end
of each method the method, notify is called to notify the other side that its
waiting condition has changed. When a thread resumes after a wait, it rechecks
the condition to be sure. This is a good habit and should always be applied
instead of using a simpler if-statement. If we re-run the program now with the
new mailbox implementation, it works as expected. Try it for your self! You can
download the source code, if you scroll to the end of this article. Here is the
output
>java
NumberCounter -m 1000 +s
NumMessages : 1000
UseWait&Notify:
true
----------------
Producer: num
= 1000
Consumer: sum
= 1000
Consumer: cnt
= 1000
The new
mailbox contains the minimum amount of code needed for two threads to safely
interchange messages. There are variations on the theme, as we will see later
in this article, but no short cuts. Every attempt to leave out wait/notify will
cause a bug.
An object
using synchronized and wait/notify is usually referred to as a monitor,
in concurrency theory. A monitor consists of (shared) data and a set of access
methods. The only way to access the data is to call one of the access methods.
This guarantees that only one thread at a time can be executing inside a
monitor. To better, understand the semantics of a monitor look at the
illustration below.

A monitor (synchronized object) is
using two queues and a lock to realize its semantics. When a thread calls one
of the access methods, the lock is inspected. If it is unlocked, the thread
becomes the lock owner and the monitor is now locked until the thread leaves
the monitor. If the monitor is already locked, is the thread suspended and
inserted into the first queue (enterQ).
A thread executing inside a monitor
might want to check a synchronization event (condition) and if needed suspend
it self until the event occur. This is performed by the wait method in Java and
the thread is inserted into the second queue (waitQ). The monitor is also unlocked,
to allow another thread in enterQ to acquire the monitor.
Eventually,
a thread inside the monitor calls the method notify, which moves one thread
from waitQ to the front of enterQ. Whenever the notifying thread leaves the
monitor and it becomes suspended, because of an end of time slice interrupt or
some kind of suspending operation, the awoken thread resumes its execution
after the wait statement with the monitor re-acquired. The explanation above
contains some simplifications, but serves as a good model for monitors.
Java
implements a so called resume&continue monitor, that means the
notifying thread wakes up a thread from waitQ, but continues executing inside
the monitor. The alternative is a resume&wait monitor, were the
notifying thread becomes suspended and the awoken thread starts executing
inside the monitor. A third alternative is a so-called implicit resume
monitor. This variant lacks wait and notify primitives. Instead, it has
entrance conditions (called guards) and re-checks them every time a thread
leaves the monitor. This variant is an important concurrency building block in
the real-time programming language Ada95.
Now, you
know how to set up a communication link between two threads, but not between
more than two! Follow on to the next problem to understand why.
Let’s
modify our latest program a little bit, so we allow more than one producer and
let each of them send the sequence 1,2…n. The (single) consumer adds all
numbers and prints out the result when the program terminates. If we have p
number of threads, should the result be p * n * (n+1) / 2.
I’m using
the same mailbox as before, slightly modified. Here it is.
class MailBox
{
protected int data = 0;
protected boolean isFull = false;
public synchronized void put(int msg) {
while (isFull)
try{ wait();
}catch(InterruptedException x) {}
data
= msg;
isFull = true;
doNotify();
}
public synchronized int get() {
while (!isFull)
try{ wait();
}catch(InterruptedException x) {}
int msg = data;
isFull = false;
doNotify();
return msg;
}
protected void doNotify() {notify();}
}
Here is the
producer and consumer thread classes. I have simplified the code for clarity.
You can find a link to the complete source code at the end of this article.
class
Producer extends Thread {
private MailBox out;
private int numMessages;
public Producer(int numMessages, MailBox
out) {
this.out = out;
this.numMessages = numMessages;
}
public void run() {
for (int i = 1; i <= numMessages;
i++) {
out.put(i);
}
}
}
class
Consumer extends Thread {
private MailBox in;
private int sum = 0;
public Consumer(MailBox in) {
this.in = in;
}
public void run() {
int n;
while ((n = in.get()) >= 0) {
sum += n;
}
System.out.println("sum = "
+ sum);
}
}
The main
program is responsible for creating a mailbox, the consumer, and a number of
producers. It then waits for the completion of all producers, sends a –1 via
the mailbox to the consumer and waits for the completion of the consumer. Now,
let’s run the program with two producers and see what happens.
1)
>java
NumberSequencer +v -p 2
2)
NumProducers:
2
3)
Max : 10
4)
UseNotifyAll:
false
5)
Consumer:
Started
6)
Consumer:
Mailbox<data=0, isFull=false> get(): enter
7)
Consumer:
Mailbox<data=0, isFull=false> get(): before wait
8)
Producer-1:
Started