读Kafka Consumer源码 (2)

读Kafka Consumer源码

部分代码实现解读 ConsumerRecords<k, v=""> poll(long timeout)

poll应该是Consumer的核心接口了,因为到这里才真正执行了和获取消息相关的逻辑。

读Kafka Consumer源码

首先是校验逻辑,在poll之前如果没有进行topic的订阅或分区的分配,poll操作将抛出异常。

接着是poll的核心逻辑:

在一个循环体中执行获取数据的逻辑,跳出循环的条件是超时或者获取到数据

从代码中可以看出pollOnce应该是真正的执行一次获取消息的操作。而代码中注释的部分是poll的核心:

fetcher#sendFetches方法给有需要的Server节点发送获取消息的请求

这么做的目的是在用户下一次进行poll操作之前先将获取消息的请求发送出去

这样网络操作和就可以和用户处理消息的逻辑并行,降低延迟

client#hasPendingRequests判断是否还有未从客户端发送出去的请求

client#pollNoWakeup执行网络真正的网络IO操作

从这段注释和代码中可以看出,poll时如果拿到数据了,会将剩余的请求发送出去来实现pipelining的目的。

所以对应的pollOnce内的逻辑必然有从缓存中(即上一次poll请求中获取的数据)获取数据的操作。

读Kafka Consumer源码

pollOnce对目标分区执行一路poll请求,大致流程如下:

coordinator#poll确保Consumer在Coordinator的管理之中

ensure coordinator

ensure active group(将Consumer加入到group中)

发送heartbeat

更新positions

从fetcher中获取消息,如果已经拿到消息则返回结果,调用结束

对分区执行poll请求

阻塞等待至少一个fetch操作完成

判断是否操作期间元数据进行了变更,如果变更了,丢弃获取的数据

返回获取结果

读上面的代码,第一个感觉就是可读性比较差,比较难懂。

比如pollOnce中,fetcher#sendFetches从字面上看会理解成发送fetch请求:

如果是同步的,那么应该获取它的结果

如果是异步的,应该通过Future获取最终的结果

而实际上fetcher#sendFetches只是去构建了请求,并且将请求保存在NetworkClient中(NetworkClient会有数据结构保存每个Node对应的请求:类似这样的数据结构Map<Node,Queue<Request>>)。

在client#poll中才将通过fetcher构造的请求真正的写出去,并且阻塞的等待fetch的结果,从实现上感觉将代码变的复杂了。

NetworkClient提供了异步的网络操作,且是非线程安全的。

NetworkClient只有poll会真正的去执行IO操作,而其中的send只是将send数据保存在channel上,直到执行poll时将它写到网络中。

总结

在读完Kafka Consumer部分的源码后,稍稍有些失望:

只提供了poll模式,没有提供给用户更多的选择,比如push模式

openmessaging在这块分别提供了PullConsumer和PushConsumer接口

而我们自己的项目则是提供了ListenConsumer、StreamConsumer等(Listen模式用户只提供回调接口,我们管理线程,而Stream模式将消费线程交给用户自己管理),继续还会提供基础的PullConsumer等

Consumer接口的灵活性由于,易用性不足

暴露了太多的接口,对于一个指向简单获取消息处理的使用方来说心智负担太重

代码的实现上复杂化了,比如提供了Fetcher和NetworkClient的实现非常复杂

总体上Consumer的代码有一些乱,比如下面是Kafka源码中Consumer部分的包组织和我自己读源码使对它的整理:

读Kafka Consumer源码

右边是Kafka源码Consumer部分的包结构,所有的类分了两块,内部的在internals中。右边是自己读源码时根据各个模块对Consumer的类进行划分。

私以为将各个类按照不同的模块分开会更加清晰,读起来也会更加舒服。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspxpj.html