这样做的原因可以保证Core Worker中的任务执行完成后,能立即从workQueue获取下一个任务,而不需要启动别的工作线程,用最少的工作线程办更多的事。
创建工作线程在execute方法中,有三个地方调用了addWorker。addWorker方法可以分为二部分:
增加工作线程数量
启动工作线程
addWorker的方法签名如下:
private boolean addWorker(Runnable firstTask, boolean core)firstTask:第一个运行的任务,可以为空。如果为空任务会从workQueue中获取。
core: 是否是核心工作线程
增加工作线程数量 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); .... for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }上面代码省略了一部分代码,主要代码都在for循环中,利用CAS锁,安全的完成线程池状态的检查与增加工作线程的数量。其中的compareAndIncrementWorkerCount(c)调用就是将工作线程数量+1。
启动工作线程增加工作线程的数量后,紧接着就会启动工作线程:
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); }启动工作线程的流程:
创建一个Worker实例, Worker构造方法会使用ThreadFactory创建一个线程
w = new Worker(firstTask); final Thread t = w.thread;就不说Worker类的实现了,直接给出构造方法来细品:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }如果线程池状态是在运行中,或者已经关闭,但工作线程要从workQueue中获取任务,才能添加工作线程
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; }注意::当线程池处于SHUTDOWN状态时,它不能接收新的任务,但是可以继续执行未完成的任务。任务是否从workQueue中获取,是根据firstTask判断,每个Worker实例都有一个firstTask 属性,如果这个值为null,工作线程启动的时候就会从workQueue中获取任务,否则会执行firstTask 。
启动线程
调用线程的start方法,启动线程。
if (workerAdded) { t.start(); workerStarted = true; } 执行任务回过头来看一个Worker类的定义:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } ... }Worker类实现了Runnable接口,同时在构造方法中会将this传递给线程,到这里你就知道了Worker实例中有run方法,它会在线程启动后执行:
public void run() { runWorker(this); }run方法内部接着调用runWorker方法运行任务,在这里才是真正的开始运行任务了:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }获取任务
首先将firstTask传递给task临时变量:
Runnable task = w.firstTask;然后循环检查task或者从workQueue中获取任务:
while (task != null || (task = getTask()) != null) { ... }getTask()稍后再做分析。
运行任务
去掉一些状态检查、异常捕获、和勾子方法调用后,保留最重要的调用task.run():
while (task != null || (task = getTask()) != null) { ... task.run(); ... }