Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析 (2)

workQueue:阻塞队列,用于存储等待执行的任务,并且只能存储调用execute 方法提交的任务。常用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。

keepAliveTime:线程池中线程的最大空闲时间,这种情况一般是线程数目大于任务的数量导致。

unit:keepAliveTime的时间单位,TimeUnit是一个枚举类型,位于java.util.concurrent包下。

threadFactory:线程工厂,用于创建线程。

handler:拒绝策略,当任务太多来不及处理时所采用的处理策略。

重要的变量

看完了构造函数,我们来看下ThreadPoolExecutor类中几个重要的成员变量:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl:控制线程运行状态的一个字段。同时,根据下面的几个方法runStateOf,workerCountOf,ctlOf可以看出,该字段还包含了两部分的信息:线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),并且使用的是Integar类型,高3位保存runState,低29位保存workerCount。

COUNT_BITS:值为29的常量,在字段CAPACITY被引用计算。

CAPACITY:表示有效线程数量(workerCount)的上限,大小为 (1<<29) - 1。

下面5个变量表示的是线程的运行状态,分别是:

RUNNING :接受新提交的任务,并且能处理阻塞队列中的任务;

SHUTDOWN:不接受新的任务,但会执行队列中的任务。

STOP:不接受新任务,也不处理队列中的任务,同时中断正在处理任务的线程。

TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。

TERMINATED:terminated( ) 方法执行完毕。

用一个状态转换图表示大概如下 (图片来源于https://www.cnblogs.com/liuzhihu/p/8177371.html):

Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析

构造函数和基本参数都了解后,接下来就是对类中重要方法的研究了。

线程池执行流程

execute方法

ThreadPoolExecutor类的核心调度方法是execute(),通过调用这个方法可以向线程池提交一个任务,交由线程池去执行。而ThreadPoolExecutor的工作逻辑也可以藉由这个方法来一步步理清。这是方法的源码:

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //获取ctl的值,前面说了,该值记录着runState和workerCount int c = ctl.get(); /* * 调用workerCountOf得到当前活动的线程数; * 当前活动线程数小于corePoolSize,新建一个线程放入线程池中; * addWorker(): 把任务添加到该线程中。 */ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; //如果上面的添加线程操作失败,重新获取ctl值 c = ctl.get(); } //如果当前线程池是运行状态,并且往工作队列中添加该任务 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); /* * 如果当前线程不是运行状态,把任务从队列中移除 * 调用reject(内部调用handler)拒绝接受任务 */ if (! isRunning(recheck) && remove(command)) reject(command); //获取线程池中的有效线程数,如果为0,则执行addWorker创建一个新线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* * 如果执行到这里,有两种情况: * 1. 线程池已经不是RUNNING状态; * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize; * 如果失败则拒绝该任务 */ else if (!addWorker(command, false)) reject(command); }

简单概括一下代码的逻辑,大概是这样:

1、判断当前运行中的线程数是否小于corePoolSize,是的话则调用addWorker创建线程执行任务。

2、不满足1的条件,就把任务放入工作队列workQueue中。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpxdgz.html