前面获取状态的时候调用了 ctlOf()方法,根据前面,我们可以知道,CAPACITY实际上是 29 位,而线程状态用的是 32 - 30 共 3 位,也就是说,ctl 共 32 位,高3 位用于表示线程池状态,而低 29 位表示工作线程的数量。
这样上述三个方法就很好理解了:
ctlOf():获取 ctl。
将工作线程数量与运行状态进行于运算,假如我们处于 RUNNING,并且有 1 个工作线程,那么 ctl = 111....000 | 000.... 001,最终得到 111 ..... 001;
runStateOf():获取运行状态。
继续根据上文的数据,~CAPACITY 取反即为 111....000,与运行状态 111...0000 与运算,最终得到 111....000,相当于低位掩码,消去低 29 位;
workerCountOf():获取工作线程数。
同理,c & CAPACITY里的 CAPACITY 相当于高位掩码,用于消去高 3 位,最终得到 00...001,即工作线程数。
同理,如果要增加工作线程数,就直接通过 CAS 去递增 ctl,比如新建线程中使用的公共方法:
private boolean compareAndIncrementWorkerCount(int expect) { // 通过 CAS 递增 ctl return ctl.compareAndSet(expect, expect + 1); }要改变线程池状态,就根据当前工作线程和要改变的状态去合成新的 ctl,然后 CAS 改变 ctl,比如 shutdown()中涉及的相关代码:
private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || // 通过 CAS 改变 ctl ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } 三、任务的创建与执行线程池任务提交方法是 execute(),根据代码可知,当一个任务进来时,分四种情况:
当前工作线程数小于核心线程数,启动新线程;
当前工作线程数大于核心线程数,但是未大于最大线程数,尝试添加到工作队列;
当前线程池核心线程和队列都满了,尝试创建新非核心线程。
非核心线程创建失败,说明线程池彻底满了,执行拒绝策略。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 1.当前工作线程数小于核心线程数,启动新线程 if (workerCountOf(c) < corePoolSize) { // 添加任务 if (addWorker(command, true)) return; c = ctl.get(); } // 2. 当前工作线程数大于核心线程数,但是未大于最大线程数,尝试添加到工作队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 如果当前线程处于非运行态,并且移除当前任务成功,则拒绝任务(防止添加到一半就shutdown) if (! isRunning(recheck) && remove(command)) reject(command); // 如果当前没有工作线程了,就启动新线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 3.当前线程池核心线程和队列都满了,尝试创建新非核心线程 else if (!addWorker(command, false)) // 4.线程池彻底满了,执行拒绝策略 reject(command); } 1.添加任务添加任务依靠 addWorker()方法,这个方法很长,但是主要就干了两件事:
CAS 让 ctl 的工作线程数 +1;
启动新的线程;
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 1.改变 ctl 使工作线程+1 for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果当前不处于运行状态,传入任务为空,并且任务队列为空的时候拒绝添加新任务 // 即线程池 shutdown 时不让添加新任务,但是运行继续跑完任务队列里的任务。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 线程不允许超过最大线程数,核心线程不允许超过最大核心线程数 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 递增工作线程数 if (compareAndIncrementWorkerCount(c)) // 失败了就重新回到上面的retry处继续往下执行 break retry; // 更新 ctl c = ctl.get(); // 如果运行状态改变了就全部从来 if (runStateOf(c) != rs) continue retry; } } // 2.启动新线程 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建新线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 如果线程池处于运行状态,或者没有新任务的SHUTDOWN状态(即SHUTDOW以后还在消费工作队列里的任务) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 线程是否在未启动前就已经启动了 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); // 如果集合中的工作线程数大于最大线程数,则将池中最大线程数改为当前工作线程数 if (s > largestPoolSize) largestPoolSize = s; // 线程创建完成 workerAdded = true; } } finally { mainLock.unlock(); } // 如果线程成功创建,就启动线程,并且更改启动状态为成功 if (workerAdded) { t.start(); workerStarted = true; } } } finally { // 如果线程启动不成功,就执行失败策略 if (! workerStarted) // 启动失败策略,从当前工作线程队列移除当前启动失败的线程,递减工作线程数,然后尝试关闭线程池(如果当前任务就是线程池最后一个任务) addWorkerFailed(w); } return workerStarted; } 2. 任务对象Worker