* 提供一个依赖FIFO队列实现的阻塞锁和同步器框架,该类依赖一个atomicInteger的state来表示
* 状态,子类必须定义方法来改变state,并且定义state所表示的不同状态,当acquired和
* released的时候。
*
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/* 等待队列的头节点, 只能通过setHead来修改. 如果head不为空,它的waitStatus
* 得需要保证不为CANCELLED
*/
private transient volatile Node head;
*/
private transient volatile Node tail;
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSwapInt(this, STATE, expect, update);
}
* 插入节点到queue,如果队列为空的话,先初始化队列。
* @param node the node to insert
* @return 返回节点的上一任节点
*/
private Node enq(Node node) {
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}
* 创建新节点并添加到队列中,并返回新的节点;
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
* 用于Condition的等待方法和acquire方法.
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)
&& parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
* 这是所有的acquire锁的循环中主要的信号控制方法。
* 方法的两个参数,要求pred是node节点的前一个节点.Requires pred == node.prev.
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
* 并将参数中的node节点作为寻找到的节点的next节点;
* /
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* waitStatus必须是0或者PROPAGATE(-2),表示需要一个信号,但又还未暂停;
* 调用者需要在park之前重新调用该方法以确保不能acquire;
* 见{acquireQueued}方法中的循环;*/
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
* Convenience method to park and then check if interrupted.
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
* 唤醒node的successor,如果存在的情况下
*/
private void unparkSuccessor(Node node) {
* to clear in anticipation(期待) of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
* 待unpark的thread, 通常是next节点中的thread
* Thread to unpark is held in successor, which is normally
* just the next node.
* 如果node已经取消或者为空,则从后向前寻找实际上未被取消的successor
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}