java.util.concurrent.ThreadPoolExecutor#addWorker:
retry: for (;;) { int c = ctl.get(); // 获取线程池当前运行状态 int rs = runStateOf(c); // 如果rs大于SHUTDOWN,则说明此时线程池不在接受新任务了 // 如果rs等于SHUTDOWN,同时满足firstTask为空,且阻塞队列如果有任务,则继续执行任务 // 也就说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取有效线程数量 int wc = workerCountOf(c); // 如果有效线程数大于等于线程池所容纳的最大线程数(基本不可能发生),不能添加任务 // 或者有效线程数大于等于当前限制的线程数,也不能添加任务 // 限制线程数量有任务是否要核心线程执行决定,core=true使用核心线程执行任务 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 使用AQS增加有效线程数量 if (compareAndIncrementWorkerCount(c)) break retry; // 如果再次获取ctl变量值 c = ctl.get(); // Re-read ctl // 再次对比运行状态,如果不一致,再次循环执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }这里特别强调,firstTask是开启线程执行的首个任务,之后常驻在线程池中的线程执行的任务都是从阻塞队列中取出的,需要注意。
以上for循环代码主要作用是判断ctl变量当前的状态是否可以添加任务,特别说明了如果线程池处于SHUTDOWN状态时,可以继续执行阻塞队列中的任务,但不能继续往线程池中添加任务了;同时增加工作线程数量使用了AQS作同步,如果同步失败,则继续循环执行。
// 任务是否已执行 boolean workerStarted = false; // 任务是否已添加 boolean workerAdded = false; // 任务包装类,我们的任务都需要添加到Worker中 Worker w = null; try { // 创建一个Worker w = new Worker(firstTask); // 获取Worker中的Thread值 final Thread t = w.thread; if (t != null) { // 操作workers HashSet 数据结构需要同步加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 获取当前线程池的运行状态 int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING状态; // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务 // rs是RUNNING状态时,直接创建线程执行任务 // 当rs等于SHUTDOWN时,并且firstTask为空,也可以创建线程执行任务,也说说明了SHUTDOWN状态时不再接受新任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 启动线程执行任务 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }以上源码主要的作用是创建一个Worker对象,并将新的任务装进Worker中,开启同步将Worker添加进workers中,这里需要注意workers的数据结构为HashSet,非线程安全,所以操作workers需要加同步锁。添加步骤做完后就启动线程来执行任务了,继续往下看。
(4)前置和后置钩子如果需要在任务执行前后插入逻辑,你可以实现ThreadPoolExecutor以下两个方法:
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { }这样一来,就可以对任务的执行进行实时监控。
5、线程池总结线程池原理关键技术:锁(lock,cas)、阻塞队列、hashSet(资源池)