ForkJoinPool大型图文现场(一阅到底 vs 直接收藏) (7)

这两个方法贯穿着后续代码分析的始终,多注意 unlockRunState 的入参即可,另外你也看到了通知都是用的 notifyAll,而不是 notify,这个问题我们之前重点说明过,你还记得为什么吗?如果不记得,打开并发编程之等待通知机制 回忆一下吧

ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)

第一层知识铺垫已经差不多了,前进

invoke/submit/execute

回到本文最开始带有 main 函数的 demo,我们向 ForkJoinPool 提交任务调用的是 invoke 方法, 其实 ForkJoinPool 还支持 submit 和 execute 两种方式来提交任务。并发的玩法非常类似,这三类方法的作业也很好区分:

invoke:提交任务,并等待返回执行结果

submit:提交并立刻返回任务,ForkJoinTask实现了Future,可以充分利用 Future 的特性

execute:只提交任务

在这三大类基础上又重载了几个更细粒度的方法,这里不一一列举:

public <T> T invoke(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task.join(); } public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); externalPush(task); }

相信你已经发现了,提交任务的方法都会调用 externalPush(task) 这个用法,源码的主角终于要登场了

ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)

但是......

如果你看 externalPush 代码,第一行就是声明一个 WorkQueue 数组变量,为了后续流程更加丝滑,咱还得铺垫一点 WorkQueue 的知识(又要铺垫)

WorkQueue

一看这么多成员变量,还是很慌的,不过,我们只需要把我几个主要的就足够了

ForkJoinPool大型图文现场(一阅到底 vs 直接收藏)

//初始队列容量 static final int INITIAL_QUEUE_CAPACITY = 1 << 13; //最大队列容量 static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // Instance fields volatile int scanState; // versioned, <0: inactive; odd:scanning int stackPred; // pool stack (ctl) predecessor 前任池(WorkQueue[])索引,由此构成一个栈 int nsteals; // number of steals 偷取的任务个数 int hint; // randomization and stealer index hint 记录偷取者的索引,方便后面顺藤摸瓜 int config; // pool index and mode volatile int qlock; // 1: locked, < 0: terminate; else 0 volatile int base; // index of next slot for poll int top; // index of next slot for push ForkJoinTask<?>[] array; // the elements (initially unallocated) 任务数组 final ForkJoinPool pool; // the containing pool (may be null) final ForkJoinWorkerThread owner; // owning thread or null if shared 当前工作队列的工作线程,共享模式下为null volatile Thread parker; // == owner during call to park; else null 调用park阻塞期间为owner,其他情况为null volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin 记录当前join来的任务 volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer 记录从其他工作队列偷取过来的任务

我们上面说了,WorkQueue 是一个双端队列,线程池有 runState,WorkQueue 有 scanState

小于零:inactive (未激活状态)

奇数:scanning (扫描状态)

偶数:running (运行状态)

操作线程池需要锁,操作队列也是需要锁的,qlock 就派上用场了

1: 锁定

0:未锁定

小于零:终止状态

WorkQueue 中也有个 config,但是和 ForkJoinPool 中的是不一样的,WorkQueue 中的config 记录了该 WorkQueue 在 WorkQueue[] 数组的下标以及 mode

其他字段的含义我们就写在代码注释中吧,主角重新登场,这次是真的

externalPush

文章前面说过,task 会细分成 submission task 和 worker task,worker task 是 fork 出来的,那从这个入口进入的,自然也就是 submission task 了,也就是说:

通过invoke() | submit() | execute() 等方法提交的 task, 是 submission task,会放到 WorkQueue 数组的偶数索引位置

调用 fork() 方法生成出的任务,叫 worker task,会放到 WorkQueue 数组的奇数索引位置

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgysx.html