Java并发之线程池ThreadPoolExecutor源码分析学习 (6)

在上面的runWorker方法当中我们可以看出,当firstTask为空的时候,会通过该方法来接着获取任务去执行,那我们就看看获取任务这个方法到底是怎么样的?

private Runnable getTask() { //标志是否获取任务超时 boolean timedOut = false; // Did the last poll() time out? ​ //死循环 for (;;) { //获取线程池的控制状态 int c = ctl.get(); //获取线程池的runState int rs = runStateOf(c); ​ // Check if queue empty only if necessary. /* *判断线程池的状态,出现以下两种情况 *1、runState大于等于SHUTDOWN状态 *2、runState大于等于STOP或者阻塞队列为空 *将会通过CAS操作,进行workerCount-1并返回null */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } ​ //获取线程池的workerCount int wc = workerCountOf(c); ​ // Are workers subject to culling? /* *allowCoreThreadTimeOut:是否允许core Thread超时,默认false *workerCount是否大于核心核心线程池 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ​ /* *1、wc大于maximumPoolSize或者已超时 *2、队列不为空时保证至少有一个任务 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { /* *通过CAS操作,workerCount-1 *能进行-1操作,证明wc大于maximumPoolSize或者已经超时 */ if (compareAndDecrementWorkerCount(c)) //-1操作成功,返回null return null; //-1操作失败,继续循环 continue; } ​ try { /* *wc大于核心线程池 *执行poll方法 *小于核心线程池 *执行take方法 */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); //判断任务不为空返回任务 if (r != null) return r; //获取一段时间没有获取到,获取超时 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

还是文字解说一下上面的代码逻辑和流程:

获取线程池控制状态和runState,判断线程池是否已经关闭或者正在关闭,是的话则workerCount-1操作返回null

获取workerCount判断是否大于核心线程池

判断workerCount是否大于最大线程池数目或者已经超时,是的话workerCount-1,-1成功则返回null,不成功则回到步骤1重新继续

判断workerCount是否大于核心线程池,大于则用poll方法从队列获取任务,否则用take方法从队列获取任务

判断任务是否为空,不为空则返回获取的任务,否则回到步骤1重新继续

接下来依然有一副流程图:

7.processWorkerExit

明显的,在执行任务当中,会去获取任务进行执行,那既然是执行任务,肯定就会有执行完或者出现异常中断执行的时候,那这时候肯定也会有相对应的操作,至于具体操作是怎么样的,我们还是直接去看源码最实际。

private void processWorkerExit(Worker w, boolean completedAbruptly) { /* *completedAbruptly:在runWorker出现,代表是否突然完成的意思 *也就是在执行任务过程当中出现异常,就会突然完成,传true * *如果是突然完成,需要通过CAS操作,workerCount-1 *不是突然完成,则不需要-1,因为getTask方法当中已经-1 * *下面的代码注释貌似与代码意思相反了 */ if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); ​ //生成重入锁 final ReentrantLock mainLock = this.mainLock; //获取锁 mainLock.lock(); try { //线程池统计的完成任务数completedTaskCount加上worker当中完成的任务数 completedTaskCount += w.completedTasks; //从HashSet<Worker>中移除 workers.remove(w); } finally { //释放锁 mainLock.unlock(); } ​ //因为上述操作是释放任务或线程,所以会判断线程池状态,尝试终止线程池 tryTerminate(); ​ //获取线程池的控制状态 int c = ctl.get(); //判断runState是否小鱼STOP,即是RUNNING或者SHUTDOWN //如果是RUNNING或者SHUTDOWN,代表没有成功终止线程池 if (runStateLessThan(c, STOP)) { /* *是否突然完成 *如若不是,代表已经没有任务可获取完成,因为getTask当中是while循环 */ if (!completedAbruptly) { /* *allowCoreThreadTimeOut:是否允许core thread超时,默认false *min-默认是corePoolSize */ int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //允许core thread超时并且队列不为空 //min为0,即允许core thread超时,这样就不需要维护核心核心线程池了 //如果workQueue不为空,则至少保持一个线程存活 if (min == 0 && ! workQueue.isEmpty()) min = 1; //如果workerCount大于min,则表示满足所需,可以直接返回 if (workerCountOf(c) >= min) return; // replacement not needed } //如果是突然完成,添加一个空任务的worker线程--这里我也不太理解 addWorker(null, false); } }

首先判断线程是否突然终止,如果是突然终止,通过CAS,workerCount-1

统计线程池完成任务数,并将worker从workers当中移除

判断线程池状态,尝试终止线程池

线程池没有成功终止

判断是否突然完成任务,不是则进行下一步,是则进行第三步

如允许核心线程超时,队列不为空,则至少保证一个线程存活

添加一个空任务的worker线程

Worker内部类

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxgfx.html