可以发现execute方法就是完成了上边说的“线程池处理流程”这个图里描述的过程。 大雄看到这里还有几个疑问,一个是Woker是如何创建并加入workers的,一个是worker是如何启动的,再就是worker是如何运行的
生活还要继续
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 做一些校验,线程池的状态要满足一定条件 // 而且得提交任务过来,再就是workQueue不能是空的 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 看你是要创建核心worker还是普通worker // 核心看超没超过corePoolSize, 普通看超没超过maximumPoolSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // 增加worker数量失败就在来 break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 中途线程池状态发生变化了 continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // worker就是这么创建的 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 加worker是要加全局锁的 mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // worker是在这里启动的 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }这段代码解决了 Woker是如何创建并加入workers的以及worker是如何启动的的问题。
addWorker做的核心工作就是,创建worker, 启动worker, 在创建之前还会做一些校验。调用了worker里边线程的start后就要等待cpu调度执行worker的run方法了。
public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // task是创建worker带进去的任务,会先执行他,然后从workQueue里边取 // 如果没有的话跳出去 while (task != null || (task = getTask()) != null) { w.lock(); // 首先加锁,如果不加锁,可能几个线程提交的任务同时进来了,会导致一些共享状态出问题 // 做一些状态的校验 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 执行任务前调用一下beforeExecute, 默认是空的 beforeExecute(wt, task); Throwable thrown = null; try { // 这个跟我们平时理解的Runnable还不一样,可以体会下,他这个run就是一个普通的方法 // 他直接调run是要执行任务,线程的start只是把worker里边的那个run跑起来了 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行完了调一下,里边可以拿到异常 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 从while跳出来表明没有任务可以执行了 processWorkerExit(w, completedAbruptly); } }