ForkJoinPool大型图文现场(一阅到底 vs 直接收藏) (8)

该方法上的注释也写的很清楚,具体请参考代码注释

/** * Tries to add the given task to a submission queue at * submitter's current queue. Only the (vastly) most common path * is directly handled in this method, while screening for need * for externalSubmit. * * @param task the task. Caller must ensure non-null. */ final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; //Flag1: 通过ThreadLocalRandom产生随机数,用于下面计算槽位索引 int r = ThreadLocalRandom.getProbe(); int rs = runState; //初始状态为0 //Flag2: 如果ws,即ForkJoinPool中的WorkQueue数组已经完成初始化,且根据随机数定位的index存在workQueue,且cas的方式加锁成功 if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && //对WorkQueue操作加锁 U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a; int am, n, s; //WorkQueue中的任务数组不为空 if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { //组长度大于任务个数,不需要扩容 int j = ((am & s) << ASHIFT) + ABASE; //WorkQueue中的任务数组不为空 U.putOrderedObject(a, j, task); //向Queue中放入任务 U.putOrderedInt(q, QTOP, s + 1);//top值加一 U.putIntVolatile(q, QLOCK, 0); //对WorkQueue操作解锁 //任务个数小于等于1,那么此槽位上的线程有可能等待,如果大家都没任务,可能都在等待,新任务来了,唤醒,起来干活了 if (n <= 1) //唤醒可能存在等待的线程 signalWork(ws, q); return; } //任务入队失败,前面加锁了,这里也要解锁 U.compareAndSwapInt(q, QLOCK, 1, 0); } //Flag3: 不满足上述条件,也就是说上面的这些 WorkQueue[]等都不存在,就要通过这个方法一切从头开始创建 externalSubmit(task); }

上面加了三处 Flag,为了让大家更好的理解代码还是有必要做进一步说明的:

Flag1: ThreadLocalRandom 是 ThreadLocal 的衍生物,每个线程默认的 probe 是 0,当线程调用ThreadLocalRandom.current()时,会初始化 seed 和 probe,维护在线程内部,这里就知道是生成一个随机数就好,具体细节还是值得大家自行看一下

Flag2: 这里包含的信息还是非常多的

// 二进制为:0000 0000 0000 0000 0000 0000 0111 1110 static final int SQMASK = 0x007e; // max 64 (even) slots

m 的值代表 WorkQueue 数组的最大下表

m & r 会保证随机数 r 大于 m 的部分不可用

m & r & SQMASK 因为 SQMASK 最后一位是 0,最终的结果就会是偶数

r != 0 说明当前线程已经初始化过一些内容

rs > 0 说明 ForkJoinPool 的 runState 也已经被初始化过

Flag3: 看过 flag2 的描述,你也就很好理解 Flag 3 了,如果是第一次提交任务,必走 Flag 3 的 externalSubmit 方法

externalSubmit

这个方法很长,但没超过 80 行,具体请看方法注释

//初始化所需要的一切 private void externalSubmit(ForkJoinTask<?> task) { int r; // initialize caller's probe //生成随机数 if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; // 如果线程池的状态为终止状态,则帮助终止 if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } //Flag1: 再判断一次状态是否为初始化,因为在lockRunState过程中有可能状态被别的线程更改了 else if ((rs & STARTED) == 0 || // initialize ((ws = workQueues) == null || (m = ws.length - 1) < 0)) { int ns = 0; //Flag1.1: 加锁 rs = lockRunState(); try { if ((rs & STARTED) == 0) { // 初始化stealcounter的值(任务窃取计数器,原子变量) U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two //取config的低16位(确切说是低15位),获取并行度 int p = config & SMASK; // ensure at least 2 slots //Flag1.2: 如果你看过HashMap 的源码,这个就很好理解了,获取2次幂大小 int n = (p > 1) ? p - 1 : 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; //初始化 WorkQueue 数组 workQueues = new WorkQueue[n]; // 标记初始化完成 ns = STARTED; } } finally { // 解锁 unlockRunState(rs, (rs & ~RSLOCK) | ns); } } //Flag2 上面分析过,取偶数位槽位,将任务放进偶数槽位 else if ((q = ws[k = r & m & SQMASK]) != null) { // 对 WorkQueue 加锁 if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; // 初始化任务提交标识 boolean submitted = false; // initial submission or resizing try { // locked version of push //计算内存偏移量,放任务,更新top值 if ((a != null && a.length > s + 1 - q.base) || (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); //提交任务成功 submitted = true; } } finally { //WorkQueue解锁 U.compareAndSwapInt(q, QLOCK, 1, 0); } // 任务提交成功了 if (submitted) { //自然要唤醒可能存在等待的线程来处理任务了 signalWork(ws, q); return; } } //任务提交没成功,可以重新计算随机数,再走一次流程 move = true; // move on failure } //Flag3: 接Flag2,如果找到的槽位是空,则要初始化一个WorkQueue else if (((rs = runState) & RSLOCK) == 0) { // create new queue q = new WorkQueue(this, null); // 设置工作队列的窃取线索值 q.hint = r; // 如上面 WorkQueue 中config 的介绍,记录当前WorkQueue在WorkQueue[]数组中的值,和队列模式 q.config = k | SHARED_QUEUE; // 初始化为 inactive 状态 q.scanState = INACTIVE; //加锁 rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated //解锁 unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgysx.html