死磕 java线程系列之线程池深入解析——普通任务执行流程 (2)

threadpool_task

任务转的过程我们知道了,但是任务是在哪里执行的呢?继续往下看。

addWorker()方法

这个方法主要用来创建一个工作线程,并启动之,其中会做线程池状态、工作线程数量等各种检测。

private boolean addWorker(Runnable firstTask, boolean core) { // 判断有没有资格创建新的工作线程 // 主要是一些状态/数量的检查等等 // 这段代码比较复杂,可以先跳过 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 线程池状态检查 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 工作线程数量检查 for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 数量加1并跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 如果上面的条件满足,则会把工作线程数量加1,然后执行下面创建线程的动作 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建工作线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 再次检查线程池的状态 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 添加到工作线程队列 workers.add(w); // 还在池子中的线程数量(只能在mainLock中使用) int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 标记线程添加成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 线程添加成功之后启动线程 t.start(); workerStarted = true; } } } finally { // 线程启动失败,执行失败方法(线程数量减1,执行tryTerminate()方法等) if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

这里其实还没到任务执行的地方,上面我们可以看到线程是包含在Worker这个类中的,那么,我们就跟踪到这个类中看看。

Worker内部类

Worker内部类可以看作是对工作线程的包装,一般地,我们说工作线程就是指Worker,但实际上是指其维护的Thread实例。

// Worker继承自AQS,自带锁的属性 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 真正工作的线程 final Thread thread; // 第一个任务,从构造方法传进来 Runnable firstTask; // 完成任务数 volatile long completedTasks; // 构造方法// 【本文由公从号“彤哥读源码”原创】 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 使用线程工厂生成一个线程 // 注意,这里把Worker本身作为Runnable传给线程 this.thread = getThreadFactory().newThread(this); } // 实现Runnable的run()方法 public void run() { // 调用ThreadPoolExecutor的runWorker()方法 runWorker(this); } // 省略锁的部分 }

这里要能够看出来工作线程Thread启动的时候实际是调用的Worker的run()方法,进而调用的是ThreadPoolExecutor的runWorker()方法。

runWorker()方法

runWorker()方法是真正执行任务的地方。

final void runWorker(Worker w) { // 工作线程 Thread wt = Thread.currentThread(); // 任务 Runnable task = w.firstTask; w.firstTask = null; // 强制释放锁(shutdown()里面有加锁) // 这里相当于无视那边的中断标记 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 取任务,如果有第一个任务,这里先执行第一个任务 // 只要能取到任务,这就是个死循环 // 正常来说getTask()返回的任务是不可能为空的,因为前面execute()方法是有空判断的 // 那么,getTask()什么时候才会返回空任务呢? while (task != null || (task = getTask()) != null) { w.lock(); // 检查线程池的状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子方法,方便子类在任务执行前做一些处理 beforeExecute(wt, task); Throwable thrown = null; try { // 真正任务执行的地方 task.run(); // 异常处理 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 钩子方法,方便子类在任务执行后做一些处理 afterExecute(task, thrown); } } finally { // task置为空,重新从队列中取 task = null; // 完成任务数加1 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 到这里肯定是上面的while循环退出了 processWorkerExit(w, completedAbruptly); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpzfgw.html