深入浅出Java线程池:源码篇 (5)

可以看到这个方法的整体框架还是比较简单的,核心就在于 while (task != null || (task = getTask()) != null) 这个循环中,如果 getTask() 返回null,则表示线程该结束了,这和Handler机制也是一样的。

上面的源码省略了具体执行任务的逻辑,他的逻辑也是很简单:判断状态+运行任务。我们来看一下:

final void runWorker(Worker w) { ...; try { while (task != null || (task = getTask()) != null) { w.lock(); // 如果线程池已经设置为stop状态,那么保证线程是interrupted标志 // 如果线程池没有在stop状态,那么保证线程不是interrupted标志 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 回调方法,这个方法是一个空实现 beforeExecute(wt, task); try { // 运行任务 task.run(); // 回调方法,也是一个空实现 afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } ... } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

在获取到一个任务后,就会去执行该任务的run方法,然后再回去继续获取新的任务。

我们会发现其中有很多的空实现方法,他是给子类去实现的,有点类似于Activity的生命周期,子类需要重写这些方法,在具体的情况做一些工作。当然,一般的使用是不需要去重写这些方法。接下来需要来看看 getTask() 是如何获取任务的。

获取任务:getTask()

这个方法的内容可以分为两个部分:判断当前线程池的状态+阻塞地从队列中获取一个任务。

第一部分是判断当前线程池的状况,如果处于关闭状态那么直接返回null来让worker结束,否则需要判断当前线程是否超时或者超出最大限制的线程数:

private Runnable getTask() { boolean timedOut = false; // 内部使用了CAS,这里需要有一个循环来不断尝试 for (;;) { int c = ctl.get(); // 如果处于shutdown状态而且队列为空,或者处于stop状态,返回null // 这和前面我们讨论到不同的线程池的状态的不同行为一致 if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) { // 这里表示让线程总数-1,记住他,后面会继续聊到 decrementWorkerCount(); return null; } // 获取目前的线程总数 int wc = workerCountOf(c); // 判断该线程在空闲情况是否可以被销毁:允许核心线程为null或者当前线程超出核心线程数 // 可以看到这里并没有去区分具体的线程是核心还是非核心,只有线程数量处于核心范围还是非核心范围 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 超出最大线程数或者已经超时; // 这里可能是用户通过 setMaximumPoolSize 改动了数据才会导致这里超出最大线程数 // 同时还必须保证当前线程数量大于1或者队列已经没有任务了 // 这样就确保了当有任务存在时,一定至少有一个线程在执行任务 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 使用CAS尝试让当前线程总数-1,失败则从来一次上面的逻辑 if (compareAndDecrementWorkerCount(c)) return null; continue; } // 获取任务逻辑 ... } }

第二部分是获取一个任务并执行。获取任务使用的是阻塞队列的方法,如果队列中没有任务,则会被阻塞:

private Runnable getTask() { boolean timedOut = false; // 内部使用了CAS,这里需要有一个循环来不断尝试 for (;;) { // 判断线程池状态逻辑 ... try { // 获取一个任务 // poll方法等待具体时间之后如果没有获取到对象,会返回null // take方法会一直等到获取新对象,除非被interrupt Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // r==null,说明超时了,重新循环 timedOut = true; } catch (InterruptedException retry) { // 被interrupt,说明可能线程池被关闭了,重新判断情况 timedOut = false; } } }

这里需要重点关注的是阻塞队列的 poll() 和 take() 方法,他们都会去队列中获取一个任务;但是,poll() 方法会阻塞指定时间后返回,而 take() 则是无限期阻塞。这里对应的就是有存活时间的线程和不会被销毁的核心线程。

同时注意 timedOut = true 是在这一部分被赋值的,当赋值为true之后需要再执行一次循环,在上面的判断中就会被拦截下来并返回false,这在第一部分逻辑介绍了。而如果线程在等待的时候被 interrupt 了,说明线程池被关闭了,此时也会重走一次上面判断状态的逻辑。

到这里关于执行的逻辑就讲得差不多了,下面聊一聊线程池关闭以及worker结束的相关逻辑。

worker退出工作:processWorkerExit

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpyzjf.html