Java高并发之线程池详解

在业务场景中,如果一个对象创建销毁开销比较大, 那么此时建议池化对象进行管理,例如线程,jdbc连接等等,在高并发场景中,如果可以复用之前销毁的对象,那么系统效率将大大提升。另外一个好处是可以设定池化对象的上限,例如预防创建线程数量过多导致系统崩溃的场景。

jdk中的线程池

下文主要从以下几个角度讲解:

创建线程池

提交任务

潜在宕机风险

线程池大小配置

自定义阻塞队列BlockingQueue

回调接口

自定义拒绝策略

自定义ThreadFactory

关闭线程池

创建线程池

我们可以通过自定义ThreadPoolExecutor或者jdk内置的Executors来创建一系列的线程池

newFixedThreadPool: 创建固定线程数量的线程池

newSingleThreadExecutor: 创建单一线程的池

newCachedThreadPool: 创建线程数量自动扩容, 自动销毁的线程池

newScheduledThreadPool: 创建支持计划任务的线程池

上述几种都是通过new ThreadPoolExecutor()来实现的, 构造函数源码如下: 

/**
    * @param corePoolSize 池内核心线程数量, 超出数量的线程会进入阻塞队列
    * @param maximumPoolSize 最大可创建线程数量
    * @param keepAliveTime 线程存活时间
    * @param unit 存活时间的单位
    * @param workQueue 线程溢出后的阻塞队列
    */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
    }

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }

public static ExecutorService newSingleThreadExecutor() {
        return new Executors.FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
    }

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
    }

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
    }

提交任务

直接调用executorService.execute(runnable)或者submit(runnable)即可,

execute和submit的区别在于submit会返回Future来获取任何执行的结果.

我们看下newScheduledThreadPool的使用示例.

public class SchedulePoolDemo {

public static void main(String[] args){
        ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
        // 如果前面的任务没有完成, 调度也不会启动
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    // 每两秒打印一次.
                    System.out.println(System.currentTimeMillis()/1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, 0, 2, TimeUnit.SECONDS);
    }
}

潜在宕机风险

使用Executors来创建要注意潜在宕机风险.其返回的线程池对象的弊端如下:

FixedThreadPool和SingleThreadPoolPool : 允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM.

CachedThreadPool和ScheduledThreadPool : 允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM.

综上所述, 在可能有大量请求的线程池场景中, 更推荐自定义ThreadPoolExecutor来创建线程池, 具体构造函数配置见下文.

线程池大小配置

一般根据任务类型进行区分, 假设CPU为N核

CPU密集型任务需要减少线程数量, 降低线程之间切换造成的开销, 可配置线程池大小为N + 1.

IO密集型任务则可以加大线程数量, 可配置线程池大小为 N * 2.

混合型任务则可以拆分为CPU密集型与IO密集型, 独立配置.

自定义阻塞队列BlockingQueue

主要存放等待执行的线程, ThreadPoolExecutor中支持自定义该队列来实现不同的排队队列.

ArrayBlockingQueue:先进先出队列,创建时指定大小, 有界;

LinkedBlockingQueue:使用链表实现的先进先出队列,默认大小为Integer.MAX_VALUE;

SynchronousQueue:不保存提交的任务, 数据也不会缓存到队列中, 用于生产者和消费者互等对方, 一起离开.

PriorityBlockingQueue: 支持优先级的队列

回调接口

线程池提供了一些回调方法, 具体使用如下所示.

ExecutorService service = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>()) {

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/c672bb3ceae89238c5b266979a88a7f8.html