2.FutureTask对象的run方法会存储返回的结果或者异常。调用方可以根据FutureTask获取任务的执行结果。
//省略了部分代码 public void run() { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //执行任务 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //存储异常 setException(ex); } if (ran) //存储返回值 set(result); } 线程池的关闭 shutdownshutdown将线程池的状态设置成SHUTDOWN,同时拒绝提交新的任务,但是已提交的任务会正常执行
shutdownNowshutdownNow将线程池的状态设置成STOP,该状态下拒绝提交新的任务 & 丢弃工作队列中的任务& 中断当前活跃的线程(尝试停止正在执行的任务)
需要注意的是shutdownNow对于正在执行的任务只是尝试停止,不保证成功(取决于任务是否监听处理中断位)
ScheduledThreadPoolExecutor 定时调度原理ScheduledThreadPoolExecutor在ThreadPoolExecutor之上扩展实现了定时调度的能力
1.实例化时工作队列使用延时队列(DelayedWorkQueue)--- 本质是个小顶堆
public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); }2.提交的任务装饰成ScheduledFutureTask类型,并把任务加入到工作队列(不直接调用execute)
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); //装饰 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); //任务加入工作队列 delayedExecute(t); return t; }3.ScheduledFutureTask实现Delayed和Comparable接口
所以提交到工作队列中的任务是按照任务执行时间排序的(最早执行的任务在头部),因为工作队列是个小顶堆。
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; }4.只能从工作队列中获取已到执行时间的任务
public RunnableScheduledFuture<?> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture<?> first = queue[0]; //如果头部的任务还没有到执行时间, 直接返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } } 线程池配置假设:CPU核心数是N,每个任务的执行时间是T,任务的超时时间是timeout,核心线程数是corePoolSize,工作队列大小是workQueue, 最大线程数是 maxPoolSize, 任务最大并发数为maxTasks
核心线程数配置
对于CPU密集型任务:corePoolSize 大小设置成和CPU核心数接近,如N+1 或者 N+2
对于IO密集型任务:corePoolSize可以设置的比较大一些,如2N~3N;也可以通过如下逻辑进行估算
假设80%的时间是IO操作,那么每个任务需要占用CPU时间大概是0.2T, 每秒每个CPU核心最大可以执行的任务数为 = (1/0.2T) = 5/T;所以理论上 80%IO的情况下corePoolSize可以设置为 5N (一个cpu可以对应5个工作线程)
工作队列大小配置工作队列的大小取决于任务的超时时间 & 核心线程池的吞吐量
则 workQueue = corePoolSize * (1/T) * timeout = (corePoolSize * timeout) / T
需要注意的是: 工作队列不能使用无界队列。(无界队列异常情况下可能耗尽系统资源,造成服务不可用)
最大线程数配置最大线程数的大小取决于最大的任务并发数 & 工作队列的大小 & 任务的执行时间
则 maxPoolSize = (maxTasks - workQueue) / T
拒绝策略配置对于无关紧要的任务,我们可以直接丢弃;对于一些重要的任务需要对任务进行持久化,以便后续进行补偿和恢复。
线程池监控我们可以有个定时脚本将线程池的最大线程数、工作队列大小、已经执行的任务数、已经拒绝的任务数等数据推送到监控系统
这样我们可以根据这些数据对线程池进行调优,也可以即使感知线上业务异常。