线程池 BlockingQueue synchronized volatile
本章从线程池到阻塞队列BlockingQueue。从BlockingQueue到synchronized 和 volatile关键字。用wait,notify线程之间的通讯实现BlockingQueue队列。将这些零碎的知识整合在一起。如下图所示,都是本章知识点。之所以写这篇博客,是因为前段时间看了一篇关于"一名3年工作经验的程序员应该具备的技能"文章,倍受打击。
学习流程图:
技术:Executors,BlockingQueue,synchronized,volatile,wait,notify
说明:文章学习思路:线程池---->队列---->关键字---->死锁---->线程池实战
源码:https://github.com/ITDragonBlog/daydayup/tree/master/ThreadBase 线程池
线程池,顾名思义存放线程的池子,可以类比数据库的连接池。因为频繁地创建和销毁线程会给服务器带来很大的压力。若能将创建的线程不再销毁而是存放在池中等待下一个任务使用,可以不仅减少了创建和销毁线程所用的时间,提高了性能,同时还减轻了服务器的压力。
线程池的使用初始化线程池有五个核心参数,分别是 corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue。还有两个默认参数 threadFactory, handler
corePoolSize:线程池初始核心线程数。初始化线程池的时候,池内是没有线程,只有在执行任务的时会创建线程。
maximumPoolSize:线程池允许存在的最大线程数。若超过该数字,默认提示RejectedExecutionException异常
keepAliveTime:当前线程数大于核心线程时,该参数生效,其目的是终止多余的空闲线程等待新任务的最长时间。即指定时间内将还未接收任务的线程销毁。
unit:keepAliveTime 的时间单位
workQueue:缓存任务的的队列,一般采用LinkedBlockingQueue。
threadFactory:执行程序创建新线程时使用的工厂,一般采用默认值。
handler:超出线程范围和队列容量而使执行被阻塞时所使用的处理程序,一般采用默认值。
开始,游泳馆来了一名学员,于是馆主安排一个教练负责培训这名学员;
然后,游泳馆来了六名学员,可馆主只招了五名教练,于是有一名学员被安排到休息室等待;
后来,游泳馆来了十六名学员,休息室已经满了,馆主核算了开支,预计最多可招十名教练;
最后,游泳馆只来了十名学员,馆主对教练说,如果半天内接不到学员的教练就可以走了;
结果,游泳馆没有学员,关闭了。
在接收任务前,线程池内是没有线程。只有当任务来了才开始新建线程。当任务数大于核心线程数时,任务进入队列中等待。若队列满了,则线程池新增线程直到最大线程数。再超过则会执行拒绝策略。
shutdown: 线程池不再接收任务,等待线程池中所有任务完成后,关闭线程池。常用
shutdownNow: 线程池不再接收任务,忽略队列中的任务,尝试中断正在执行的任务,返回未执行任务列表,关闭线程池。慎用
awaitTermination: 线程池可以继续接收任务,当任务都完成后,或者超过设置的时间后,关闭线程池。方法是阻塞的,考虑使用
1 newSingleThreadExecutor() 单线程线程池
初始线程数和允许最大线程数都是一,keepAliveTime 也就失效了,队列是无界阻塞队列。该线程池的主要作用是负责缓存任务。
2 newFixedThreadPool(n) 固定大小线程池
初始线程数和允许最大线程数相同,且大小自定义,keepAliveTime 也就失效了,队列是无界阻塞队列。符合大部分业务要求,常用。
3 newCachedThreadPool() 无缓存无界线程池
初始线程数为零,最大线程数为无穷大,keepAliveTime 60秒类终止空闲线程,队列是无缓存无界队列。适合任务数不多的场景,慎用。
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * 线程池 * 优势,类比数据库的连接池 * 1. 频繁的创建和销毁线程会给服务器带来很大的压力 * 2. 若创建的线程不销毁而是留在线程池中等待下次使用,则会很大地提高效率也减轻了服务器的压力 * * 三种workQueue策略 * 直接提交 SynchronousQueue * 无界队列 LinkedBlockingQueue * 有界队列 ArrayBlockingQueue * * 四种拒绝策略 * AbortPolicy : JDK默认,超出 MAXIMUM_POOL_SIZE 放弃任务抛异常 RejectedExecutionException * CallerRunsPolicy : 尝试直接调用被拒绝的任务,若线程池被关闭,则丢弃任务 * DiscardOldestPolicy : 放弃队列最前面的任务,然后重新尝试执被拒绝的任务。若线程池被关闭,则丢弃任务 * DiscardPolicy : 放弃不能执行的任务但不抛异常 */ public class ThreadPoolExecutorStu { // 线程池中初始线程个数 private final static Integer CORE_POOL_SIZE = 3; // 线程池中允许的最大线程数 private final static Integer MAXIMUM_POOL_SIZE = 8; // 当线程数大于初始线程时。终止多余的空闲线程等待新任务的最长时间 private final static Long KEEP_ALIVE_TIME = 10L; // 任务缓存队列 ,即线程数大于初始线程数时先进入队列中等待,此数字可以稍微设置大点,避免线程数超过最大线程数时报错。或者直接用无界队列 private final static ArrayBlockingQueue<Runnable> WORK_QUEUE = new ArrayBlockingQueue<Runnable>(5); public static void main(String[] args) { Long start = System.currentTimeMillis(); /** * ITDragonThreadPoolExecutor 耗时 1503 * ITDragonFixedThreadPool 耗时 505 * ITDragonSingleThreadExecutor 语法问题报错, * ITDragonCachedThreadPool 耗时506 * 推荐使用自定义线程池,或newFixedThreadPool(n) */ ThreadPoolExecutor threadPoolExecutor = ITDragonThreadPoolExecutor(); for (int i = 0; i < 8; i++) { // 执行8个任务,若超过MAXIMUM_POOL_SIZE则会报错 RejectedExecutionException MyRunnableTest myRunnable = new MyRunnableTest(i); threadPoolExecutor.execute(myRunnable); System.out.println("线程池中现在的线程数目是:"+threadPoolExecutor.getPoolSize()+", 队列中正在等待执行的任务数量为:"+ threadPoolExecutor.getQueue().size()); } // 关掉线程池 ,并不会立即停止(停止接收外部的submit任务,等待内部任务完成后才停止),推荐使用。 与之对应的是shutdownNow,不推荐使用 threadPoolExecutor.shutdown(); try { // 阻塞等待30秒关掉线程池,返回true表示已经关闭。和shutdown不同,它可以接收外部任务,并且还阻塞。这里为了方便统计时间,所以选择阻塞等待关闭。 threadPoolExecutor.awaitTermination(30, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("耗时 : " + (System.currentTimeMillis() - start)); } // 自定义线程池,开发推荐使用 public static ThreadPoolExecutor ITDragonThreadPoolExecutor() { // 构建一个,初始线程数量为3,最大线程数据为8,等待时间10分钟 ,队列长度为5 的线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, WORK_QUEUE); return threadPoolExecutor; } /** * 固定大小线程池 * corePoolSize初始线程数和maximumPoolSize最大线程数一样,keepAliveTime参数不起作用,workQueue用的是无界阻塞队列 */ public static ThreadPoolExecutor ITDragonFixedThreadPool() { ExecutorService executor = Executors.newFixedThreadPool(8); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; return threadPoolExecutor; } /** * 单线程线程池 * 等价与Executors.newFixedThreadPool(1); */ public static ThreadPoolExecutor ITDragonSingleThreadExecutor() { ExecutorService executor = Executors.newSingleThreadExecutor(); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; return threadPoolExecutor; } /** * 无界线程池 * corePoolSize 初始线程数为零 * maximumPoolSize 最大线程数无穷大 * keepAliveTime 60秒类将没有被用到的线程终止 * workQueue SynchronousQueue 队列,无容量,来任务就直接新增线程 * 不推荐使用 */ public static ThreadPoolExecutor ITDragonCachedThreadPool() { ExecutorService executor = Executors.newCachedThreadPool(); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; return threadPoolExecutor; } } class MyRunnableTest implements Runnable { private Integer num; // 正在执行的任务数 public MyRunnableTest(Integer num) { this.num = num; } public void run() { System.out.println("正在执行的MyRunnable " + num); try { Thread.sleep(500);// 模拟执行事务需要耗时 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("MyRunnable " + num + "执行完毕"); } }