ForkJoinPool大型图文现场(一阅到底 vs 直接收藏) (9)

Flag1.1 : 有个细节需要说一下,我们在 Java AQS队列同步器以及ReentrantLock的应用 时提到过使用锁的范式以及为什么要这样用,ForkJoinPool 这里同样遵循这种范式

Lock lock = new ReentrantLock(); lock.lock(); try{ ... }finally{ lock.unlock(); }

Flag1.2: 简单描述这个过程,就是根据不同的并行度来初始化不同大小的 WorkQueue[]数组,数组大小要求是 2 的 n 次幂,所以给大家个表格直观理解一下并行度和队列容量的关系:

并行度p 容量
1,2   4  
3,4   8  
5 ~ 8   16  
9 ~ 16   32  

Flag 1,2,3: 如果你理解了上面这个方法,很显然,第一次执行这个方法内部的逻辑顺序应该是 Flag1——> Flag3——>Flag2

externalSubmit 如果任务成功提交,就会调用 signalWork 方法了

signalWork

前面铺垫的知识要大规模派上用场(一大波僵尸来袭),are you ready?

ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)

如果 ForkJoinPool 的 ctl 成员变量的作用已经忘了,赶紧向上翻重新记忆一下

//常量值 static final int SS_SEQ = 1 << 16; // version count final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; // ctl 小于零,说明活动的线程数 AC 不够 while ((c = ctl) < 0L) { // too few active // 取ctl的低32位,如果为0,说明没有等待的线程 if ((sp = (int)c) == 0) { // no idle workers // 取TC的高位,如果不等于0,则说明目前的工作着还没有达到并行度 if ((c & ADD_WORKER) != 0L) // too few workers //添加 Worker,也就是说要创建线程了 tryAddWorker(c); break; } //未开始或者已停止,直接跳出 if (ws == null) // unstarted/terminated break; //i=空闲线程栈顶端所属的工作队列索引 if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; //程序执行到这里,说明有空闲线程,计算下一个scanState,增加了版本号,并且调整为 active 状态 int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS //计算下一个ctl的值,活动线程数 AC + 1,通过stackPred取得前一个WorkQueue的索引,重新设置回sp,行程最终的ctl值 long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred); //更新 ctl 的值 if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v //如果有线程阻塞,则调用unpark唤醒即可 if ((p = v.parker) != null) U.unpark(p); break; } //没有任务,直接跳出 if (q != null && q.base == q.top) // no more work break; } }

假设程序刚开始执行,那么活动线程数以及总线程数肯定都没达到并行度要求,这时就会调用 tryAddWorker 方法了

tryAddWorker

tryAddWorker 的逻辑就非常简单了,因为是操作线程池,同样会用到 lockRunState/unlockRunState 的锁控制

private void tryAddWorker(long c) { //初始化添加worker表识 boolean add = false; do { //因为要添加Worker,所以AC和TC都要加一 long nc = ((AC_MASK & (c + AC_UNIT)) | (TC_MASK & (c + TC_UNIT))); //ctl还没被改变 if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) //更新ctl 的值, add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; //ctl值更新成功,开始真正的创建Worker if (add) { createWorker(); break; } } // 重新获取ctl,并且没有达到最大线程数,并且没有空闲的线程 } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); }

一切顺利,就要调用 createWorker 方法来创建真正的 Worker 了,形势逐渐明朗

createWorker

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgysx.html