五一假期大雄看了一本《java并发编程艺术》,了解了线程池的基本工作流程,竟然发现线程池工作原理和互联网公司运作模式十分相似。
线程池处理流程 原理解析 互联网公司与线程池的关系这里用一个比喻来描述一下线程池,中间有一些名词你可能不是太清楚,后边源码解析的部分会讲到。
你可以把线程池看作是一个研发部门,研发部门有很多程序员(Worker), 他们在一个大办公室里(HashSet workers)。程序员干不完的需求(Runnable/Callable)放在需求池(workQueue)里排队。每个研发部都配置有骨干程序员数量(corePoolSize)和最大能容纳的程序员数量(maximumPoolSize)。具体要做的任务就是产品的需求。
new 一个线程池相当于创建了一个研发部,创建研发部时需要指定骨干程序员数量,最大能容纳的程序员数量,需求池用哪种(BlockingQueue),如果忙不过来的需求怎么给产品回复(拒绝策略)等等内容。刚开始这个研发部一个程序员也没有。
当产品给这个研发部提一个需求时(当然肯定不会只提一个,他们会不断的提需求。这里以提一个需求为例)
首先会看骨干程序员招聘满了没。
如果没满,会招聘一个骨干程序员,招聘进来就让他不停的工作(很残酷啊),干完刚派过来的任务他会主动在需求池找下一个需求来做(好员工),如果需求池没有需求了,他就停止工作了,然后研发部会把他裁掉,如果裁掉后发现骨干程序员数量不够了,就会再招聘一个程序员。裁掉后,要是骨干程序员数量还够就不招聘了。
如果骨干程序员数量满了,就看需求池满没满,如果需求池没满,就把需求扔进需求池里;如果需求池满了,就看程序员数量有没有达到上限,如果达到了,就对产品说,这个需求我们做不了,没资源;如果没达到,就招聘一个程序员,招聘进来就让他不停的工作,干完刚派过来的需求他会主动到需求池找下一个任务来做,如果需求池没有任务了,他就停止工作了,然后研发部会把他裁掉,如果裁掉后发现骨干程序员数量不够了,就会再招聘一个程序员。裁掉后,要是骨干程序员数量还够就不招聘了。
源码解析首先是worker(程序员)
Worker被装在一个HashSet(workers)里边, 他是用来执行任务的,他们的职责就是不断的从workQueue里边取任务,然后执行。当workQueue(需求池)里边拿不到任务,或者线程池达到特定状态,worker就会从workers里边移走(被裁)。
下边是Worker源码,移除了非关键的东西
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 标识这个任务是在哪个线程运行 final Thread thread; Runnable firstTask; // 完成了几个任务 volatile long completedTasks; Worker(Runnable firstTask) { // 阻止中断,知道runWorker执行 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 直接用你提供的线程工厂搞个线程出来 this.thread = getThreadFactory().newThread(this); } // 调用ThreadPoolExecutor里边的runWorker方法 public void run() { runWorker(this); } // 以下这些是AQS相关的东西 // 0代表没有加锁 // 1代表加锁了 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }Worker实现了Runnable接口,所以他是个任务,有run方法;同时有继承了AQS,所以他也是一把锁。
下边是提交任务的过程
提交任务有submit和execute, submit就是首先将Callable或者Runnable包装成FutureTask,然后调用execute, 所以核心是分析execute
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 这个c里边有两个信息,一个是现在有多少worker, 另一个是现在线程池的状态是啥 // workerCountOf方法就是从里边提取 worker的数量的 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 当前worker的数量比需要的核心线程数少 // 加worker去执行,加成功就完事了,也就是说只要worker比核心线程数少,就会创建worker // 不管现在核心线程是否在工作,也不管workQueue是不是满的 // addWorker的第二个参数表示是不是要加核心线程(或者叫核心worker) if (addWorker(command, true)) return; c = ctl.get(); } // 当前worker达到或超过了核心线程数,或者加worker失败了,才会走下边的流程 // worker已经比核心线程数多了 // 如果 线程池没有shutdown的话 // 就尝试将任务加到workQueue里边,工作队列入队成功的话再往里边走 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 再次检查状态如果线程池要停了,那么就拒绝任务,并且把worker从工作队列扔掉 reject(command); else if (workerCountOf(recheck) == 0) // 如果没有worker的话(说明没加进去,这种场景我没想到是什么情况),加一个worker addWorker(null, false); // 其他情况,丢到工作队列就不用管了,等着worker去处理 } // 如果队列满了加失败了,或者线程池状态不满足了,就尝试加普通worker(非核心线程) else if (!addWorker(command, false)) // 加失败了就拒绝任务 // 失败一方面可能是worker数量已经达到你的给的maximumPoolSize // 另一方面,可能是检查到线程池的状态不对了 reject(command); }