深入浅出Java线程池:源码篇 (6)

前面已经介绍 runWorker() 了方法,这个方法的主要任务就是让worker动起来,不断去队列获取任务。而当获取任务的时候返回了null,则表示该worker可以结束了,最后会调用 processWorkerExit() 方法,如下:

final void runWorker(Worker w) { ... try { ... } finally { // 如果worker退出,那么需要执行后续的善后工作 processWorkerExit(w, completedAbruptly); } }

processWorkerExit() 会完成worker退出的善后工作。具体的内容是:

把完成的任务数合并到总的任务数,移除worker,尝试设置线程池的状态为terminated:

private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果不是经过getTask方法返回null正常退出的,那么需要让线程总数-1 // 这个参数前面一直让你们注意一下不知道你们还记不记得 // 如果是在正常情况下退出,那么在getTask() 方法中就会执行decrementWorkerCount()了 // 而如果出现一些特殊的情况突然结束了,并不是通过在getTask返回null结束 // Abruptly就是突然的意思,那么completedAbruptly就为true,正常情况下在runWorker方法中会被设置为false // 那什么叫突然结束?用户的任务抛出了异常,这个时候线程就突然结束了,没有经过getTask方法 // 这里就需要让线程总数-1 if (completedAbruptly) decrementWorkerCount(); // 获取锁,并累加完成的任务总数,从set中移除worker final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } // 尝试设置线程池的状态为terminated // 这个方法前面我们addWorker失败的时候提到过,后面再展开 tryTerminate(); ... }

移除worker之后,如果线程池还没有被stop,那么最后必须保证队列任务至少有一个线程在执行队列中的任务:

private void processWorkerExit(Worker w, boolean completedAbruptly) { ... int c = ctl.get(); // stop及以上的状态不需要执行剩下的任务 if (runStateLessThan(c, STOP)) { // 如果线程是突然终止的,那肯定需要重新创建一个 // 否则进行判断是否要保留一个线程 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; } // 如果此时线程数<=核心线程数,或者当核心线程可被销毁时,线程数==0且队列不为空 // 那么需要创建一个线程来执行任务 addWorker(null, false); } }

代码虽然看起来很多,但是具体的逻辑内容还是比较简单的。前面一直提到一个方法 tryTerminate() 但一直没有展开解释,下面来介绍一下。

尝试终止线程池:tryTerminate()

这个方法出现在任何可能让线程池进入终止状态的地方。如添加worker失败时,那么这个时候可能线程池已经处于stop状态,且已经没有任何正在执行的worker了,那么此时可以进入terminated状态;再如worker被销毁的时候,可能这是最后一个被销毁的worker,那么此时线程池需要进入terminated状态。

根据这个方法的使用情况其实就已经差不多可以推断出这个方法的内容:判断当前线程池的状态,如果符合条件则设置线程池的状态为terminated 。如果此时不能转换为terminated状态,则什么也不做,直接返回。

首先判断当前线程池状态是否符合转化为terminated。如果处于运行状态或者tidying以上状态,则肯定不需要进行状态转换。因为running需要先进入stop状态,而tidying其实已经是准备进入terminated状态了。如果处于shutdown状态且队列不为空,那么需要执行完队列中的任务,所以也不适合状态转换:

final void tryTerminate() { for (;;) { int c = ctl.get(); // 如果处于运行状态或者tidying以上状态时,直接返回,不需要修改状态 // 如果处于stop以下状态且队列不为空,那么需要等队列中的任务执行完成,直接返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateLessThan(c, STOP) && ! workQueue.isEmpty())) return; // 到这里说明线程池肯定处于stop状态 // 线程的数量不等于0,尝试中断一个空闲的worker线程 // 这里他只中断workerSet中的其中一个,当其中的一个线程停止时,会再次调用tryTerminate // 然后又会再去中断workerSet中的一个worker,不断循环下去直到剩下最后一个,workercount==0 // 这就是 链式反应 。 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; } // 设置状态为terminated逻辑 ... } }

经过上面的判断,能到第二部分逻辑,线程池肯定是具备进入terminated状态的条件了。剩下的代码就是把线程池的状态设置为terminated:

final void tryTerminate() { for (;;) { // 上一部分逻辑 ... // 首先获取全局锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 尝试把线程池的状态从stop修改为tidying // 如果修改失败,说明状态已经被修改了,那么外层循环再跑一个 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 这个方法是一个空实现,需要子类继承重写 terminated(); } finally { // 最后再设置状态为terminated ctl.set(ctlOf(TERMINATED, 0)); // 唤醒所有等待终止锁的线程 termination.signalAll(); } return; } } finally { // 释放锁 mainLock.unlock(); } // CAS修改线程池的状态失败,重新进行判断 } }

当线程池被标记为terminated状态时,那么这个线程池就彻底地终止了。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpyzjf.html