到这里,ForkJoinPool 的完整流程算是有个基本了解了,但是我们前面讲的这些内容都是从 submission task 作为切入点的。刚刚聊到的 compute 方法,我们按照分治算法范式写了自己的逻辑,具体请回看文中开头的demo,很关键的一点是,我们在 compute 中调用了 fork 方法,这就给我们了解 worker task 的机会了,继续来看 fork 方法
forkFork 方法的逻辑很简单,如果当前线程是 ForkJoinWorkerThread 类型,也就是说已经通过上文注册的 Worker,那么直接调用 push 方法将 task 放到当前线程拥有的 WorkQueue 中,否则就再调用 externalPush 重走我们已上说的所有逻辑(你敢再走一遍吗?)
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } //push 方法很简单,这里就不再过多解释了 final void push(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }有 fork 就有 join,继续看一下 join 方法()
joinjoin 的核心调用在 doJoin,但是看到这么多级联三元运算符,我慌了
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; //status,task 的运行状态 return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); }我们将 doJoin 方法用我们最熟悉的 if/else 做个改动,是不是就豁然开朗了
private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; if((s = status) < 0) { // 有结果,直接返回 return s; }else { if((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { // 如果是 ForkJoinWorkerThread Worker if((w = (wt = (ForkJoinWorkerThread) t).workQueue).tryUnpush(this) // 类似上面提到的 scan,但是是专项尝试从本工作队列里取出等待的任务 // 取出了任务,就去执行它,并返回结果 && (s = doExec()) < 0) { return s; }else { // 也有可能别的线程把这个任务偷走了,那就执行内部等待方法 return wt.pool.awaitJoin(w, this, 0L); } }else { // 如果不是 ForkJoinWorkerThread,执行外部等待方法 return externalAwaitDone(); } } }其中 awaitJoin 和 externalAwaitDone 都用到了 Helper(帮助) 和 Compensating(补偿) 两种策略,这两种策略大家完全可以自行阅读了,尤其是 awaitJoin 方法,强烈推荐大家自行阅读,其中 pop 的过程在这里,这里不再展开
到这里,有关 ForkJoinPool 相关的内容就算是结束了,为了让大家有个更好的理解 fork/join 机制,我们还是画几张图解释一下
Fork/Join 图解假设我们的大任务是 Task(8), 最终被分治成可执行的最小单元是 Task(1)
按照分治思想拆分任务的整体目标就是这样滴:
从外部先提交一个大的 Task(8),将其放在偶数槽位中(请注意颜色对应)
不满足并行度,会创建 Worker 1 来扫描,并从 base 端窃取到任务 task(8),执行到 compute, fork
出两个 task(4), 并 push到 WorkQueue 中