不知道读者对于源码中的retry: 有没有疑惑,毕竟平时很少用到。他的作用是标记一个循环,这样我们在内层的循环就可以跳转到任意一个外层的循环。这里的retry只是一个名字,改成 repeat: 甚至 a: 都是可以的。他的本质就是:一个循环的标记 。
创建worker对象,并调用其内部线程的start()方法来启动线程:
private boolean addWorker(Runnable firstTask, boolean core) { ... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建一个新的worker // 创建的过程中内部会创建一个线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 获得全局锁并加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 获取锁之后,需要再次检查状态 int c = ctl.get(); // 只有运行状态或者shutDown&&task==null才会被执行 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { // 如果这个线程不是刚创建的,则抛出异常 if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); // 添加到workerSet中 workers.add(w); workerAdded = true; int s = workers.size(); // 跟踪线程池到达的最多线程数量 if (s > largestPoolSize) largestPoolSize = s; } } finally { // 释放锁 mainLock.unlock(); } // 如果添加成功,启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果线程没有启动,表示添加worker失败,可能在添加的过程中线程池被关闭了 if (! workerStarted) // 把worker从workerSet中移除 addWorkerFailed(w); } return workerStarted; }经过前面两步,如果没有出现异常,则创建worker成功。最后还涉及到一个方法: addWorkerFailed(w) ,他的内容比较简答,顺便提一下吧:
// 添加worker失败 private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { if (w != null) workers.remove(w); // 这里会让线程总数-1 decrementWorkerCount(); // 尝试设置线程池的状态为terminad // 因为添加失败有可能是线程池在添加worker的过程中被shutdown // 那么这个时候如果没有任务正在执行就需要设置状态为terminad // 这个方法后面会详细讲 tryTerminate(); } finally { mainLock.unlock(); } }那么到这里,execute()方法中的一些调用方法就分析完了。阻塞队列相关的方法不属于本文的范畴,就不展开了。那么还有一个问题:worker是如何工作的呢?worker内部有一个线程,当线程启动时,初始化线程的runnable对象的run方法会被调用,那么这个runnable对象是什么?我直接来看worker。
打工人:Worker首先我们看到他的构造方法:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }源码很简单,把传进来的任务设置给内部变量firstTask,然后把自己传给线程工厂去创建一个线程。所以线程启动时,Worker本身的run方法会被调用,那么我们看到Worker的 run()方法。
public void run() { runWorker(this); }Worker是ThreadPoolExecutor的内部类,这里直接调用到了ThreadPoolExecutor的方法: runWorker()来开始执行。那么接下来,我们就看到这个方法。
启动worker:runWorker()这个方法是worker执行的方法,在线程被销毁前他会一直执行,类似于Handler的looper,不断去队列获取消息来执行:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 获取worker初始化时设置的任务,可以为null。如果为null则表示仅仅创建线程 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 这个参数的作用后面解释,需要结合其他的源码 boolean completedAbruptly = true; try { // 如果自身的task不为null,那么会执行自身的task // 否则调用getTask去队列获取一个task来执行 // 这个getTask最终会去调用队列的方法来获取任务 // 而队列如果为空他的获取方法会进行阻塞,这里也就阻塞了,后面深入讲 while (task != null || (task = getTask()) != null) { try{ // 执行任务 ... } finally { // 任务执行完成,把task设置为null task = null; // 任务总数+1 w.completedTasks++; // 释放锁 w.unlock(); } } // 这里设置为false,先记住他 completedAbruptly = false; } finally { // 如果worker退出,那么需要执行后续的善后工作 processWorkerExit(w, completedAbruptly); } }