深入浅出Java线程池:源码篇 (2)

在源码中这几个状态分别对应:

// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;

上面的位操作不够直观,转化后如下:

private static final int RUNNING = 111 00000 00000000 00000000 00000000; private static final int SHUTDOWN = 000 00000 00000000 00000000 00000000; private static final int STOP = 001 00000 00000000 00000000 00000000; private static final int TIDYING = 010 00000 00000000 00000000 00000000; private static final int TERMINATED = 011 00000 00000000 00000000 00000000;

可以看到除了running是负数,其他的状态都是正数,且状态越靠后,数值越大。因此我们可以通过判断 ctl&COUNT_MASK > SHUTDOWN 来判断状态是否处于 stop、tidying、terminated之一。后续源码中会有很多的这样的判断,举其中的一个方法:

// 这里来判断线程池的状态 if(runStateAtLeast(ctl,SHUTDOWN)) { ... } // 这里执行逻辑,直接判断两个数的大小 private static boolean runStateAtLeast(int c, int s) { return c >= s; } ps:这里怎么没有使用掩码COUNT_MASK ?因为状态是处于高位,低位的数值不影响高位的大小判断。当然如果要判断相等,就还是需要使用掩码COUNT_MASK的。

接下来是ThreadPoolExecutor内部的三个关键角色对象:

// 阻塞队列 private final BlockingQueue<Runnable> workQueue; // 存储worker的hashSet,worker被创建之后会被存储到这里 private final HashSet<Worker> workers = new HashSet<>(); // RejectedExecutionHandler默认的实现是AbortPolicy private volatile RejectedExecutionHandler handler; private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

内部使用的锁对象:

// 这里是两个锁。ThreadPoolExecutor内部并没有使用Synchronize关键字来保持同步 // 而是使用Lock;和Synchronize的区别就是他是应用层的锁,而synchronize是jvm层的锁 private final ReentrantLock mainLock = new ReentrantLock(); private final Condition termination = mainLock.newCondition();

最后是内部一些参数的配置,前面都介绍过,把源码贴出来再回顾一下:

// 线程池历史达到的最大线程数 private int largestPoolSize; // 线程池完成的任务数。 // 该数并不是实时更新的,在获取线程池完成的任务数时,需要去统计每个worker完成的任务并累加起来 // 当一个worker被销毁之后,他的任务数就会被累加到这个数据中 private long completedTaskCount; // 线程工厂,用于创建线程 private volatile ThreadFactory threadFactory; // 空闲线程存储的时间 private volatile long keepAliveTime; // 是否允许核心线程被回收 private volatile boolean allowCoreThreadTimeOut; // 核心线程数限额 private volatile int corePoolSize; // 线程总数限额 private volatile int maximumPoolSize;

不是吧sir?源码还没看到魂呢,整出来这么无聊的变量?
咳咳,别急嘛,源码解析马上来。这些变量会贯穿整个源码过程始终,先对他们有个印象,后续阅读源码就会轻松畅通很多。

关键方法:execute()

这个方法的主要任务就是根据线程池的当前状态,选择任务的执行策略。该方法的核心逻辑思路是:

在线程数没有达到核心线程数时,会创建一个核心线程来执行任务

public void execute(Runnable command) { // 不能传入空任务 if (command == null) throw new NullPointerException(); // 获取ctl变量,就是上面我们讲的将状态和线程数合在一起的一个变量 int c = ctl.get(); // 判断核心线程数是否超过限额,否则创建一个核心线程来执行任务 if (workerCountOf(c) < corePoolSize) { // addWorker方法是创建一个worker,也就是创建一个线程,参数true表示这是一个核心线程 // 如果添加成功则直接返回 // 否则意味着中间有其他的worker被添加了,导致超出核心线程数;或者线程池被关闭了等其他情况 // 需要进入下一步继续判断 if (addWorker(command, true)) return; c = ctl.get(); } ... }

当线程数达到核心线程数时,新任务会被放入到等待队列中等待被执行

当等待队列已经满了之后,如果线程数没有到达总的线程数上限,那么会创建一个非核心线程来执行任务

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpyzjf.html