程序猿的博客


  • 首页

  • 标签

  • 分类

  • 归档

线程池

发表于 2018-09-25 | 分类于 多线程

为什么要使用线程池

线程的创建和销毁会带来系统的开销。通过线程池进行线程的管理,可以进行线程的复用,避免线程频繁的创建和消耗。

《java并发编程的艺术》 合理利用线程池能够带来三个好处

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

线程池的属性

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

线程池的大小

线程池的大小有三种。

  1. 当前线程池大小
    当前线程池中的线程数量

  2. 核心线程池大小 corePoolSize
    线程池提交一个任务。如果当前线程数量<corePoolSize,即使存在空闲线程可以执行任务也会创建一个新的线程。prestartAllCoreThreads方法可以提前创建并启动所有核心线程。

  3. 最大线程池大小 maxumumPoolSize
    线程池允许创建的最大线程数。如果任务队列满了,且当前的线程数小于最大线程池大小,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

线程存活时间 keepAliveTime

线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

工作队列 workQueue

用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

线程工厂 threadFactory

用于创建线程的工厂

拒绝策略 handler

工作队列满了和线程池大小已经达到最大线程池大小时,说明线程池已经饱和,任务无法处理。这时通过拒绝策列处理任务。
JDK1.5提供的四种策略。
AbortPolicy:直接抛出异常。
CallerRunsPolicy:用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy:丢弃当前任务。
可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略

线程池执行流程

任务提交流程

  1. 向线程池提交任务
  2. 判断当前线程池大小是否大于核心线程池大小。如果当前线程池大小大于核心线程池大小则执行步骤3,否则创建线程执行任务。
  3. 判断工作队列是否已满。已满则继续执行步骤4,否则将任务加入工作队列,等待线程池中线程读取队列,执行任务。
  4. 判断当前线程池大小是否大于最大线程池大小。如果当前线程池大小大于最大线程池大小则执行步骤5,否则创建线程执行任务。
  5. 线程池饱和,执行拒绝策列RejectedExecutionHandler.rejectedExecution。

源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
if (command == null)
throw new NullPointerException();
int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {
/**
*当前线程池大小小于核心线程池大小,通过addWorker方法
*new一个Worker(一个Worker相当于一个线程)对象执行
*/
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* 当前线程池大小不小于核心线程池大小,则将任务加入workQueue
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果任务加入workQueue失败(可能任务队列满了),则尝试addWorker执行。还是失败就执行拒绝策略
else if (!addWorker(command, false))
reject(command);

Worker.runWorker()方法,执行任务,执行完创建Worker时的第一个任务后。会通过getTask()获取workerQueue中的任务
执行,getTask()方法阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}

关闭线程池 shutdown和shutdownNow

shutdown:设置线程池状态为SHUTDOWN,中断所有没有执行任务的线程。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
shutdownNow:设置线程池状态为STOP,尝试停止所有正在执行任务或暂停任务的线程(通过interrupt)。shutdownNow并不代表线程池就一定立即就能退出,(因为任务如果没有处于阻塞等待状态,interrupt无法中断)它可能必须要等待所有正在执行的任务都执行完成了才能退出。

Executor框架

Thread即使工作单元也是执行机制。从jdk5起把工作单元和执行机制分开。工作单元是Runnable,Callable,执行机制由Executor框架提供。

Executor框架组成

  1. 任务 实现Runnable和Callable接口
  2. 执行机制 实现Executor接口。主要实现类由ThreadPoolExecutor和ScheduledThreadPoolExecutor。
  3. 异步计算结果 实现Future接口
  4. Executors工具类

ThreadPoolExecutor

ThreadPoolExecutor是线程池的实现。构造函数的参数前面已经介绍。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

Executors工具类提供创建ThreadPoolExecutor的方法。

CachedThreadPool

CachedThreadPool是一个可缓存的线程池。可以从ThreadPoolExecutor的构造函数参数看出它的功能。核心线程池大小0,最大线程池大小Integer.MAX_VALUE,使用不存储元素的阻塞队列SynchronousQueue。所以CachedThreadPool的线程池大小几乎无限大,提交任务时如果没有空闲的线程(没线程读取SynchronousQueue)就会创建一个线程。工作线程闲置60s,将会被终止。

适用:适合于大量执行时间短的任务。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

FixedThreadPool

FixedThreadPool,固定大小的线程池。核心线程池大小和最大线程池大小一致。线程存活时间为0意味着空闲线程池不会被终止。提交任务时,都会创建一个线程。直到线程数达到指定的线程数nThreads,才会将任务放入队列。LinkedBlockingQueue时有界的阻塞队列,默认大小时Integer.MAX_VALUE。

适用:执行长期的任务,负载比较重的服务器。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

SingleThreadExecutor

SingleThreadExecutor,单线程的线程池。线程池大小只能是1.

适用:按顺序执行任务,且任何时间的不会有多个线程是活动的情况。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

执行任务

线程池执行任务提供了2个方法 execute和submit

execute方法执行任务,没返回值。

1
2
3
4
5
6
7
 ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("run");
}
});

submit方法有返回值Future,通过Future对象可以获取返回值。

1
2
3
4
5
6
7
8
9
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> submit = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "run";
}
});
//阻塞获取返回值
System.out.println(submit.get());

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor和实现了ScheduledExecutorService接口。是用于执行延时任务和周期任务的线程池。功能与Timer类似。Timer是单线程,ScheduledThreadPoolExecutor是多线程。

ScheduledExecutorService API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 延迟执行Runnable任务
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);

/**
* 延迟执行Callable任务
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

/**
* 按固定的频率period执行任务。initialDelay是第一次运行时的延迟时间。执行时间是 initialDelay+n*period. n是第几次执行。
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

/**
* 每次任务执行完后延迟时间delay后再次执行,执行时间是initalDelay+n*(任务执行时间+delay)
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

使用

1
2
3
4
5
6
7
8
9
10
11
12
//通过Executors创建
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
//一秒执行一次
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}, 0, 1, TimeUnit.SECONDS);
Thread.sleep(10000);
//取消任务
scheduledFuture.cancel(true);

Future

用于操作任务的计算结果和任务的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 取消任务
* @Param mayInterruptIfRunning 是否中断执行的任务
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* 是否已经取消
*/
boolean isCancelled();

/**
* 是否已经执行
*/
boolean isDone();

/**
* 阻塞获取执行结果
*/
V get() throws InterruptedException, ExecutionException;

/**
* 阻塞获取执行结果,直到超时退出阻塞状态。
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

fork/join框架

发表于 2018-09-20 | 分类于 多线程

Fork/Join框架简介

fork/join是java7提供的任务并行执行的框架。fork就是将大任务分解成若干子任务,join就是将子任务的执行结果聚合成整个任务的结果。

Fork/Join框架通过两个类实现

  1. ForkJoinTask
    ForkJoinTask负责分解任务和合并结果。通过实现compute方法,把任务分成多个子任务的ForkJoinTask,调用子任务的fork方法执行任务,再调用子任务的join方法等待子任务执行完。

通常不会直接继承ForkJoinTask而是使用其子类。
RecursiveTask: 用于有返回结果的任务
RecursiveAction: 用于没返回结果的任务

  1. ForkJoinPool
    ForkJoinPool用于执行ForkJoinTask。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
1
2
工作窃取算法是指某个线程从其他队列里窃取任务来执行。假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任
务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

使用

计算start-end之间所有数字的和。例子来自《java并发编程的艺术》

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static class CountTask extends RecursiveTask<Integer>{
private static final int THRESHOLD=2;
private int start;
private int end;
public CountTask(int start,int end){
this.start=start;
this.end=end;
}
@Override
protected Integer compute() {
int sum=0;
//分解的最小任务,达到最小任务就开始计算
if(end-start<THRESHOLD){
for(int i=start;i<end+1;i++){
sum+=i;
}
//否则分解任务
}else{
//二分法分解任务
int middle=(start+end)/2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle+1, end);
//子任务fork开始执行,会再次调用子任务的compute,如果达不到最小任务会继续分解。(递归)
leftTask.fork();
rightTask.fork();
//等待子任务完成,读取结果
sum+=leftTask.join();
sum+=rightTask.join();
}
return sum;
}
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
CountTask countTask = new CountTask(1,10);
ForkJoinPool pool=new ForkJoinPool();
//通过ForkJoinPool提交任务
ForkJoinTask<Integer> submit = pool.submit(countTask);
Integer integer = submit.get();
System.out.println(integer);
}

java中的阻塞队列

发表于 2018-09-18 | 分类于 多线程

阻塞队列是指支持阻塞添加和阻塞移除两种操作的队列.
java中提供了7种阻塞队列:
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

阻塞队列的增删查方法

处理方式/方法 抛出异常 返回值 阻塞 超时退出
插入 add(e) offer(e) put(e) offer(e,time,unit)
移除 remove() poll() take() poll(time,unit)
检查 element() peek()

队列简介

ArrayBlockingQueue

ArrayBlockingQueue是一个由数组结构组成的有界阻塞队列。支持公平队列和非公平队列。会按照线程阻塞的顺序来让线程访问队列。默认是非公平队列,可以通过构造函数设置。

1
2
ArrayBlockingQueue(int capacity);
ArrayBlockingQueue(int capacity, boolean fair);

LinkedBlockingQueue

LinkedBlockingQueue是一个由链表结构组成的有界阻塞队列。默认长度和最大长度都是Integer.MAX_VALUE,可以通过构造函数设置长度。

PriorityBlockingQueue

PrioriityBlockingQueue是一个使用优先级队列实现的无界阻塞队列。队列中元素默认按照自然顺序的升序排序。相等的元素不能保证其顺序。
队列中的元素必须可以比较(即满足下面条件的其中之一)

  1. 构造队列是传递Comparator,作为元素的比较器

    1
    2
    PriorityBlockingQueue(int initialCapacity,
    Comparator<? super E> comparator)
  2. 元素自身实现Comparable接口

DelayQueue

DelayQueue支持延迟获取元素的无界阻塞队列,依据优先级队列PriorityQueue实现。

1
DelayQueue<E extends Delayed>

DelayQueue中的元素必须实现Delayed。

实现Delayed接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DelayedElement implements Delayed{
private long time;
private String id;
//1.构造函数设置延迟多久运行
public DelayedElement(String id,long time,TimeUnit unit){
this.id=id;
this.time=System.nanoTime()+TimeUnit.NANOSECONDS.convert(time,unit);
}
//2.获取还需要延迟多久
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time-System.nanoTime(),TimeUnit.NANOSECONDS);
}
//3. 比较,按time升序排序
@Override
public int compareTo(Delayed o) {
long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

@Override
public String toString(){
//纳秒转成毫秒输出
return String.format("id:%s,seconds:%d",id,TimeUnit.SECONDS.convert(time,TimeUnit.NANOSECONDS));
}
}

使用

1
2
3
4
5
6
7
8
9
10
11
DelayQueue delayQueue=new DelayQueue();
//延迟5秒执行
delayQueue.add(new DelayedElement("1",5,TimeUnit.SECONDS));
//延迟2秒执行
delayQueue.add(new DelayedElement("2",2,TimeUnit.SECONDS));
//延迟1秒执行
delayQueue.add(new DelayedElement("3",1,TimeUnit.SECONDS));
//阻塞等待
System.out.println(delayQueue.take());
System.out.println(delayQueue.take());
System.out.println(delayQueue.take());

运行结果:
id=3的最先运行,id=2的在id=3运行后1秒运行,id=1的在id=3运行后4秒运行。和预想结果一直。

1
2
3
4

id:3,seconds:78285
id:2,seconds:78286
id:1,seconds:78289

ScheduledThreadPoolExecutor实现延迟执行和固定周期执行依靠的DelayedWorkQueue与DelayQueue类似。ScheduledFutureTask是Delayed的实现。

SynchronousQueue

SynchronousQueue:一个不存储元素的阻塞队列。SynchronousQueue内部没有容器存储数据,每一个put操作,都需要等待一个take操作。否则不能再添加元素。
支持公平和非公平模式。

1
SynchronousQueue(boolean fair)

适用于生产者消费者模式的数据传递。

LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞队列。
LinkedTransferQueue实现了一个重要的接口TransferQueue,该接口含有下面几个重要方法:

  1. transfer(E e):若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列部,并且等待进入阻塞状态,到有消费者线程取走该元素。
  2. tryTransfer(E e):若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法即刻转移/传输对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
  3. tryTransfer(E e, long timeout, TimeUnit unit):若当前存在一个正在等待获取的消费者线程,会立即传给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程取,则返回false,同时该元素被移除。
  4. hasWaitingConsumer():判断是否存在消费者线程。
  5. getWaitingConsumerCount():获取所有等待获取元素的消费线程数量。

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。可以从队列的两端添加或者移除元素。因为有两个操作入口,也减少多线程入队时的竞争。增加了以First和Last结尾的操作函数,用于从头操作队列或者从尾部操作队列。

EventListenerSupport实现事件监听

发表于 2018-09-18 | 分类于 工具类

EventListenerSupport是apache lang3包提供用于处理事件监听的解决方案。

pom

1
2
3
4
5
 <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>

使用

  1. 创建监听器接口

    1
    2
    3
    4
    public interface IListener {

    void onListener();
    }
  2. 通过接口创建EventListenerSupport

    1
    EventListenerSupport<IListener> eventListener = EventListenerSupport.create(IListener.class);
  3. 实现监听器

    1
    2
    3
    4
    5
    6
    public class TestListener implements IListener {

    public void onListener() {
    System.out.println("test");
    }
    }
  4. 向EventListenerSupport注册监听器

    1
    eventListener.addListener(new TestListener());
  5. 事件发生时,触发监听器.fire方法会返回IListener的代理类.运行IListener的方法,会通过代理运行所有注册监听器的相同方法.

    1
    eventListener.fire().onListener();

ConcurrentHashMap源码解析(jdk1.8)

发表于 2018-09-14 | 分类于 jdk源码

jdk8的ConcurrentHashMap改动非常大。放弃了之前segment锁,改用cas+synchronized来实现同步。

常量含义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
* table数组最大容量。
*/
private stactic final int MAXIMUM_CAPACITY = 1 << 30;

/**
* 默认初始化容量,是2的次幕
*/
private static final int DEFAULT_CAPACITY = 16;

/**
* The largest possible (non-power of two) array size.
* Needed by toArray and related methods.
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* The default concurrency level for this table. Unused but
* defined for compatibility with previous versions of this class.
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* 加载因子,用于判断是否需要扩容, 当哈希表中的条目数超出了加载因子与当前容量的乘积时.table长度扩容为原* 来的两倍。
*/
private static final float LOAD_FACTOR = 0.75f;

/**
* 链表转红黑树的阈值,链表长度超过这个值自动转为红黑树
*/
static final int TREEIFY_THRESHOLD = 8;

/**
* 红黑树元素个数少于这个值,转回链表
*/
static final int UNTREEIFY_THRESHOLD = 6;

/**
* 当桶中的bin被树化时最小的hash表容量。这个MIN_TREEIFY_CAPACITY的值至少是TREEIFY_THRESHOLD的
* 4倍。
*/
static final int MIN_TREEIFY_CAPACITY = 64;

/**
* 扩容线程每次最少要迁移16个hash桶
*/
private static final int MIN_TRANSFER_STRIDE = 16;

/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;

/**
* 最多多少线程帮助扩容
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

/**
* The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* 以下是节点和hash值
*/
static final int MOVED = -1; // forwarding nodes的hash值
static final int TREEBIN = -2; // 红黑树根节点的hash值
static final int RESERVED = -3; // transient reservations的hash值
static final int HASH_BITS = 0x7fffffff; // 正常节点的最大hash值

重要变量

table
用来存放Node节点数据的数组,默认为null,默认大小为16,每次扩容时大小总是2的幂次方;下文的table都是指这个属性

nextTable
扩容时新表,数组为table的两倍,扩容完毕会赋给table

baseCount
map的大小

sizeCtl
控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同
负数代表正在进行初始化或扩容操作
-1代表正在初始化
-N 表示有N-1个线程正在进行扩容操作
正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小

transferIndex
表示已经分配给扩容线程的table数组索引位置。主要用来协调多个线程扩容。transferIndex初始化是指向table.length。当开始扩容时,首先要将transferIndex右移(以cas的方式修改 transferIndex=transferIndex-stride(要迁移hash桶的个数)),获取迁移任务。每个扩容线程都会通过for循环+CAS的方式设置transferIndex,因此可以确保多线程扩容的并发安全。

image

内部类

Node
节点,保存key-value的数据结构;value字段和next用volatile修饰,保障可见性。读数据时不需要加锁。可以看到Node不支持setValue。修改值直接node.val=xxx修改。

1
2
3
4
5
6
7
8
9
10
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
...
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
}

TreeNode
红黑树节点

1
2
3
4
5
6
7
8
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
...
}

TreeBin
用于封装红黑树,持有红黑树根节点的引用。包含一个读写锁用于写线程等待读线程完成,再tree重新构造之前。

1
2
3
4
5
6
7
8
9
10
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// 表示锁状态
static final int WRITER = 1; // 持有写锁
static final int WAITER = 2; // 等待写锁
static final int READER = 4; // 持有读锁
}

ForwardingNode
一个特殊的Node节点,hash值为-1(MOVED常量),其中存储nextTable的引用。只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动

1
2
3
4
5
6
  final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
....

主要方法解析

put方法

put方法调用了putVal方法。

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}

putVal方法关键代码解析

  1. 计算hash值
    spread方法通过高16位异或低16位来散列。因为后面计算table坐标时是采用 hash&(length-1)的公式来计算。
    也就是说只会保留低位,这样大大加大了出现hash冲突的概率。这里用高位^低位,增加低位的随机性,减少hash冲突的次数。

    1
    2
    3
    static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
    }
  2. tab==null时,先进行table的初始化。

    1
    2
    if (tab == null || (n = tab.length) == 0)
    tab = initTable();
  3. 通过 (table.length-1)&hash算出脚标i,如果table脚标i的元素为null。说明不存在hash冲突。将key,val封装成Node,通过cas直接设置进table,跳出循环。cas失败说明其他线程修改了该脚标节点,重新开始循环。

    1
    2
    3
    4
    5
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null,
    new Node<K,V>(hash, key, value, null)))
    break; // no lock when adding to empty bin
    }
  4. 节点f是table中脚标为i,如果该节点hash==MOVE,说明该节点是ForwardingNode且其他线程正在对这个map进行扩容。当前线程也协助扩容。

    1
    2
    else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);
  5. 不是前面几种情况的话,就是说明存在hash冲突。将新节点加入以f为根节点的链表或红黑树。这里只需要锁住根节点,相比以前的分段锁粒度更小。红黑树用TreeBin对象封装,hash=-2。hash大于0即是链表。节点加入链表会记录一个链表长度binCount,如果binCount>=TREEIFY_THRESHOLD,链表会向红黑树转化。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    else{
    V oldVal = null;
    synchronized (f) {
    if (tabAt(tab, i) == f) {
    if (fh >= 0) {
    binCount = 1;
    for (Node<K,V> e = f;; ++binCount) {
    K ek;
    if (e.hash == hash &&
    ((ek = e.key) == key ||
    (ek != null && key.equals(ek)))) {
    oldVal = e.val;
    if (!onlyIfAbsent)
    e.val = value;
    break;
    }
    Node<K,V> pred = e;
    if ((e = e.next) == null) {
    pred.next = new Node<K,V>(hash, key,
    value, null);
    break;
    }
    }
    }
    else if (f instanceof TreeBin) {
    Node<K,V> p;
    binCount = 2;
    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    value)) != null) {
    oldVal = p.val;
    if (!onlyIfAbsent)
    p.val = value;
    }
    }
    }
    }
    if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
    treeifyBin(tab, i);
    if (oldVal != null)
    return oldVal;
    break;
    }
    }
  6. addCount方法分两部分,一、更新baseCount,二、判断是否扩容

    1
    addCount(1L, binCount);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private final void addCount(long x, int check) {}
/*
*CounterCell是多线程的情况下辅助计算baseCount;
*多线程添加时,通过随机在CounterCell数组中选一个来记录添加的map大小,减少多线程的竞争。
*最后通过baseCount加上所有的CounterCell.value得出最终的baseCount。
*/
CounterCell[] as; long b, s;
/*baseCount的更新
* counterCells==null,通过cas更新baseCount。成功,更新完成。失败则进入if块处理。
* counterCells!=null,直接进入if块处理
*/
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//cas失败进入fullAddCount方法循环cas
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
//将CounterCell数组记录的值加入baseCount中
s = sumCount();
}
//判断扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//s是上面计算的容量,s>sizeCtl是扩容。sizeCtl一般是0.75*table.length,表示扩容阈值。
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
/*sizeCtl<0
*代表正在进行初始化或扩容操作
*-1代表正在初始化
*-N 表示有N-1个线程正在进行扩容操作
*/
if (sc < 0) {
//扩容任务已经全部分配或者扩容已经完成,则当前线程不需要再扩容。
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//如果已经有其他线程在执行扩容操作,sizeCtl+1,参与扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//扩容方法
transfer(tab, null);
s = sumCount();
}
}
}

扩容transfer

transfer是可以多线程进行的。通过给每个线程分配迁移(将table中的hash桶迁移到新的nextTab中)的hash桶,每个线程最少迁移16个hash桶,用transferIndex来同步。一个map扩容,要迁移的hash桶是原来的table长度。分配时从后面的分配起。从下图看出,每个线程分配的hash桶脚标范围是transferIndex-stride(线程迁移的hash桶个数)到transferIndex-1。分配完一个线程后,通过cas将transferIndex设置为transferIndex-stride。

image
线程迁移的hash桶个数最少是16,图中为了方便没画这么多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//设置stride,stride是 分配给该线程迁移的hash桶个数,最小值是MIN_TRANSFER_STRIDE
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
//nextTab为null时,这个第一个扩容的线程,初始化nextTab为原来table的2倍
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
//初始transferIndex
transferIndex = n;
}
int nextn = nextTab.length;
//初始ForwardingNode节点,指向扩容的nextTab
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//
if (--i >= bound || finishing)
advance = false;
//如果transferIndex<0,说明要迁移的hash桶都分配给线程执行了。
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//cas修改,transferIndex。分配迁移hash桶任务。该线程负责迁移的hash桶脚标范围 bound-i
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {

//该线程要迁移的hash桶的最小脚标
bound = nextBound;
//该线程要迁移的hash桶的最大脚标
i = nextIndex - 1;
advance = false;
}
}
//
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//所有线程都完成任务
finishing = advance = true;
i = n; // recheck before commit
}
}
//如果table[i]==mull,设置ForwardingNode节点,用于占位。
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
//迁移操作,锁住要迁移的hash桶
synchronized (f) {
/**
* hash桶中的节点rehash有两种情况。因为节点的key相同,索引hash值也相同。算脚标是hash&
* (length-1),也就是只取hash值的前几位。扩容后length是之前的2倍。算脚标时,会取多一位。
* 所以根据hash&(length-1)的结果,
* 1.如果最高位是0,则脚本和之前一样,移到原来位置。
* 2.如果最高位是1,新的脚标位置就是 length+原来脚标。
* 下面链表和红黑树都是这样将hash桶分成两份,设置在新表的对应位置。
*/
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//hash桶是链表结构
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//hash桶是红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

get方法

get方法不需要加锁。因为Node的val是volatile修饰的,所以其他线程的修改对当前线程是可见的。

  1. 先获取key对应的脚标i,table[i]==null,返回null。table[i]节点的key是否和key是否相等。相等则返回val。
  2. table[i].hash<0,则说明节点是TreeBin或者ForwardingNode节点,通过该节点的find方法找出key对应的节点。
  3. 不是以上情况则是链表,遍历链表找到key对应的节点值。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
     public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
    (e = tabAt(tab, (n - 1) & h)) != null) {
    if ((eh = e.hash) == h) {
    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
    return e.val;
    }
    else if (eh < 0)
    return (p = e.find(h, key)) != null ? p.val : null;
    while ((e = e.next) != null) {
    if (e.hash == h &&
    ((ek = e.key) == key || (ek != null && key.equals(ek))))
    return e.val;
    }
    }
    return null;
    }

死锁

发表于 2018-09-06 | 分类于 多线程

死锁是常见的线程活性问题。多个线程互相等待对方而导致永远暂停称为死锁。常见的死锁q情况是线程A持有锁L1的情况下申请锁L2,线程B持有锁L2申请锁L1。线程A等待锁L2释放,线程B等待锁L1释放,从而互相等待永远暂停。

死锁的必要条件

  1. 资源互斥,资源同时只能被一个线程占用。
  2. 资源不可抢夺,资源被一个线程占用时,其他线程无法抢夺。
  3. 占用并等待资源,线程持有资源,并申请另外的资源而进入等待时。不会释放现有资源。
  4. 循环等待资源,线程A持有锁L1的情况下申请锁L2,线程B持有锁L2申请锁L1。线程A等待锁L2释放,线程B等待锁L1释放。A持有L1等L2,B持有L2等L1 这就是循环等待。

哲学家用餐问题

哲学家用餐问题是经典的死锁问题。一群哲学家围着一个大圆桌坐下,每个哲学家面前都有一个碗和一根筷子。哲学家要么思考要么吃饭。哲学家吃饭时总是先拿起左手边的筷子再拿起右手边的筷子。只有拿到2根筷子的哲学家可以吃饭。哲学家吃饭吃着会放下筷子,再次思考。

将问题简化,假设只有2个哲学家。哲学家p1想吃饭,先拿起他左边的筷子c1。这时哲学家p2也想吃饭,拿起他左边的筷子c2。哲学家相当于线程,筷子相当于锁。这样就进入死锁状态。
image

代码模拟

哲学家抽象类

定义哲学家的吃饭,思考动作和 身份标识id,左右筷子属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public abstract class AbstractPhilosopher  extends Thread {
protected int id;
protected Chopsticks left;
protected Chopsticks right;
public AbstractPhilosopher(int id,Chopsticks left,Chopsticks right){
this.id=id;
this.left=left;
this.right=right;
}

@Override
public void run() {
//哲学家不是在思考,就是在吃饭
while (true){
think();
eat();
}
}
public abstract void eat();
public void think(){
try {
System.out.println(String.format("%s thinking",this.toString()));
Thread.sleep(new Random().nextInt(2)*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void doEat(){
try {
System.out.println(String.format("%s eating",this.toString()));
Thread.sleep(new Random().nextInt(2)*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public String toString() {
return String.format("Philosopher%d",this.id);
}
}

筷子类

筷子类充当锁对象,获取锁相当于拿起筷子,释放锁相当于放下筷子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Chopsticks {
private int id;
public Chopsticks(int id){
this.id=id;
}

public int getId() {
return id;
}
@Override
public String toString() {
return String.format("Chopsticks%d",id);
}
}

哲学家实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class DeadLockPhilosopher extends AbstractPhilosopher {
public DeadLockPhilosopher(int id, Chopsticks left, Chopsticks right) {
super(id, left, right);
}
@Override
public void eat() {
//拿起左边筷子
synchronized (this.left){
System.out.println(String.format("%s拿起%s",this.toString(),this.left.toString()));

//为了使死锁发生频率高点,在这里等下哲学家2获取锁
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (this.right){
//拿起右边筷子
System.out.println(String.format("%s拿起%s",this.toString(),this.right.toString()));
this.doEat();
}
}
}
}

模拟

1
2
3
4
5
6
7
8
9
10
11
12
13
 public static void main(String[] args) throws InterruptedException {

Chopsticks c1 = new Chopsticks(1);
Chopsticks c2 = new Chopsticks(2);
//筷子c1是哲学家p1的左筷子,c2是p1的右筷子
DeadLockPhilosopher p1 = new DeadLockPhilosopher(1,c1,c2);
//筷子c2是哲学家p2的左筷子,c1是p2的右筷子
DeadLockPhilosopher p2 = new DeadLockPhilosopher(2,c2,c1);
p1.start();
p2.start();

p1.join();
}

运行结果

1
2
3
4
Philosopher1  thinking
Philosopher2 thinking
Philosopher2拿起Chopsticks2
Philosopher1拿起Chopsticks1

哲学家p2拿起了筷子c2,哲学家p1拿起了筷子c1。产生死锁。

避免死锁的解决方法

死锁的解决方法可以从死锁的产生条件入手。只要消除死锁四个必要条件之一就可以避免死锁。

粗粒锁法

通过一个大的锁来替代多个细粒度的锁。由于只有一个锁,死锁的必要条件占用并等待资源和循环等待资源都不成立。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class BigLogPhilosopher extends AbstractPhilosopher {
private final static Object GLOBAL_LOCK=new Object();
public BigLogPhilosopher(int id, Chopsticks left, Chopsticks right) {
super(id, left, right);
}

@Override
public void eat() {
synchronized (GLOBAL_LOCK){
//拿起左边
System.out.println(String.format("%s拿起%s",this.toString(),this.left.toString()));
//拿起右边筷子
System.out.println(String.format("%s拿起%s",this.toString(),this.right.toString()));
this.doEat();
}
}
}

粗粒锁的缺点是降低了并发,浪费资源。如果有4个哲学家的情况下,一个哲学家吃饭,还剩下2根筷子可以给一个哲学家同时用餐。
但是用粗粒锁的方法同时只能一个哲学家吃饭。

锁排序法

锁排序是给锁进行排序,所有线程申请锁都按排好的顺序。从而消除循环等待条件。

如哲学家问题中,给筷子排序,每次哲学家不按照左右顺序来拿筷子。而是都拿id小的筷子再拿大的。
所以哲学家p1,p2都会先拿筷子c1。当哲学家p1拿到筷子c1后,哲学家p2获取不到锁进入阻塞。死锁不会发送。

1
2
3
4
5
6
7
8
9
10
public class SortLockPhilosopher extends DeadLockPhilosopher {
public SortLockPhilosopher(int id, Chopsticks left, Chopsticks right) {
super(id, left, right);
if(left.getId()>right.getId()){
this.left=right;
this.right=left;
}
}

}

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) throws InterruptedException {
Chopsticks c1 = new Chopsticks(1);
Chopsticks c2 = new Chopsticks(2);
DeadLockPhilosopher p1 = new SortLockPhilosopher(1,c1,c2);
DeadLockPhilosopher p2 = new SortLockPhilosopher(2,c2,c1);
p1.start();
p2.start();

p1.join();
}

用tryLock(long,TimeUnit)

第三种方法是通过tryLock(long,TimeUnit),固定时间内获取不了锁就获取失败返回false。哲学家问题中,可以在超时获取不到锁的情况下,将本来就持有的锁放下,即可消除占有并等待资源条件,避免死锁。

懒加载中的DCL

发表于 2018-09-06 | 分类于 多线程

java中经常会使用延迟一些高开销对象的初始化过程,等到使用再加载,称之为懒加载。

单例中懒汉式

下面时单例中的懒汉式代码。

1
2
3
4
5
6
public static Instance getInstance(){
if(instance==null){ //1
instance=new Instance(); //2
}
retrun instance; //3
}

这代码子是典型的 check-then-act的模式,在多线程情况下存在多线程安全问题。例如线程1执行完步骤1还未执行2时,让出执行权。线程2进来会将instance初始化。线程1唤醒时会再次初始化对象,破坏了单例。

DCL实现懒汉式

为了解决线程安全问题,getInstance方法要加锁。但是根据getInstance方法的作用。锁再instance对象初始化完毕后就没作用。 所以采用DCL (Double-Checked Locking)双层检查锁的方式。通过第一次检查如果instance初始化了就直接跳过加锁代码,返回instance。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Instance{
private Instance(){};
private static Instance instance;
public static Instance getInstance(){
if(instance==null){ //1
synchronized(lock){
if(instance==null){ //2
instance=new Instance(); //3
}

}
}
return instance; //4
}
}

DCL实现懒汉式存在问题

这种写法看上去解决了线程安全,也兼顾了效率。其实不然,这种代码会导致getInstanceh返回一个没有初始化完成的Instance对象。

问题出现在第三步,instance=new Instance()这行代码可以用下面三行伪代码表示

1
2
3
menory=allocate();    //1.分配内存
ctorInstance(menory); //2.初始化
instance=menory; //3.赋值给instance

intra-thread semantics(线程内语义)允许那些在单线程内,不会改变单线程程序执行结果的重排序。重排序后的顺序:

1
2
3
menory=allocate();    //1.分配内存
instance=menory; //2.赋值给instance
ctorInstance(menory); //3.初始化

对单线程来说只要Instance初始化在该线程访问Instance对象之前都是不会改变执行结果。

注意:

1
happens-before的锁定规则是“一个unLock操作先行发生于后面对同一个锁的lock操作”并不会禁止锁内部的重排序。

在这种情况下,如果线程1执行步骤2 instance=menory后,线程2获取执行权判断if(instance==null)时,instance不为null,线程2将会返回一个未初始化完成的instance对象。

执行顺序 线程1 线程2
1 分配内存
2 设置instance指向内存空间
3 判断instance是否为null
4 初次访问Instance对象
5 初始化Instance对象
6 初次访问Instance对象

解决方法

使用volatile+DCL

该问题产生的原因是2和3的重排序导致的。

1
2
3
menory=allocate();    //1.分配内存
instance=menory; //2.赋值给instance
ctorInstance(menory); //3.初始化

那么解决方案可以是禁止2和3的重排序。

通过将instance变量设置为volatile变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Instance{
private Instance(){};
private volatile static Instance instance;
public static Instance getInstance(){
if(instance==null){
synchronized(lock){
if(instance==null){
instance=new Instance();
}

}
}
return instance;
}
}

volatile变量会通过内存屏障来禁止重排序。

通过类加载实现

jvm类加载后,执行类初始化时,会去获取锁同步多线程初始化类。

1
2
3
4
5
6
7
8
class InstanceFactory(){
private InstanceHolder(){
public final static Instance instance= new Instance();
}
public Instance getInstance(){
return InstanceHolder.instance;
}
}

执行图

image

这里初始化过程虽然也发生了重排序,但是对于线程2来说这个操作是原子性的。线程2只能看到操作未开始或者已经结束之后。

FileAlterationMonitor实现文件监听

发表于 2018-09-06 | 分类于 工具类

文件监听组件

Apacha common.io2.0提供了监听文件变化的功能。

功能由三个组件组成。

  1. 监听器 FileAlterationListener

    用于实现文件改变时触发的行为。

  2. 观察者 FileAlterationObserver

    用于观察文件的改变,通知注册的监听器执行相应的事件。

  3. 监视器 FileAlterationMonitor

    通过一线程,每间隔一段时间调用一次注册的观察者检查文件。

maven依赖

1
2
3
4
5
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
</dependency>

FileAlterationListener

通过继承FileAlterationListenerAdaptor,覆盖相应事件方法。这里只重写了文件改变。还有其他事件可以查看FileAlterationListener接口看下。

1
2
3
4
5
6
7
    public class FileAlterationReload extends FileAlterationListenerAdaptor {

@Override
public void onFileChange(File file) {
System.out.println("文件改变");
}
}

程序实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Demo {

public static void main(String[] args) throws Exception {


//检查classpath下properties文件夹下的properties文件。
FileAlterationObserver fileAlterationObserver = new FileAlterationObserver(Demo.class.getClassLoader().getResource("properties").getPath(),new PropertiesFileFilter());

FileAlterationListener fileAlterationListener =new FileAlterationReload();
//注册监听器
fileAlterationObserver.addListener(fileAlterationListener);
FileAlterationMonitor fileAlterationMonitor = new FileAlterationMonitor();
//注册观察者
fileAlterationMonitor.addObserver(fileAlterationObserver);
//启动监听
fileAlterationMonitor.start();
//让主线程别这么快结束。
Thread.sleep(1000000);

}
}

修改properties文件夹下的properties文件时,会输出 文件改变。

应用场景

可以用于实现配置文件的热部署

synchronized原理

发表于 2018-08-20 | 分类于 多线程

synchronized关键字是jdk提供用于实现内部锁。

synchronized实现机制

synchronized代码块通过加锁的方式,只有获取到锁的线程才可以进去该代码块。使多个线程排队运行该代码块,保证代码块的原子性(对其他线程来说。只能看到别的线程还未执行或执行完毕的状态)。

monitor
  1. Monitor对象是jvm实现的,c++中的对象。
  2. 每个对象内部都有一个唯一的monitor,monitor的获取是互斥的。
  3. synchronized通过monitor来实现互斥。
  4. monitor的本质是依赖于底层操作系统的Mutex Lock实现,操作系统实现线程之间的切换需要从用户态到内核态的切换,切换成本非常高。
javap反汇编分析

通过javap将下列代码反汇编

1
2
3
4
5
6
7
8
9
10
11
12
13
public class SynchronizedDemo {
private int i=1;

public void test1(){
synchronized (this){
i++;
}
}
public synchronized void test2(){
i++;
}

}

test1用synchronized代码块,通过monitorenter获取锁,执行完后用monitorexit释放锁。
image

test2是synchronized方法,jvm通过ACC_SYNCHRONIZED标志,内部实现。
image

无论是monitorenter/monitorexit还是ACC_SYNCHRONIZED,最终都是通过获取锁对象的monitor实现互斥。进入synchronized代码块前会通过monitorenter来获取monitor对象。获取失败的线程进入同步队列中等待。获取monitor对象的线程执行完后通过monitorexit指令来释放锁。释放锁会通知同步队列中阻塞的线程出队列再次申请锁。
image

Synchronized 1.6的优化

锁的强度分四种级别,依次是:无锁状态、偏向锁状态、轻量级锁状态、重量级锁状态。他们会随着竞争的激烈而逐渐升级。

锁的相关信息存在对象的对象头的Mark Word:
Mark Word是hotSpot虚拟机的对象的对象头中的一部分,用于存储对象滋生运行时数据,如hashcode,GC分代年龄、锁状态标志,线程持有锁,偏向线程id,偏向时间戳等信息。

Mark Word结构
image

适应自旋锁

线程的阻塞和唤醒需要cpu从用户态切换成内核态,给cpu造成很大的负担。而往往一个线程从阻塞到唤醒只经历很短的一段时间,所以引入自旋锁。通过无意义的循环进行等待锁释放,而不会立刻进入阻塞,这就是自旋。因为自旋是会消耗cpu的,所以要有个自旋次数限制,达到自旋次数还未获取锁就进入阻塞。适应自旋锁会根据自旋获取锁的成功率来调整自旋次数,如果获取锁成功率高会调高自旋次数,否则反之。

锁消除

为了保证数据的完整性,我们在进行操作时需要对这部分操作进行同步控制,但是在有些情况下,JVM检测到不可能存在共享数据竞争,这是JVM会对这些同步锁进行锁消除。锁消除的依据是JIT编译器借助逃逸分析技术分析锁对象是否只能给一个线程访问而没发布到其他线程,锁消除就是在JIT生成动态字节码时消除moniterenter(申请锁)和moniterexit(释放锁)两个字节码指令。

注意:锁消除并不意味着可以随意加锁,JIT只会对执行频率足够多的地方进行优化。

锁粗化

使用锁时,我们会尽力将锁的范围缩小,只在操作共享变量时同步以减小锁竞争的范围。但是如果一系列的连续加锁解锁操作,频繁的获取释放锁可能会导致不必要的性能损耗,所以就是将多个连续的加锁、解锁操作(锁对象相同)连接在一起,扩展成一个范围更大的锁,这就是锁粗化。

偏向锁

大部分锁并没有被争用,且在其生命周期内也许至多被一个线程持有。所以一个内部锁第一次被获取时,会将Mark Word的偏向线程设为获取锁的线程,是否偏向锁标志设为1。线程只要判断偏向线程是否是当前线程,是则说明当前线程获取锁了。否则会通过cas设置Mark work。失败的话,则说明存在争用,撤销偏向锁,锁升级为轻量锁

偏向锁获取和撤销流程
  1. 获取对象头的Mark Word;
  2. 判断Mark Word偏向锁标志位是否为1,锁标志位为 01。否,则cas竞争锁。是的话,该锁是偏向锁进入(3);
  3. 判断Mark Work中的线程ID是否设置,没设置则进入步骤(4);如果指向当前线程,则执行同步代码块;如果指向其它线程,进入步骤(5);
  4. 通过CAS原子指令设置Mark Word的线程ID为当前线程ID,如果执行CAS成功,则执行同步代码块,否则进入步骤(5);
  5. 如果执行CAS失败,表示当前存在多个线程竞争锁,当达到全局安全点(safepoint),获得偏向锁的线程被挂起,撤销偏向锁,并升级为轻量级,升级完成后被阻塞在安全点的线程继续执行同步代码块;(偏向锁会一直被持有Mark Word中的ThreadId一直指向获取锁的线程,直到其他线程来竞争锁)

image

轻量级锁

引入轻量级锁的主要目的是在没有多线程竞争的前提下,通过cas减少重量锁的使用。轻量锁依据是大部分锁再同步周期内不存在竞争。

轻量锁获取和撤销流程
  1. jvm会在进入同步块前会在当前线程的栈帧创建用于存储锁记录的空间。
  2. 轻量锁进入同步代码块前会将mark work复制到锁记录中,通过cas将对象头中的Mark Word替换为指向锁记录的指针。cas成功则获取锁,将锁标志位改成00表示轻量锁。失败则尝试自旋获取锁。
  3. 在执行完代码块释放锁时会通过cas将Mark Word替换回对象头。cas成功则释放锁。如果失败则说明同步周期内存在竞争,锁升级为重量锁。

因为自旋会消耗cpu,所以轻量锁一旦升级为重量锁就不会恢复。
image

重量锁

重量锁就是通过Monitor实现的互斥锁。monitor的本质是依赖于底层操作系统的Mutex Lock实现,操作系统实现线程之间的切换需要从用户态到内核态的切换,切换成本非常高。

ReentrantLock中的公平锁和非公平锁的原理

发表于 2018-08-18 | 分类于 多线程

ReentrantLock

ReentrantLock内部是通过AQS实现锁的功能,有公平锁和非公平锁两种实现。

  1. 公平锁,即锁的获取顺序按线程申请锁的先后顺序。
  2. 非公平锁,当一个线程t1申请锁时,锁刚好释放。即使已有其他线程在t1之前申请锁排队,线程t1还是会获取锁。这样减少了线程的等待唤醒的可能,减少上下文切换带来的开销。因为获取锁的顺序和申请顺序可能不一致所以叫非公平锁。

前置技能(先了解前置技能才好看懂)

  1. AQS
  2. CLH

ReentrantLock中的Sync

Sync是个抽象类,非公平锁和公平锁都基于这个类实现。这里实现了非公平的占用锁方法。非公平锁为了减少线程的等待唤醒。在锁释放的情况下,新的线程会直接占用锁而不管等待队列中有没有线程。这样减少了新线程的等待唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
abstract static class Sync extends AbstractQueuedSynchronizer {
...
/**
*非公平锁的占用方法
*/
final boolean nonfairTryAcquire(int acquires) {
//获取当前的线程
final Thread current = Thread.currentThread();
//获取当前的锁占用状态
int c = getState();
//state==0则说明没有线程占用锁
if (c == 0) {
//此时会直接把锁给当前线程,而不去判断CLH队列中的是否已有等待线程。
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//这里是重入锁的处理,当前线程重入锁,state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

...
}

非公平锁

ReentrantLock中的非公平锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;


final void lock() {
//当前线程进来先直接用cas尝试占用锁,失败再调用acquire
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

//acquire会调用tryAcquire判断占用锁是否成功,这里直接调用了Sync的非公平锁处理方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

公平锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
*公平锁的处理
*
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//会判断队列中是否有等待线程,有则获取锁失败,进入队列等待。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

总结

  1. 非公平锁减少了线程发生等待唤醒的可能,节省了上下文切换的开销。
  2. 公平锁适合锁持有事件较长或者线程申请锁的间隔事件相对长的情况。
  3. 总的来所,公平锁的开销比非公平锁大,所以ReentrantLock默认支持的是非公平锁。
12345

wujiazhen

42 日志
18 分类
27 标签
GitHub
© 2019 wujiazhen
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4