线程池源码分析 (3)

根据上文,不难发现,在线程池中线程往往以 Worker 对象的方式存在,那么这个 Worker 又是何方神圣?

private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 工作线程 final Thread thread; // 要执行的任务 Runnable firstTask; // 线程执行过的任务数 volatile long completedTasks; // 通过线程工厂创建工作线程 Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // 执行任务 public void run() { runWorker(this); } ... ... }

这个 Worker 类继承了 AQS,也就是说,他本身就相当于一个同步队列,结合他的成员变量 thread 和 firstTask,可以知道他实际上就是我们线程池中所说的“线程”。除了父类 AQS 本身提供的独占锁以外,Worker 还提供了一些检查任务线程运行状态以及中断线程相关的方法。

此外,线程池中还有一个工作队列 workers,用于保存当前全部的 Worker

private final HashSet<Worker> workers = new HashSet<Worker>(); 3.任务的启动

当调用 Worker.run()的时候,其实调用的是 runWorker()方法。

runWorker()方法实际上就是调用线程执行任务的方法,他的逻辑大题是这样的:

拿到入参的新 Worker,一直循环获取 Worker 里的任务;

加锁然后执行任务;

如果执行完任务流程,并且没有发生异常导致 Worker 挂掉,就直接复用 Worker(在获取任务的方法 getTask()中循环等待任务);

如果执行完任务流程后发现发生异常导致 Worker 挂掉,就从工作队列中移除当前 Worker,并且补充一个新的;

如果整个流程执行完毕,就删除当前的 Worker。

final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 新创建的Worker默认state为-1,AQS的unlock方法会将其改为0,此后允许使用interruptIfStarted()方法进行中断 // 完成任务以后是否需要移除当前Worker,即当前任务是否意外退出 boolean completedAbruptly = true; try { // 循环获取任务 while (task != null || (task = getTask()) != null) { // 加锁,防止 shundown 时中断正在运行的任务 w.lock(); // 如果线程池状态为 STOP 或更后面的状态,中断线程任务 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子方法,默认空实现,需要自己提供 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 钩子方法 afterExecute(task, thrown); } } finally { task = null; // 任务执行完毕 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 根据completedAbruptly决定是否要移除意外退出的Worker,并补充新的Worker // 也就是说,如果上述过程顺利完成,工作线程没有挂掉,就不删除,下次继续用,否则就干掉它再补充一个。 processWorkerExit(w, completedAbruptly); } } 4.任务的获取与超时处理

在 runWorker()方法中,通过 getTask()方法去获取任务。值得注意的是,超时处理也在此处,简单的来说,整套流程是这样的:

判断线程池是否关闭,工作队列是否为空,如果是说明没任务了,直接返回null,否则接着往下判断;

判断当前是否存在非核心线程,如果是说明需要进行超时处理;

获取任务,如果不需要超时处理,则直接从任务队列获取任务,否则根据 keepaliveTime 阻塞一段时间后获取任务,如果获取不到,说明非核心线程超时,返回 null 交给 runWorker()中的processWorkerExit()方法去删除;

换句话说,runWorker()方法一旦执行完毕,必然会删除当前的 Worker,而通过 getTask()拿任务的 Worker,在线程池正常运行的状态下,核心线程只会一直在 for 循环中等待直到拿到任务,而非核心线程超时以后拿不到任务就会返回一个 null,然后回到 runWorker()中走完processWorkerExit()方法被删除。

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 如果线程池关闭了,并且工作队列里的任务都完成了,或者线程池直接进入了 STOP 或更进一步的状态,就不返回新任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 获取当前工作线程 int wc = workerCountOf(c); // 核心线程是否超时(默认false)或当前是否存在非核心线程,即判断当前当前是否需要进行超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 判断线程是否超过最大线程数或存在非核心线程 if ((wc > maximumPoolSize || (timed && timedOut)) // 并且除非任务队列为空,否则池中最少有一个线程 && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 获取任务 Runnable r = timed ? // 阻塞 keepaliveTime 以获取任务,如果在 keepaliveTime 时间内没有获取到任务,则返回 null. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 如果获取不到任务,说明非核心线程超时了,下一轮判断确认是否退出循环。 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } 四、线程池的中断

image-20210211171605477

线程池的中断方法分为三种:

shutdown():中断线程池,不再添加新任务,同时等待当前进行和队列中的任务完成

shutdownNow():立即中断线程池,不再添加新任务,同时中断所有工作中的任务,不再处理任务队列中任务

1.shutdown

shutdown 是有序关闭。主要干了三件事:

改变当前线程池状态为 SHUTDOWN;

将当前工作队列中的全部线程标记为中断;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpdwgg.html