Java 线程调度之 wait 和 notify

wait(),notify(),notifyAll() 是属于Object对象的jvm级native方法,用于线程调度;

wait()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Causes the current thread to wait until either another thread invokes the
* {@link java.lang.Object#notify()} method or the
* {@link java.lang.Object#notifyAll()} method for this object, or a
* specified amount of time has elapsed.
* The current thread must own this object's monitor.
*
* This method causes the current thread to place itself
* in the wait set for this object and then to relinquish(放弃、让出)
* any and all synchronization claims(夺走,主张) on this object. Thread
* becomes disabled for thread scheduling purposes and lies dormant
* until one of four things happens:
*
* 1. Some other thread invokes the {@code notify} method for this
* object and thread happens to be arbitrarily chosen as
* the thread to be awakened.
* 2. Some other thread invokes the {@code notifyAll} method for this
* object.
* 3. Some other thread {Thread#interrupt() interrupts}
* 4. The specified amount of real time has elapsed, more or less.
*
* The thread is then removed from the wait set for this
* object and re-enabled for thread scheduling. It then competes in the
* usual manner with other threads for the right to synchronize on the
* object; once it has gained control of the object, all its
* synchronization claims on the object are restored to the status
*
* Thread <var>T</var> then returns from the
* invocation of the {@code wait} method. Thus, on return from the
* {@code wait} method, the synchronization state of the object and of
* thread {@code T} is exactly as it was when the {@code wait} method
* was invoked.
*
* /
public final native void wait() throws InterruptedException;

调用对象的该方法的将使线程进入等待状态,直到另外一个线程调用 notify() 或者 notifyAll() 后重新唤起该线程,该方法只能在获取到对象的内置锁之后,才能进行调用,wait() 方法调用之后,线程将放弃掉对象的内置锁,在线程再次被唤起之前,线程将重新获取到该内置锁。

可以给一个处于等待的线程可以发送 interrupt() 消息来中断等待。

A waiting thread can be sent {@code interrupt()} to cause it to prematurely stop waiting, so {@code wait} should be called in a loop to check that the condition that has been waited for has been met before continuing.

interrupt

interrupt()方法的工作仅仅是改变中断状态,并不是直接中断正在运行的线程。中断的真正原理是当线程被Object.wait(),Thread.join()或sleep()方法阻塞时,调用interrupt()方法后改变中断状态,而wait/join/sleep这些方法内部会不断地检查线程的中断状态值,当发现中断状态值改变时则抛出InterruptedException异常;对于没有阻塞的线程,调用interrupt()方法是没有任何作用。

notify()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Wakes up a single thread that is waiting on this object's
* monitor. If any threads are waiting on this object, one of them
* is chosen to be awakened. The choice is arbitrary(任意的).
* A thread waits on an object's monitor by calling {@code wait} methods.
*
* The awakened thread will not be able to proceed until the current
* thread relinquishes(放弃,交出) the lock on this object. The awakened thread
* will compete in the usual manner with any other threads that might be
* actively competing to synchronize on this object; for example, the
* awakened thread enjoys no reliable privilege or disadvantage in being
* the next thread to lock this object.
*
* This method should only be called by a thread that is the owner
* of this object's monitor. A thread becomes the owner of the
* object's monitor in one of three ways:
*
* 1. By executing a synchronized instance method of that object.
* 2. By executing the body of a {@code synchronized} statement
* that synchronizes on the object.
* 3. For objects of type {@code Class,} by executing a
* synchronized static method of that class.
*
* Only one thread at a time can own an object's monitor.
*
* @exception IllegalMonitorStateException if the current thread is not
* the owner of this object's monitor.
*/
public final native void notify();

调用对象的notify()方法,将会唤醒调用了该对象的wait()方法导致进入等待状态的线程;如果超过一个线程处于等待状态,JVM将会选择一个线程来唤醒(无序),被选择的线程不会立即执行,当前线程需要先释放掉锁之后,并且,被选择的线程还需要和其它尝试锁定该对象的线程竞争;

下面的一个对象的两个方法,比如线程 A 在调用getInt1之后线程进入休眠状态,在某一时刻notify方法被调用,线程 A 被JVM选择作为被唤醒的线程,但是,与此同时,线程 B 又调用了getInt2方法,此时线程 A 还需要和其它同样使用到了锁对象的线程(线程 B)竞争系统资源,只有线程 A 得到系统资源之后,才能继续向下执行;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public int getInt2() {
synchronized(this) {
Thread.sleep(1000);
}
return 2;
}
public int getInt1() {
synchronized(this) {
while(flag == 0) {
wait();
}
}
return 1;
}

notifyAll()

notify()类似,但它是唤醒所有的处于wait状态的线程;

  • 调用了wait方法使持有该对象的线程把该对象的控制权交出去,然后处于等待状态。
  • 调用了notify方法通知某个正在等待这个对象的控制权的线程可以继续运行。
  • 调用了notifyAll方法通知所有等待这个对象控制权的线程继续运行。

Example1

  • WaitRunnable,线程开启之后,将 sleep 指定的时间,然后重置 flag,再 notifyAll() 唤醒所有的等待线程;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class WaitRunnable implements Runnable {
private Object obj;
private int flag = 0;
public long sleepTime = 1000;
public void run() {
try {
Thread.sleep(sleepTime);
obj = new Object();
flag = 1;
} catch (InterruptedException e) {
flag = 1; // 线程被interrupt,更改flag
}
synchronized (this) {
notifyAll();
}
}
public Object getData() {
synchronized (this) {
while (flag == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
return obj;
}
}
  • main
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(4);
WaitRunnable r1 = new WaitRunnable();
executor.submit(r1);
WaitRunnable r2 = new WaitRunnable();
executor.submit(r2);
final WaitRunnable r3 = new WaitRunnable();
r3.sleepTime = 2000;
// r3的sleepTime被指定为2000,r3通过Thread调用,该Thread在1500ms之后interrupt;
final Thread t3 = new Thread() {
@Override
public void run() {
r3.run();
print("getO3 = " + System.currentTimeMillis());
Object o3 = r3.getData();
print("o3 = " + o3);
}
};
t3.start();
Runnable interruptRuna = new Runnable() {
public void run() {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
t3.interrupt(); // 中断线程
}
};
executor.submit(interruptRuna);
print("getO1 = " + System.currentTimeMillis());
Object o1 = r1.getData();
print("o1 = " + o1);
executor.shutdown();
}
//
getO1 = 1516497874842
getO3 = 1516497876347
o3 = null
o1 = java.lang.Object@1d44bcfa

Example2

生产者与消费者实例:

1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
Plate lz = new Plate();
for (int i = 0; i < 10; i++) {
new Thread(new Plate.GetRuna(lz)).start();
}
for (int i = 0; i < 10; i++) {
new Thread(new Plate.AddRuna(lz)).start();
}
}

Plate是一个篮子,可以往里面放egg,同一时刻,篮子里面只能有一个或者零个egg,放入一个,必须被取走之后才能继续往里面放,取了之后,等待再放入之后再重新取;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class Plate {
private int egg = 0;
private Object o1 = new Object();
private Object o2 = new Object();
public void getEgg() {
synchronized (o1) {
while (egg <= 0)
try {
o1.wait();
} catch (InterruptedException e) {
}
}
synchronized (o2) {
egg--;
o2.notify();
System.out.println("get - " + egg);
}
}
public synchronized void addEgg() {
synchronized (o2) {
while (egg >= 1)
try {
o2.wait();
} catch (InterruptedException e) {
}
}
synchronized (o1) {
egg++;
o1.notify();
System.out.println("add - " + egg);
}
}
public static class AddRuna implements Runnable {
private final Plate lz;
public AddRuna(Plate lz) {
this.lz = lz;
}
public void run() {
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
lz.addEgg();
}
}
public static class GetRuna implements Runnable {
private final Plate lz;
public GetRuna(Plate lz) {
this.lz = lz;
}
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
lz.getEgg();
}
}
}

注意 addEgg 和 getEgg 方法中,notify和wait时,锁对象是不同的!

  • wait()、notify()方法都需要在获取到对象的锁(monitor)之后,才能够进行调用,否则将抛异常。获取锁通过synchronized关键字获取,锁对象与调用wait()、notify()的对象必须是同一个对象。
  • synchronized方法或代码块在执行的时候获取到对象锁,调用wait()方法当前线程会进入等待状态,并且会释放掉锁,之后其它线程也可以获取该对象的锁,即其它线程可以继续调用wait()方法,notify()方法。
  • notify()方法是无序的,notify()之后,其它等待状态中的线程(调用wait()方法进入等待状态)或者尝试获取锁对象的线程(synchronized关键字修饰的代码块或方法)根据CPU的调度随机获取到锁。
  • notifyAll()方法用于激活所有进入等待状态的线程。

其它链接