原以为线程池还挺简单的(平时常用,也分析过原理),这次是想自己动手写一个线程池来更加深入的了解它;但在动手写的过程中落地到细节时发现并没想的那么容易。结合源码对比后确实不得不佩服 Doug Lea 。
我觉得大部分人直接去看 java.util.concurrent.ThreadPoolExecutor 的源码时都是看一个大概,因为其中涉及到了许多细节处理,还有部分 AQS 的内容,所以想要理清楚具体细节并不是那么容易。
与其挨个分析源码不如自己实现一个简版,当然简版并不意味着功能缺失,需要保证核心逻辑一致。
所以也是本篇文章的目的:
自己动手写一个五脏俱全的线程池,同时会了解到线程池的工作原理,以及如何在工作中合理的利用线程池。
再开始之前建议对线程池不是很熟悉的朋友看看这几篇:
这里我截取了部分内容,也许可以埋个伏笔(坑)。
具体请看这两个链接。
如何优雅的使用和理解线程池
线程池中你不容错过的一些细节
由于篇幅限制,本次可能会分为上下两篇。
创建线程池现在进入正题,新建了一个 CustomThreadPool 类,它的工作原理如下:
简单来说就是往线程池里边丢任务,丢的任务会缓冲到队列里;线程池里存储的其实就是一个个的 Thread ,他们会一直不停的从刚才缓冲的队列里获取任务执行。
流程还是挺简单。
先来看看我们这个自创的线程池的效果如何吧:
初始化了一个核心为3、最大线程数为5、队列大小为 4 的线程池。
先往其中丢了 10 个任务,由于阻塞队列的大小为 4 ,最大线程数为 5 ,所以由于队列里缓冲不了最终会创建 5 个线程(上限)。
过段时间没有任务提交后(sleep)则会自动缩容到三个线程(保证不会小于核心线程数)。
构造函数来看看具体是如何实现的。
下面则是这个线程池的构造函数:
会有以下几个核心参数:
miniSize 最小线程数,等效于 ThreadPool 中的核心线程数。
maxSize 最大线程数。
keepAliveTime 线程保活时间。
workQueue 阻塞队列。
notify 通知接口。
大致上都和 ThreadPool 中的参数相同,并且作用也是类似的。
需要注意的是其中初始化了一个 workers 成员变量:
/** * 存放线程池 */ private volatile Set<Worker> workers; public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) { workers = new ConcurrentHashSet<>(); }workers 是最终存放线程池中运行的线程,在 j.u.c 源码中是一个 HashSet 所以对他所有的操作都是需要加锁。
我这里为了简便起见就自己定义了一个线程安全的 Set 称为 ConcurrentHashSet。
其实原理也非常简单,和 HashSet 类似也是借助于 HashMap 来存放数据,利用其 key 不可重复的特性来实现 set ,只是这里的 HashMap 是用并发安全的 ConcurrentHashMap 来实现的。
这样就能保证对它的写入、删除都是线程安全的。
不过由于 ConcurrentHashMap 的 size() 函数并不准确,所以我这里单独利用了一个 AtomicInteger 来统计容器大小。
创建核心线程往线程池中丢一个任务的时候其实要做的事情还蛮多的,最重要的事情莫过于创建线程存放到线程池中了。
当然我们不能无限制的创建线程,不然拿线程池来就没任何意义了。于是 miniSize maxSize 这两个参数就有了它的意义。
但这两个参数再哪一步的时候才起到作用呢?这就是首先需要明确的。
从这个流程图可以看出第一步是需要判断是否大于核心线程数,如果没有则创建。
结合代码可以发现在执行任务的时候会判断是否大于核心线程数,从而创建线程。
worker.startTask() 执行任务部分放到后面分析。
这里的 miniSize 由于会在多线程场景下使用,所以也用 volatile 关键字来保证可见性。
队列缓冲结合上面的流程图,第二步自然是要判断队列是否可以存放任务(是否已满)。
优先会往队列里存放。
上至封顶一旦写入失败则会判断当前线程池的大小是否大于最大线程数,如果没有则继续创建线程执行。
不然则执行会尝试阻塞写入队列(j.u.c 会在这里执行拒绝策略)
以上的步骤和刚才那张流程图是一样的,这样大家是否有看出什么坑嘛?
时刻小心从上面流程图的这两步可以看出会直接创建新的线程。
这个过程相对于中间直接写入阻塞队列的开销是非常大的,主要有以下两个原因:
创建线程会加锁,虽说最终用的是 ConcurrentHashMap 的写入函数,但依然存在加锁的可能。
会创建新的线程,创建线程还需要调用操作系统的 API 开销较大。
所以理想情况下我们应该避免这两步,尽量让丢入线程池中的任务进入阻塞队列中。
执行任务任务是添加进来了,那是如何执行的?