深入浅出Java线程池:源码篇 (3)

当线程数已经到达总的线程数限制时,新的任务会被拒绝策略者处理,线程池无法执行该任务。

public void execute(Runnable command) { ... // 如果线程池还在运行,则尝试添加任务到队列中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次检查如果线程池被关闭了,那么把任务移出队列 // 如果移除成功则拒绝本次任务 // 这里主要是判断在插入队列的过程中,线程池有没有被关闭了 if (! isRunning(recheck) && remove(command)) reject(command); // 否则再次检查线程数是否为0,如果是,则创建一个没有任务的非主线程worker // 这里对应核心线程为0的情况,指定任务为null,worker会去队列拿任务来执行 // 这里表示线程池至少有一个线程来执行队列中的任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果上面添加到队列中失败,则尝试创建一个非核心线程来执行任务 // 如果创建失败,则拒绝任务 else if (!addWorker(command, false)) reject(command); }

源码中还设计到两个关键方法:addWorker创建一个新的worker,也就是创建一个线程;reject拒绝一个任务。后者比较简单我们先看一下。

拒绝任务:reject() // 拒绝任务,调用rejectedExecutionHandler来处理 final void reject(Runnable command) { handler.rejectedExecution(command, this); }

默认的实现类有4个,我们依次来看一下:

AbortPolicy是默认实现,会抛出一个RejectedExecutionException异常:

public static class AbortPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }

DiscardPolicy最简单,就是:什么都不做,直接抛弃任务。(这是非常渣男不负责任的行为,咱们不能学他,所以也不要用它 [此处狗头] )

public static class DiscardPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }

DiscardOldestPolicy会删除队列头的一个任务,然后再次执行自己(挤掉原位,自己上位,绿茶行为?)

public static class DiscardOldestPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }

CallerRunsPolicy最猛,他干脆在自己的线程执行run方法,不依靠线程池了,自己动手丰衣足食。

public static class CallerRunsPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }

上面4个ThreadPoolExecutor已经帮我们实现了,他的静态内部类,在创建ThreadPoolExecutor的时候我们可以直接拿来用。也可以自己继承接口实现自己的逻辑。具体选择哪个需要根据实际的业务需求来决定。

那么接下来看创建worker的方法。

创建worker:addWorker()

方法的目的很简单:创建一个worker。前面我们讲到,worker内部创建了一个线程,每一个worker则代表了一个线程,非常类似android中的looper。looper的loop()方法会不断地去MessageQueue获取message,而Worker的run()方法会不断地去阻塞队列获取任务,这个我们后面讲。

addWorker() 方法的逻辑整体上分为两个部分:

检查线程状态线程数是否满足条件:

// 第一个参数是创建的线程首次要执行的任务,可以是null,则表示初始化一个线程 // 第二参数表示是否是一个核心线程 private boolean addWorker(Runnable firstTask, boolean core) { retry: for (int c = ctl.get();;) { // 还记不记得我们前面讲到线程池的状态控制? // runStateAtLeast(c, SHUTDOWN)表示状态至少为shutdown,后面类同 // 如果线程池处于stop及以上,不会再创建worker // 如果线程池状态在shutdown时,如果队列不为空或者任务!=null,则还会创建worker if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty())) // 其他情况返回false,表示拒绝创建worker return false; // 这里采用CAS轮询,也就是循环锁的策略来让线程总数+1 for (;;) { // 检查是否超出线程数限制 // 这里根据core参数判断是核心线程还是非核心线程 if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // 利用CAS让ctl变量自增,表示worker+1 // 如果CAS失败,则表示发生了竞争,则再来一次 if (compareAndIncrementWorkerCount(c)) // 成功则跳出最外层循环 break retry; // 如果这个期间ctl被改变了,则获取ctl,再尝试一次 c = ctl.get(); // 如果线程池被shutdown了,那么重复最外层的循环,重新判断状态是否可以创建worker if (runStateAtLeast(c, SHUTDOWN)) // 继续最外层循环 continue retry; } } // 创建worker逻辑 ... }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpyzjf.html