拜托,不要再问我线程池啦! (3)

task其实就是通过调用execute方法传递进来的Runnable实例,也就是你的任务。只不过它可能保存在Worker.firstTask中,或者在workQueue中,保存在哪里在前面的任务添加顺序中已经说明。

从workQueue中获取任务

试想一下如果每个任务执行完成,就关闭掉一个线程那有多浪费资源,这样使用线程池也没有多大的意义。所以线程的主要的功能就是线程复用,一旦任务执行完成直接去获取下一个任务,或者挂起线程等待下一个提交的任务,然后等待一段时间后还是没有任务提交,然后才考虑是否关闭部分空闲的线程。

runWorker中会循环的获取任务:

while (task != null || (task = getTask()) != null) { ... task.run(); ... }

上面的代码getTask()就是从workQueue中获取任务:

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { ... int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

获取任务的时候会有两种方式:

超时等待获取任务

一直等待任务,直到有新任务

如果allowCoreThreadTimeOut为true,corePoolSize指定的核心线程数量会被忽略,直接使用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 获取任务,否则的话会根据当前工作线程的数量,如果wc > corePoolSize为false则当前会被认为是核心线程,调用workQueue.take()一直等待任务。

工作线程的关闭

还是在runWorker方法中:

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { task.run(); } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

completedAbruptly变量:标记当前工作线程是正常执行完成,还是异常完成的。completedAbruptly为false可以确定线程池中没有可执行的任务了。

上面代码是简洁后的代码,一个while循环保证不间断的获取任务,没有任务可以执行(task为null)退出循环,最后再才会调用processWorkerExit方法:

private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }

processWorkerExit接收一个Worker实例与completedAbruptly变量。processWorkerExit的大致工作流程:

判断当前工作线程是否异常完成,如果是直接减少工作线程的数量,简单的说就是校正一下工作线程的数量。

增加完成的任务数量,将Worker从workers中移除

tryTerminate() 检查线程池状态,因为线程池可以延迟关闭,如果你调用shutdown方法后不会立即关闭,要等待所有的任务执行完成,所以这里调用tryTerminate()方法,尝试去调用terminated方法。

工作线程完成策略

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyjypy.html