JUC之AQS的Condition等待队列

synchronized的线程等待唤醒机制是通过锁对象Monitor的 notify/notifyAll/wait实现,Monitor中的等待队列只有一条所以无法实现分组的等待唤醒。显示锁的等待唤醒方法可以通过一个或多个Condition来处理等待唤醒,一个Condition就是AQS中的一条等待队列。

AQS中的等待队列

ReentrantLock的条件变量Condition就是一个ConditionObject对象。

1
2
3
final ConditionObject newCondition() {
return new ConditionObject();
}

ConditionObject是AbstractQueuedSynchronizer的内部类,一个ConditionObject是一条等待队列。

image

重要属性

1
2
3
4
/** 等待队列的头节点 */
private transient Node firstWaiter;
/** 等待队列的尾节点 */
private transient Node lastWaiter;

Node属性状态

Node的waitStatus表示节点状态。

CONDITON: 节点在等待队列中,节点线程等Condition唤醒

CANCELLED: 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态

SIGNAL: 当前节点释放锁的时候,需要唤醒下一个节点

PROPAGATE: 表示下一次共享式同步状态获取将会无条件地传播下去

await方法源码解析

这个await的过程是将同步队列的首节点(即获取锁的节点)移到同步队列的尾节点。

  1. 先判断中断标志是否为true,抛出中断异常返回。否继续执行

    1
    2
    if (Thread.interrupted())
    throw new InterruptedException();
  2. 通过addConditionWaiter() 方法将当前线程封装成Node节点,并追加到队列尾。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    private Node addConditionWaiter() {
    Node t = lastWaiter;
    //尾节点被取消,重新指定尾节点
    if (t != null && t.waitStatus != Node.CONDITION) {
    //清除状态不是Node.CONDITION的节点。
    unlinkCancelledWaiters();
    t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
    firstWaiter = node;
    else
    t.nextWaiter = node;
    lastWaiter = node;
    return node;
    }
  3. 释放锁,返回同步状态。释放锁失败的话,节点的waitStatus为CANCELLED。

    1
    int savedState = fullyRelease(node);
  4. 判断当前节点是否在同步队列中,不在说明是等待队列中的节点用LockSupport.park进行阻塞。当节点被中断时设置
    中断模式interruptMode

    1
    2
    3
    4
    5
    while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;
    }
  5. 被唤醒会再次尝试获取锁。acquireQueued获取锁需要前驱节点是同步队列的头节点。可以看到当前Node是没有设置prev,prev在什么地方设置。

    1
    2
    3
    4
    5
    6
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // 清除等待队列中取消状态的节点
    unlinkCancelledWaiters();
    if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

signal方法源码

  1. 唤醒等待队列的头节点。

    1
    2
    3
    4
    5
    6
    7
    public final void signal() {
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
    doSignal(first);
    }
  2. 将firstWaiter设置为现在头节点的下一个节点。并将当前节点的nextWaiter设置为null。transferForSignal唤醒线程失败(Node可能已取消导致失败)后,会唤醒下一个节点,直到有一个节点被唤醒。

    1
    2
    3
    4
    5
    6
    7
    8
     private void doSignal(Node first) {
    do {
    if ( (firstWaiter = first.nextWaiter) == null)
    lastWaiter = null;
    first.nextWaiter = null;
    } while (!transferForSignal(first) &&
    (first = firstWaiter) != null);
    }
  3. transferForSignal方法会通过cas更新Node的waitStatus属性为0。再将当前节点追加同步队列的尾部(这里可以解释await方法被唤醒时为什么有前驱节点)。再设置Node.waitStatus=Node.SIGNAL。唤醒线程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    final boolean transferForSignal(Node node) {
    /*
    * 节点被取消时,cas更新失败
    */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;

    //将节点追加同步队列队尾
    Node p = enq(node);
    int ws = p.waitStatus;
    //只要节点状态不是取消,将节点状态设置成SIGNAL(作用可以看下CLH队列)
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
    return true;
    }