我们知道,单个线程计算是串行的,只有等上一个任务结束之后,才能执行下一个任务,所以执行效率是比较低的。
那么,如果用多线程执行任务,就可以在单位时间内执行更多的任务,而Master-Worker就是多线程并行计算的一种实现方式。
它的思想是,启动两个进程协同工作:Master和Worker进程。
Master负责任务的接收和分配,Worker负责具体的子任务执行。每个Worker执行完任务之后把结果返回给Master,最后由Master汇总结果。(其实也是一种分而治之的思想,和forkjoin计算框架有相似之处,参看:并行任务计算框架forkjoin)
Master-Worker工作示意图如下:
下面用Master-Worker实现计算1-100的平方和,思路如下:
定义一个Task类用于存储每个任务的数据。
Master生产固定个数的Worker,把所有worker存放在workers变量(map)中,Master需要存储所有任务的队列workqueue(ConcurrentLinkedQueue)和所有子任务返回的结果集resultMap(ConcurrentHashMap)。
每个Worker执行自己的子任务,然后把结果存放在resultMap中。
Master汇总resultMap中的数据,然后返回给Client客户端。
为了扩展Worker的功能,用一个MyWorker继承Worker重写任务处理的具体方法。
Task类:
package com.thread.masterworker; public class Task { private int id; private String name; private int num; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getNum() { return num; } public void setNum(int num) { this.num = num; } }Master实现:
package com.thread.masterworker; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { //所有任务的队列 private ConcurrentLinkedQueue<Task> workerQueue = new ConcurrentLinkedQueue<Task>(); //所有worker private HashMap<String,Thread> workers = new HashMap<String,Thread>(); //共享变量,worker返回的结果 private ConcurrentHashMap<String,Object> resultMap = new ConcurrentHashMap<String,Object>(); //构造方法,初始化所有worker public Master(Worker worker,int workerCount){ worker.setWorkerQueue(this.workerQueue); worker.setResultMap(this.resultMap); for (int i = 0; i < workerCount; i++) { Thread t = new Thread(worker); this.workers.put("worker-"+i,t); } } //任务的提交 public void submit(Task task){ this.workerQueue.add(task); } //执行任务 public int execute(){ for (Map.Entry<String, Thread> entry : workers.entrySet()) { entry.getValue().start(); } //一直循环,直到结果返回 while (true){ if(isComplete()){ return getResult(); } } } //判断是否所有线程都已经执行完毕 public boolean isComplete(){ for (Map.Entry<String, Thread> entry : workers.entrySet()) { //只要有任意一个线程没有结束,就返回false if(entry.getValue().getState() != Thread.State.TERMINATED){ return false; } } return true; } //处理结果集返回最终结果 public int getResult(){ int res = 0; for (Map.Entry<String,Object> entry : resultMap.entrySet()) { res += (Integer) entry.getValue(); } return res; } }父类Worker:
package com.thread.masterworker; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workerQueue; private ConcurrentHashMap<String,Object> resultMap; public void setWorkerQueue(ConcurrentLinkedQueue<Task> workerQueue) { this.workerQueue = workerQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } @Override public void run() { while(true){ //从任务队列中取出一个任务 Task task = workerQueue.poll(); if(task == null) break; //处理具体的任务 Object res = doTask(task); //把每次处理的结果放到结果集里面,此处直接把num值作为结果 resultMap.put(String.valueOf(task.getId()),res); } } public Object doTask(Task task) { return null; } }子类MyWorker继承父类Worker,重写doTask方法实现具体的逻辑:
package com.thread.masterworker; public class MyWorker extends Worker { @Override public Object doTask(Task task) { //暂停0.5秒,模拟任务处理 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } //计算数字的平方 int num = task.getNum(); return num * num; } }客户端Client:
package com.thread.masterworker; import java.util.Random; public class Client { public static void main(String[] args) { Master master = new Master(new MyWorker(), 10); //提交n个任务到任务队列里 for (int i = 0; i < 100; i++) { Task task = new Task(); task.setId(i); task.setName("任务"+i); task.setNum(i+1); master.submit(task); } //执行任务 long start = System.currentTimeMillis(); int res = master.execute(); long time = System.currentTimeMillis() - start; System.out.println("结果:"+res+",耗时:"+time); } }以上,我们用10个线程去执行子任务,最终由Master做计算求和(1-100的平方和)。每个线程暂停500ms,计算数字的平方值。