从一次生产消费者的bug看看线程池如何增加线程 (2)

4个topic,720个任务,每个处理掉180个

2021-01-17 16:27:56.210 [EVENT-CONSUMER-3] INFO - TOPIC[random1] size 0, remainingCapacity 32 finish 180 2021-01-17 16:27:56.210 [EVENT-CONSUMER-2] INFO - TOPIC[random4] size 1, remainingCapacity 31 finish 179 2021-01-17 16:27:56.210 [EVENT-CONSUMER-3] INFO - take event 2021-01-17 16:27:56.210 [EVENT-CONSUMER-2] INFO - take event 2021-01-17 16:27:56.210 [EVENT-CONSUMER-3] INFO - execute RandomSleepConfigListener2 2021-01-17 16:27:56.210 [EVENT-CONSUMER-2] INFO - execute RandomSleepConfigListener3 2021-01-17 16:27:56.215 [EVENT-CONSUMER-3] INFO - TOPIC[random2] size 0, remainingCapacity 32 finish 180 2021-01-17 16:27:56.215 [EVENT-CONSUMER-3] INFO - take event 2021-01-17 16:27:56.215 [EVENT-CONSUMER-3] INFO - execute RandomSleepConfigListener4 2021-01-17 16:27:56.217 [EVENT-CONSUMER-2] INFO - TOPIC[random3] size 0, remainingCapacity 32 finish 180 2021-01-17 16:27:56.221 [EVENT-CONSUMER-3] INFO - TOPIC[random4] size 0, remainingCapacity 32 finish 180

嗯,目前为止觉得很完美,然后看consumer类,觉得每次任务被推入阻塞队列,然后执行线程去从阻塞队列中去拉取消息出来,这不符合我作死的风格,改。
然后就变为了CODE-B的模样,线程池创建出来后,一直循环来拉取即可

{ logger.info("non-static invoke--------"); // 创建任务提交 ScheduleThreadPool.EVENT_CONSUMER_POOL.execute(() -> { // 注意这里有个循环 for (;;) { try { logger.info("take event"); IEvent take = waitEvent.take(); priorityList.forEach(c -> c.handler(take)); int t = count.incrementAndGet(); logger.info("TOPIC[{}] size {}, remainingCapacity {} finish {} ", topic, waitEvent.size(), waitEvent.remainingCapacity(), t); } catch (InterruptedException e) { // 记录错误失败 } } }); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wsxpfs.html