池化技术(三)详解线程池ThreadPoolExecutor

一、继承关系&构建参数

跟其他池化技术一样,线程池的目的也是为了重复利用资源,节省开销,提升程序运行速度,java提供的线程池位于juc包中,这是它的继承树:

图1

主要的实现逻辑都在ThreadPoolExecutor里,本文也会围绕这个类展开。

这是初始化线程池的几个参数:

1
2
3
4
5
6
7
int corePoolSize;					// 核心线程数,保证至少有这么多个线程在提供服务
int maximumPoolSize; // 最大线程数,当任务队列被撑爆时,线程池便会扩充线程,扩充的线程数不会超过这个数
long keepAliveTime; // 线程存活时间,非核心线程最大空闲时间,超过这个时间没拿到可执行的任务就会被回收掉
TimeUnit unit; // 前面时间的单位
BlockingQueue<Runnable> workQueue; // 任务队列,线程池提交的任务(除各个线程第一个任务外)首先会被推入该队列内,工作线程也是从这个队列里消费任务的,当该队列溢出时,才会触发扩容条件,增加新的非核心线程来均摊压力
ThreadFactory threadFactory; // 创建线程的工厂,用来定制一些线程的特性,可不传
RejectedExecutionHandler handler; // 当无法再接收任务时的拒绝策略,可不传,也可自定义,默认的处理方式是直接抛RejectedExecutionException,线程池提供了一些可选策略,下面会列出来

拒绝策略可自行实现RejectedExecutionHandler来定制,下面是一些线程池自带的拒绝策略:

策略类 是否默认 解释
AbortPolicy 直接抛RejectedExecutionException
DiscardPolicy 不做任何处理,丢弃任务,且业务无感知
DiscardOldestPolicy 将队列中最早放进去的任务顶掉,让新任务挤进去,业务无感知
CallerRunsPolicy 让任务不再加入线程池执行,而是直接在当前线程里执行掉

其实正常情况下使用默认的策略就比较合适,这样业务方可以catch住这个异常,然后自行做处理。

二、源码解析

2.1:ctl

开始前,我们得先了解下线程池是怎么标记状态和当前线程数的,这是几个关键的字段和方法:

代码块1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 状态+线程数,初始状态为RUNNING,线程数为0
private static final int COUNT_BITS = Integer.SIZE - 3; // 后29位标记工作线程(worker)的个数

// COUNT MASK,结果:00011111111111111111111111111111,拿着这个数跟ctl做下与运算,得到的就是线程数,因为前三位与一下没了,只有后面的29位数字了
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// 终结,结果:01100000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;

// 整理中,结果:01000000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;

// 停止,结果:00100000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;

// 关闭,结果:00000000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;

// 运行中,结果:11100000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;

上面的ctl变量既表示了线程池的运行状态,又表示了当前线程池内线程的数量,前三个bit位用来表示状态,后面的29个bit位表示线程数,这是针对每个线程池状态的解释:

状态 前三位bit值 解释
TERMINATED 011 终结,hook方法terminated执行完毕
TIDYING 010 整理中,所有任务已终结,线程数为0,触发hook方法terminated
STOP 001 停止,不再接收新任务,也不再执行队列中的任务,中断所有执行中的任务
SHUTDOWN 000 关闭,不再接收新任务,但会执行队列中的任务
RUNNING 111 运行中,大部分时间所处的状态,可接收新任务,也会执行队列中的任务

这是线程池状态跃迁图:

图2

2.2:操作ctl的相关方法

基于上面的了解,来看下下面几个基于ctl的关键函数(这些函数在后续的线程池状态判断、修改状态值和累加线程数时起着很重要的作用):

代码块2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 当ctl的值小于SHUTDOWN(也就是0)时,就说明其处于运行中的状态(RUNING的第一位是1,表示它是个负数,最小)
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

// 获取线程数,跟COUNT_MASK做与运算,即可获得后29位的数字
private static int workerCountOf(int c) {
return c & COUNT_MASK;
}

// 利用cas增加线程数,成功创建线程worker后触发该方法
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

// 利用cas减少线程数,成功回收线程worker后触发该方法
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

// 拼接状态值和线程数,rs表示目标状态值,高3位有值,后29位为0,wc则相反,俩数做或运算,就拼接到了一起,主要用来刷新ctl的值
private static int ctlOf(int rs, int wc) {
return rs | wc;
}

// c:当前ctl的值,s:参与比较的目标状态值,当ctl的值小于目标状态值时,返回true
private static boolean runStateLessThan(int c, int s) {
return c < s;
}

// c和s释义同上,当ctl的值大于等于目标状态值时,返回true
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

这些方法以及对ctl的理解都非常重要,后面的流程会涉及很多对于ctl的判断和操作。

2.3:execute

下面开始进入核心流程,我们以最基本的execute方法为切入点进行详细解析,这是execute的源码(注:worker就是线程池里对Thread的封装,所以worker数量等同于线程数量):

代码块3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException(); // 提交的任务为空直接抛npe
}
int c = ctl.get(); // 获取ctl的值,根据这个值可拿到池状态(前3位)和worker数量(后29位)
if (workerCountOf(c) < corePoolSize) { // workers总数还没超过核心线程数
if (addWorker(command, true)) { // 直接添加核心worker
return;
}
c = ctl.get(); // 如果添加失败,则继续进行下面的逻辑,刷新下当前的ctl(应尽量保持ctl是最新值)
}

// 核心线程数添加失败、当前线程数已经等于核心线程数,都会触发下方逻辑
if (isRunning(c) && workQueue.offer(command)) { // 如果池状态没问题,会直接把新任务放到队列里
int recheck = ctl.get(); // 提交任务到队列后进行二次检查
if (!isRunning(recheck) && remove(command)) { // 如果池状态不正常,就移除掉刚加到队列里的任务
reject(command); // 然后拒绝掉本次任务
} else if (workerCountOf(recheck) == 0) {
// 池状态正常,或者移除任务失败,就进行这一步判断,如果worker数量为0,那就扩展一个worker
// 什么情况会触发这个条件呢?当你配了corePoolSize是0的时候,就很可能触发本段逻辑,
// 提交了任务没worker执行可不行,所以这里才需要判断一下workcount是否是0,若是,即刻扩展一个worker投入使用
addWorker(null, false);
}
} else if (!addWorker(command, false)) { // 当queue被撑爆,或者池状态不正常,则无脑添加一个worker,这是为了缓解当前压力,多一个worker意味着可以更快执行完队列里积压的任务
reject(command); // 当然,如果连worker都无法扩展,那就直接拒绝掉这次任务吧~
}
}

这是execute的流程图:

图3

说明:图中带背景颜色的均为后续要介绍的方法模块。

如果我们只考虑正常情况,即不考虑非RUNNING状态,corePoolSize都大于0的情况下,上面的逻辑就可以简化成下面这样:

图4

一般来说,线程池绝大部分时间都处于RUNNING状态,如果只考虑大部分情况,那么流程图就得到了大幅度简化,理解起来也容易多了:当池内worker数还没达到corePoolSize时,直接扩展核心worker,否则就把任务加到队列里,等待核心worker空闲下来后执行,当队列也满了,就会按照maximumPoolSize扩展非核心worker来执行溢出的任务(执行完溢出的任务后会拉取队列中其他任务执行,缓解核心worker的压力)。

2.4:addWorker

addWorker主要用来创建worker,这是非常重要的一个方法,这是它的源码:

代码块4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (int c = ctl.get(); ; ) { // 外层死循环
// Check if queue empty only if necessary.
// 下面这个复杂的判断等价于 rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty)
// 概括为以下3个条件:
// 条件1:线程池不在RUNNING状态,且状态是STOP、TIDYING或TERMINATED中的任意一种状态
// 条件2:线程池不在RUNNING状态,线程池接受了新的任务
// 条件3:线程池不在RUNNING状态,阻塞队列为空。
// 上面3个条件中满足任意一个,就拒绝执行任务和添加新的worker
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty())) {
return false;
}

for (; ; ) {
// 如果线程池线程数量达到上限,就直接拒绝添加新的worker
if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) {
return false;
}
if (compareAndIncrementWorkerCount(c)) { // 为线程数原子+1(cas更新)
break retry; // cas更新成功,直接跳出整个循环(内外循环均终止)
}

c = ctl.get(); // 如果cas更新线程数没有成功,那就再刷新下ctl的值
if (runStateAtLeast(c, SHUTDOWN)) { // 然后拿着新ctl的值查看状态是否正常
continue retry; // 如果状态不是RUNNING,那就终止内循环,再从外循环开始执行
}
// 如果走到这里,说明现在线程池状态依旧正常,且cas更新线程数失败,那就继续进行内部的死循环,直到cas成功跳出整个循环
}
}

boolean workerStarted = false;
boolean workerAdded = false;
ThreadPoolExecutor.Worker w = null; // worker是线程池的一个内部类,后面会详细介绍,现在只需要知道每个worker内都持有一个线程即可
try {
w = new ThreadPoolExecutor.Worker(firstTask); // 初始化Worker,并传入用户提交的任务,作为该线程的第一个任务执行
final Thread t = w.thread; // 取出其内部的线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 池锁,因为下面涉及一些线程不安全的操作
try {
int c = ctl.get(); // 同样获取最新的ctl值

// 当下面的条件满足任意一个时,才会真正将这个worker放入线程池中
// 条件1:当线程池是正常的RUNNING状态
// 条件2:是SHUTDOWN状态且提交的任务为空(此时说明任务已经入队列,且没有任何worker可以执行它,
// SHUTDOWN显然是希望worker入池执行队列里剩余任务的)
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // 如果当前新建的线程已经处于运行状态了,就抛异常(因为现在的线程还没start,这个alive正常情况下应该是false)
throw new IllegalThreadStateException();
workers.add(w); // 入池,本质是个HashSet
int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s; // 刷新历史worker数量的峰值
}
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 如果worker被加成功了,那就正式启动里面的Thread
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w); // worker未入池成功,或者启动失败,此时要触发addWorkerFailed做一些善后工作
}
return workerStarted; // 将是否入池成功作为结果返回
}

这是流程图:

图5

这个流程相当复杂,但对于线程池来说这是值得的,因为这一步很关键,但这样并不利于我们理解,所以跟上面一样,这里假定没有特殊情况发生(即不考虑非RUNNING状态),正常情况下这块的逻辑走向如下:

图6

所以正常情况下,逻辑很清晰明了,首先判断池子有没有满,满了就返回添加失败,没满就cas更新线程数,更新成功了就创建Worker对象,将这个对象加到workers集合(入池),加成功后启动内部的线程,这个线程就算是投入使用了,至于worker是怎么工作的?接下来介绍。

2.5:Worker类

worker是线程池对线程本身的一层包装,代码如下:

代码块5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 这个类继承了AQS,实现了Runnable接口
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
// 尽管这个类并不会被序列化,但这里仍然提供了serialVersionUID,主要用来抑制编译告警
private static final long serialVersionUID = 6138294804551838833L;

// 内置Thread,利用ThreadFactory产生,ThreadFactory可自定义
final Thread thread;

// 该worker要执行的第一个任务,允许为空,为空就直接从队列里拿要执行的任务
Runnable firstTask;

// 该线程一共执行了多少个任务
volatile long completedTasks;

// 构造器
Worker(Runnable firstTask) {
// 设置AQS的初始state为-1,这样别的线程就无法获取到该worker的锁
setState(-1);
// 初始化第一个要执行的任务
this.firstTask = firstTask;
// 利用ThreadFactory创建一个线程,将自己传进去(worker实现了Runnable接口)
this.thread = getThreadFactory().newThread(this);
}

// worker的run方法很简单,直接将该worker委托给了线程池的runWorker方法
public void run() {
runWorker(this);
}

protected boolean isHeldExclusively() {
return getState() != 0;
}

protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

runWorker方法的代码如下:

代码块6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// 这个方法属于ThreadPoolExecutor,Worker的主要逻辑都在这里
final void runWorker(ThreadPoolExecutor.Worker w) {
Thread wt = Thread.currentThread(); // 拿到worker对应的线程
Runnable task = w.firstTask; // 拿到默认要执行的任务
w.firstTask = null; // 清空默认任务
w.unlock(); // 这一步会让Worker lock的state变成0,从而可以使其他线程获得worker的锁
boolean completedAbruptly = true; // 标记worker是否遇到了意外终止了任务的执行,如果没遇到什么意外,程序终止时这个值应该是false
try {
// 死循环+阻塞的方式获取阻塞队列里的任务,当死循环结束时,worker终止
while (task != null || (task = getTask()) != null) {

w.lock(); // 拿到worker的锁(一般情况下,执行runWorker的线程都是worker对应的线程,所以锁一般都不会发生竞争,这里主要是标记自己为“忙碌”状态)

// 情况1. 如果线程池已经处于STOP状态并且当前线程没有被中断,中断线程(大部分情况下都是命中情况1)
// 情况2. 如果线程池还处于RUNNING/SHUTDOWN状态,并且当前线程已经被中断了,
// 重新检查一下线程池状态,如果处于STOP状态并且没有被中断,那么中断线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 任务运行前置处理逻辑
try {
task.run(); // 任务运行(到这一步,你提交的任务就被这个worker给运行了)
afterExecute(task, null); // 任务运行后置处理逻辑
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++; // 当前worker运行的任务数+1
w.unlock(); // 释放Worker的锁
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); // 一个worker结束后触发的逻辑
}
}

结合前面的代码,可以整理得出以下的流程图:

图7

到这里,我们就已经基本梳理清楚了线程池大致的工作流程,线程池产生worker,worker们竞争消费queue里的任务,直到没有任务时陷入阻塞。下面就进入getTask方法,来看看具体是怎么从queue里取task的。

2.6:getTask

来看下worker是怎么获取任务的(需结合代码块6理解):

代码块7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private Runnable getTask() {
boolean timedOut = false; // 一个标记,用来标记当前任务获取是否超时,需要结合下面的逻辑来理解

for (;;) { // 死循环
int c = ctl.get(); // 取ctl

// 当状态处于STOP/TIDYING/TERMINATED时,直接ctl-1并返回null终止当前worker
// 当状态处于SHUTDOWN且队列里任务为空时,也可以ctl-1,并返回null终止当前worker
// 因为SHUTDOWN状态下还需要处理队列里的任务,因此当队列里还有任务时,就不能直接终止worker
// 不得不吐槽一下,线程池里的条件判断可读性并不好(至少对于我来说是这样的。。)
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c); // 根据ctl获取worker count

// 是否启用按时间废弃worker策略,如果指定了allowCoreThreadTimeOut为true,那么所有的worker都会按时间废除,
// 否则只废除超出核心worker数的部分
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 条件1:worker count已经超出最大线程数,或者开启了按时间废弃worker策略并且已经阻塞超时
// 条件2:并且线程数超过1个或者队列里没有待执行的任务
// 当两个条件同时满足时,用cas减少worker count,死循环更新,直到成功
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 如果不出意外,程序会走到这里,当启用了按时间废弃worker时,获取任务时就阻塞keepAliveTime的时间,否则将一直阻塞
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; // 如果阻塞了keepAliveTime后没有拿到任何任务,就标记为获取超时,下次死循环时该状态会和timed一起作用生效
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

这是对应的流程图(需结合图7理解):

图8

结合代码和流程图,可知在通常情况下,如果池内的worker数量还没超出corePoolSize,那么在workQueue取任务时会采用take长阻塞的方式,反之会阻塞keepAliveTime的时间,超出这个时间必定结束阻塞,在结束阻塞后若还没拿到任务,那么该worker就很可能在下次循环中被废弃,线程池正是通过这个机制来回收那些超出corePoolSize部分的worker的。

2.7:processWorkerExit

这是源码:

代码块8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) {
if (completedAbruptly) { // 如果非正常结束,意味着getTask里对ctl-1失败,那么这里要补一次ctl-1的操作
decrementWorkerCount();
}

final ReentrantLock mainLock = this.mainLock; // 线程池锁
mainLock.lock();
try {
completedTaskCount += w.completedTasks; // 将被回收的worker里完成的任务数归入线程池整体完成的任务数
workers.remove(w); // 直接从池子里移除该worker
} finally {
mainLock.unlock(); // 解锁
}

tryTerminate(); // 触发tryTerminate(后面会细说这个方法是干嘛的)

// 下面这段代码是在判断是否需要再拓展一个worker
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // 如果当前池状态处于RUNNING或SHUTDOWN

if (!completedAbruptly) { // worker正常结束,需要进行下面的判断来决定是否补充worker

// 下面的条件概括一下就是:当worker count小于corePoolSize时,或者队列里还有任务要执行,
// 但当前worker数量为0时,就扩展一个worker,否则直接return
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
// 如果worker非正常结束,直接追加一个worker来弥补本次回收的损失;或者worker正常结束,但满足上面的条件需要扩容时,也会执行到这里
addWorker(null, false);
}
}

流程图如下:

图9

这里面的逻辑非常简单,就是将worker从池子里移除,不过这里面有个函数的调用也要详细说下,它在executeaddWorker里都曾出现过,它就是tryTerminate.

2.8:tryTerminate

这个方法用来终止整个线程池,当线程池处于RUNNING状态时,它不会做任何操作,这是它的源码:

代码块9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
// 条件1:线程池正在运行,正在运行,不允许终止逻辑执行
// 条件2:线程池状态处于TIDYING/TERMINATED,说明线程池已经终止过了
// 条件3:线程池处于SHUTDOWN且workerQueue里还有待执行的任务,SHUTDOWN是需要继续执行queue里的任务的,所以这个条件下也不能走终止逻辑
// 以上条件命中任意一个,都直接return,不作任何终止处理
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateLessThan(c, STOP) && !workQueue.isEmpty())) {
return;
}

// 当达到终止要求,且池内还有worker时,给一个worker的thread打上interrupt的标记
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE); // 这个方法在代码块的下方
return;
}

// 能走到这里,说明池子里已经没有worker了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 接着利用cas将池状态改成TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 如果cas成功,就再触发一下terminated方法(这个方法触发时机就是这里,
// ThreadPoolExecutor本身并没有对这个方法进行实现,可以重写该方法自行拓展逻辑)
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0)); // 池子的最终状态一定是TERMINATED
termination.signalAll();
}
return;
}
// cas失败会进入下一次循环
} finally {
mainLock.unlock();
}
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 池锁
try {
for (ThreadPoolExecutor.Worker w : workers) {
Thread t = w.thread;
// 如果worker并非interrupt状态,且本线程获得了它的锁,即可认为该worker闲置中,直接标记成interrupt即可
// 通过runWorker我们知道,如果worker处于运行中,那么它的锁其他线程是拿不到的,如果这里tryLock拿到了,说明其处于闲置(阻塞)状态
// 通过getTask我们知道,如果这里中断了worker里的线程,那么queue阻塞获取任务那里会直接抛Interrupt异常,重试后会发现池状态不对,
// 返回null,从而将这个worker从workers剔除,然后就又会触发一次tryTerminate,然后继续走到这里终止其他worker,
// 所以虽然这里onlyOne传的是true,每次只终止一个worker线程,但可以循环往复的触发
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne) {
// onlyOne为true的话只进行一次循环就结束,当线程池触发shutdown后,
// 会全部中断一次worker,但总有些worker还在执行任务,onlyOne模式就是用来收底的,
// 能保证这些被落下的worker也能被慢慢回收
break;
}
}
} finally {
mainLock.unlock();
}
}

这是流程图:

图10

最后,再来看看shutdown

2.9:shutdown

shutdown逻辑特别简单,如下:

代码块10
1
2
3
4
5
6
7
8
9
10
11
12
13
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 权限检查
advanceRunState(SHUTDOWN); // 死循环+cas将池状态更新成SHUTDOWN
interruptIdleWorkers(); // 这个直接触发interruptIdleWorkers(false),会把所有空闲的worker全部中断
onShutdown(); // 给ScheduledThreadPoolExecutor用的,ThreadPoolExecutor里该方法为空
} finally {
mainLock.unlock();
}
tryTerminate(); // 此时触发tryTerminate,尝试将池状态改成TIDYING/TERMINATED
}

当shutdown后,首先会将池状态标记为SHUTDOWN,然后将池内闲置的worker给终止掉,之后这批被interrupt的worker便会走processWorkerExit流程,移除worker,如果此时池内还有没被回收的worker,就继续终止worker、走processWorkerExit流程,以此类推,直到最后queue里没任务,且workers里没worker时,才会将池标记为TIDYING / TERMINATED状态。

三、Executors类

本质上是个工具类,可以快速帮我们创建一些具有独特属性的池,它为我们提供了下面几个方法:

3.1:newSingleThreadExecutor

代码:

代码块11
1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

通过传给线程池的参数,不难猜出,这是个单线程处理的线程池,尽管maximumPoolSize配置了1,但队列采用没有边界限制的LinkedBlockingQueue,容量可达Integer.MAX_VALUE,所以正常情况下,该池有且仅有一个线程在工作。

3.2:newFixedThreadPool

代码:

代码块12
1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

跟newSingleThreadExecutor类似,唯一不同的是这里可以传线程数,正常情况下,该池有且仅有nThreads个线程在工作。

3.3:newCachedThreadPool

代码:

代码块13
1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

核心线程数为0,maximumPoolSize为Integer.MAX_VALUE,利用SynchronousQueue做队列,SynchronousQueue队列无容器,当一个任务被推入时,只要该任务没被消费,那其他线程就无法再推新的任务进去,结合参数设置以及对execute流程的理解,可以推断出CachedThreadPool就是提供了一个可以趋近于无限扩容的线程池。

3.4:newWorkStealingPool & newScheduledThreadPool

代码:

代码块14
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}

这是为创建ForkJoinPool、ScheduledThreadPoolExecutor提供的快捷方法,不在本文章讨论范围内,暂时忽略。