Java Condition 线程间通信

Lock常用于处理线程同步,解决并发操作,然而,Lock也可以用来实现线程间通信。但是,Lock并不能直接进行线程通信,而是要借助Condition
根据Condition接口的定义,Condition提供了awaitsignalsignalAll等几个方法,分别对标Object对象的waitnotifynotifyAll方法。
Object对象的wait等方法,需要使用在synchronized方法或者代码块中,也即在调用这些方法前,需要获取到锁对象;同样,Condition对象的await等方法,也是首先需要获取到锁,只是获取锁的方式有所不同,使用的是Lock锁。

Condition.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
* Condition提供了使一个线程挂起,直到在某些条件满足的情况下被其它线程唤起的方法。
* Conditions provide a means for one thread to suspend execution until
* notified by another thread that some state condition may now be true.
* The key property that waiting for a condition provides
* is that it <em>atomically</em> releases the associated lock and
* suspends the current thread, just like {@code Object.wait}.
*
* 假设有这样一个缓冲区buffer,支持put和take操作,如果buffer为空,则take操作的线程将被
* 挂起,如果buffer已满,则put操作的线程也将被挂起。
* 我们希望把处于等待状态中的put和take线程,分开存储到不同的wait-sets,这样我们就可以做
* 一些优化,当条件满足时,只notify一个线程。
* We would like to keep waiting {@code put} threads and {@code take}
* threads in separate wait-sets so that we can use the optimization of
* only notifying a single thread at a time when items or spaces become
* available in the buffer. This can be achieved using two
* {@link Condition} instances.
* class BoundedBuffer {
* final Lock lock = new ReentrantLock();
* final Condition notFull = lock.newCondition(); // 缓冲区未满
* final Condition notEmpty = lock.newCondition(); // 缓冲区不为空
*
* final Object[] items = new Object[100];
* int putptr, takeptr, count;
*
* public void put(Object x) throws InterruptedException {
* lock.lock();
* try {
* while (count == items.length)
* notFull.await();
* items[putptr] = x;
* if (++putptr == items.length) putptr = 0;
* ++count;
* notEmpty.signal();
* } finally {
* lock.unlock();
* }
* }
*
* public Object take() throws InterruptedException {
* lock.lock();
* try {
* while (count == 0)
* notEmpty.await();
* Object x = items[takeptr];
* if (++takeptr == items.length) takeptr = 0;
* --count;
* notFull.signal();
* return x;
* } finally {
* lock.unlock();
* }
* }
* }
*
* 和Object#monitor不同的是,Condition可以保证进入等待状态线程集合的被唤醒顺序,
*
public interface Condition {
/** 使当前线程进入休眠状态直到被唤醒或者线程被interrupt
*
* await之后lock锁对象自动释放,当前线程将不再获取线程调度资源,直到以下4种情况发生
* 1. 其它线程调用signal,并且当前线程被选中为被唤醒的线程;
* 2. 其它线程调用signalAll;
* 3. 其它线程interrupt了当前线程;
* 4. spurious(虚假的) wakeup occurs;
*
* 在所以的情形下,在该方法返回前,当前线程必须重新获取lock锁对象!
* In all cases, before this method can return the current thread must
* re-acquire the lock associated with this condition. When the
* thread returns it is <em>guaranteed</em> to hold this lock.
*
* <p>An implementation can favor responding to an interrupt over normal
* method return in response to a signal. In that case the implementation
* must ensure that the signal is redirected to another waiting thread, if
* there is one.
*/
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();

AbstractQueuedSynchronizer#ConditionObject.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* 往等待队列中添加一个Node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果lastWaiter已取消,则清除掉
if (t != null && t.waitStatus != Node.CONDITION) {
// 解除已取消的Waiter的link链接关系
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/** 删除并移动节点,直到找到一个未取消的node,也可能找不到,为空。
* Split out from signal in part to encourage(促进) compilers
* to inline(内联) the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/* 从condition队列中移动node到sync队列,如果成功,返回true.
* Transfers a node from a condition queue onto sync queue.
* @return true if successfully transferred (else the node was
* cancelled before signal)
*// AQS
final boolean transferForSignal(Node node) {
// 如果不能改变waitStatus,则节点已经被取消
// If cannot change waitStatus, the node has been cancelled.
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
/* 将node添加到队列并且尝试修改上一节点的waitStatus来表明当前线程处于等待状态
* 如果线程已经取消或者修改waitStatus失败,则唤醒线程
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// Inserts node into queue(返回node的上一个节点)
Node p = enq(node);
int ws = p.waitStatus;
// 如果waitStatus大于0,则尝试修改waitStatus为SIGNAL
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 唤醒线程
return true;
}
/* 移除并移动所有的节点(Removes and transfers all nodes.)
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
/** 解除已经取消的node节点的链接关系,只能持有锁的时候,才能调用
* 以下几种情况时会调用该方法:
* 1. condition在wait的时候取消了;
* 2. 插入新waiter的时候,lastWaiter已经取消;
* It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// public methods
/** 唤醒一个处于等待该Condition的时间最长的线程,如果存在等待线程的话;
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/** 唤醒所有的处于等待该Condition的线程;
*/
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/** 不可打断的Condition wait(uninterruptible condition wait.)
* 1. 获取lock state,通过getState方法
* 2. 调用release,如果失败抛出异常
* 3. 阻塞,直到signal
* 4. acquireQueued,重新获取锁对象
*/
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
/** 可打断的Conditoin wait(interruptible condition wait)
* 1. 如果当前线程被打断,抛出异常
* 2. 获取lock state,通过getState方法
* 3. 调用release,如果失败抛出异常
* 4. 一直阻塞直到signal,或者interrupt
* 5. signal之后重新获取锁对象(acquireQueued...)
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 添加Waiter
Node node = addConditionWaiter();
// release
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 检查线程是否已经interrupt,如果是,break
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 重新获取锁对象
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/** 检查当前线程的中断状态,如果signal之前被中断,返回THROW_IE
* 如果signal之后被中断,返回REINTERRUPT,如果未中断,则返回0;
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node)?THROW_IE:REINTERRUPT):0;
}
/* 线程取消wait后,移动node到sync队列,如果线程在signal前已经cancel,则返回true
*/
final boolean transferAfterCancelledWait(Node node) {
// (0 for normal sync nodes)
if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
/* 检查Node是否正在acquiring(获取锁),如果是,返回true
*/
final boolean isOnSyncQueue(Node node) {
// node.waitStatus为CONDITION,
// 或者node.prev为空(未在等待节点中),返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 否则,如果node.next不为空,表示有successor
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt(); // 中断线程
}
/**
* 带超时的wait(timed condition wait.)
* 1. 如果当前线程中断,抛出异常
* 2. 通过getState获取state
* 3. 调用release,如果失败抛出异常
* 4. 一直阻塞直到signal,interrupt,或者超时
* 5. 重新获取锁对象acquireQueued
* If interrupted while blocked in step 4, throw InterruptedException.
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// We don't check for nanosTimeout <= 0L here, to allow
// awaitNanos(0) as a way to "yield the lock".
final long deadline = System.nanoTime() + nanosTimeout;
long initialNanos = nanosTimeout;
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
// 超时,移动node
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
// 检查线程是否已经中断,不为0表示已中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
long remaining = deadline - System.nanoTime(); // avoid overflow
return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
}
/**
* await到指定时间(absolute timed condition wait.)
* 4. 阻塞直到signal,interrupt,或者超时。
* <li>If interrupted while blocked in step 4, throw InterruptedException.
* <li>If timed out while blocked in step 4, return false, else true.
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() >= abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 指定时间(timed condition wait.)
* 步骤同上
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
// We don't check for nanosTimeout <= 0L here, to allow
// await(0, unit) as a way to "yield the lock".
final long deadline = System.nanoTime() + nanosTimeout;
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 判断condition是否是sync所创建的
*/
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
/** 查询是否有线程在等待该condition
* Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}.
* @return {@code true} if there are any waiting threads
*/
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
/** 获取等待线程的数量
* Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}.
*/
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
/** 返回正处于等待状态的线程合集
* Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}.
*/
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}

执行流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Thread0: t0BeforeLock...
Thread0: compareAndSetState: 1 - true // 修改state成功,获取到锁
Thread0: t0AfterLock...
Thread0: await... // 进入等待Condition状态
Thread0: addConditionWaiter // 新添加Node并返回
Thread0: release: 1 // 释放锁
Thread0: setState: 0 // 修改state为0
Thread0: await>>savedState: 1 // 修改之前的state为1,之后线程重新获取锁对象
Thread0: await>>node.waitStatus: -2 // 新添加的node的waitStatus为Condition
Thread0: isOnSyncQueue = -2 // node的waitStatus为-2,
// 表示node没有在sync queue中,则线程进入等待状态
Thread0: LockSupport.park // 线程进入等待状态
Thread1: t1BeforeLock...
Thread1: compareAndSetState: 1 - true // 修改state成功,获取到锁
Thread1: t1AfterLock...
Thread1: doSignal>>first.waitStatus: -2 // signal,节点的waitStatus为-2
Thread1: transferForSignal // 将node的waitStatus从-2修改为0
Thread1: compareAndSetWaitStatus: 0
Thread1: enq // 将node加入sync queue,并返回前一个节点
Thread1: transferForsignal>>p.waitStatus: 0
Thread1: compareAndSetWaitStatus: -1 // 修改前一节点的waitStatus为-1(SIGNAL)
Thread1: t1BeforeUnlock...
Thread1: release: 1 // 释放锁
Thread1: setState: 0
Thread1: unparkSuccessor>>node.waitStatus: -1 // 激活线程,将waitStatus从-1改为0
Thread1: compareAndSetWaitStatus: 0
Thread1: s: $node@814013
Thread1: t1AfterUnlock...
Thread0: checkInterruptWhileWaiting // 检查线程是否已经interrupt
Thread0: isOnSyncQueue = 0 // 如果未取消,isOnSyncQueue返回true
Thread0: compareAndSetState: 1 - true // 重新获取锁对象
Thread0: t0BeforeUnLock...
Thread0: release: 1 // 任务执行完成,释放锁
Thread0: setState: 0
Thread0: t0AfterUnLock...