程序猿的博客


  • 首页

  • 标签

  • 分类

  • 归档

JUC之AQS的Condition等待队列

发表于 2018-08-17 | 分类于 多线程

synchronized的线程等待唤醒机制是通过锁对象Monitor的 notify/notifyAll/wait实现,Monitor中的等待队列只有一条所以无法实现分组的等待唤醒。显示锁的等待唤醒方法可以通过一个或多个Condition来处理等待唤醒,一个Condition就是AQS中的一条等待队列。

AQS中的等待队列

ReentrantLock的条件变量Condition就是一个ConditionObject对象。

1
2
3
final ConditionObject newCondition() {
return new ConditionObject();
}

ConditionObject是AbstractQueuedSynchronizer的内部类,一个ConditionObject是一条等待队列。

image

重要属性

1
2
3
4
/** 等待队列的头节点 */
private transient Node firstWaiter;
/** 等待队列的尾节点 */
private transient Node lastWaiter;

Node属性状态

Node的waitStatus表示节点状态。

CONDITON: 节点在等待队列中,节点线程等Condition唤醒

CANCELLED: 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态

SIGNAL: 当前节点释放锁的时候,需要唤醒下一个节点

PROPAGATE: 表示下一次共享式同步状态获取将会无条件地传播下去

await方法源码解析

这个await的过程是将同步队列的首节点(即获取锁的节点)移到同步队列的尾节点。

  1. 先判断中断标志是否为true,抛出中断异常返回。否继续执行

    1
    2
    if (Thread.interrupted())
    throw new InterruptedException();
  2. 通过addConditionWaiter() 方法将当前线程封装成Node节点,并追加到队列尾。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    private Node addConditionWaiter() {
    Node t = lastWaiter;
    //尾节点被取消,重新指定尾节点
    if (t != null && t.waitStatus != Node.CONDITION) {
    //清除状态不是Node.CONDITION的节点。
    unlinkCancelledWaiters();
    t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
    firstWaiter = node;
    else
    t.nextWaiter = node;
    lastWaiter = node;
    return node;
    }
  3. 释放锁,返回同步状态。释放锁失败的话,节点的waitStatus为CANCELLED。

    1
    int savedState = fullyRelease(node);
  4. 判断当前节点是否在同步队列中,不在说明是等待队列中的节点用LockSupport.park进行阻塞。当节点被中断时设置
    中断模式interruptMode

    1
    2
    3
    4
    5
    while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
    break;
    }
  5. 被唤醒会再次尝试获取锁。acquireQueued获取锁需要前驱节点是同步队列的头节点。可以看到当前Node是没有设置prev,prev在什么地方设置。

    1
    2
    3
    4
    5
    6
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // 清除等待队列中取消状态的节点
    unlinkCancelledWaiters();
    if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

signal方法源码

  1. 唤醒等待队列的头节点。

    1
    2
    3
    4
    5
    6
    7
    public final void signal() {
    if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
    doSignal(first);
    }
  2. 将firstWaiter设置为现在头节点的下一个节点。并将当前节点的nextWaiter设置为null。transferForSignal唤醒线程失败(Node可能已取消导致失败)后,会唤醒下一个节点,直到有一个节点被唤醒。

    1
    2
    3
    4
    5
    6
    7
    8
     private void doSignal(Node first) {
    do {
    if ( (firstWaiter = first.nextWaiter) == null)
    lastWaiter = null;
    first.nextWaiter = null;
    } while (!transferForSignal(first) &&
    (first = firstWaiter) != null);
    }
  3. transferForSignal方法会通过cas更新Node的waitStatus属性为0。再将当前节点追加同步队列的尾部(这里可以解释await方法被唤醒时为什么有前驱节点)。再设置Node.waitStatus=Node.SIGNAL。唤醒线程。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    final boolean transferForSignal(Node node) {
    /*
    * 节点被取消时,cas更新失败
    */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;

    //将节点追加同步队列队尾
    Node p = enq(node);
    int ws = p.waitStatus;
    //只要节点状态不是取消,将节点状态设置成SIGNAL(作用可以看下CLH队列)
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
    return true;
    }

JUC之AQS中的CLH队列

发表于 2018-08-16 | 分类于 多线程

CLH队列

AQS内部维护着一个FIFO的队列,即CLH队列。AQS的同步机制就是依靠CLH队列实现的。CLH队列是FIFO的双端双向队列,实现公平锁。头节点是一个获取同步状态成功的节点。线程通过AQS获取锁失败,就会将线程封装成一个Node节点,插入队列尾。当有线程释放锁时,后唤醒头节点的next节点(第二个节点)尝试占用锁。

CLH队列结构

image

Node类

CLH队列由Node对象组成,Node是AQS中的内部类。

重要属性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//用于标识共享锁
static final Node SHARED = new Node();

//用于标识独占锁
static final Node EXCLUSIVE = null;

/**
* 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态;
*/
static final int CANCELLED = 1;

/**
* 当前节点释放锁的时候,需要唤醒下一个节点
*/
static final int SIGNAL = -1;

/**
* 节点在等待队列中,节点线程等待Condition唤醒
*/
static final int CONDITION = -2;

/**
* 表示下一次共享式同步状态获取将会无条件地传播下去
*/
static final int PROPAGATE = -3;

/** 等待状态,值为上面的状态常量CANCELLED,SIGNAL,CONDITION,PROPAGATE。 */
volatile int waitStatus;

/** 前驱节点 */
volatile Node prev;

/** 后继节点 */
volatile Node next;

/** 节点线程 */
volatile Thread thread;

//
Node nextWaiter;

CLH队列源码执行顺序

  1. 线程调用acquire方法获取锁。tryAcquire返回true,获取锁成功。返回false,获取失败则会通过addWaiter方法追加到CLH队列队尾。
    1
    2
    3
    4
    5
    public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
  1. addWaiter(Node.EXCLUSIVE)方法会将当前线程封装成Node节点,追加在队尾。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // 获取原队尾
    Node pred = tail;
    if (pred != null) {
    node.prev = pred;
    //用cas更新 ,pred是原来队尾,作为预期值,node作为新值
    if (compareAndSetTail(pred, node)) {
    pred.next = node;
    return node;
    }
    }
    //前面cas更新失败后,再enq方法中循环用cas更新直到成功
    enq(node);
    return node;
    }
  1. acquireQueued方法中会使线程自旋阻塞,直到获取到锁。
    队列中节点获取占用锁机会的条件
    1、前驱节点是头节点
    2、前驱节点的waitStatus=Node.SIGNAL
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    //1. 拿到当前节点的前驱节点
    final Node p = node.predecessor();

    //2. 如果当前节点的前驱节点是头节点的话,就再次尝试获取锁
    if (p == head && tryAcquire(arg)) {
    //成功获取锁后,将节点设置为头节点
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    /**
    更改当前节点前驱节点的waitStatus,只有前驱节点的waitStatus=Node.SIGNAL,当前节点才有可能被唤醒。如果前驱节点的waitStatus>0(即取消),则跳过取更前面的节点。
    */
    if (shouldParkAfterFailedAcquire(p, node) &&
    //通过Unsafe.park来阻塞线程
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }
  1. 线程释放锁,从前面可以知道,获取到锁的线程会设置为CLH队列的头部。这里如果tryRelease返回true,且head的waitStatus!=0。就会更新head的waitStatus为0(设回初始值)并且
    唤醒线程head.next节点的线程。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public final boolean release(int arg) { 
    //判断是否可以释放锁。
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    unparkSuccessor(h);
    return true;
    }
    return false;
    }
  1. 更新head的waitStatus为0并且唤醒线程head.next节点的线程。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    //waitStatus不是取消状态,就设置成0
    if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);


    //获取下个waitStatus不为取消的Node
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
    if (t.waitStatus <= 0)
    s = t;
    }
    //LockSupport.unpark是调用了Unsafe.unpark,唤醒线程。
    if (s != null)
    LockSupport.unpark(s.thread);
    }

JUC之AQS

发表于 2018-08-15 | 分类于 多线程

AbstractQueuedSynchronizer简称 AQS。AQS是jdk提供的一个用于实现阻塞锁和依赖于先进先出等待队列的相关同步器(信号量,事件等)。AQS是concurrent包的基石。

AQS 状态。

它依赖一个int类型的原子变量来表示状态。

1
private volatile int state;//共享变量,使用volatile修饰保证线程可见性

对state操作的基本方法

1
2
3
4
5
protected final int getState()

protected final void setState(int newState)

protected final boolean compareAndSetState(int expect, int update)

子类必须实现其protected的方法用于改变这个状态和根据该状态的获取和释放定义状态的含义。

AQS实现同步器

AQS支持两种独占和共享两种同步方式。独占的实现有ReentrantLock,共享的实现有CountDownLatch,Semaphore。读写锁的读锁用了共享锁,写锁用来独占锁。AQS是通过重写以下方法来实现同步。

  1. 共享锁是满足一定条件的都可以获取锁,具体实现逻辑在tryAcquireShared中实现。
  2. 独占锁是同时只能有一个线程占用锁。
通过AQS实现锁需要重写的方法

AQS用了模板设计模式,我们不用关心获取资源失败,线程排队,线程阻塞/唤醒等一系列复杂的实现,这些都在AQS中为我们处理好了。我们只需要负责好自己的那个环节就好,也就是获取/释放共享资源state。用户可以根据自己的要求很轻松的扩展AQS。

实现独占锁要重写的方法

1
2
3
4
5
6
7
//独占式获取同步状态,试着获取,成功返回true,反之为false
protected boolean tryRelease(int arg)
//释放锁方法,返回true代表成功释放
protected boolean tryAcquire(int arg)

是否在独占模式下被线程占用,true为被占用。
protected boolean isHeldExclusively()

实现共享锁要重写的方法

1
2
3
4
5

//返回值大于0则是成功占用锁,小于0则是阻塞
protected int tryAcquireShared(int arg)
//释放锁方法,返回true代表成功释放
protected boolean tryReleaseShared(int arg)

使用

独占和共享方式都有3个占用锁方法和一个释放锁方法。内部调用了上面实现的tryAcquire和tryRelease。

独占锁使用方法

1
2
3
4
5
6
7
8
9
10
11
12

public final void acquire(int arg)

//通过首先检查中断状态,如果中断,中止
public final void acquireInterruptibly(int arg)
throws InterruptedException
//阻塞时长超出设置事件则跳出
public final boolean tryAcquireNanos(int arg,
long nanosTimeout)
throws InterruptedException

public final boolean release(int arg)

共享锁使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
//占用共享锁方法,最终都是调用了实现的tryAcquireShared方法。
public final void acquireShared(int arg)

//通过首先检查中断状态,如果中断,中止
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException
//阻塞时长超出设置事件则跳出。
public final boolean tryAcquireSharedNanos(int arg,
long nanosTimeout)
throws InterruptedException

//释放锁方法
public final boolean releaseShared(int arg)

独占锁实现

JDK文档中带的一个独占锁例子.

这例子实现的是不可重入的独占锁,只允许一个线程持有锁。

这里state=0时表示锁还没被占用,state=1时表示锁处于占用状态。这样一个线程占用锁时,通过cas把state置为1,通过setExclusiveOwnerThread方法设置当前线程为独占线程。其他线程再通过cas时,因为旧值不为0,所以cas失败,返回false表示占用锁失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46


import java.util.concurrent.locks.AbstractQueuedSynchronizer;


public class Mutex implements java.io.Serializable {
//实现的独占锁
private static class Sync extends AbstractQueuedSynchronizer {
//是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
//这里获取锁,通过cas将state设置为1,并设置独占线程。
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
//设置当前拥有独占访问权限的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁,将同步状态置为0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
//同步对象完成一系列复杂的操作,我们仅需指向它即可
private final Sync sync = new Sync();
//加锁操作,代理到acquire(模板方法)上就行,acquire会调用我们重写的tryAcquire方法
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
//释放锁,代理到release(模板方法)上就行,release会调用我们重写的tryRelease方法。
public void unlock() {
sync.release(1);
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
}

共享锁实现 CountDownLatch

CountDownLatch就是通过AQS实现的共享锁,实现简单,容易理解。

基于AQS实现的共享锁

CountDownLatch内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

//CountDownLatch构造函数中会将count传过来,构建Sync锁。
Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

//count==0的时候才能成功占用锁。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

//countDown方法会调用这里的逻辑,每次count-1,直到0才成功释放。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

CountDownLatch.await()

1
2
3
4
5
6
 /**
这里调用了sync的acquireSharedInterruptibly(1),里面最终会调用重写的tryAcquireShared方法。可以看到只有state==0的时候才能占用成功,不然会阻塞。这里传入的参数1并没有作用。
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

CountDownLatch.countDown()

1
2
3
4
5
6
/**
这里调用了AQS的releaseShared,会使用重写的tryReleaseShared实现。每次都会使state-1,直到state==0成功释放锁。
*/
public void countDown() {
sync.releaseShared(1);
}

JUC之Semaphore

发表于 2018-08-14 | 分类于 多线程

流量控制 Semaphore

Semaphore是一个计数的信号量。初始化时分配一个配额permits。在访问前需要用acquire()方法申请一个配额,访问结束后调用release()释放配置。申请配额时如果配额不足将会阻塞。

使用例子

例子:有2个足球,4个人射门练习。每个人射门后都要把球拿回来,后面的人才能射球。这里足球就是配额。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Player implements  Runnable {

private final Semaphore semaphore;

public Player(Semaphore semaphore){
this.semaphore=semaphore;
}
public void run() {
try {
semaphore.acquire();
//踢完还剩余的球
System.out.println("踢球。。。。"+semaphore.availablePermits());
int millis = new Random().nextInt(4)+1;
Thread.sleep(millis*1000);
System.out.println(String.format("捡球花时间%d秒",millis));
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}
}


public static void main(String[] args){

Semaphore semaphore = new Semaphore(2);

for(int i=0;i<4;i++){
new Thread(new Player(semaphore)).start();
}

try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

结果

1
2
3
4
5
6
7
8
踢球。。。。1
踢球。。。。0
捡球花时间4秒
捡球花时间4秒
踢球。。。。0
踢球。。。。0
捡球花时间1秒
捡球花时间2秒

常用API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void acquire() 
从该信号量获取许可证,阻止直到可用,或线程为 interrupted 。
void acquire(int permits)
从该信号量获取给定数量的许可证,阻止直到所有可用,否则线程为 interrupted 。
void acquireUninterruptibly()
从这个信号灯获取许可证,阻止一个可用的。
void acquireUninterruptibly(int permits)
从该信号量获取给定数量的许可证,阻止直到所有可用。
int availablePermits()
返回此信号量中当前可用的许可数。
int drainPermits()
获取并返回所有可立即获得的许可证。
protected Collection<Thread> getQueuedThreads()
返回一个包含可能正在等待获取的线程的集合。
int getQueueLength()
返回等待获取的线程数的估计。
boolean hasQueuedThreads()
查询任何线程是否等待获取。
boolean isFair()
如果此信号量的公平设置为真,则返回 true 。
protected void reducePermits(int reduction)
缩小可用许可证的数量。
void release()
释放许可证,将其返回到信号量。
void release(int permits)
释放给定数量的许可证,将其返回到信号量。
String toString()
返回一个标识此信号量的字符串及其状态。
boolean tryAcquire()
从这个信号量获得许可证,只有在调用时可以使用该许可证。
boolean tryAcquire(int permits)
从这个信号量获取给定数量的许可证,只有在调用时全部可用。
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
从该信号量获取给定数量的许可证,如果在给定的等待时间内全部可用,并且当前线程尚未 interrupted 。
boolean tryAcquire(long timeout, TimeUnit unit)
如果在给定的等待时间内可用,并且当前线程尚未 到达 interrupted,则从该信号量获取许可。

总结

  1. Semaphore内部是由共享锁实现的。
  2. Semaphore支持公平锁和非公平锁。默认是非公平锁。可以通过构造函数设置。

JUC之CyclicBarrier

发表于 2018-08-13 | 分类于 多线程

CyclicBarrier

CyclicBarrier和CountDownLatch功能相似。CyclicBarrier是所有参与线程互相等待对方执行到某点,再一起执行后面程序。

与CountDownLatch一样,CyclicBarrier的构造函数也需要一个int类型的参数
parties,表示参与者数量。每一个线程调用cyclicBarrier对象的await方法就会进入等待。直到参与线程数量到达parties时,唤醒所有线程。

使用例子

场景:所有参与者到位等待,一起开始执行。(比赛参赛选手都到位准备好了,裁判再鸣枪)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for(int i=0 ;i<5;i++){
final int c=i;
new Thread(){
@Override
public void run() {
try {
System.out.println(String.format("参与者%d等待",c));
cyclicBarrier.await();
System.out.println(String.format("参与者%d执行",c));
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
}

执行结果

1
2
3
4
5
6
7
8
9
10
参与者3等待
参与者1等待
参与者2等待
参与者4等待
参与者0等待
参与者3执行
参与者0执行
参与者1执行
参与者2执行
参与者4执行

CyclicBarrier和CountDownLatch的区别

  1. CyclicBarrier内部用条件变量Condition实现,会产生上下文切换。CountDownLatch使用共享锁实现。
  2. CyclicBarrier当parties=0时,在唤醒所有线程同时,还会重置parties。
    CountDownLatch不能重置回滚,只能使用一次。

CyclicBarrier使用场景

  1. 迭代算法并发化。迭代中,由多个工作线程完成工作。使用await等待所有工作线程完成工作,再将结果作为下一轮迭代的输入。

  2. 模拟高并发。保证线程同时开始其操作来模拟高并发测试。

JUC之CountDownLatch

发表于 2018-08-12 | 分类于 多线程

CountDownLatch简介

一个线程需要等到其他线程进行某操作后在运行,可以使用CountDownLatch。

CountDownLatch构造方法,带有一个int类型的参数。

1
public CountDownLatch(int count)

当一个线程调用countDownLatch.await()时,线程会等待。直到其他线程执行
countDownLatch.countDown()。每执行一个countDown方法,初始化的count会减一,直到count=0时,等待线程才被唤醒。

使用

假设场景顾客餐厅吃饭,需要等待做饭,炒菜,上汤都完成了,才开始吃饭。
代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//顾客线程
public class Customer implements Runnable {

private final CountDownLatch countDownLatch;

public Customer(CountDownLatch countDownLatch){
this.countDownLatch=countDownLatch;
}
public void run() {
System.out.println("点完菜,等待开吃");
try {
this.countDownLatch.await();
System.out.println("开饭了。。。。。。。。");
} catch (InterruptedException e) {
e.printStackTrace();
}


}

public static void main(String[] args) throws InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(3);

Thread customer = new Thread(new Customer(countDownLatch));
//顾客进餐厅点菜
customer.start();

new Thread(){
@Override
public void run() {
try {
System.out.println("做饭");
}finally {
countDownLatch.countDown();
}
}
}.start();
new Thread(){
@Override
public void run() {
try {
System.out.println("炒菜");
}finally {
countDownLatch.countDown();
}
}
}.start();

new Thread(){
@Override
public void run() {
try {
System.out.println("上汤");
}finally {
countDownLatch.countDown();
}
}
}.start();

customer.join();
}

输出:

1
2
3
4
5
点完菜,等待开吃
做饭
炒菜
上汤
开饭了。。。。。。。。

结果正确,等待做饭,炒菜,上汤都完成了,才开始吃饭。

总结

  1. countDownLatch内部通过共享锁实现
  2. countDown方法最好放在finally代码块中执行,防止线程永远等待
  3. CountDownLatch再 count减到0之后,再次执行countDown不会有影响,count不变。
  4. await方法有重载方法
    public boolean await(long timeout, TimeUnit unit),等待一段时间后恢复。
  5. countDownLatch只能使用一次,count=0时,不能回滚再次使用。

线程同步-CAS与原子变量

发表于 2018-08-11 | 分类于 多线程

CAS简介

锁的开销极大。在某些场景,如保证一个变量的 read-modify-write操作的原子性。这种场景可以通过使用CAS解决而不需要用到锁。

CAS,Compare and swap 比较并交换,是一种乐观锁的实现方式。是一个中由处理器保证原子性的if-then-act操作。它通过提供一个变量内存位置,预期值(旧值)和新值。将预期值和变量的当前值进行比较,如果相等即证明变量并没有被改变,将该变量修改成新值。如果不相等则进行重试(预期值会重新加载),直到成功。

Unsafe类中通过CAS修改int类型变量源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
*var1 AtomicInteger对象
*var2 内存偏移量
*var4 增加的值
*var5 获取的变量原值,保存在var5用于当预期值
*compareAndSwapInt会比较内存值和var5相等的话就会改变内存值(即AtomicIntege * r对象中的变量)。
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//获取内存的当前值
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

return var5;
}

原子变量

以上代码是原子遍历AtomicInteger自增代码的实现片段。JDK基于CAS提供了保证共享变量read-modify-write操作原子性的类。

分组 类名
基本类型 AtomicInteger,AtomicLong,AtomicBoolean
数组类型 AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
字段更新 AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicReferenceFieldUpdater
引用型 AtomicReference,AtomicStampedReference,AtomicMarkableReference
AtomicInteger
方法 作用
int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) 使用将给定函数应用于当前值和给定值的结果原子更新当前值,返回更新后的值。
int addAndGet(int delta) 将给定的值原子地添加到当前值。
boolean compareAndSet(int expect, int update) 如果当前值 ==为预期值,则将该值原子设置为给定的更新值。
int decrementAndGet() 原子减1当前值。
double doubleValue() 返回此值 AtomicInteger为 double一个宽元转换后。
float floatValue() 返回此值 AtomicInteger为 float一个宽元转换后。
int get() 获取当前值。
int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) 使用给定函数应用给当前值和给定值的结果原子更新当前值,返回上一个值。
int getAndAdd(int delta) 将给定的值原子地添加到当前值。
int getAndDecrement() 原子减1当前值。
int getAndIncrement() 原子上增加一个当前值。
int getAndSet(int newValue) 将原子设置为给定值并返回旧值。
int getAndUpdate(IntUnaryOperator updateFunction) 用应用给定函数的结果原子更新当前值,返回上一个值。
int incrementAndGet() 原子上增加一个当前值。
int intValue() 将 AtomicInteger的值作为 int 。
void lazySet(int newValue) 最终设定为给定值。
long longValue() 返回此值 AtomicInteger为 long一个宽元转换后。
void set(int newValue) 设置为给定值。
String toString() 返回当前值的String表示形式。
int updateAndGet(IntUnaryOperator updateFunction) 使用给定函数的结果原子更新当前值,返回更新的值。
boolean weakCompareAndSet(int expect, int update) 如果当前值 ==为预期值,则将值设置为给定更新值。

ABA问题

以上说到,CAS是将预期值和内存当前值比较,通过比较结果来判断其他线程是否修改过该变量。但是如果存在其他线程修改变量后又改回原值(即预期值),在某些场景就会存在问题。

ABA问题例子

银行账户 500元(共享变量)

  1. 要取出50元,机器故障发送了2个请求A,B,此时两个请求的期望值都是500,新值450
  2. A请求执行完后,内存值变成450。所以第二个请求是不会成功的。
  3. 但是如果在B请求执行前,C又往账户存了50块。这时银行帐号变成500。B请求预期值满足提交成功,银行账户最终存款为 450。

这种情况下,存款少了50块。

ABA解决方案。

ABA问题可以通过版本号来解决,每次修改操作都添加一个版本号。例如刚才的取款操作加个版本号 1,在存款操作执行后版本号+1,变为2。取款的第二次请求执行时就会判断版本号不是1,执行失败。

原子变量AtomicStampedReference,AtomicMarkableReference 中处理了ABA问题。

注意

  1. CAS只能保证一个共享变量的操作的原子性(原子性操作+原子性操作≠原子操作),如果要保持多个共享变量的操作的原子性,就必须使用锁。
  2. 如果变量更新多次失败,循环时间长开销大。
  3. ABA问题

线程间协作-等待唤醒机制

发表于 2018-08-09 | 分类于 多线程

线程间协作

多线程开发中,线程往往都不是孤立的。一个线程往往需要多线程协作完成其待执行的任务。等待唤醒机制就是用来协调线程间的协作。例如:街边的小吃店都是生产一份等销售完再生产,这是典型的生产者消费者模式。下面用代码实现这个场景。

等待唤醒机制的好处:
节省cpu。线程间通讯也可以通过轮询的方式来检查条件进行协作,但是会消耗大量cpu。用生产者/消费者模式举例。在生产者生产的时候,消费者并不需要执行。如果不用等待唤醒机制,消费者只能轮询监控生存者是否完成生产,消耗cpu。通过等待唤醒机制,可以在生产这生产时,消费者进入等待状态,不消耗cpu。待生存者生产完成后再唤醒消费者。

wait/notify

内部锁是通过wait/notify/notifyAll这三个方法实现等待唤醒。wait方法会是一个线程进入等待状态,notify会随机唤醒一个等待状态的线程,notifyAll会唤醒所有等待的线程。

1
2
3
4
5
6
7
8
9
10
void notify() 
唤醒在此对象监视器上等待的单个线程。
void notifyAll()
唤醒在此对象监视器上等待的所有线程。
void wait()
在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,导致当前线程等待。
void wait(long timeout)
在其他线程调用此对象的 notify() 方法或 notifyAll() 方法,或者超过指定的时间量前,导致当前线程等待。
void wait(long timeout, int nanos)
在其他线程调用此对象的 notify() 方法或 notifyAll() 方法,或者其他某个线程中断当前线程,或者已超过某个实际时间量前,导致当前线程等待。

注意:

  1. wait/notify/notifyAll只能在内部锁作用范围内调用。
  2. wait/notify/notifyAll都是通过该内部锁的锁对象调用。
  3. wait会释放锁。
生存者消费者模式代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package Demo1;

public class Product {

private int id;

private boolean flag;
public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public synchronized void product() throws InterruptedException {
while (id<100) {

if (flag) {
wait();
}
//用id的变化模拟生产一个物品
id++;
System.out.println("product............id:" + id);
//物品生产好后flag变成true并唤醒消费者
flag = true;
notify();
}
}

public synchronized void comsume() throws InterruptedException {
while (id<100) {
if (!flag) {
wait();
}
System.out.println("comsume............id:" + id);
flag = false;
notify();
}
}


}

public class Consumer implements Runnable {

private Product p;
Consumer(Product p){
this.p=p;
}
public void run(){
try {
p.comsume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


public class Producer implements Runnable {

private Product p;
Producer(Product p){
this.p=p;
}
public void run() {
try {
p.product();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public class Main {
public static void main(String[] args) throws InterruptedException {

Product product = new Product();
Producer producer = new Producer(product);
Consumer consumer = new Consumer(product);
Thread thread = new Thread(producer);
thread.start();
new Thread(consumer).start();
thread.join();
}
}

结果如下,生产一个产品,再消费一个产品。

1
2
3
4
5
6
7
8
9
10
product............id:1
comsume............id:1
product............id:2
comsume............id:2
product............id:3
comsume............id:3
product............id:4
comsume............id:4
product............id:5
comsume............id:5

wait/notify 存在的问题
  1. 过早唤醒。因为notify唤醒具有随机性。在不确保一定能唤醒想要唤醒的线程时,必须使用notifyAll。notifyAll会将所有线程唤醒,有些线程过早被唤醒,浪费资源。例如:
    如果个多生产者多消费者模式,由于notify唤醒线程的随机性,有可能唤醒的都是生产者或消费者线程。这时需要notifyAll来实现唤醒。notifyAll存在的弊端就是会把所有等待线程唤醒,无论是生产者还是消费者。这个问题可以通过显示锁的Condition来解决。

显示锁的等待唤醒机制 await/signal

jdk1.5加入的Lock可以通过创建多个Condition对象来实现分组的等待和唤醒。Condition对象只会唤醒用该Condition对象调用等待的对象。

Condition的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
void await() 
造成当前线程在接到信号或被中断之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
void awaitUninterruptibly()
造成当前线程在接到信号之前一直处于等待状态。
boolean awaitUntil(Date deadline)
造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
void signal()
唤醒一个等待线程。
void signalAll()
唤醒所有等待线程。
Condtion版本生存者/消费者模式

以下Product2类,其他类和上面的一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package Demo2;

import Demo1.Product;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Product2 {

private int id;
private boolean flag;
private final Lock lock = new ReentrantLock();
private final Condition pro = lock.newCondition();
private final Condition coms = lock.newCondition();
private int size;

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public void product() throws InterruptedException {
try {
lock.lock();
while (id < 100) {
if (flag) {
//已生产一个产品,使用生产者的Condtion对象使生产者线程等待
pro.await();
}
System.out.println(String.format("product............id:%d" , ++id));
flag = true;
//使用消费者的Condition是消费者线程唤醒
coms.signalAll();
}
} finally {
lock.unlock();
}
}

public void comsume() throws InterruptedException {
try {
lock.lock();
while (id < 100) {
if (!flag) {
coms.await();
}
System.out.println(String.format("consumer............id:%d" , id));
flag = false;
pro.signalAll();
}
}finally {
lock.unlock();
}
}
}

线程同步-volatile关键字

发表于 2018-08-08 | 分类于 多线程

volatile作用

一旦一个共享变量(类的成员变量、类的静态成员变量)被volatile修饰之后,那么就具备了两层语义:

  1. 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是可见的。

  2. 禁止进行指令重排序。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private  boolean stop=true;
@Test
public void test() throws Exception{

Runnable run1 = new Runnable(){

public void run() {
// TODO Auto-generated method stub
while(stop){
//处理业务

}
System.out.println("2......end");
}

};
Thread thread=new Thread(run1);
thread.start();
Thread.sleep(1000);
stop=false;
System.out.println("1......end");

//等待程序运行
thread.join();
}

这段Demo,给线程run1设置了一个结束标志stop,主线程等待一秒后会将介素标志stop设置为false,理论上线程run1就因为stop=false而run方法结束。但是以上代码运行并没有结束。说明stop的修改对线程run1是不可见的。

实际上是因为stop字段没有用volatile修饰,JIT编译器并不知道它是多线程共享的变量。所以为了提高代码效率,会对代码进行优化成等同以下结果:

run方法优化

1
2
3
while(true){
//处理业务
}

所以run1线程一直循环。如果用volatile修饰stop变量即可,解决。

原理

内存屏障

内存屏障(Memory Barrier)是一种CPU指令,内存屏障用于插入两个指令之间使用,其作用是禁止重排序和刷新处理器缓存和冲刷处理器缓存来保证可见性。Java编译器也会根据内存屏障的规则禁止重排序。

JMM种定义了这四类存储屏障

  1. LoadLoad屏障:对于这样的语句Load1; LoadLoad; Load2,在Load2及后续读取操作要读取的数据被访问前,保证Load1要读取的数据被读取完毕。

  2. StoreStore屏障:对于这样的语句Store1; StoreStore; Store2,在Store2及后续写入操作执行前,保证Store1的写入操作对其它处理器可见。

  3. LoadStore屏障:对于这样的语句Load1; LoadStore; Store2,在Store2及后续写入操作被刷出前,保证Load1要读取的数据被读取完毕。

  4. StoreLoad屏障:它的开销是四种屏障中最大的。 在大多数处理器的实现中,这个屏障是个万能屏障,兼具其它三种内存屏障的功能。保障了屏障前的所有内存访问指令(存储和装载)完成之后,才执行改屏障之后的内存访问指令。

总结:java内存屏障由store和load两两组合,store和load的拼接顺序也表示着这个内存屏障的功能。例如LoadStore,前面的读操作Load会在后面的写操作Store执行前执行完毕。
执行完毕,意味中处理结果对后面的操作可见。

volatile中的内存屏障

读操作

1
2
3
4
5
volatile读操作
⬇
LoadLoad
⬇
LoadStore

volatile变量读操作后加入LoadLoad和LoadStore,保障volatile变量读操作后的所有普通读写操作都不能和volatile读操作重排序。

写操作

1
2
3
4
5
StoreStore
⬇
volatile写操作
⬇
StoreLoad

volatile变量写操作之前加StoreStore屏障,禁止上面的普通写操作不能和volatile写操作重排序

volatile变量写操作之后加StoreLoad屏障,防止上面的volatile写和下面可能存在的volatile读/写重排序

JSR-133对volatile语言进行了增强。旧内存模型允许volatile变量与普通变量重排序。JSR-133后,只要volatile变量
与普通变量之间的重排序有可能破坏volatile的内存语义,这种重排序就会被禁止。

使用场景

  1. 作为状态变量。如上面Demo,用于线程结束的标志。可以通知线程结束。
  2. 某些场景代替锁。如多个线程共享一组可变变量,要保证这些变量更新的原子性。可以将这些变量封装成对象,用volatile修饰。对这些变量更新操作可以是创建一个对象并赋予引用,volatile保证了可见性和有序性。

Brian Goetz大神的《正确使用 Volatile 变量》
详细介绍了volatile的应用场景

线程同步-锁

发表于 2018-08-07 | 分类于 多线程

锁概述

锁是解决线程安全问题的最基本的解决方案。

加锁的代码块,只允许持有锁的线程通过,其他线程会进入同步队列阻塞。持有锁的线程执行完代码块后释放锁。才会唤醒同步队列的线程抢夺锁。通过加锁,同一时间只允许一个线程(持有锁的线程)执行加锁的代码。使得会发生线程安全问题的代码单线程串行。从而解决线程安全问题。

image

锁的分类

公平锁和非公平锁

公平锁是按照锁申请的顺序来分配锁资源。

非公平锁是允许插队的,可能后申请的线程比先申请的线程优先获取锁。

1
2
3
java中锁一般默认都是非公平锁。因为公平锁为了保障公平往往会增加线程的唤醒和暂停。例如一个
运行中的线程要获取锁必须先检查有没有其他排队的线程,有就需要一次暂停。而非公平锁,在一个运行中的线程申请锁时有可能直接获取锁。所以公平锁的开销比
公平锁大。公平锁适用于锁持有时间相对长或者线程申请锁平均间隔时间相对大的情况。

独占锁和共享锁

独占锁,顾名思义,独占锁只允许一个线程持有。

共享锁,共享锁允许多个线程持有。读写锁ReadWriteLock中的读锁就是共享锁

注意:当有线程持有写锁时,不允许其他线程获取读锁。

内部锁 synchronized

内部锁是一种排他锁,可以保证原子性,可见性和有序性。

通过synchronized关键字使用。有三种用法:
  1. 同步代码快
1
2
3
4
5
6
7
8
9
10
   private int i=0;
private final Object o=new Object();
public void test(){
//可以用一任意对象作为锁。用同个对象锁的synchronized代码块,只有获取到该对象锁的才能进去。
synchronized (o) {
while(i<100){
i++;
}
}
}

2.同步方法

1
2
3
4
5
6
7
8
   private int i=0;
//同步方法的锁对象用的是this
public synchronized void test1(){

while(i<100){
i++;
}
}

3.同步静态方法

1
2
3
4
5
6
7
8
   private static int i=0;
//静态同步方法的锁对象是该类字节码对象,XX.class
public synchronized static void test2(){

while(i<100){
i++;
}
}

显示锁 Lock

显示锁是1.5加入jdk的。作用与内部锁相同,用于作为线程同步机制。它提供了内部锁没有的特效,但并不能替代内部锁。

Lock使用

一个Lock接口的实例就是一个显示锁。通过lock方法加锁,和unlock方法释放锁。类java.util.concurrent.locks.ReentrantLock是Lock的实现。Lock既支持公平锁,也支持非公平锁。默认使用非公平锁。可以通过构造函数设置 new ReentrantLock(boolean isFair)

Lock API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Lock {

//获取锁
void lock();

//如果线程未被中断,则获取锁
void lockInterruptibly() throws InterruptedException;

//锁没被占用才能获取锁
boolean tryLock();

//在给定时间内获取空闲锁
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

//释放锁
void unlock();

//获取监视器
Condition newCondition();
}
Lock使用

注意:锁的释放一定要在finally里释放,防止锁泄露。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
   //新建一个锁对象 
Lock lock =new ReentrantLock();
private static int i=0;
@Test
public void test01(){

Runnable runnable = new Runnable(){

public void run() {
//获取锁
lock.lock();
try{

while(i<100){
System.out.println(i++);
}
}finally{
//释放锁
lock.unlock();
}
};

};
Thread thread = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread.start();
thread2.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

读写锁

读写锁是一种改进型的排他锁。将读和写的锁分离。可以多个线程进行读操作,但是读操作时不允许写操作。一个线程进行写操作时,其他线程不能进行读和写操作。

读写锁分为读锁和写锁

  1. 读锁:一个线程持有读锁不会妨碍其他线程获取读锁。
  2. 写锁:一个线程持有写锁,其他线程无法获取读锁和写锁。
读写锁使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final ReadWriteLock readWriteLock= new ReentrantReadWriteLock();
private final Lock readLock = readWriteLock.readLock();
private final Lock writeLock = readWriteLock.writeLock();
public void read(){
//获取读锁
readLock.lock();
try{
//读取共享变量
}finally{
readLock.unlock();
}
}

public void write(){
//获取写锁
writeLock.lock();
try{
//写共享变量
}finally{
writeLock.unlock();
}
}

读写锁适用于读操作比写操作频繁且读操作持有锁时间比较长的情况。

内部锁和显示锁的区别

  1. 内部锁基于代码块的锁,没有灵活性可言。显示锁可以灵活使用,但也意味容易出错。例如显示锁容易导致锁泄露(即锁没有释放),内部锁不存在锁泄露。
  2. 内部锁获取锁只能阻塞(相当于显示锁的lock)。显示锁可以有tryLock方法。有闲置的锁就可以获取锁放回true,否则返回false,不会造成阻塞。
  3. 内部锁的线程通信 notify/notifyAll/wait。显示锁可以通过一个或多个Condition来处理等待唤醒,更加灵活。

锁适用情况

多线程共享一组数据,一个线程有以下操作时

  1. check-then-act,读取共享数据判断下个操作是什么。
  2. read-modify-write,读取共享数据,修改再写回。如:i++
  3. 多个线程对多个共享数据更新,如果多个共享数据是有关联的。如:服务器ip,端口。就需要加锁保持原子性。
1…345

wujiazhen

42 日志
18 分类
27 标签
GitHub
© 2019 wujiazhen
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4