一个线程罢工的诡异事件

一个线程罢工的诡异事件

背景

事情(事故)是这样的,突然收到报警,线上某个应用里业务逻辑没有执行,导致的结果是数据库里的某些数据没有更新。

虽然是前人写的代码,但作为 Bug maker&killer 只能咬着牙上了。

因为之前没有接触过出问题这块的逻辑,所以简单理了下如图:

一个线程罢工的诡异事件

有一个生产线程一直源源不断的往队列写数据。

消费线程也一直不停的取出数据后写入后续的业务线程池。

业务线程池里的线程会对每个任务进行入库操作。

整个过程还是比较清晰的,就是一个典型的生产者消费者模型。

尝试定位

接下来便是尝试定位这个问题,首先例行检查了以下几项:

是否内存有内存溢出?

应用 GC 是否有异常?

通过日志以及监控发现以上两项都是正常的。

紧接着便 dump 了线程快照查看业务线程池中的线程都在干啥。

一个线程罢工的诡异事件

结果发现所有业务线程池都处于 waiting 状态,队列也是空的。

同时生产者使用的队列却已经满了,没有任何消费迹象。

结合上面的流程图不难发现应该是消费队列的 Consumer 出问题了,导致上游的队列不能消费,下有的业务线程池没事可做。

review 代码

于是查看了消费代码的业务逻辑,同时也发现消费线程是一个单线程

一个线程罢工的诡异事件

结合之前的线程快照,我发现这个消费线程也是处于 waiting 状态,和后面的业务线程池一模一样。

他做的事情基本上就是对消息解析,之后丢到后面的业务线程池中,没有发现什么特别的地方。

但是由于里面的分支特别多(switch case),看着有点头疼;所以我与写这个业务代码的同学沟通后他告诉我确实也只是入口处解析了一下数据,后续所有的业务逻辑都是丢到线程池中处理的,于是我便带着这个前提去排查了(埋下了伏笔)。

因为这里消费的队列其实是一个 disruptor 队列;它和我们常用的 BlockQueue 不太一样,不是由开发者自定义一个消费逻辑进行处理的;而是在初始化队列时直接丢一个线程池进去,它会在内部使用这个线程池进行消费,同时回调一个方法,在这个方法里我们写自己的消费逻辑。

所以对于开发者而言,这个消费逻辑其实是一个黑盒。

于是在我反复 review 了消费代码中的数据解析逻辑发现不太可能出现问题后,便开始疯狂怀疑是不是 disruptor 自身的问题导致这个消费线程罢工了。

再翻了一阵 disruptor 的源码后依旧没发现什么问题后我咨询对 disruptor 较熟的@咖啡拿铁,在他的帮助下在本地模拟出来和生产一样的情况。

本地模拟

一个线程罢工的诡异事件


一个线程罢工的诡异事件

本地也是创建了一个单线程的线程池,分别执行了两个任务。

第一个任务没啥好说的,就是简单的打印。

第二个任务会对一个数进行累加,加到 10 之后就抛出一个未捕获的异常。

接着我们来运行一下。

一个线程罢工的诡异事件


一个线程罢工的诡异事件

发现当任务中抛出一个没有捕获的异常时,线程池中的线程就会处于 waiting 状态,同时所有的堆栈都和生产相符。

细心的朋友会发现正常运行的线程名称和异常后处于 waiting 状态的线程名称是不一样的,这个后续分析。

解决问题

一个线程罢工的诡异事件

当加入异常捕获后又如何呢?

一个线程罢工的诡异事件

程序肯定会正常运行。

同时会发现所有的任务都是由一个线程完成的。

虽说就是加了一行代码,但我们还是要搞清楚这里面的门门道道。

源码分析

于是只有直接 debug 线程池的源码最快了;

一个线程罢工的诡异事件

一个线程罢工的诡异事件

通过刚才的异常堆栈我们进入到 ThreadPoolExecutor.java:1142 处。

发现线程池已经帮我们做了异常捕获,但依然会往上抛。

在 finally 块中会执行 processWorkerExit(w, completedAbruptly) 方法。

一个线程罢工的诡异事件

看过之前《如何优雅的使用和理解线程池》的朋友应该还会有印象。

线程池中的任务都会被包装为一个内部 Worker 对象执行。

processWorkerExit 可以简单的理解为是把当前运行的线程销毁(workers.remove(w))、同时新增(addWorker())一个 Worker 对象接着处理;

就像是哪个零件坏掉后重新换了一个新的接着工作,但是旧零件负责的任务就没有了。

接下来看看 addWorker() 做了什么事情:

一个线程罢工的诡异事件

只看这次比较关心的部分;添加成功后会直接执行他的 start() 的方法。

一个线程罢工的诡异事件

由于 Worker 实现了 Runnable 接口,所以本质上就是调用了 runWorker() 方法。

在 runWorker() 其实就是上文 ThreadPoolExecutor 抛出异常时的那个方法。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyyxzx.html