介绍过了 WorkerQueue 和 ForkJoinTask,上文说的三个重要角色中的最后一个 ForkJoinWorkerThread 终于登场了
private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { //如果工厂已经存在了,就用factory来创建线程,会去注册线程,这里的this就是ForkJoinPool对象 if (fac != null && (wt = fac.newThread(this)) != null) { //启动线程 wt.start(); return true; } } catch (Throwable rex) { ex = rex; } //如果创建线程失败,就要逆向注销线程,包括前面对ctl等的操作 deregisterWorker(wt, ex); return false; }Worker 线程是如何与 WorkQueue 对应的,就藏在 fac.newThread(this) 这个方法里面,下面这点代码展示一下调用过程
public ForkJoinWorkerThread newThread(ForkJoinPool pool); static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } protected ForkJoinWorkerThread(ForkJoinPool pool) { // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); }很显然核心内容在 registerWorker 方法里面了
registerWorker WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { this.pool = pool; this.owner = owner; // Place indices in the center of array (that is not yet allocated) base = top = INITIAL_QUEUE_CAPACITY >>> 1; } final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; //这里线程被设置为守护线程,因为,当只剩下守护线程时,JVM就会推出 wt.setDaemon(true); // configure thread //填补处理异常的handler if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); //创建一个WorkQueue,并且设置当前WorkQueue的owner是当前线程 WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index //又用到了config的知识,提取出我们期望的WorkQueue模式 int mode = config & MODE_MASK; //加锁 int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array //判断ForkJoinPool的WorkQueue[]都初始化完全 if ((ws = workQueues) != null && (n = ws.length) > 0) { //一种魔数计算方式,用以减少冲突 int s = indexSeed += SEED_INCREMENT; // unlikely to collide //假设WorkQueue的初始长度是16,那这里的m就是15,最终目的就是为了得到一个奇数 int m = n - 1; //和得到偶数的计算方式一样,得到一个小于m的奇数i i = ((s << 1) | 1) & m; // odd-numbered indices //如果这个槽位不为空,说明已经被其他线程初始化过了,也就是有冲突,选取别的槽位 if (ws[i] != null) { // collision int probes = 0; // step by approx half n //步长加2,也就保证step还是奇数 int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; //一直遍历,直到找到空槽位,如果都遍历了一遍,那就需要对WorkQueue[]扩容了 while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } //初始化一个随机数 w.hint = s; // use as random seed //如文章前面所说,config记录索引值和模式 w.config = i | mode; //扫描状态也记录为索引值,如文章前面所说,奇数表示为scanning状态 w.scanState = i; // publication fence //把初始化好的WorkQueue放到ForkJoinPool的WorkQueue[]数组中 ws[i] = w; } } finally { //解锁 unlockRunState(rs, rs & ~RSLOCK); } //设置worker的前缀名,用于业务区分 wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); //返回当前线程创建的WorkQueue,回到上一层调用栈,也就将WorkQueue注册到ForkJoinWorkerThread里面了 return w; }到这里线程是顺利创建成功了,可是如果线程没有创建成功,就需要 deregisterWorker来做善后工作了
deregisterWorkerderegisterWorker 方法接收刚刚创建的线程引用和异常作为参数,来做善后工作,将 registerWorker 相关工作撤销回来
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { WorkQueue[] ws; // remove index from array //获取当前线程注册的索引值 int idx = w.config & SMASK; //加锁 int rs = lockRunState(); //如果奇数槽位都不为空,则清空内容 if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) ws[idx] = null; //解锁 unlockRunState(rs, rs & ~RSLOCK); } long c; // decrement counts //死循环式CAS更改ctl的值,将前面AC和TC加1的值再减1,ctl就在那里,不增不减 do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) | (TC_MASK & (c - TC_UNIT)) | (SP_MASK & c)))); //清空WorkQueue,将其中的task取消掉 if (w != null) { w.qlock = -1; // ensure set w.transferStealCount(this); w.cancelAll(); // cancel remaining tasks } //可能的替换操作 for (;;) { // possibly replace WorkQueue[] ws; int m, sp; //如果线程池终止了,那就跳出循环即可 if (tryTerminate(false, false) || w == null || w.array == null || (runState & STOP) != 0 || (ws = workQueues) == null || (m = ws.length - 1) < 0) // already terminating break; //当前线程创建失败,通过sp判断,如果还存在空闲线程,则调用tryRelease来唤醒这个线程,然后跳出 if ((sp = (int)(c = ctl)) != 0) { // wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } //如果没空闲线程,并且还没有达到满足并行度的条件,那就得再次尝试创建一个线程,弥补刚刚的失败 else if (ex != null && (c & ADD_WORKER) != 0L) { tryAddWorker(c); // create replacement break; } else // don't need replacement break; } if (ex == null) // help clean on way out //处理异常 ForkJoinTask.helpExpungeStaleExceptions(); else // rethrow ForkJoinTask.rethrow(ex); }总之 deregisterWorker 方法从线程池里注销线程,清空WorkQueue,同时更新ctl,最后做可能的替换,根据线程池的状态决定是否找一个自己的替代者:
有空闲线程,则唤醒一个
没有空闲线程,再次尝试创建一个新的工作线程