一时技痒,撸了个动态线程池,源码放Github了

线程池在日常工作中用的还挺多,当需要异步,批量处理一些任务的时候我们会定义一个线程池来处理。

在使用线程池的过程中有一些问题,下面简单介绍下之前遇到的一些问题。

场景一:实现一些批量处理数据的功能,刚开始线程池的核心线程数设的比较小,然后想调整下,只能改完后重启应用。

场景二:有一个任务处理的应用,会接收 MQ 的消息进行任务的处理,线程池的队列也允许缓存一定数量的任务。当任务处理的很慢的时候,想看看到底有多少没有处理完不是很方便。当时为了快速方便,就直接启动了一个线程去循环打印线程池队列的大小。

正好之前在我公众号有转发过美团的一篇线程池应用的文章(https://mp.weixin.qq.com/s/tIWAocevZThfbrfWoJGa9w),觉得他们的思路非常好,就是没有开放源码,所以自己就抽时间在我的开源项目 Kitty 中增加了一个动态线程池的组件,支持了 Cat 监控,动态变更核心参数,任务堆积告警等。今天就给大家分享一下实现的方式。

项目源代码地址:https://github.com/yinjihuan/kitty

使用方式 添加依赖

依赖线程池的组件,目前 Kitty 未发布,需要自己下载源码 install 本地或者私有仓库。

<dependency> <groupId>com.cxytiandi</groupId> <artifactId>kitty-spring-cloud-starter-dynamic-thread-pool</artifactId> </dependency> 添加配置

然后在 Nacos 配置线程池的信息,我的这个整合了 Nacos。推荐一个应用创建一个单独的线程池配置文件,比如我们这个叫 dataId 为 kitty-cloud-thread-pool.properties,group 为 BIZ_GROUP。

内容如下:

kitty.threadpools.nacosDataId=kitty-cloud-thread-pool.properties kitty.threadpools.nacosGroup=BIZ_GROUP kitty.threadpools.accessToken=ae6eb1e9e6964d686d2f2e8127d0ce5b31097ba23deee6e4f833bc0a77d5b71d kitty.threadpools.secret=SEC6ec6e31d1aa1bdb2f7fd5eb5934504ce09b65f6bdc398d00ba73a9857372de00 kitty.threadpools.owner=尹吉欢 kitty.threadpools.executors[0].threadPoolName=TestThreadPoolExecutor kitty.threadpools.executors[0].corePoolSize=4 kitty.threadpools.executors[0].maximumPoolSize=4 kitty.threadpools.executors[0].queueCapacity=5 kitty.threadpools.executors[0].queueCapacityThreshold=5 kitty.threadpools.executors[1].threadPoolName=TestThreadPoolExecutor2 kitty.threadpools.executors[1].corePoolSize=2 kitty.threadpools.executors[1].maximumPoolSize=4

nacosDataId,nacosGroup

监听配置修改的时候需要知道监听哪个 DataId,值就是当前配置的 DataId。

accessToken,secret

钉钉机器人的验证信息,用于告警。

owner

这个应用的负责人,告警的消息中会显示。

threadPoolName

线程池的名称,使用的时候需要关注。

剩下的配置就不一一介绍了,跟线程池内部的参数一致,还有一些可以查看源码得知。

注入使用 @Autowired private DynamicThreadPoolManager dynamicThreadPoolManager; dynamicThreadPoolManager.getThreadPoolExecutor("TestThreadPoolExecutor").execute(() -> { log.info("线程池的使用"); try { Thread.sleep(30000); } catch (InterruptedException e) { e.printStackTrace(); } }, "getArticle");

通过 DynamicThreadPoolManager 的 getThreadPoolExecutor 方法获取线程池对象,然后传入 Runnable,Callable 等。第二个参数是这个任务的名称,之所以要扩展一个参数是因为如果任务没有标识,那么无法区分任务。

这个线程池组件默认集成了 Cat 打点,设置了名称可以在 Cat 上查看这个任务相关的监控数据。

扩展功能 任务执行情况监控

在 Cat 的 Transaction 报表中会以线程池的名称为类型显示。

一时技痒,撸了个动态线程池,源码放Github了

详情中会以任务的名称显示。

一时技痒,撸了个动态线程池,源码放Github了

核心参数动态修改

核心参数目前只支持 corePoolSize,maximumPoolSize,queueCapacity(队列类型为 LinkedBlockingDeque 才可以修改),rejectedExecutionType,keepAliveTime,unit 这些参数的修改。

一般 corePoolSize,maximumPoolSize,queueCapacity 是最常要动态改变的。

需要改动的话直接在 Nacos 中将对应的配置值修改即可,客户端会监听配置的修改,然后同步修改先线程池的参数。

队列容量告警

queueCapacityThreshold 是队列容量告警的阀值,如果队列中的任务数量超过了 queueCapacityThreshold 就会告警。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgfyy.html