线程池ThreadPoolExecutor类的使用

1.使用线程池的好处?

第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

可以先看下线程池的类图:

线程池ThreadPoolExecutor类的使用

2.ThreadPoolExecutor的使用

线程池的状态:

线程池ThreadPoolExecutor类的使用

A.线程池的创建

我们可以通过java.util.concurrent.ThreadPoolExecutor来创建一个线程池。

创建线程池需要的参数介绍:

corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。

ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。

SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。

PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。

AbortPolicy:直接抛出异常。

CallerRunsPolicy:只用调用者所在线程来运行任务。

DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。

DiscardPolicy:不处理,丢弃掉。

当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。

keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

B.向线程池提交任务

  提交任务有execute()和submit()两个方法,下面看看他俩的区别:

  ①接收参数不同

  execute()的参数是Runnable,submit()参数可以是Runnable,也可以是Cable。

  ②返回值不同

  execute()没有返回值,submit()有返回值Future。通过Future可以获取各个线程的完成情况,是否有异常,还能试图取消任务的执行。详见》》》》》》》》

  execute()很好理解,下面看个使用submit()获取返回值的例子,假设我有很多更新各种数据的task,我希望如果其中一个task失败,其它的task就不需要执行了。那我就需要catch Future.get抛出的异常,然后终止其它task的执行,代码如下:

1 public class SubmitTest { 2 3 public static void main(String[] args) { 4 ExecutorService executorService = Executors.newCachedThreadPool(); 5 List<Future<String>> futureList = new ArrayList<>(); 6 // 创建10个任务并执行 7 for (int i = 0; i < 10; i++) { 8 // 使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中 9 Future<String> future = executorService.submit(new TaskRunn(i)); 10 // 将任务执行结果存储到List中 11 futureList.add(future); 12 } 13 // 正常关闭线程池 14 executorService.shutdown(); 15 // 遍历任务的结果 16 for (Future<String> future : futureList) { 17 try { 18 System.out.println(future.get()); 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 } catch (ExecutionException e) { 22 // 出错了停止所有的线程 23 executorService.shutdownNow(); 24 e.printStackTrace(); 25 return; 26 } 27 } 28 } 29 } 30 31 class TaskRunn implements Callable<String>{ 32 33 private int id; 34 public TaskRunn(int id) { 35 this.id = id; 36 } 37 38 /** 39 * 任务的具体过程,一旦任务传给ExecutorService的submit方法,则该方法自动在一个线程上执行 40 */ 41 @Override 42 public String call() throws Exception { 43 System.out.println("call() begin..."+id+"//"+Thread.currentThread().getName()); 44 if (new Random().nextInt(10) > 5) { 45 throw new TaskException("task err:"+id+"//"+Thread.currentThread().getName()); 46 } 47 // 模拟业务耗时 48 for (int i = 0; i < 10; i++) { 49 Thread.sleep(1000); 50 } 51 return "result:"+id+"//" +Thread.currentThread().getName(); 52 } 53 } 54 55 // 定义自己的异常 56 class TaskException extends Exception{ 57 public TaskException(String mess) { 58 super(mess); 59 } 60 }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzsjpz.html