Java并发之线程池ThreadPoolExecutor源码分析学习 (4)

失败的话再接着获取线程池控制状态,因为只有状态变了才会失败,所以重新获取
3、判断线程池是否处于运行状态,是的话则添加command到阻塞队列,加入时也会再次获取状态并且检测
​ 状态是否不处于运行状态,不处于的话则将command从阻塞队列移除,并且拒绝任务
4、如果线程池里没有了线程,则创建新的线程去执行获取阻塞队列的任务执行
5、如果以上都没执行成功,则需要开启最大线程池里的线程来执行任务,失败的话就丢弃

有时候再多的文字也不如一个流程图来的明白,所以还是画了个execute的流程图给大家方便理解。

2.addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) { //外部循环标记 retry: //外层死循环 for (;;) { //获取线程池控制状态 int c = ctl.get(); //获取runState int rs = runStateOf(c); ​ // Check if queue empty only if necessary. /** *1.如果线程池runState至少已经是SHUTDOWN *2\. 有一个是false则addWorker失败,看false的情况 * - runState==SHUTDOWN,即状态已经大于SHUTDOWN了 * - firstTask为null,即传进来的任务为空,结合上面就是runState是SHUTDOWN,但是 * firstTask不为空,代表线程池已经关闭了还在传任务进来 * - 队列为空,既然任务已经为空,队列为空,就不需要往线程池添加任务了 */ if (rs >= SHUTDOWN && //runState大于等于SHUTDOWN,初始位RUNNING ! (rs == SHUTDOWN && //runState等于SHUTDOWN firstTask == null && //firstTask为null ! workQueue.isEmpty())) //workQueue队列不为空 return false; ​ //内层死循环 for (;;) { //获取线程池的workerCount数量 int wc = workerCountOf(c); //如果workerCount超出最大值或者大于corePoolSize/maximumPoolSize //返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //通过CAS操作,使workerCount数量+1,成功则跳出循环,回到retry标记 if (compareAndIncrementWorkerCount(c)) break retry; //CAS操作失败,再次获取线程池的控制状态 c = ctl.get(); // Re-read ctl //如果当前runState不等于刚开始获取的runState,则跳出内层循环,继续外层循环 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop //CAS由于更改workerCount而失败,继续内层循环 } } ​ //通过以上循环,能执行到这是workerCount成功+1了 //worker开始标记 boolean workerStarted = false; //worker添加标记 boolean workerAdded = false; //初始化worker为null Worker w = null; try { //初始化一个当前Runnable对象的worker对象 w = new Worker(firstTask); //获取该worker对应的线程 final Thread t = w.thread; //如果线程不为null if (t != null) { //初始线程池的锁 final ReentrantLock mainLock = this.mainLock; //获取锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //获取锁后再次检查,获取线程池runState int rs = runStateOf(ctl.get()); ​ //当runState小于SHUTDOWN或者runState等于SHUTDOWN并且firstTask为null if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //线程已存活 if (t.isAlive()) // precheck that t is startable //线程未启动就存活,抛出IllegalThreadStateException异常 throw new IllegalThreadStateException(); //将worker对象添加到workers集合当中 workers.add(w); //获取workers集合的大小 int s = workers.size(); //如果大小超过largestPoolSize if (s > largestPoolSize) //重新设置largestPoolSize largestPoolSize = s; //标记worker已经被添加 workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } //如果worker添加成功 if (workerAdded) { //启动线程 t.start(); //标记worker已经启动 workerStarted = true; } } } finally { //如果worker没有启动成功 if (! workerStarted) //workerCount-1的操作 addWorkerFailed(w); } //返回worker是否启动的标记 return workerStarted; }

我们也简单说一下这个代码的流程吧,还真的是挺难的,博主写的时候都停了好多次,想砸键盘的说:

1、获取线程池的控制状态,进行判断,不符合则返回false,符合则下一步
2、死循环,判断workerCount是否大于上限,或者大于corePoolSize/maximumPoolSize,没有的话则对workerCount+1操作,
3、如果不符合上述判断或+1操作失败,再次获取线程池的控制状态,获取runState与刚开始获取的runState相比,不一致则跳出内层循环继续外层循环,否则继续内层循环
4、+1操作成功后,使用重入锁ReentrantLock来保证往workers当中添加worker实例,添加成功就启动该实例。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxgfx.html