一、Executor执行器
1.Executor接口,Java线程池框架中的顶层接口,提供一个execute方法来执行任务
import java.util.concurrent.Executor; public class T01_MyExecutor implements Executor { public static void main(String[] args) { new T01_MyExecutor().execute(()->System.out.println("hello executor")); } @Override public void execute(Runnable command) { //new Thread(command).run();//另起线程方法调用 command.run();//方法直接调用 } }
2.任务接口
1)callable接口:提供一个call方法,具有返回值可以抛出异常
2)runnable接口:提供一个run方法,无返回值不可抛出异常
3.ExecutorService接口(Executor的一个子接口)
1)ExecutorService可以理解为服务,执行execute方法可以向服务器中添加runnable任务无返回值,而执行submit方法可以向服务中添加callable与runnable任务,且有返回值
2)Executor与ExecutorService的简单理解:可以想象许多的执行器Executor在等待着执行任务,而service服务可通过调用execute或submit方法来添加不同的任务供执行器执行
3)submit方法具有一个future类型的返回值,与callable,Executors一起使用
public class T06_Future { public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask<Integer> task = new FutureTask<>(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1000; }); //new Callable () { Integer call();} new Thread(task).start(); System.out.println(task.get()); //阻塞 //******************************* ExecutorService service = Executors.newFixedThreadPool(5); Future<Integer> f = service.submit(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1; }); System.out.println(f.get()); System.out.println(f.isDone()); } }
运行结果:
4.Executors(操作Executor的一个工具类、工厂类)
二、ThreadPool线程池1.线程池将任务分配给池中线程,当线程池中线程执行不过来时,任务会进入等待队列,队列由BlockingQueue实现,当有线程空闲时再来任务时会被分配给空闲线程,一个线程同时维护着两个队列(结束队列与等待队列)
下面一个程序帮助理解线程池的概念
public class T05_ThreadPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(5); //execute submit for (int i = 0; i < 6; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); service.shutdown();//正常关闭,等待所有任务执行完毕,关闭线程池 System.out.println(service.isTerminated());//判断所有任务是否都执行完了 System.out.println(service.isShutdown());//判断线程池是否被关闭了 System.out.println(service); TimeUnit.SECONDS.sleep(5); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); } }
运行结果:
2.六种线程池
1)FixedThreadPool 固定线程数量的线程池
2)cachedThreadPool 假定刚开始一个线程都没有,来一个任务开启一个线程,如果再来一个任务,线程池中有空闲线程就将任务分配给空闲线程,如此往复直到起到电脑能支撑限度为止,默认一个线程空闲超过一分钟就自动销毁
3)singleThreadPool 线程池中只有一个线程
4)scheduledThreadPool 定时器线程池
5)workStealingPool 工作窃取线程池,当前线程池有线程空闲,会自动去寻找任务执行,默认根据CPU核数启动默认线程数目线程,是精灵(demon)线程(守护线程、后台线程),主函数不阻塞的话看不到输出,本质上是ForkJoinPool实现的
6)ForkJoinPool 分叉合并线程,大任务可以切分成许多的小任务,如果小任务还是太大的话还可以继续分,分的可以了就将小任务合并,最后产生一个总的结果(有点像归并排序)
ForkJoinTask从RecursiveAction和RecursiveTask继承
a.RecursiveAction 无返回值
/** * 对数组中所有数求和 * @author zhangqi * */ public class T12_ForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } System.out.println(Arrays.stream(nums).sum()); //stream api } static class AddTask extends RecursiveAction { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected void compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; System.out.println("from:" + start + " to:" + end + " = " + sum); } else { int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); } } } /*static class AddTask extends RecursiveTask<Long> { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected Long compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; return sum; } int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); return subTask1.join() + subTask2.join(); } }*/ public static void main(String[] args) throws IOException { ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task); //long result = task.join(); //System.out.println(result); System.in.read(); } }
运行结果:
b.RescursiveTask 有返回值的递归任务
/** * 对数组中所有数求和 * @author zhangqi * */ public class T12_ForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } System.out.println(Arrays.stream(nums).sum()); //stream api } /* static class AddTask extends RecursiveAction { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected void compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; System.out.println("from:" + start + " to:" + end + " = " + sum); } else { int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); } } }*/ static class AddTask extends RecursiveTask<Long> { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected Long compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; return sum; } int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); return subTask1.join() + subTask2.join(); } } public static void main(String[] args) throws IOException { ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task); long result = task.join(); System.out.println(result); //System.in.read(); } }
运行结果:
补充:ThreadPoolExecutor线程池执行器(自定义线程池,许多线程池的底层实现,ForkJoinPool不是由它实现)