从一次生产消费者的bug看看线程池如何增加线程

某个闲来无事的下午,看到旧有的项目中,有个任务调度的地方都是同步的操作,就是流程A的完成依赖流程B,流程B的完成依赖流程C,按此类推。

作为一名垃圾代码生产者,QA的噩梦、故障报告枪手的我来说,发掘可以“优化”的空间,是我的分内之事。

因为是在一个工程内,并且本身工程组件没有使用到任何消息队列的软件(例如kafka、rocketMQ),如果只是要因为这个功能而贸然引用,对其进行维护的成本就比较高,我的技术组长大人是万万不会同意的。没办法,自己来吧。很快的,我完成了下面几个类的编写

整体的设计很简单,就是传统的生产消费者,只是利用了阻塞队列,作为缓冲。

在生产者内部有个定时执行的线程,将队列中的消息转发给消费者。生产者会单独占用一个线程

每个消费者自己也有一个阻塞队列,用来接收生产者产生的消息,消费者们因为可能不是所有的topic每时每刻都会有消息的产生,因此利用线程池即可。

1 代码实现 public interface IEvent { String getTopic(); } // 消息实体 public class Event<T> implements IEvent{ /** * 产生的时间戳 */ private long ts = System.currentTimeMillis(); /** * 携带的实体数据 */ private T entity; /** * topic */ private String topic; // setter getter 省略 } // 如何处理消息 public interface ConfigListener { String ALL = "all"; /** * 提供给监听器处理 * * @param event */ void handler(IEvent event); /** * 优先级顺序 * @return */ int getOrder(); /** * * @return */ String getTopic(); } // 创建4个消息处理的类,这里省略了,只展示一个 public class RandomSleepConfigListener implements ConfigListener { @Override public void handler(IEvent event) { logger.info("execute " + this.getClass().getSimpleName()); // 20ms - 50ms long t = (long) (Math.random() * 5) + 5L; try { TimeUnit.MILLISECONDS.sleep(t); } catch (InterruptedException e) { e.printStackTrace(); } } } // 线程池类 public class ScheduleThreadPool { private static final AtomicInteger atomic = new AtomicInteger(); // 被生产者单独使用的线程 public static final ExecutorService EVENT_POOL = Executors.newFixedThreadPool(1, r -> new Thread(r, "EVENT-PRODUCER-" + atomic.incrementAndGet())); /** * 常驻线程2个,最大8个,最多接受任务128个,超过则由提交线程来处理 */ public static final ExecutorService EVENT_CONSUMER_POOL = new ThreadPoolExecutor(2, 8, 50L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(128), r -> new Thread(r, "EVENT-CONSUMER-" + atomic.incrementAndGet()), new ThreadPoolExecutor.CallerRunsPolicy()); } // ############################### 以上的准备工作完成,下面就是编写生产者和消费者 ########################################### public class Producer { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); /** * 外部提交的消息体会被送入到这个队列当中 */ private static final ArrayBlockingQueue<IEvent> blockingQueue = new ArrayBlockingQueue<>(128); /** * topic, consumer */ private static Map<String, Consumer> topic2ConsumerMap = Maps.newHashMap(); // 一些初始化的工作 static { logger.info("Producer init start..."); // SPI方式插件式加载,这里可以改为你熟悉的加载类的方式 Iterator<ConfigListener> configListenerIterator = ServiceBootstrap.loadAll(ConfigListener.class); // 整体遍历一遍,不同的listener分散到不同的地方去 while (configListenerIterator.hasNext()) { ConfigListener configListener = configListenerIterator.next(); String topic = configListener.getTopic(); // 没有明确topic的,我们不进行处理 if (null == topic) { continue; } logger.info("we init {} topic", topic); if (topic2ConsumerMap.containsKey(topic)) { topic2ConsumerMap.get(topic).addListener(configListener); } else { topic2ConsumerMap.put(topic, new Consumer(topic).addListener(configListener)); } } // 如果有定义对全部都适用的事件处理,需要加入到每个topic的listener的队列中去 if (topic2ConsumerMap.containsKey(ConfigListener.ALL)) { Consumer consumer = topic2ConsumerMap.get(ConfigListener.ALL); topic2ConsumerMap.remove(ConfigListener.ALL); for (Map.Entry<String, Consumer> entry : topic2ConsumerMap.entrySet()) { entry.getValue().addAllListener(consumer.getPriorityList()); } } // 启动监听线程 ScheduleThreadPool.EVENT_POOL.execute(() -> { //noinspection InfiniteLoopStatement int i = 0; while (true) { try { // 从队列获取需要处理的任务,没有会进行阻塞 IEvent iEvent = blockingQueue.take(); logger.info("from producer queue take a message {} {}", iEvent.getTopic(), (i++)); topic2ConsumerMap.get(iEvent.getTopic()).addEvent(iEvent); } catch (InterruptedException e) { // } } }); logger.info("Producer init end..."); } /** * 阻塞队列添加要处理的事件 * @param iEvent * @return true 添加成功 */ public static void publish(IEvent iEvent) throws InterruptedException { logger.info("publish start..."); // 当队列满时,这个方法会被阻塞 blockingQueue.put(iEvent); logger.info("publish over..."); } } public class Consumer { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); /** * 排序好的列表 */ private List<ConfigListener> priorityList = Lists.newArrayListWithCapacity(16); /** * 降序排列 */ private Comparator<ConfigListener> comparator = (o1, o2) -> o2.getOrder() - o1.getOrder(); /** * 等待被处理的事件 */ private LinkedBlockingQueue<IEvent> waitEvent = new LinkedBlockingQueue<>(32); /** * 统计已经完成的任务数 */ private AtomicInteger count = new AtomicInteger(); /** * 处理哪种topic */ private String topic; // //CODE-B 这块代码是后来产生问题的代码,也是因为这个代码引起了我对线程池创建过程的好奇 // { // logger.info("non-static invoke--------"); // // 创建任务提交 // ScheduleThreadPool.EVENT_CONSUMER_POOL.execute(() -> { // // 注意这里有个循环 // for (;;) { // try { // logger.info("take event"); // IEvent take = waitEvent.take(); // priorityList.forEach(c -> c.handler(take)); // int t = count.incrementAndGet(); // logger.info("TOPIC[{}] size {}, remainingCapacity {} finish {} ", // topic, waitEvent.size(), waitEvent.remainingCapacity(), t); // } catch (InterruptedException e) { // // 记录错误失败 // } // } // }); // } public Consumer(String topic) { this.topic = topic; } public List<ConfigListener> getPriorityList() { return priorityList; } public Consumer addListener(ConfigListener listener) { priorityList.add(listener); priorityList.sort(comparator); return this; } public void addAllListener(Collection<? extends ConfigListener> c) { priorityList.addAll(c); priorityList.sort(comparator); } public void addEvent(IEvent iEvent) { try { logger.info(" topic {} queueSize {} finish {}", this.topic, waitEvent.size(), count.get()); waitEvent.put(iEvent); } catch (InterruptedException e) { // } // CODE-A ScheduleThreadPool.EVENT_CONSUMER_POOL.execute(() -> { // 注意这里和分发的producer不一样,不使用循环 try { logger.info("take event"); IEvent take = waitEvent.take(); priorityList.forEach(c -> c.handler(take)); int t = count.incrementAndGet(); logger.info("TOPIC[{}] size {}, remainingCapacity {} finish {} ", topic, waitEvent.size(), waitEvent.remainingCapacity(), t); } catch (InterruptedException e) { // 记录错误失败 } }); } } // 测试类 public class ProductTest{ // 这里我自己创建了4个消息处理的类,对应的topic分别如下 String[] topics = {"random1","random2","random3","random4"}; @Test(timeout = 30000L) public void publish() throws InterruptedException { for (int i = 0; i < 720; i++) { int j = i & 0x3; System.out.println(i); Producer.publish(new Event<String>("hello", topics[j])); } TimeUnit.SECONDS.sleep(60L); } } 2 开搞

代码都准备好了以后,我们就开始了,debug出来的结果和设想的符合预期

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxpfs.html