synchronized的线程等待唤醒机制是通过锁对象Monitor的 notify/notifyAll/wait实现,Monitor中的等待队列只有一条所以无法实现分组的等待唤醒。显示锁的等待唤醒方法可以通过一个或多个Condition来处理等待唤醒,一个Condition就是AQS中的一条等待队列。
AQS中的等待队列
ReentrantLock的条件变量Condition就是一个ConditionObject对象。1
2
3final ConditionObject newCondition() {
return new ConditionObject();
}
ConditionObject是AbstractQueuedSynchronizer的内部类,一个ConditionObject是一条等待队列。
重要属性
1 | /** 等待队列的头节点 */ |
Node属性状态
Node的waitStatus表示节点状态。
CONDITON: 节点在等待队列中,节点线程等Condition唤醒
CANCELLED: 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态
SIGNAL: 当前节点释放锁的时候,需要唤醒下一个节点
PROPAGATE: 表示下一次共享式同步状态获取将会无条件地传播下去
await方法源码解析
这个await的过程是将同步队列的首节点(即获取锁的节点)移到同步队列的尾节点。
先判断中断标志是否为true,抛出中断异常返回。否继续执行
1
2if (Thread.interrupted())
throw new InterruptedException();通过addConditionWaiter() 方法将当前线程封装成Node节点,并追加到队列尾。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private Node addConditionWaiter() {
Node t = lastWaiter;
//尾节点被取消,重新指定尾节点
if (t != null && t.waitStatus != Node.CONDITION) {
//清除状态不是Node.CONDITION的节点。
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}释放锁,返回同步状态。释放锁失败的话,节点的waitStatus为CANCELLED。
1
int savedState = fullyRelease(node);
判断当前节点是否在同步队列中,不在说明是等待队列中的节点用LockSupport.park进行阻塞。当节点被中断时设置
中断模式interruptMode1
2
3
4
5while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}被唤醒会再次尝试获取锁。acquireQueued获取锁需要前驱节点是同步队列的头节点。可以看到当前Node是没有设置prev,prev在什么地方设置。
1
2
3
4
5
6if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 清除等待队列中取消状态的节点
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
signal方法源码
唤醒等待队列的头节点。
1
2
3
4
5
6
7public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}将firstWaiter设置为现在头节点的下一个节点。并将当前节点的nextWaiter设置为null。transferForSignal唤醒线程失败(Node可能已取消导致失败)后,会唤醒下一个节点,直到有一个节点被唤醒。
1
2
3
4
5
6
7
8private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}transferForSignal方法会通过cas更新Node的waitStatus属性为0。再将当前节点追加同步队列的尾部(这里可以解释await方法被唤醒时为什么有前驱节点)。再设置Node.waitStatus=Node.SIGNAL。唤醒线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15final boolean transferForSignal(Node node) {
/*
* 节点被取消时,cas更新失败
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将节点追加同步队列队尾
Node p = enq(node);
int ws = p.waitStatus;
//只要节点状态不是取消,将节点状态设置成SIGNAL(作用可以看下CLH队列)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}