线程池

为什么要使用线程池

线程的创建和销毁会带来系统的开销。通过线程池进行线程的管理,可以进行线程的复用,避免线程频繁的创建和消耗。

《java并发编程的艺术》 合理利用线程池能够带来三个好处

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

线程池的属性

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

线程池的大小

线程池的大小有三种。

  1. 当前线程池大小
    当前线程池中的线程数量

  2. 核心线程池大小 corePoolSize
    线程池提交一个任务。如果当前线程数量<corePoolSize,即使存在空闲线程可以执行任务也会创建一个新的线程。prestartAllCoreThreads方法可以提前创建并启动所有核心线程。

  3. 最大线程池大小 maxumumPoolSize
    线程池允许创建的最大线程数。如果任务队列满了,且当前的线程数小于最大线程池大小,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

线程存活时间 keepAliveTime

线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

工作队列 workQueue

用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

线程工厂 threadFactory

用于创建线程的工厂

拒绝策略 handler

工作队列满了和线程池大小已经达到最大线程池大小时,说明线程池已经饱和,任务无法处理。这时通过拒绝策列处理任务。
JDK1.5提供的四种策略。
AbortPolicy:直接抛出异常。
CallerRunsPolicy:用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy:丢弃当前任务。
可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略

线程池执行流程

任务提交流程

  1. 向线程池提交任务
  2. 判断当前线程池大小是否大于核心线程池大小。如果当前线程池大小大于核心线程池大小则执行步骤3,否则创建线程执行任务。
  3. 判断工作队列是否已满。已满则继续执行步骤4,否则将任务加入工作队列,等待线程池中线程读取队列,执行任务。
  4. 判断当前线程池大小是否大于最大线程池大小。如果当前线程池大小大于最大线程池大小则执行步骤5,否则创建线程执行任务。
  5. 线程池饱和,执行拒绝策列RejectedExecutionHandler.rejectedExecution。

源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
if (command == null)
throw new NullPointerException();
int c = ctl.get();

if (workerCountOf(c) < corePoolSize) {
/**
*当前线程池大小小于核心线程池大小,通过addWorker方法
*new一个Worker(一个Worker相当于一个线程)对象执行
*/
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* 当前线程池大小不小于核心线程池大小,则将任务加入workQueue
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果任务加入workQueue失败(可能任务队列满了),则尝试addWorker执行。还是失败就执行拒绝策略
else if (!addWorker(command, false))
reject(command);

Worker.runWorker()方法,执行任务,执行完创建Worker时的第一个任务后。会通过getTask()获取workerQueue中的任务
执行,getTask()方法阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}

关闭线程池 shutdown和shutdownNow

shutdown:设置线程池状态为SHUTDOWN,中断所有没有执行任务的线程。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
shutdownNow:设置线程池状态为STOP,尝试停止所有正在执行任务或暂停任务的线程(通过interrupt)。shutdownNow并不代表线程池就一定立即就能退出,(因为任务如果没有处于阻塞等待状态,interrupt无法中断)它可能必须要等待所有正在执行的任务都执行完成了才能退出。

Executor框架

Thread即使工作单元也是执行机制。从jdk5起把工作单元和执行机制分开。工作单元是Runnable,Callable,执行机制由Executor框架提供。

Executor框架组成

  1. 任务 实现Runnable和Callable接口
  2. 执行机制 实现Executor接口。主要实现类由ThreadPoolExecutor和ScheduledThreadPoolExecutor。
  3. 异步计算结果 实现Future接口
  4. Executors工具类

ThreadPoolExecutor

ThreadPoolExecutor是线程池的实现。构造函数的参数前面已经介绍。

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

Executors工具类提供创建ThreadPoolExecutor的方法。

CachedThreadPool

CachedThreadPool是一个可缓存的线程池。可以从ThreadPoolExecutor的构造函数参数看出它的功能。核心线程池大小0,最大线程池大小Integer.MAX_VALUE,使用不存储元素的阻塞队列SynchronousQueue。所以CachedThreadPool的线程池大小几乎无限大,提交任务时如果没有空闲的线程(没线程读取SynchronousQueue)就会创建一个线程。工作线程闲置60s,将会被终止。

适用:适合于大量执行时间短的任务。

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

FixedThreadPool

FixedThreadPool,固定大小的线程池。核心线程池大小和最大线程池大小一致。线程存活时间为0意味着空闲线程池不会被终止。提交任务时,都会创建一个线程。直到线程数达到指定的线程数nThreads,才会将任务放入队列。LinkedBlockingQueue时有界的阻塞队列,默认大小时Integer.MAX_VALUE。

适用:执行长期的任务,负载比较重的服务器。

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

SingleThreadExecutor

SingleThreadExecutor,单线程的线程池。线程池大小只能是1.

适用:按顺序执行任务,且任何时间的不会有多个线程是活动的情况。

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

执行任务

线程池执行任务提供了2个方法 execute和submit

execute方法执行任务,没返回值。

1
2
3
4
5
6
7
 ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println("run");
}
});

submit方法有返回值Future,通过Future对象可以获取返回值。

1
2
3
4
5
6
7
8
9
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> submit = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return "run";
}
});
//阻塞获取返回值
System.out.println(submit.get());

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor和实现了ScheduledExecutorService接口。是用于执行延时任务和周期任务的线程池。功能与Timer类似。Timer是单线程,ScheduledThreadPoolExecutor是多线程。

ScheduledExecutorService API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 延迟执行Runnable任务
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);

/**
* 延迟执行Callable任务
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);

/**
* 按固定的频率period执行任务。initialDelay是第一次运行时的延迟时间。执行时间是 initialDelay+n*period. n是第几次执行。
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

/**
* 每次任务执行完后延迟时间delay后再次执行,执行时间是initalDelay+n*(任务执行时间+delay)
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);

使用

1
2
3
4
5
6
7
8
9
10
11
12
//通过Executors创建
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
//一秒执行一次
ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date()));
}
}, 0, 1, TimeUnit.SECONDS);
Thread.sleep(10000);
//取消任务
scheduledFuture.cancel(true);

Future

用于操作任务的计算结果和任务的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 取消任务
* @Param mayInterruptIfRunning 是否中断执行的任务
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* 是否已经取消
*/
boolean isCancelled();

/**
* 是否已经执行
*/
boolean isDone();

/**
* 阻塞获取执行结果
*/
V get() throws InterruptedException, ExecutionException;

/**
* 阻塞获取执行结果,直到超时退出阻塞状态。
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;