拜托,不要再问我线程池啦! (4)

如果某个工作线程完成,线程池内部会判断是否需要重新启动一个:

//判断线程池状态 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { //获取最小工作线程数量 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //如果最小工作线程数量为0,但是workQueue中还有任务,那重置最小工作线程数量1 if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果当前工作线程数数量大于或等于最小工作线程数量,则不需要启动新的工作线程 if (workerCountOf(c) >= min) return; // replacement not needed } //启动一个新的工作线程 addWorker(null, false); }

工作线程完成后有两种处理策略:

对于异常完成的工作线程,直接启动一个新的替换

对于正常完成的工作线程,判断当前工作线程是否足够,如果足够则不需要新启动工作线程

注意:这里的完成,表示工作线程的任务执行完成,workQueue中也没有任务可以获取了。

线程池的关闭

关闭线程池有可以通过shutdown方法:

public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }

shutdown方法,第一步就是先改变线程池的状态,调用advanceRunState(SHUTDOWN)方法,将线程池当前状态更改为SHUTDOWN,advanceRunState代码如下:

private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }

然后立即调用interruptIdleWorkers()方法,interruptIdleWorkers()内部会调用它的重载方法interruptIdleWorkers(boolean onlyOne)同时onlyOne参数传递的false来关闭空闲的线程:

private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }

以上代码会遍历workers中的Worker实例,然后调用线程的interrupt()方法。

什么样的线程才是空闲工作线程?

前面提到过在getTask()中,线程从workQueue中获取任务时会阻塞,被阻塞的线程就是空闲的

再次回到getTask()的代码中:

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } ... int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

再次分析getTask()中的代码中有一段捕获InterruptedException的代码块,interruptIdleWorkers方法中断线程后,getTask()会捕获中断异常,因为外面是一个for循环,随后代码走到判断线程池状态的地方:

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }

上面的代码的会判断当前线程池状态,如果状态大于STOP或者状态等于SHUTDOWN并且workQueue为空时则返回null,getTask()返回空那么在runWorker中循环就会退出,当前工作线程的任务就完成了,可以退出了:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { task.run(); } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } shutdownNow

除了shutdown方法能关闭线程池,还有shutdownNow也可以关闭线程池。它两的区别在于:

shutdownNow会清空workQueue中的任务

shutdownNow还会中止当前正在运行的任务

shutdownNow会使线程进入STOP状态,而shutdown()是SHUTDOWN状态

public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }

上面代码基本流程:

advanceRunState(STOP): 使线程池进行STOP状态,与shutdown()中的一致 ,只是使用的状态码是STOP

interruptWorkers(): 与shutdown()中的一致

drainQueue(): 清空队列

任务是中止执行还是继续执行?

调用shutdownNow()后线程池处于STOP状态,紧接着所有的工作线程都会被调用interrupt方法,如果此时runWorker还在运行会发生什么?

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyjypy.html