一时技痒,撸了个动态线程池,源码放Github了 (2)

一时技痒,撸了个动态线程池,源码放Github了

拒绝次数告警

当队列容量满了后,新进来的任务会根据用户设置的拒绝策略去选择对应的处理方式。如果是采用 AbortPolicy 策略,也会进行告警。相当于消费者已经超负荷了。

一时技痒,撸了个动态线程池,源码放Github了

线程池运行情况

底层对接了 Cat,所以将线程的运行数据上报给了 Cat。我们可以在 Cat 中查看这些信息。

一时技痒,撸了个动态线程池,源码放Github了

如果你想在自己的平台去展示,我这边暴露了/actuator/thread-pool 端点,你可以自行拉取数据。

{ threadPools: [{ threadPoolName: "TestThreadPoolExecutor", activeCount: 0, keepAliveTime: 0, largestPoolSize: 4, fair: false, queueCapacity: 5, queueCapacityThreshold: 2, rejectCount: 0, waitTaskCount: 0, taskCount: 5, unit: "MILLISECONDS", rejectedExecutionType: "AbortPolicy", corePoolSize: 4, queueType: "LinkedBlockingQueue", completedTaskCount: 5, maximumPoolSize: 4 }, { threadPoolName: "TestThreadPoolExecutor2", activeCount: 0, keepAliveTime: 0, largestPoolSize: 0, fair: false, queueCapacity: 2147483647, queueCapacityThreshold: 2147483647, rejectCount: 0, waitTaskCount: 0, taskCount: 0, unit: "MILLISECONDS", rejectedExecutionType: "AbortPolicy", corePoolSize: 2, queueType: "LinkedBlockingQueue", completedTaskCount: 0, maximumPoolSize: 4 }] } 自定义拒绝策略

平时我们使用代码创建线程池可以自定义拒绝策略,在构造线程池对象的时候传入即可。这里由于创建线程池都被封装好了,我们只能在 Nacos 配置拒绝策略的名称来使用对应的策略。默认是可以配置 JDK 自带的 CallerRunsPolicy,AbortPolicy,DiscardPolicy,DiscardOldestPolicy 这四种。

如果你想自定义的话也是支持的,定义方式跟以前一样,如下:

@Slf4j public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.info("进来了。。。。。。。。。"); } }

要让这个策略生效的话使用的是 SPI 的方式,需要在 resources 下面创建一个 META-INF 的文件夹,然后创建一个 services 的文件夹,再创建一个 java.util.concurrent.RejectedExecutionHandler 的文件,内容为你定义的类全路径。

一时技痒,撸了个动态线程池,源码放Github了

自定义告警方式

默认是内部集成了钉钉机器人的告警方式,如果你不想用也可以将其关闭。或者将告警信息对接到你的监控平台去。

如果没有告警平台也可以在项目中实现新的告警方式,比如短信等。

只需要实现 ThreadPoolAlarmNotify 这个类即可。

/** * 自定义短信告警通知 * * @作者 尹吉欢 * @个人微信 jihuan900 * @微信公众号 猿天地 * @GitHub https://github.com/yinjihuan * @作者介绍 * @时间 2020-05-27 22:26 */ @Slf4j @Component public class ThreadPoolSmsAlarmNotify implements ThreadPoolAlarmNotify { @Override public void alarmNotify(AlarmMessage alarmMessage) { log.info(alarmMessage.toString()); } } 代码实现

具体的就不讲的很细了,源码在https://github.com/yinjihuan/kitty/tree/master/kitty-dynamic-thread-pool,大家自己去看,并不复杂。

创建线程池

根据配置创建线程池,ThreadPoolExecutor 是自定义的,因为需要做 Cat 埋点。

/** * 创建线程池 * @param threadPoolProperties */ private void createThreadPoolExecutor(DynamicThreadPoolProperties threadPoolProperties) { threadPoolProperties.getExecutors().forEach(executor -> { KittyThreadPoolExecutor threadPoolExecutor = new KittyThreadPoolExecutor( executor.getCorePoolSize(), executor.getMaximumPoolSize(), executor.getKeepAliveTime(), executor.getUnit(), getBlockingQueue(executor.getQueueType(), executor.getQueueCapacity(), executor.isFair()), new KittyThreadFactory(executor.getThreadPoolName()), getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName()), executor.getThreadPoolName()); threadPoolExecutorMap.put(executor.getThreadPoolName(), threadPoolExecutor); }); } 刷新线程池

首先需要监听 Nacos 的修改。

/** * 监听配置修改,spring-cloud-alibaba 2.1.0版本不支持@NacosConfigListener的监听 */ public void initConfigUpdateListener(DynamicThreadPoolProperties dynamicThreadPoolProperties) { ConfigService configService = nacosConfigProperties.configServiceInstance(); try { configService.addListener(dynamicThreadPoolProperties.getNacosDataId(), dynamicThreadPoolProperties.getNacosGroup(), new AbstractListener() { @Override public void receiveConfigInfo(String configInfo) { new Thread(() -> refreshThreadPoolExecutor()).start(); log.info("线程池配置有变化,刷新完成"); } }); } catch (NacosException e) { log.error("Nacos配置监听异常", e); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgfyy.html