Java调度线程池ScheduledThreadPoolExecutor源码分析

最近新接手的项目里大量使用了ScheduledThreadPoolExecutor类去执行一些定时任务,之前一直没有机会研究这个类的源码,这次趁着机会好好研读一下。

该类主要还是基于ThreadPoolExecutor类进行二次开发,所以对Java线程池执行过程还不了解的同学建议先看看我之前的文章。
当面试官问Java线程池时,你应该知道些什么?

一、执行流程

与ThreadPoolExecutor不同,向ScheduledThreadPoolExecutor中提交任务的时候,任务被包装成ScheduledFutureTask对象加入延迟队列并启动一个woker线程。

用户提交的任务加入延迟队列时,会按照执行时间进行排列,也就是说队列头的任务是需要最早执行的。而woker线程会从延迟队列中获取任务,如果已经到了任务的执行时间,则开始执行。否则阻塞等待剩余延迟时间后再尝试获取任务。

任务执行完成以后,如果该任务是一个需要周期性反复执行的任务,则计算好下次执行的时间后会重新加入到延迟队列中。

二、源码深入分析

首先看下ScheduledThreadPoolExecutor类的几个构造函数:

public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }

注:这里构造函数都是使用super,其实就是ThreadPoolExecutor的构造函数
这里有三点需要注意:

使用DelayedWorkQueue作为阻塞队列,并没有像ThreadPoolExecutor类一样开放给用户进行自定义设置。该队列是ScheduledThreadPoolExecutor类的核心组件,后面详细介绍。

这里没有向用户开放maximumPoolSize的设置,原因是DelayedWorkQueue中的元素在大于初始容量16时,会进行扩容,也就是说队列不会装满,maximumPoolSize参数即使设置了也不会生效。

worker线程没有回收时间,原因跟第2点一样,因为不会触发回收操作。所以这里的线程存活时间都设置为0。

再次说明:上面三点的理解需要先了解ThreadPoolExecutor的知识点。

当我们创建出一个调度线程池以后,就可以开始提交任务了。这里依次分析一下三个常用API的源码:

首先是schedule方法,该方法是指任务在指定延迟时间到达后触发,只会执行一次。

public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { //参数校验 if (command == null || unit == null) throw new NullPointerException(); //这里是一个嵌套结构,首先把用户提交的任务包装成ScheduledFutureTask //然后在调用decorateTask进行包装,该方法是留给用户去扩展的,默认是个空方法 RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit))); //包装好任务以后,就进行提交了 delayedExecute(t); return t; }

重点看一下提交任务的源码:

private void delayedExecute(RunnableScheduledFuture<?> task) { //如果线程池已经关闭,则使用拒绝策略把提交任务拒绝掉 if (isShutdown()) reject(task); else { //与ThreadPoolExecutor不同,这里直接把任务加入延迟队列 super.getQueue().add(task); //如果当前状态无法执行任务,则取消 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //这里是增加一个worker线程,避免提交的任务没有worker去执行 //原因就是该类没有像ThreadPoolExecutor一样,woker满了才放入队列 ensurePrestart(); } }

这里的关键点其实就是super.getQueue().add(task)行代码,ScheduledThreadPoolExecutor类在内部自己实现了一个基于堆数据结构的延迟队列。add方法最终会落到offer方法中,一起看下:

public boolean offer(Runnable x) { //参数校验 if (x == null) throw new NullPointerException(); RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x; final ReentrantLock lock = this.lock; lock.lock(); try { //查看当前元素数量,如果大于队列长度则进行扩容 int i = size; if (i >= queue.length) grow(); //元素数量加1 size = i + 1; //如果当前队列还没有元素,则直接加入头部 if (i == 0) { queue[0] = e; //记录索引 setIndex(e, 0); } else { //把任务加入堆中,并调整堆结构,这里就会根据任务的触发时间排列 //把需要最早执行的任务放在前面 siftUp(i, e); } //如果新加入的元素就是队列头,这里有两种情况 //1.这是用户提交的第一个任务 //2.新任务进行堆调整以后,排在队列头 if (queue[0] == e) { //这个变量起优化作用,后面说 leader = null; //加入元素以后,唤醒worker线程 available.signal(); } } finally { lock.unlock(); } return true; }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/f25cb4ae62cc2a190b0d605d076d456e.html