Java ReentrantLock 多线程同步锁

Java 锁对象 ReentrantLock,用于处理线程同步,和synchronized方法或者代码块类似,获取到锁对象的线程,将独享代码块,其它线程不能访问;
ReentrantLock实现了Lock接口(提供了lockunlocknewCondition等方法),实际是真正处理加锁,释放锁的是它的内部类Sync,而Sync又继承自 AbstractQueuedSynchronizer

ReentrantLock.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 具有和隐式监视器锁{**synchronized**}方法和语句基本相同的可重入互斥锁,但我们可以扩展该锁。
* 锁对象被最后一个lock成功但未unlock的线程占有,在线程中调用lock,如果锁对象未被其它线程
* 所占有,并且成功的获取到了锁,lock方法会立即返回。
* 如果当前线程已经获取到了该锁对象,lock方法也会立即返回。
* This can be checked using methods isHeldByCurrentThread or getHoldCount.
*
* 构造函数接收参数fairness,表示是否公平锁,如果设置为true,则在线程间争夺锁时,
* 等待时间最长的那个锁将获取到锁对象,否则,锁对象不对谁获取到锁做任何的优先级顺序,
* 公平锁 vs 非公平锁:有许多个线程的情况下,使用公平锁会导致吞吐量比较低,而非公平锁则相反,
* 但是公平锁在获取锁和保证缺乏饥饿的时间上有较小的差异。
* 记注:公平锁不保证公平的线程调度!!!that fairness of locks does not guarantee
* fairness of thread scheduling.
* 1. Thus, one of many threads using a fair lock may obtain it multiple times
* in succession while other active threads are not progressing and not
* currently holding the lock.
* 2. Also note that the untimed {@link #tryLock()} method does not
* honor the fairness setting. It will succeed if the lock
* is available even if other threads are waiting.
* /

Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class ReentrantLock implements Lock, java.io.Serializable {
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* synchronization control 基类,有公平和非公平两个子类,Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
// 非公平的acquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果state为0,则cas操作state为1,如果成功,返回true。
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果当前线程是当前获取到独享该锁对象的线程,则增加state;
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// acquire失败,返回结果false;
return false;
}
// 尝试解锁,如果state为0表示该锁已经被线程释放,此时设置exclusiveThread为null
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
}

NonfainSync & FairSync

NonfairSyncFairSync 都继承自 Sync,这两种锁对 lock 方法和 tryAcquire 方法,有不同的处理逻辑,以区分公平与非公平锁的处理逻辑;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static final class NonfairSync extends Sync {
/**
* Try immediate barge, backing up to normal acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 公平锁
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant(授予) access unless
* recursive(循环) call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 相比于nonfairTryAcquire,多了{hasQueuedPredecessors}的判断
// 用于查询是否有线程在获取锁时等待了更长的时间
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

hasQueuedPredecessors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/* 用于查询是否有线程已经在获取锁对象时等待了更长的时间.
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
*
* An invocation of this method is equivalent(相等) to (but may be
* more efficient(有效率的) than):
* {@code
* getFirstQueuedThread() != Thread.currentThread() &&
* hasQueuedThreads()}
*
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current
* thread. Likewise, it is possible for another thread to win a
* race to enqueue after this method has returned {@code false},
* due to the queue being empty.
*
* <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire).
* For example, the {@code tryAcquire} method for a fair, reentrant,
* exclusive mode synchronizer might look like this:
*
* <pre>
* protected boolean tryAcquire(int arg) {
* // 当前线程已经获取到锁
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* // 有等待了更长时间的线程
* return false;
* } else {
* // 正常的acquire
* }
* }}</pre>
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// head != tail
// (head.next为空?? 或者 head.next的thread不为当前线程)
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

为什么head.next为空也表示有已经等待了更长时间的thread?

AbstractQueuedSynchronizer.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
/**
* 提供一个依赖FIFO队列实现的阻塞锁和同步器框架,该类依赖一个atomicInteger的state来表示
* 状态,子类必须定义方法来改变state,并且定义state所表示的不同状态,当acquired和
* released的时候。
*
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/* 等待队列的头节点, 只能通过setHead来修改. 如果head不为空,它的waitStatus
* 得需要保证不为CANCELLED
*/
private transient volatile Node head;
/* 等待队列的尾节点,只有通过enq添加节点的时候才能修改
*/
private transient volatile Node tail;
// 同步状态
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// 修改state,如果成功,返回true
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSwapInt(this, STATE, expect, update);
}
/**
* 插入节点到queue,如果队列为空的话,先初始化队列。
* @param node the node to insert
* @return 返回节点的上一任节点
*/
private Node enq(Node node) {
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
// 设置node的前一个节点为oldTail
U.putObject(node, Node.PREV, oldTail);
// 将node设置为新的tail
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}
/**
* 创建新节点并添加到队列中,并返回新的节点;
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
// 设置node的前一个节点为oldTail
U.putObject(node, Node.PREV, oldTail);
// 将新建的node设置为新的tail
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
// 如果队列为空,则初始化队列,此时head和tail相同;
initializeSyncQueue();
}
}
}
/* 设置头节点,所以会出列,清空队列;Called only by acquire methods.
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
public final void acquire(int arg) {
// tryAcquire,如果失败,则添加一个Node到Waiter链中
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/* 用于 独享不可打断模式 下,已经存在于队列中的线程的acquire;
* 用于Condition的等待方法和acquire方法.
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @return {@code true} if interrupted while waiting
*/
// @ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
// 获取node的前一个节点;
final Node p = node.predecessor();
// 如果前一个节点是head,则再次尝试: tryAqcuire,
// 如果成功,则将head设置成为新的node;
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
// Checks and updates status for a node that failed to acquire.
// 此方法被执行两次,p的初始waitStatus为0,
// shouldParkAfterFailedAcquire先将waitStatus设置为-1,表示节点
// 处于SIGNAL状态,需要等待release信号来进行释放(此时线程还未park)
// 返回false,则后面的parkAndCheckInterrupt方法不会被执行。
// 所以该循环会再次执行,shouldParkAfterFailedAcquire返回true后,
// 将调用parkAndCheckInterrupt将线程park;
if (shouldParkAfterFailedAcquire(p, node)
&& parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
/** 检察并更新节点在acquire失败之后,线程是否应该挂起,如果返回true表示线程应该被挂起。
* 这是所有的acquire锁的循环中主要的信号控制方法。
* 方法的两个参数,要求pred是node节点的前一个节点.Requires pred == node.prev.
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// waitStatus是SIGNAL,表示在等待release信号
if (ws == Node.SIGNAL)
// 节点已经设置了SIGNAL(-1)状态,处于等待release信号中,可以安全的挂起。
return true;
if (ws > 0) {
/* 上一个节点被取消了,寻找上一个未取消掉的Node节点,
* 并将参数中的node节点作为寻找到的节点的next节点;
* /
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* waitStatus必须是0或者PROPAGATE(-2),表示需要一个信号,但又还未暂停;
* 调用者需要在park之前重新调用该方法以确保不能acquire;
* 见{acquireQueued}方法中的循环;*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted.
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// 挂起线程
return Thread.interrupted();
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}
*/
/// @ReservedStackAccess
public final boolean release(int arg) {
// 如果release成功
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 唤醒node的successor,如果存在的情况下
*/
private void unparkSuccessor(Node node) {
/* If status is negative (possibly needing signal) try
* to clear in anticipation(期待) of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// head: waitStatus = -1;
// head.next: waitStatus = -1;
// head.next.next: waitStatus = 0;(head.next.next.next为null)
int ws = node.waitStatus; // ws通常是-1
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/*
* 待unpark的thread, 通常是next节点中的thread
* Thread to unpark is held in successor, which is normally
* just the next node.
* 如果node已经取消或者为空,则从后向前寻找实际上未被取消的successor
*/
Node s = node.next;
// 如果node.next为空,或者next的waitStatus大于0,则从后向前查找。
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
//
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果release后状态已经是0了,则release成功。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}

AbstractOwnableSynchronizer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* A synchronizer that may be exclusively owned by a thread. This
* class provides a basis for creating locks and related synchronizers
* that may entail a notion of ownership. The
* {@code AbstractOwnableSynchronizer} class itself does not manage or
* use this information. However, subclasses and tools may use
* appropriately maintained values to help control and monitor access
* and provide diagnostics.
* 一个同步器,仅仅能被一个线程所持有,该类提供了基本的创建lock的准则,以及可能涉及到所有权的相关的同步器。本身并不管理这些信息,由子类去管理;
*/
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/**
* The current owner of exclusive mode synchronization.
* 独占模式同步的当前所有者。
*/
private transient Thread exclusiveOwnerThread;
/**
* Set the thread that currently owns exclusive access. A
* {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* 设置当前占有控制权的线程
*/
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}
/**
* This method does not otherwise impose any synchronization
* or {@code volatile} field accesses.
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}

Example1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
public static void main(String[] args) throws InterruptedException {
final ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread("A") {
@Override
public void run() {
print("t1-pre");
lock.lock();
lock.lock();
printThread(lock);
// holderCount = 2,因为上面调用了两次lock()方法;
print("HoldCount = " + lock.getHoldCount());
print("t1");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
print("t1 - end");
lock.unlock();
lock.unlock();
print("t1 - end2");
}
};
t1.start();
Thread t2 = new Thread("B") {
@Override
public void run() {
print("t2-pre");
lock.lock();
printThread(lock);
print("HoldCount = " + lock.getHoldCount()); // holderCount = 1
print("t2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
print("t2 - end");
lock.unlock();
print("t2 - end2");
}
};
t2.start();
}
public static void printThread(ReentrantLock lock) {
try {
Field sync = lock.getClass().getDeclaredField("sync");
sync.setAccessible(true);
Object o = sync.get(lock);
print("lock.sync = " + o);
Field f = AbstractOwnableSynchronizer.class.
getDeclaredField("exclusiveOwnerThread");
f.setAccessible(true);
Object o2 = f.get(o);
print("lock.sync.exclusiveOwnerThread = " + o2);
} catch (Exception e) {
}
}
  • 运行结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
t1-pre
t2-pre
lock.sync = java.util.concurrent.locks.ReentrantLock$NonfairSync@758916e9[State = 2, nonempty queue]
lock.sync.exclusiveOwnerThread = Thread[A,5,main]
HoldCount = 2
t1
t1 - end
t1 - end2
lock.sync = java.util.concurrent.locks.ReentrantLock$NonfairSync@758916e9[State = 1, empty queue]
lock.sync.exclusiveOwnerThread = Thread[B,5,main]
HoldCount = 1
t2
t2 - end
t2 - end2

Example2

  • 假设有3个线程,线程运行的时候,都先获取锁,然后在耗时操作(15s)之后再释放锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
new Thread(name) {
@Override
public void run() {
Log.e("beforeLock...", Thread.currentThread().getName());
lock.lock();
Log.e("afterLock...", Thread.currentThread().getName());
try {
Thread.sleep(15 * 1000);
} catch (InterruptedException e) {
}
Log.e("willUnLock...", Thread.currentThread().getName());
lock.unlock();
Log.e("afterUnLock...", Thread.currentThread().getName());
}
}.start();
  • lock/unlock过程:

Thread1:

1. compareAndSetState => true
2. setExclusiveOwnerThread
3. // TODO
4. unlock -> tryRelease => true
5. unparkSuccessor // 此时等待队列中已经有等待的线程,所以会执行该方法
   // 该方法将head的waitStatus设置为0
6. LockSupport.unpark(thread) // 该thread就是Thread2

Thread2:

1. compareAndSetState => false
2. acquire
3. tryAcquire
4. nonfairTryAcquire => false
5. addWaiter => node,此时node.waitStatus为0
6. acquireQueued(node, 1)
7. shouldParkAfterFailedAcquire // 执行两次,修改node.prev.waitStatus为-1
8. parkAndCheckInterrupt() // 线程挂起,等待Thread1#unpark之后激活线程
9. setHead() // 将Thread1的node设置为head
10. lock => 成功
11. // TODO
12. unlock -> tryRelease => true
13. unparkSuccessor // 此时等待队列中已经有等待的线程,所以会执行该方法,
    // 该方法将head的waitStatus设置为0
14. LockSupport.unpark(thread) // 该thread就是Thread3

Thread3:

1. compareAndSetState => false
2. acquire
3. tryAcquire
4. nonfairTryAcquire => false
5. addWaiter => node,此时node.waitStatus为0
6. acquireQueued(node, 1)
7. shouldParkAfterFailedAcquire // 执行两次,修改node.prev.waitStatus为-1
8. parkAndCheckInterrupt() // 线程挂起,等待Thread2#unpark之后激活线程
9. setHead() // 将Thread2的node设置为head
10. lock => 成功
11. // TODO
12. unlock -> tryRelease => true
13. h.waitStatus = 0; // 不执行unparkSuccessor
  • 执行结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
E/beforeLock...: Thread-1-1517233082922
E/afterLock...: Thread-1-1517233082922
E/beforeLock...: Thread-2-1517233083501
E/beforeLock...: Thread-3-1517233084150
E/willUnLock...: Thread-1-1517233097923
E/afterUnLock...: Thread-1-1517233097924
E/afterLock...: Thread-2-1517233097924
E/willUnLock...: Thread-2-1517233112926
E/afterUnLock...: Thread-2-1517233112926
E/afterLock...: Thread-3-1517233112927
E/willUnLock...: Thread-3-1517233127929
E/afterUnLock...: Thread-3-1517233127929