一时技痒,撸了个动态线程池,源码放Github了 (3)

然后再刷新线程池的参数信息,由于监听事件触发的时候,这个时候配置其实还没刷新,所以我就等待了 1 秒钟,让配置完成刷新然后直接从配置类取值。

虽然有点挫还是可以用,其实更好的方式是解析 receiveConfigInfo 那个 configInfo,configInfo 就是改变之后的整个配置内容。因为不太好解析成属性文件,就没做,后面再改吧。

/** * 刷新线程池 */ private void refreshThreadPoolExecutor() { try { // 等待配置刷新完成 Thread.sleep(1000); } catch (InterruptedException e) { } dynamicThreadPoolProperties.getExecutors().forEach(executor -> { ThreadPoolExecutor threadPoolExecutor = threadPoolExecutorMap.get(executor.getThreadPoolName()); threadPoolExecutor.setCorePoolSize(executor.getCorePoolSize()); threadPoolExecutor.setMaximumPoolSize(executor.getMaximumPoolSize()); threadPoolExecutor.setKeepAliveTime(executor.getKeepAliveTime(), executor.getUnit()); threadPoolExecutor.setRejectedExecutionHandler(getRejectedExecutionHandler(executor.getRejectedExecutionType(), executor.getThreadPoolName())); BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue(); if (queue instanceof ResizableCapacityLinkedBlockIngQueue) { ((ResizableCapacityLinkedBlockIngQueue<Runnable>) queue).setCapacity(executor.getQueueCapacity()); } }); }

其他的刷新都是线程池自带的,需要注意的是线程池队列大小的刷新,目前只支持 LinkedBlockingQueue 队列,由于 LinkedBlockingQueue 的大小是不允许修改的,所以按照美团那篇文章提供的思路,自定义了一个可以修改的队列,其实就是把 LinkedBlockingQueue 的代码复制了一份,改一下就可以。

往 Cat 上报运行信息

往 Cat 的 Heartbeat 报表上传数据的代码如下,主要还是 Cat 本身提供了扩展的能力。只需要定时去调用下面的方式上报数据即可。

public void registerStatusExtension(ThreadPoolProperties prop, KittyThreadPoolExecutor executor) { StatusExtensionRegister.getInstance().register(new StatusExtension() { @Override public String getId() { return "thread.pool.info." + prop.getThreadPoolName(); } @Override public String getDescription() { return "线程池监控"; } @Override public Map<String, String> getProperties() { AtomicLong rejectCount = getRejectCount(prop.getThreadPoolName()); Map<String, String> pool = new HashMap<>(); pool.put("activeCount", String.valueOf(executor.getActiveCount())); pool.put("completedTaskCount", String.valueOf(executor.getCompletedTaskCount())); pool.put("largestPoolSize", String.valueOf(executor.getLargestPoolSize())); pool.put("taskCount", String.valueOf(executor.getTaskCount())); pool.put("rejectCount", String.valueOf(rejectCount == null ? 0 : rejectCount.get())); pool.put("waitTaskCount", String.valueOf(executor.getQueue().size())); return pool; } }); } 定义线程池端点

通过自定义端点来暴露线程池的配置和运行的情况,可以让外部的监控系统拉取数据做对应的处理。

@Endpoint(id = "thread-pool") public class ThreadPoolEndpoint { @Autowired private DynamicThreadPoolManager dynamicThreadPoolManager; @Autowired private DynamicThreadPoolProperties dynamicThreadPoolProperties; @ReadOperation public Map<String, Object> threadPools() { Map<String, Object> data = new HashMap<>(); List<Map> threadPools = new ArrayList<>(); dynamicThreadPoolProperties.getExecutors().forEach(prop -> { KittyThreadPoolExecutor executor = dynamicThreadPoolManager.getThreadPoolExecutor(prop.getThreadPoolName()); AtomicLong rejectCount = dynamicThreadPoolManager.getRejectCount(prop.getThreadPoolName()); Map<String, Object> pool = new HashMap<>(); Map config = JSONObject.parseObject(JSONObject.toJSONString(prop), Map.class); pool.putAll(config); pool.put("activeCount", executor.getActiveCount()); pool.put("completedTaskCount", executor.getCompletedTaskCount()); pool.put("largestPoolSize", executor.getLargestPoolSize()); pool.put("taskCount", executor.getTaskCount()); pool.put("rejectCount", rejectCount == null ? 0 : rejectCount.get()); pool.put("waitTaskCount", executor.getQueue().size()); threadPools.add(pool); }); data.put("threadPools", threadPools); return data; } } Cat 监控线程池中线程的执行时间

本来是将监控放在 KittyThreadPoolExecutor 的 execute,submit 方法里的。后面测试下来发现有问题,数据在 Cat 上确实有了,但是执行时间都是 1 毫秒,也就是没生效。

不说想必大家也知道,因为线程是后面单独去执行的,所以再添加任务的地方埋点没任务意义。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgfyy.html