ForkJoinPool大型图文现场(一阅到底 vs 直接收藏) (12)

马上就接近真相了,steal 到任务了,就干点正事吧

final void runTask(ForkJoinTask<?> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy //Flag1: 记录当前的任务是偷来的,至于如何执行task,是我们写在compute方法中的,我们一会看doExec() 方法 (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); ForkJoinWorkerThread thread = owner; //累加偷来的数量,亲兄弟明算帐啊,虽然算完也没啥实际意义 if (++nsteals < 0) // collect on overflow transferStealCount(pool); //任务执行完后,就重新更新scanState为SCANNING scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } }

Flag1: doExec 方法才是真正执行任务的关键,它是链接我们自定义 compute 方法的核心,来看 doExec 方法

doExec

形势一片大好,挺住,揭开 exec 的面纱,就看到本质了

//ForkJoinTask中的抽象方法,RecursiveTask 和 RecursiveAction 都重写了它 protected abstract boolean exec(); final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; } //RecursiveTask重写的内容,终于看到我们文章开头 demo 中的compute 了 protected final boolean exec() { result = compute(); return true; }

到这里,我们已经看到本质了,绕了这么一大圈,终于和我们自己重写的compute方法联系到了一起,真是不容易,但是 runWorker 三部曲还差最后一曲 awaitWork 没谱,我们来看看

awaitWork

上面说的是 scan 到了任务,要是没有scan到任务,那就得将当前线程阻塞一下,具体标注在注释中,可以简单了解一下

private boolean awaitWork(WorkQueue w, int r) { if (w == null || w.qlock < 0) // w is terminating return false; for (int pred = w.stackPred, spins = SPINS, ss;;) { if ((ss = w.scanState) >= 0) break; else if (spins > 0) { r ^= r << 6; r ^= r >>> 21; r ^= r << 7; if (r >= 0 && --spins == 0) { // randomize spins WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc; if (pred != 0 && (ws = workQueues) != null && (j = pred & SMASK) < ws.length && //前驱任务队列还在 (v = ws[j]) != null && // see if pred parking //并且工作队列已经激活,说明任务来了了 (v.parker == null || v.scanState >= 0)) //继续自旋等一会,别返回false spins = SPINS; // continue spinning } } //自旋之后,再次检查工作队列是否终止,若是,退出扫描 else if (w.qlock < 0) // recheck after spins return false; else if (!Thread.interrupted()) { long c, prevctl, parkTime, deadline; int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK); if ((ac <= 0 && tryTerminate(false, false)) || (runState & STOP) != 0) // pool terminating return false; if (ac <= 0 && ss == (int)c) { // is last waiter prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred); int t = (short)(c >>> TC_SHIFT); // shrink excess spares if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl)) return false; // else use timed wait parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t); deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP; } else prevctl = parkTime = deadline = 0L; Thread wt = Thread.currentThread(); U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport w.parker = wt; if (w.scanState < 0 && ctl == c) // recheck before park U.park(false, parkTime); U.putOrderedObject(w, QPARKER, null); U.putObject(wt, PARKBLOCKER, null); if (w.scanState >= 0) break; if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L && U.compareAndSwapLong(this, CTL, c, prevctl)) return false; // shrink pool } } return true; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgysx.html