J.U.C源码阅读笔记(五)线程池

线程池是存放线程的容器,内部维护了若干个线程。通过利用线程池可以避免频繁创建线程,销毁线程带来的系统内耗,提高吞吐量。在Java中用Thread对线程做了抽象,线程池的实现类是ThreadPoolExecutor。但是线程之间的切换需要系统调用进内核,一旦线程池中线程的数量比较多,线程切换带来的内耗会制约系统吞吐量。协程(在Windows上称为纤程)本质上是用户态的线程,协程的调度不需要进内核,在用户态即可完成,所以相对线程,协程更加轻量。在Java中QuasarLoom库中实现了协程。

Java线程池类结构

下图是Java中关于线程池的类的组织架构。顶层接口是Executor,里面只有一个execute(Runnable command)方法,ExecutorService扩展了顶层接口,添加了关闭线程池等方法,AbstractExecutorService 实现了ExecutorService中的部分方法。而真正要用到的类是ThreadPoolExecutor

线程池类的结构

ThreadPoolExecutor部分源码解读

属性

ThreadPoolExecutor主要属性包括线程池的状态以及线程池中工作线程的数量。ThreadPoolExecutor构造函数中包含了如下变量:

  • corePoolSize 核心线程数量
  • maximumPoolSize 线程池所允许的最大线程数量
  • keepAliveTime 线程存活时间(一般指corePoolSizemaximumPoolSize这些线程的存活时间,当调用allowCoreThreadTimeout()后,核心线程也可以超时退出)
  • TimeUnit 超时单位(毫秒、秒等)
  • BlockingQueue 阻塞队列用于存放任务
  • ThreadFactory 创建线程的工厂,可以用来根据业务用来设置线程的名字

线程池状态属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 工具数字,29。后面会用它进行位运算,创建线程池的状态
private static final int COUNT_BITS = Integer.SIZE - 3;
// 工具数字,后面利用它计算线程池的状态和线程池中工作线程数量。
// 线程池中最大可以创建的线程数量(2^29 - 1):00011111 11111111 11111111 11111111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

/* 用int的高 3 位表示[线程池]的状态,低 29 位存放线程池中工作线程数量 */

// 正在运行:允许接收任务,处理任务队列中的任务
// 11100000 00000000 000000000 00000000
private static final int RUNNING = -1 << COUNT_BITS;

// 关闭状态:不再接收新的任务,但会处理任务队列中的任务
// 00000000 00000000 000000000 00000000
private static final int SHUTDOWN = 0 << COUNT_BITS;

// 停止状态:不再接收新的任务,也不会处理任务队列中的任务,并且会中断正在执行任务的线程
// 00100000 00000000 000000000 00000000
private static final int STOP = 1 << COUNT_BITS;

// 正在整理中:所有的任务都已经终止,工作线程数量为 0,然后就会执行钩子函数 terminated(),转到终止状态,可以重写钩子函数。
// 01000000 00000000 000000000 00000000
private static final int TIDYING = 2 << COUNT_BITS;

// 终止状态:线程池彻底终止,当处于TIDYING状态线程池执行完钩子函数terminated()就会转换成这个状态
// 01100000 00000000 000000000 00000000
private static final int TERMINATED = 3 << COUNT_BITS;

// ctl高 3 位存储线程池的状态,低 29 位存储工作线程数量,ctl初始值为 -1。内部大量用到了ctl变量。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 将整数 c 的低 29 位修改为 0,就得到了线程池的状态(~CAPACITY = 11100000 00000000 00000000 00000000)
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 将整数 c 的高 3 为修改为 0,就得到了线程池中的线程数(CAPACITY = 00011111 11111111 11111111 11111111)
private static int workerCountOf(int c) { return c & CAPACITY; }

线程池状态转换

因为在创建工作线程以及任务执行过程中需要不断地的判断线程池状态,所以有必要了解:)

  • RUNNING -> SHUTDOWN:On invocation of shutdown(), perhaps implicitly in finalize()

  • (RUNNING or SHUTDOWN) -> STOP:On invocation of shutdownNow()

  • SHUTDOWN -> TIDYING:When both queue and pool are empty

  • STOP -> TIDYING:When pool is empty

    因为 STOP 状态不会在去处理队列中的任务,所以只需要考虑线程池中的线程数量

  • TIDYING -> TERMINATED:When the terminated() hook method has completed

执行任务源码

public void execute(Runnable command)

在将来某个时间执行给定的任务(将来的意思是任务可能会放到任务队列中),可能创建线程执行也可能利用现有的线程执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void execute(Runnable command) {
// 预处理
if (command == null)
throw new NullPointerException();
/*
* 线程池处理任务分为 3 个步骤:
*
* 步骤 1.
* 如果当前工作线程数量少于核心线程数,会尝试通过 addWorker(Runnable fisrtTask, boolean core) 创建线程执行 "给定的任务"
* 此时 firstTask 为给定的任务,core 为 true 代表工作线程数量边界为 "核心线程数量"
* 但是由于意外情况,addWorker 可能会失败: 比如执行 addWorker 时线程池状态变为 Shutdown、Stop 或者由于系统资源不* 够(当前用户进程 limit 受限 ulimit -u),导致 new Thread() 直接失败(抛 can not create native thread 异常)
* 所以如果 addworker 方法返回 false 说明存在意外情况导致创建失败,直接返回。
*
* 步骤 2.
* 当步骤 1 不能处理时,说明当前线程数量 >= 核心线程数量
* 如果处于 RUNNING 状态,并且任务成功入队后,会二次判断线程池状态
* 当处于未运行状态时会将刚才入队的任务出队,同时执行拒绝策略
* 当工作线程数量为 0 时(线程超时退出、异常退出),会尝试创建新的工作线程, 最终就是要保证,如果线程池处于正常状态,就务必要成功执行 addWorker()
*
* 步骤 3.
* 当步骤 2 不能处理时,说明线程池没有运行或者队列已经满了,此时会尝试创建新的工作线程
* 通过将 core 设置为 false,此时工作线程数量的边界为最大线程数量
* 如果创建工作线程失败,代表线程池处于未运行状态或者已经饱和,会尝试执行拒绝策略
*/
int c = ctl.get();
// 对应上述步骤 1
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 对应上述步骤 2
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池已经关闭,需要移除之前提交到队列中的任务,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 防止线程池没有关闭,但是工作线程数量为0(因为线程有存活时间或者出现异常情况)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 对应步骤 3
else if (!addWorker(command, false))
reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core)

创建工作线程,每个 Worker 内部维护一个Thread,也可以想成每个 Worker 是对 Thread 的抽象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
// 获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);
/*
* 步骤1. 检查线程池的状态
* 如果处于SHUTDOWN、STOP、TIDYING、TERMINATED状态时,一般就不再创建新的线程了,但是要处理一种特殊情况:
* 当线程池状态处于 SHUTDOWN 时,尽管不需要处理新来的任务,但是需要处理完阻塞队列中的任务
* 所以,当线程池状态为 SHUTDOWN && firstTask == null && !workQueue.isEmpty(),还会允许添加 Worker 执行队列中的任务
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
/*
* 步骤2. 判断当前线程数量和边界值,修改 worker 值
*/
for (;;) {
int wc = workerCountOf(c);
// 比较工作线程数量和边界CAPACITY,CAPACITY根据core的值动态调整
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 尝试将c中保存的工作线程数量加 1,
// 如果修改成功,会通过break,跳出外层循环到retry。
// 如果修改失败,说明有并发操作,此时会比较当前线程池的状态和进入这个方法时线程池的状态
//    如果状态不一致,说明线程池状态发生的变化,会通过continue重新进入外层for循环,重新走一遍逻辑,避免在线程池关闭之类的状态下执行后续的操作
// 如果状态一致,会重新在内层for循环中尝试修改ctl
if (compareAndIncrementWorkerCount(c))
break retry;
// 判断当前线程池的状态和进入这个方法时线程池的状态是否一致
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
/*
* 步骤3. 正式创建工作线程并启动它。
* Thread会被封装到Worker中
*/
// 工作线程是否启动
boolean workerStarted = false;
// 工作线程是否已经创建
boolean workerAdded = false;
Worker w = null;
try {
// 创建工作线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 获取到锁以后才尝试创建线程,因为在关闭线程池时也需要获取到这把锁
// 为了防止 [当关闭线程池以后并且也释放了这把锁],当获取到锁后还会重新检测线程池的状态,拿到锁以后,也就避免此时不会关闭线程池
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 只有线程池正在运行 或者 处于前面说的特殊情况
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将包装的Worker放到集合中
workers.add(w);
// 记录线程池中线程数量的峰值,统计用。
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 成功创建worker以后,会将worker中包装的Thread启动
if (workerAdded) {
// 启动Woker类里的Thread对象,因为Worker实现了Runnable
// 此时新创建的线程就会执行Worker中的run()方法。
t.start();
workerStarted = true;
}
}
} finally {
// 启动线程出现意外情况,执行相应的处理方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

private void addWorkerFailed(Worker w)

处理 [创建工作线程出现的失败情况],会将之前放到集合中的Worker对象移除,并且将ctl变量中保存的工作线程数量减1

1
2
3
4
5
6
7
8
9
10
11
12
13
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
// 添加线程失败,线程池可能处于异常状态,尝试中断线程
tryTerminate();
} finally {
mainLock.unlock();
}
}

public void run()

创建的线程会从run方法开始执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// 作为runWorker(Worker w)的代理方法,在Worker中实现
public void run() {
runWorker(this);
}

// 在ThreadPoolExecutor中实现
// 整体逻辑就是不断地的从任务队列中取任务执行
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
// 因为线程会被复用,将firstTask置为空避免重复执行
w.firstTask = null;
// 允许被中断(当调用shutdownnow时,需要获取锁 w ,然后中断 w 封装的线程)
w.unlock();
// 用于判断线程是否是在执行用户任务过程中出现的异常退出的
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池处于未运行状态,需要确保当前线程被中断
// 否则,需要确保当前线程不能被中断
// 重复调用 runStateAtLeast(ctl.get(), STOP) 是为了避免边界条件:
// 第一次runStateAtLeast(ctl.get(), STOP)返回false,当执行 || 右侧的Thread.interrupted()时,线程池停止了,但还没中断当前线程
// 此时需要线程自我中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// beforeExecute(wt, task); 和 afterExecute(task, thrown); 都是钩子方法
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 正式执行用户提交的任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
// 记录完成的任务数量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 当线程在执行任务中抛了异常,会转到这里
// 此时当前线程结束,如果线程池没有关闭,会重新创建一个线程代替当前线程
processWorkerExit(w, completedAbruptly);
}
}

private Runnable getTask()

线程会不断地尝试获取任务,当取任务超时后,会在符合预先条件的情况下结束当前线程:
此时getTask()会返回null,线程结束在runWorker(Worker w)中的while循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
private Runnable getTask() {
// 标识线程是否超时
boolean timedOut = false; // Did the last poll() time out?
// 直到超时、线程池关闭或者拿到任务才会退出
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 判断线程池是否关闭,如果关闭了,符合特殊情况:(SHUTDOWN并且工作队列为空)或 (线程池状态是STOP及以上)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 减少线程数
decrementWorkerCount();
// 返回null以后,这个线程会正常结束
return null;
}

int wc = workerCountOf(c);

// 判断是否允许[所有]线程超时退出
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果用户调用了setMaximumPoolSize()会导致wc > maximumPoolSize
// 如果 (允许所有线程超时 或者 当前线程数量已经超过corePoolSize) 并且当前线程已经超时了
// 此时需要结束当前线程(通过返回null)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 阻塞获取任务,如果在指定时间段内没有获取到,会返回null
// timed表示是否应该超时获取,注意此处也是 keepAliveTime 起作用的地方
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 未超时的情况下获取到了任务,直接返回
if (r != null)
return r;
// 超时了,下次进入循环中时会返回null
timedOut = true;
} catch (InterruptedException retry) {
// 被中断了会重试:可能调用setMaximumPoolSize方法,setMaximumPoolSize执行会中断线程
timedOut = false;
}
}
}

public interface RejectedExecutionHandler

ThreadPoolExecutor基于RejectedExecutionHandler提供了四种拒绝策略,自己也可以基于RejectedExecutionHandler定制拒绝策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 让提交任务的线程去执行
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

// 直接拒绝,并且抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {

public AbortPolicy() { }

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

// 直接丢弃,不抛出异常
public static class DiscardPolicy implements RejectedExecutionHandler {

public DiscardPolicy() { }


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

// 丢弃任务队列中最老的任务,也就是队列中队头,然后重新尝试将当前任务入队。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {

public DiscardOldestPolicy() { }


public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

关闭线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查调用 shutdown 的线程是否权限关闭线程池(针对里面的每个worker)
checkShutdownAccess();
// 死循环修改线程池状态为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}

ScheduledThreadPoolExecutor 源码解读

ScheduledThreadPoolExecutor 为定时线程池, 可以周期性的执行任务,文档:

Q&A

线程池工作流程?

线程池工作流

线程池的种类?

线程池的种类也就是线程池工具类 Executors中提供的线程池,实际上也就是通过调整ThreadPoolExecutor以及ScheduledThreadPoolExecutor构造函数中的参数。不过阿里巴巴手册中不建议用这个工具类,所以就简单看一下在ThreadPoolExectuor的构造函数中如何“搭配”变量。

  • 固定数量的线程池:核心线程和最大线程数量一样。

    1
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
  • 单线程线程池:核心线程和最大线程数量都为1

    1
    return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
  • 缓存线程池:最大线程数量为Integer.MAX_VALUE也可以认为是无界的,尽管会提高性能,但一般不会用这个,因为会创建大量的线程执行。

    1
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
  • 定时线程池

    1
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);

线程在执行任务中出现异常会怎么办?

参照runWorker()中,当线程在执行任务时抛出了异常,会触发processWorkerExit(w, completedAbruptly);此时当前线程结束,如果线程池没有关闭,会重新创建一个线程代替当前线程

执行拒绝策略的时刻?

  • 达到边界条件:任务队列满了,工作线程数量达到最大线程数量。
  • 非边界条件:当工作线程数量达到了核心线程数量,此时如果入队成功 => 如果线程池已经关了,会尝试移除刚才入队的任务。如果移除成功 => 会执行拒绝策略

拒绝策略使用注意事项?

注意在使用 Future 同时使用 Discard 策略时,注意设置超时时间,因为当执行了这个策略后,后续又执行了 Future.get(),此时会卡住当前线程,所以需要加上超时时间或者使用其他拒绝策略

参考