deregisterWorker 线程解释清楚了是为了帮助大家完整理解流程,但 registerWorker 成功后的流程还没走完,咱得继续,有了 Worker,那就调用 wt.start() 干活吧
runForkJoinWorkerThread 继承自Thread,调用start() 方法后,自然要调用自己重写的 run() 方法
public void run() { if (workQueue.array == null) { // only run once Throwable exception = null; try { onStart(); //Work开始工作,处理workQueue中的任务 pool.runWorker(workQueue); } catch (Throwable ex) { exception = ex; } finally { try { onTermination(exception); } catch (Throwable ex) { if (exception == null) exception = ex; } finally { pool.deregisterWorker(this, exception); } } } }方法的重点自然是进入到 runWorker
runWorkerrunWorker 是很常规的三部曲操作:
scan: 通过扫描获取任务
runTask:执行扫描到的任务
awaitWork:没任务进入等待
具体请看注释
final void runWorker(WorkQueue w) { //初始化队列,并根据需要是否扩容为原来的2倍 w.growArray(); // allocate queue int seed = w.hint; // initially holds randomization hint int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift //死循环更新偏移r,为扫描任务作准备 for (ForkJoinTask<?> t;;) { //扫描任务 if ((t = scan(w, r)) != null) //扫描到就执行任务 w.runTask(t); //没扫描到就等待,如果等也等不到任务,那就跳出循环别死等了 else if (!awaitWork(w, r)) break; r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift } }先来看 scan 方法
scanForkJoinPool 的任务窃取机制要来了,如何 steal 的,就藏在scan 方法中
private ForkJoinTask<?> scan(WorkQueue w, int r) { WorkQueue[] ws; int m; //再次验证workQueue[]数组的初始化情况 if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) { //获取当前扫描状态 int ss = w.scanState; // initially non-negative //又一个死循环,注意到出口位置就好 //和前面逻辑类似,随机一个起始位置,并赋值给k for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) { WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t; int b, n; long c; //如果k槽位不为空 if ((q = ws[k]) != null) { //base-top小于零,并且任务q不为空 if ((n = (b = q.base) - q.top) < 0 && (a = q.array) != null) { // non-empty //获取base的偏移量,赋值给i long i = (((a.length - 1) & b) << ASHIFT) + ABASE; //从base端获取任务,和前文的描述的steal搭配上了,是从base端steal if ((t = ((ForkJoinTask<?>) U.getObjectVolatile(a, i))) != null && q.base == b) { //是active状态 if (ss >= 0) { //更新WorkQueue中数组i索引位置为空,并且更新base的值 if (U.compareAndSwapObject(a, i, t, null)) { q.base = b + 1; //n<-1,说明当前队列还有剩余任务,继续唤醒可能存在的其他线程 if (n < -1) // signal others signalWork(ws, q); //直接返回任务 return t; } } else if (oldSum == 0 && // try to activate w.scanState < 0) tryRelease(c = ctl, ws[m & (int)c], AC_UNIT); } //如果获取任务失败,则准备换位置扫描 if (ss < 0) // refresh ss = w.scanState; r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m; // move and rescan oldSum = checkSum = 0; continue; } checkSum += b; } //k一直在变,扫描到最后,如果等于origin,说明已经扫描了一圈还没扫描到任务 if ((k = (k + 1) & m) == origin) { // continue until stable if ((ss >= 0 || (ss == (ss = w.scanState))) && oldSum == (oldSum = checkSum)) { if (ss < 0 || w.qlock < 0) // already inactive break; //准备inactive当前工作队列 int ns = ss | INACTIVE; // try to inactivate //活动线程数AC减1 long nc = ((SP_MASK & ns) | (UC_MASK & ((c = ctl) - AC_UNIT))); w.stackPred = (int)c; // hold prev stack top U.putInt(w, QSCANSTATE, ns); if (U.compareAndSwapLong(this, CTL, c, nc)) ss = ns; else w.scanState = ss; // back out } checkSum = 0; } } } return null; }如果顺利扫描到任务,那就要调用 runTask 方法来真正的运行这个任务了
runTask