接下来看看流程图来理解一下上面代码的一个执行流程
3.addWorkerFailed(Worker w)
addWorker方法添加worker失败,并且没有成功启动任务的时候,就会调用此方法,将任务从workers中移除,并且workerCount做-1操作。
private void addWorkerFailed(Worker w) { //重入锁 final ReentrantLock mainLock = this.mainLock; //获取锁 mainLock.lock(); try { //如果worker不为null if (w != null) //workers移除worker workers.remove(w); //通过CAS操作,workerCount-1 decrementWorkerCount(); tryTerminate(); } finally { //释放锁 mainLock.unlock(); } }4.tryTerminate()
当对线程池执行了非正常成功逻辑的操作时,都会需要执行tryTerminate尝试终止线程池
final void tryTerminate() { //死循环 for (;;) { //获取线程池控制状态 int c = ctl.get(); /* *线程池处于RUNNING状态 *线程池状态最小大于TIDYING *线程池==SHUTDOWN并且workQUeue不为空 *直接return,不能终止 */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //如果workerCount不为0 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } //获取线程池的锁 final ReentrantLock mainLock = this.mainLock; //获取锁 mainLock.lock(); try { //通过CAS操作,设置线程池状态为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { //设置线程池的状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //发送释放信号给在termination条件上等待的线程 termination.signalAll(); } return; } } finally { //释放锁 mainLock.unlock(); } // else retry on failed CAS } }5.runWorker(Worker w)
该方法的作用就是去执行任务
final void runWorker(Worker w) { //获取当前线程 Thread wt = Thread.currentThread(); //获取worker里的任务 Runnable task = w.firstTask; //将worker实例的任务赋值为null w.firstTask = null; /* *unlock方法会调用AQS的release方法 *release方法会调用具体实现类也就是Worker的tryRelease方法 *也就是将AQS状态置为0,允许中断 */ w.unlock(); // allow interrupts //是否突然完成 boolean completedAbruptly = true; try { //worker实例的task不为空,或者通过getTask获取的不为空 while (task != null || (task = getTask()) != null) { //获取锁 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /* *获取线程池的控制状态,至少要大于STOP状态 *如果状态不对,检查当前线程是否中断并清除中断状态,并且再次检查线程池状态是否大于STOP *如果上述满足,检查该对象是否处于中断状态,不清除中断标记 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //中断改对象 wt.interrupt(); try { //执行前的方法,由子类具体实现 beforeExecute(wt, task); Throwable thrown = null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行完后调用的方法,也是由子类具体实现 afterExecute(task, thrown); } } finally {//执行完后 //task设置为null task = null; //已完成任务数+1 w.completedTasks++; //释放锁 w.unlock(); } } completedAbruptly = false; } finally { //处理并退出当前worker processWorkerExit(w, completedAbruptly); } }接下来我们用文字来说明一下执行任务这个方法的具体逻辑和流程。
首先在方法一进来,就执行了w.unlock(),这是为了将AQS的状态改为0,因为只有getState() >= 0的时候,线程才可以被中断;
判断firstTask是否为空,为空则通过getTask()获取任务,不为空接着往下执行
判断是否符合中断状态,符合的话设置中断标记
执行beforeExecute(),task.run(),afterExecute()方法
任何一个出异常都会导致任务执行的终止;进入processWorkerExit来退出任务
正常执行的话会接着回到步骤2
附上一副简单的流程图:
6.getTask()