MasterDemo master = new MasterDemo(new WorkerDemo(), 10);
for (int i = 0; i < 100; i++) {
TaskDemo task = new TaskDemo();
task.setId(i);
task.setName("任务" + i);
task.setPrice(new Random().nextInt(10000));
master.submit(task);
}
master.execute();
while (true) {
if (master.isComplete()) {
System.out.println("执行的结果为: " + master.getResult());
break;
}
}
该线程池是jdk7之后引入的一个并行执行任务的框架, 其核心思想也是将任务分割为子任务,
有可能子任务还是很大, 还需要进一步拆解, 最终得到足够小的任务.
将分割出来的子任务放入双端队列中, 然后几个启动线程从双端队列中获取任务执行.
子任务执行的结果放到一个队列里, 另起线程从队列中获取数据, 合并结果.
假设我们的场景需要计算从0到20000000L的累加求和. CountTask继承自RecursiveTask, 可以携带返回值.
每次分解大任务, 简单的将任务划分为100个等规模的小任务, 并使用fork()提交子任务.
在子任务中通过THRESHOLD设置子任务分解的阈值, 如果当前需要求和的总数大于THRESHOLD, 则子任务需要再次分解,
如果子任务可以直接执行, 则进行求和操作, 返回结果. 最终等待所有的子任务执行完毕, 对所有结果求和.
public class CountTask extends RecursiveTask<Long>{
// 任务分解的阈值
private static final int THRESHOLD = 10000;
private long start;
private long end;
public CountTask(long start, long end) {
this.start = start;
this.end = end;
}
public Long compute() {
long sum = 0;
boolean canCompute = (end - start) < THRESHOLD;
if (canCompute) {
for (long i = start; i <= end; i++) {
sum += i;
}
} else {
// 分成100个小任务
long step = (start + end) / 100;
ArrayList<CountTask> subTasks = new ArrayList<CountTask>();
long pos = start;
for (int i = 0; i < 100; i++) {
long lastOne = pos + step;
if (lastOne > end) {
lastOne = end;
}
CountTask subTask = new CountTask(pos, lastOne);
pos += step + 1;
// 将子任务推向线程池
subTasks.add(subTask);
subTask.fork();
}