ScheduledThreadPoolExecutor源码解析

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor和实现了ScheduledExecutorService接口。是用于执行延时任务和周期任务的线程池。

ScheduledThreadPoolExecutor的延时执行的实现是通过队列DelayedWorkQueue和ScheduledFutureTask来实现。

核心内部类

ScheduledFutureTask

ScheduledFutureTask用于封装定期任务和获取任务结果。

1
2
3
4
5
6
 ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask的继承关系

ScheduledFutureTask的继承关系

看继承关系可以看到,父接口有一个Delayed接口。Delayed接口继承了Comparable接口。这两个方法非常重要。DelayedWorkQueue队列会根据compareTo的排序规则给队列元素排序,将执行时间早的任务放在队头。getDelay方法用于判断任务是否到了执行时间。下面是实现方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//还需要延迟多久
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
//按预定的执行时间排序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

DelayedWorkQueue

用于存储延迟执行的任务的阻塞队列。内部用数组实现,初始容量16。容量不足时会扩容50%。
queue数组表示一个二叉堆。

当父节点的键值总是大于或等于任何一个子节点的键值时为最大堆。 当父节点的键值
总是小于或等于任何一个子节点的键值时为最小堆

1
2
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];

入队

入队的任务是ScheduledFutureTask对象,会通过ScheduledFutureTask的compareTo进行任务的比较。将数组queue排列成最小堆。最早执行的任务(getDelay最小的)是根节点queue[0]。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
 public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
//1. 判断队列容量是否大于等于数组容量,是则需要扩容
if (i >= queue.length)
//2. 扩容,每次扩容50%
grow();
//3. 队列容量+1
size = i + 1;
//4. 队列中还没有元素
if (i == 0) {
//5. 加入第一个元素
queue[0] = e;
setIndex(e, 0);
} else {
//6. 队列中已经有元素就需要进行排序。
siftUp(i, e);
}
//7. 如果队头元素等于新的元素e,说明e执行时间比队列中其他元素早,唤醒消费线程,消费线程判断元素e是否达到执行时间。
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}

数组queue是一个二叉堆,新加入的元素通过堆排序找到合适的位置插入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
//父节点
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
//如果比父节点大,确定位置
if (key.compareTo(e) >= 0)
break;
//如果比父节点小,和父节点交换位置,再和父节点的父节点比较。
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}

出队

queue数组表示二叉堆,queue[0]元素是根节点。判断根节点是否到了执行时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//1. 获取队头节点
RunnableScheduledFuture<?> first = queue[0];
//2. 队头为空,没元素。进行等待。
if (first == null)
available.await();
else {

long delay = first.getDelay(NANOSECONDS);
//3.判断队头节点是否到了执行时间
if (delay <= 0)
return finishPoll(first); //4. 返回队头节点,queue堆取掉头节点,进行调整

//4.队头节点没到执行时间,进入等待
first = null;

//这里是leader-follower模式的变种。为了减少不必要的等待。
//不是leader的线程会进行永久的等待直到被唤醒。leader线程只会等待到下个个延迟。
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

ScheduledThreadPoolExecutor任务执行流程

任务提交

ScheduledThreadPoolExecutor的schedule方法很多,都差不多。以scheduleAtFiexedRate为例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//1. 将任务封装成ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
//triggerTime方法会算出触发的具体时间,now()+initalDelay
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
//2.用outerTask保存当前任务。用于周期执行时,再次将任务加入队列。
sft.outerTask = t;
//3.延迟执行,将任务加入到DelayedWorkQueue队列中。
delayedExecute(t);
return t;
}

任务执行

执行任务时调用ThreadPoolExecutor的runWorker方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
//1.通过getTask方法从DelayedWorkQueue队列中拿出一个任务。
//没有达到执行时间的任务时,会阻塞
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//2. 调用任务的run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

ScheduledFutureTask的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run() {
//是否周期任务,period>0
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
//执行任务并重置任务状态
else if (ScheduledFutureTask.super.runAndReset()) {
//周期任务,设置下次执行的时间
setNextRunTime();
//再将outerTask(任务提交时将自身赋值给了outerTask)加入任务队列。
reExecutePeriodic(outerTask);
}
}