Java 线程池实现原理

提交任务流程

提交任务 submit() 流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
1.newTaskFor() 将任务包装成 FutureTask
2.execute(task)交给executor执行任务
执行任务
1.若当前线程数少于核心线程数,则直接创建并添加一个 worker 来执行任务.
创建成功直接返回,否则往下走
addWorker()方法
创建一个新的线程,并把当前任务 command 作为这个线程的第一个任务
addWorker()
一.循环CAS操作,将线程池中的线程数+1
0.外层循环
1.获取当前线程状态
2.若当线程池处于 SHUTDOWN 的时候,不允许提交任务,但是已有的任务继续执行.返回false
也就是只有
1.线程是Running状态
或2.线程是SHUTDOWN状态,但任务队列中还有任务.
允许走下面的创建线程流程.否则返回false
内层循环
3.若线程数超出给定上界,返回
CAS增加ctl数量,在线程池中为将要添加的线程留出空间.
(1)若成功则表示线程创建成功,结束外层循环.
(2)这里失败的话,说明有其他线程也在尝试往线程池中创建线程.若获取失败继续往下.
(3)重新获取线程状态,若和之前不一样了,则再进行下一次外层循环.到0
二.创建 worker线程,加入到线程池 workers 中
4.通过传入Runnable类型task,创建worker线程
通过线程工厂创建线程
加mainLock全局锁.该锁保证加锁期间,该线程不会被关闭
获取线程状态
若是 RUNNING 状态 或 (是SHUTDOWN状态,不接受新的任务,但是会继续执行等待队列中的任务)
将worker添加到workers列表中
记录线程池大小最大值 largestPoolSize
记录 workerAdded(添加 work 到 works 中成功) 标识为 true
mainLock解锁
若 workerAdded 标识为 true,,则启动worker中的线程
记录 workerStarted(worker已启动) 标识为 true
最终,若线程没能启动成功,走addWorkerFailed()清理流程
恢复clt的值
任务提交成功就能直接返回了
到这里说明上面一步执行时:
1.当前线程数大于等于核心线程数
要么 2.刚刚 addWorker 失败了
若线程池处于 RUNNING 状态,把这个任务添加到任务队列 workQueue 中
若添加任务到队列中成功
如果线程池已不处于 RUNNING 状态,那么移除已经入队的这个任务,并且执行拒绝策略
如果线程池还是 RUNNING 的,并且线程数为 0,那么开启新的线程
如果 workQueue 队列满了,那么进入到这个分支.
以maximumPoolSize为上界创建线程.
第二个参数false表示 以 maximumPoolSize 为界创建新的 worker,
若创建失败,说明当前线程数已经达到 maximumPoolSize,执行拒绝策略
3.返回task

线程run()逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
线程是ThreadPoolExecutor$Worker.thread 实例
runWorker()
获取当前worker中的第一个任务
若任务不为空 或 获取任务不为空
worker 加锁
前置处理钩子方法 beforeExecute()
执行任务 task.run()
捕获异常
后置处理钩子方法,将task和异常作为参数
任务置为空,下次循环调用 getTask() 获取任务
// 参见 getTask()获取任务逻辑
累加完成的任务数
释放掉 worker 的独占锁

getTask()获取任务逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
初始化超时标识为false
循环
1.若线程池处于shutdown状态 且 2.线程池处于stop或者terminate状态 或 队列为空
CAS 减少工作线程数
返回 null,外面调用处退出循环
获取线程数量
设置 线程是否能被超时回收标识(超时获取不到任务则回收线程).
根据 1.是否允许核心线程数内的线程被回收,或 2.当前线程数超过了核心线程数 来判断.
这两种情况则有可能发生超时关闭
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize
若 (工作线程数超过最大线程数 或 允许超时并且超时了) 并且 (线程数 > 1 或 虽然线程数小于1但是队列是空的)
(wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())
cas减少线程数,1.若成功则返回null;2.若不成功进行下一次循环
根据timed(是否允许超时回收标识判断)
若允许超时回收,则最多等待keepAliveTime时间到 workQueue 中取任务,取不到也返回
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
若不允许超时回收,则一直阻塞取任务,直到取到任务才返回.
workQueue.take()
1.若取到任务不为空,返回取到的任务.2.若为空,说明 取不到任务超时了,timedOut = true,进行下一次循环处理