# 《并发设计模式》第39章-线程池模式-线程池执行任务源码深度解析

作者:冰河
星球:http://m6z.cn/6aeFbs (opens new window)
博客:https://binghe.gitcode.host (opens new window)
文章汇总:https://binghe.gitcode.host/md/all/all.html (opens new window)
源码获取地址:https://t.zsxq.com/0dhvFs5oR (opens new window)

沉淀,成长,突破,帮助他人,成就自我。

  • 本章难度:★★☆☆☆
  • 本章重点:了解线程池的应用场景,重点理解线程池模式的核心原理和应用,以及掌握线程池的提交任务的原理和源码执行流程,并能够结合自身项目实际场景思考如何将线程池模式灵活应用到自身实际项目中。

大家好,我是冰河~~

线程池执行任务时,是具备一定流程的:向线程池提交任务时,首先会判断线程池中的线程数是否已经达到corePoolSize,如果线程池中的线程数未达到corePoolSize,则直接创建新线程执行任务。否则,判断线程池中的工作队列是否已满,如果线程池中的工作队列未满,则将任务添加到队列中等待执行。否则,判断线程池中的线程数是否已经达到maximumPoolSize,如果线程池中的线程数未达到maximumPoolSize,则直接创建新线程执行任务。否则,执行拒绝策略。

# 一、故事背景

上回说到:基于自定义线程池优化了社区电商系统中优惠券服务推送消息的功能,至此,已经解决了社区电商系统优惠券服务推送消息时,出现的内存占用高和CPU占用高的问题。并且小菜也在老王耐心的指导下,彻底将线程池的核心参数研究透彻。这次,他又将目光瞄准了线程池的执行流程,没错,小菜想从源码级别彻底吃透线程池执行任务的流程。说干就干,于是小菜开始不断调试和研究线程池执行任务的源码流程。

# 二、核心流程概述

从线程池的总体设计上来看,ThreadPoolExecutor类是线程池中提供的最核心的类之一,它能够保证线程池按照正常的业务逻辑执行任务,并且能够保证修改线程池每个阶段状态的原子性。

ThreadPoolExecutor类中定义了一个workers工作线程集合,代码如下所示。

private final HashSet<Worker> workers = new HashSet<Worker>();
1

用户可以向线程池中添加需要执行的任务,workers集合中的工作线程可以直接执行任务,或者从任务队列中获取任务后执行。ThreadPoolExecutor类中提供了整个线程池从创建到执行任务,再到消亡的整个流程方法。

在ThreadPoolExecutor类中,线程池的逻辑主要体现在execute(Runnable)方法,addWorker(Runnable, boolean)方法,addWorkerFailed(Worker)方法和拒绝策略上,接下来,就深入分析这几个核心方法。

# 三、execute()方法解析

execute(Runnable)方法的作用是提交Runnable类型的任务到线程池中,源码如下所示。

public void execute(Runnable command) {
	//如果提交的任务为空,则抛出空指针异常
	if (command == null)
		throw new NullPointerException();
	//获取线程池的状态和线程池中线程的数量
	int c = ctl.get();
	//线程池中的线程数量小于corePoolSize的值
	if (workerCountOf(c) < corePoolSize) {
		//重新开启线程执行任务
		if (addWorker(command, true))
			return;
		c = ctl.get();
	}
	//如果线程池处于RUNNING状态,则将任务添加到阻塞队列中
	if (isRunning(c) && workQueue.offer(command)) {
		//再次获取线程池的状态和线程池中线程的数量,用于二次检查
		int recheck = ctl.get();
		//如果线程池没有未处于RUNNING状态,从队列中删除任务
		if (!isRunning(recheck) && remove(command))
			//执行拒绝策略
			reject(command);
		//如果线程池为空,则向线程池中添加一个线程
		else if (workerCountOf(recheck) == 0)
			addWorker(null, false);
	}
	//任务队列已满,则新增worker线程,如果新增线程失败,则执行拒绝策略
	else if (!addWorker(command, false))
		reject(command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

为了能够更好的理解execute(Runnable)方法实现的源码细节。接下来,拆解下execute(Runnable)方法,具体分析execute(Runnable)方法的执行逻辑。

(1)线程池中的线程数是否小于corePoolSize核心线程数,如果小于corePoolSize核心线程数,则向workers工作线程集合中添加一个核心线程执行任务。源码如下所示。

//线程池中的线程数量小于corePoolSize的值
if (workerCountOf(c) < corePoolSize) {
	//重新开启线程执行任务
	if (addWorker(command, true))
		return;
	c = ctl.get();
}
1
2
3
4
5
6
7

(2)如果线程池中的线程数量大于corePoolSize核心线程数,则判断当前线程池是否处于RUNNING状态,如果处于RUNNING状态,则添加任务到待执行的任务队列中。源码如下所示。

if (isRunning(c) && workQueue.offer(command))
1

注意:这里向任务队列添加任务时,需要判断线程池是否处于RUNNING状态,只有线程池处于RUNNING状态时,才能向任务队列添加新任务。否则,会执行拒绝策略。

(3)向任务队列中添加任务成功,由于其他线程可能会修改线程池的状态,所以这里需要对线程池进行二次检查,如果当前线程池的状态不再是RUNNING状态,则需要将添加的任务从任务队列中移除,执行后续的拒绝策略。如果当前线程池仍然处于RUNNING状态,则判断线程池是否为空,如果线程池中不存在任何线程,则新建一个线程添加到线程池中,源码如下所示。

# 查看全文

加入冰河技术 (opens new window)知识星球,解锁完整技术文章与完整代码