一、继承关系&构建参数
跟其他池化技术一样,线程池的目的也是为了重复利用资源,节省开销,提升程序运行速度,java提供的线程池位于juc包中,这是它的继承树:
主要的实现逻辑都在ThreadPoolExecutor
里,本文也会围绕这个类展开。
这是初始化线程池的几个参数:
1 2 3 4 5 6 7
| int corePoolSize; int maximumPoolSize; long keepAliveTime; TimeUnit unit; BlockingQueue<Runnable> workQueue; ThreadFactory threadFactory; RejectedExecutionHandler handler;
|
拒绝策略可自行实现RejectedExecutionHandler
来定制,下面是一些线程池自带的拒绝策略:
策略类 |
是否默认 |
解释 |
AbortPolicy |
是 |
直接抛RejectedExecutionException |
DiscardPolicy |
否 |
不做任何处理,丢弃任务,且业务无感知 |
DiscardOldestPolicy |
否 |
将队列中最早放进去的任务顶掉,让新任务挤进去,业务无感知 |
CallerRunsPolicy |
否 |
让任务不再加入线程池执行,而是直接在当前线程里执行掉 |
其实正常情况下使用默认的策略就比较合适,这样业务方可以catch住这个异常,然后自行做处理。
二、源码解析
2.1:ctl
开始前,我们得先了解下线程池是怎么标记状态和当前线程数的,这是几个关键的字段和方法:
代码块11 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
private static final int TERMINATED = 3 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int RUNNING = -1 << COUNT_BITS;
|
上面的ctl变量既表示了线程池的运行状态,又表示了当前线程池内线程的数量,前三个bit位用来表示状态,后面的29个bit位表示线程数,这是针对每个线程池状态的解释:
状态 |
前三位bit值 |
解释 |
TERMINATED |
011 |
终结,hook方法terminated 执行完毕 |
TIDYING |
010 |
整理中,所有任务已终结,线程数为0,触发hook方法terminated |
STOP |
001 |
停止,不再接收新任务,也不再执行队列中的任务,中断所有执行中的任务 |
SHUTDOWN |
000 |
关闭,不再接收新任务,但会执行队列中的任务 |
RUNNING |
111 |
运行中,大部分时间所处的状态,可接收新任务,也会执行队列中的任务 |
这是线程池状态跃迁图:
2.2:操作ctl的相关方法
基于上面的了解,来看下下面几个基于ctl的关键函数(这些函数在后续的线程池状态判断、修改状态值和累加线程数时起着很重要的作用):
代码块21 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| private static boolean isRunning(int c) { return c < SHUTDOWN; }
private static int workerCountOf(int c) { return c & COUNT_MASK; }
private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); }
private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) { return c < s; }
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
|
这些方法以及对ctl的理解都非常重要,后面的流程会涉及很多对于ctl的判断和操作。
2.3:execute
下面开始进入核心流程,我们以最基本的execute方法为切入点进行详细解析,这是execute的源码(注:worker就是线程池里对Thread的封装,所以worker数量等同于线程数量):
代码块31 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { return; } c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) { reject(command); } else if (workerCountOf(recheck) == 0) { addWorker(null, false); } } else if (!addWorker(command, false)) { reject(command); } }
|
这是execute的流程图:
说明:图中带背景颜色
的均为后续要介绍的方法模块。
如果我们只考虑正常情况,即不考虑非RUNNING状态,corePoolSize都大于0的情况下,上面的逻辑就可以简化成下面这样:
一般来说,线程池绝大部分时间都处于RUNNING状态,如果只考虑大部分情况,那么流程图就得到了大幅度简化,理解起来也容易多了:当池内worker数还没达到corePoolSize
时,直接扩展核心worker,否则就把任务加到队列里,等待核心worker空闲下来后执行,当队列也满了,就会按照maximumPoolSize
扩展非核心worker来执行溢出的任务(执行完溢出的任务后会拉取队列中其他任务执行,缓解核心worker的压力)。
2.4:addWorker
addWorker主要用来创建worker,这是非常重要的一个方法,这是它的源码:
代码块41 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get(); ; ) { if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) { return false; }
for (; ; ) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) { return false; } if (compareAndIncrementWorkerCount(c)) { break retry; }
c = ctl.get(); if (runStateAtLeast(c, SHUTDOWN)) { continue retry; } } }
boolean workerStarted = false; boolean workerAdded = false; ThreadPoolExecutor.Worker w = null; try { w = new ThreadPoolExecutor.Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int c = ctl.get();
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) { largestPoolSize = s; } workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }
|
这是流程图:
这个流程相当复杂,但对于线程池来说这是值得的,因为这一步很关键,但这样并不利于我们理解,所以跟上面一样,这里假定没有特殊情况发生(即不考虑非RUNNING状态),正常情况下这块的逻辑走向如下:
所以正常情况下,逻辑很清晰明了,首先判断池子有没有满,满了就返回添加失败,没满就cas更新线程数,更新成功了就创建Worker对象,将这个对象加到workers集合(入池),加成功后启动内部的线程,这个线程就算是投入使用了,至于worker是怎么工作的?接下来介绍。
2.5:Worker类
worker是线程池对线程本身的一层包装,代码如下:
代码块51 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
public void run() { runWorker(this); }
protected boolean isHeldExclusively() { return getState() != 0; }
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; }
public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
|
runWorker
方法的代码如下:
代码块61 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| final void runWorker(ThreadPoolExecutor.Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
|
结合前面的代码,可以整理得出以下的流程图:
到这里,我们就已经基本梳理清楚了线程池大致的工作流程,线程池产生worker,worker们竞争消费queue里的任务,直到没有任务时陷入阻塞。下面就进入getTask方法,来看看具体是怎么从queue里取task的。
2.6:getTask
来看下worker是怎么获取任务的(需结合代码块6
理解):
代码块71 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| private Runnable getTask() { boolean timedOut = false;
for (;;) { int c = ctl.get();
if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
|
这是对应的流程图(需结合图7
理解):
结合代码和流程图,可知在通常情况下,如果池内的worker数量还没超出corePoolSize,那么在workQueue取任务时会采用take长阻塞的方式,反之会阻塞keepAliveTime的时间,超出这个时间必定结束阻塞,在结束阻塞后若还没拿到任务,那么该worker就很可能在下次循环中被废弃,线程池正是通过这个机制来回收那些超出corePoolSize部分的worker的。
2.7:processWorkerExit
这是源码:
代码块81 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| private void processWorkerExit(ThreadPoolExecutor.Worker w, boolean completedAbruptly) { if (completedAbruptly) { decrementWorkerCount(); }
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); }
tryTerminate();
int c = ctl.get(); if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } addWorker(null, false); } }
|
流程图如下:
这里面的逻辑非常简单,就是将worker从池子里移除,不过这里面有个函数的调用也要详细说下,它在execute
和addWorker
里都曾出现过,它就是tryTerminate
.
2.8:tryTerminate
这个方法用来终止整个线程池,当线程池处于RUNNING状态时,它不会做任何操作,这是它的源码:
代码块91 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| final void tryTerminate() { for (; ; ) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && !workQueue.isEmpty())) { return; }
if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; }
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } } } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (ThreadPoolExecutor.Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) { break; } } } finally { mainLock.unlock(); } }
|
这是流程图:
最后,再来看看shutdown
2.9:shutdown
shutdown逻辑特别简单,如下:
代码块101 2 3 4 5 6 7 8 9 10 11 12 13
| public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
|
当shutdown后,首先会将池状态标记为SHUTDOWN,然后将池内闲置的worker给终止掉,之后这批被interrupt的worker便会走processWorkerExit流程,移除worker,如果此时池内还有没被回收的worker,就继续终止worker、走processWorkerExit流程,以此类推,直到最后queue里没任务,且workers里没worker时,才会将池标记为TIDYING / TERMINATED
状态。
三、Executors类
本质上是个工具类,可以快速帮我们创建一些具有独特属性
的池,它为我们提供了下面几个方法:
3.1:newSingleThreadExecutor
代码:
代码块111 2 3 4 5 6
| public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
|
通过传给线程池的参数,不难猜出,这是个单线程处理的线程池,尽管maximumPoolSize配置了1,但队列采用没有边界限制的LinkedBlockingQueue
,容量可达Integer.MAX_VALUE
,所以正常情况下,该池有且仅有一个线程在工作。
3.2:newFixedThreadPool
代码:
代码块121 2 3 4 5
| public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
|
跟newSingleThreadExecutor类似,唯一不同的是这里可以传线程数,正常情况下,该池有且仅有nThreads个线程在工作。
3.3:newCachedThreadPool
代码:
代码块131 2 3 4 5
| public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
|
核心线程数为0,maximumPoolSize为Integer.MAX_VALUE
,利用SynchronousQueue做队列,SynchronousQueue队列无容器,当一个任务被推入时,只要该任务没被消费,那其他线程就无法再推新的任务进去,结合参数设置以及对execute流程的理解,可以推断出CachedThreadPool就是提供了一个可以趋近于无限扩容的线程池。
3.4:newWorkStealingPool & newScheduledThreadPool
代码:
代码块141 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }
|
这是为创建ForkJoinPool、ScheduledThreadPoolExecutor提供的快捷方法,不在本文章讨论范围内,暂时忽略。