JUC之AQS

AbstractQueuedSynchronizer简称 AQS。AQS是jdk提供的一个用于实现阻塞锁和依赖于先进先出等待队列的相关同步器(信号量,事件等)。AQS是concurrent包的基石。

AQS 状态。

它依赖一个int类型的原子变量来表示状态。

1
private volatile int state;//共享变量,使用volatile修饰保证线程可见性

对state操作的基本方法

1
2
3
4
5
protected final int getState()

protected final void setState(int newState)

protected final boolean compareAndSetState(int expect, int update)

子类必须实现其protected的方法用于改变这个状态和根据该状态的获取和释放定义状态的含义。

AQS实现同步器

AQS支持两种独占共享两种同步方式。独占的实现有ReentrantLock,共享的实现有CountDownLatch,Semaphore。读写锁的读锁用了共享锁,写锁用来独占锁。AQS是通过重写以下方法来实现同步。

  1. 共享锁是满足一定条件的都可以获取锁,具体实现逻辑在tryAcquireShared中实现。
  2. 独占锁是同时只能有一个线程占用锁。
通过AQS实现锁需要重写的方法

AQS用了模板设计模式,我们不用关心获取资源失败,线程排队,线程阻塞/唤醒等一系列复杂的实现,这些都在AQS中为我们处理好了。我们只需要负责好自己的那个环节就好,也就是获取/释放共享资源state。用户可以根据自己的要求很轻松的扩展AQS。

实现独占锁要重写的方法

1
2
3
4
5
6
7
//独占式获取同步状态,试着获取,成功返回true,反之为false
protected boolean tryRelease(int arg)
//释放锁方法,返回true代表成功释放
protected boolean tryAcquire(int arg)

是否在独占模式下被线程占用,true为被占用。
protected boolean isHeldExclusively()

实现共享锁要重写的方法

1
2
3
4
5

//返回值大于0则是成功占用锁,小于0则是阻塞
protected int tryAcquireShared(int arg)
//释放锁方法,返回true代表成功释放
protected boolean tryReleaseShared(int arg)

使用

独占和共享方式都有3个占用锁方法和一个释放锁方法。内部调用了上面实现的tryAcquire和tryRelease。

独占锁使用方法

1
2
3
4
5
6
7
8
9
10
11
12

public final void acquire(int arg)

//通过首先检查中断状态,如果中断,中止
public final void acquireInterruptibly(int arg)
throws InterruptedException
//阻塞时长超出设置事件则跳出
public final boolean tryAcquireNanos(int arg,
long nanosTimeout)
throws InterruptedException

public final boolean release(int arg)

共享锁使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
//占用共享锁方法,最终都是调用了实现的tryAcquireShared方法。
public final void acquireShared(int arg)

//通过首先检查中断状态,如果中断,中止
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException
//阻塞时长超出设置事件则跳出。
public final boolean tryAcquireSharedNanos(int arg,
long nanosTimeout)
throws InterruptedException

//释放锁方法
public final boolean releaseShared(int arg)

独占锁实现

JDK文档中带的一个独占锁例子.

这例子实现的是不可重入的独占锁,只允许一个线程持有锁。

这里state=0时表示锁还没被占用,state=1时表示锁处于占用状态。这样一个线程占用锁时,通过cas把state置为1,通过setExclusiveOwnerThread方法设置当前线程为独占线程。其他线程再通过cas时,因为旧值不为0,所以cas失败,返回false表示占用锁失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46


import java.util.concurrent.locks.AbstractQueuedSynchronizer;


public class Mutex implements java.io.Serializable {
//实现的独占锁
private static class Sync extends AbstractQueuedSynchronizer {
//是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
//这里获取锁,通过cas将state设置为1,并设置独占线程。
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
//设置当前拥有独占访问权限的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁,将同步状态置为0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
//同步对象完成一系列复杂的操作,我们仅需指向它即可
private final Sync sync = new Sync();
//加锁操作,代理到acquire(模板方法)上就行,acquire会调用我们重写的tryAcquire方法
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
//释放锁,代理到release(模板方法)上就行,release会调用我们重写的tryRelease方法。
public void unlock() {
sync.release(1);
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
}

共享锁实现 CountDownLatch

CountDownLatch就是通过AQS实现的共享锁,实现简单,容易理解。

基于AQS实现的共享锁

CountDownLatch内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

//CountDownLatch构造函数中会将count传过来,构建Sync锁。
Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

//count==0的时候才能成功占用锁。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

//countDown方法会调用这里的逻辑,每次count-1,直到0才成功释放。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

CountDownLatch.await()

1
2
3
4
5
6
 /**
这里调用了sync的acquireSharedInterruptibly(1),里面最终会调用重写的tryAcquireShared方法。可以看到只有state==0的时候才能占用成功,不然会阻塞。这里传入的参数1并没有作用。
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

CountDownLatch.countDown()

1
2
3
4
5
6
/**
这里调用了AQS的releaseShared,会使用重写的tryReleaseShared实现。每次都会使state-1,直到state==0成功释放锁。
*/
public void countDown() {
sync.releaseShared(1);
}