读Kafka Consumer源码

最近一直在关注阿里的一个开源项目:OpenMessaging

OpenMessaging, which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, e-commerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems.

这是OpenMessaging-Java项目GitHub上的一段介绍,大致是说OpenMessaging项目致力于建立MQ领域的标准。

看了OpenMessaging-Java项目的源码,定义了:

Message接口

Producer接口

Consumer接口

消费方式:Pull、Push

各种异常

确实是在朝着建立一套MQ的接口标准。

这引发了我的一个思考:MQ目前确实没有一套标准的接口,如果我们尝试从更高的层次看自己的项目,即我们希望它成为行业标准,那么现在项目中接口的定义合适吗?是否够通用、简洁、易用、合理?

带着这样的疑问,最近把Kafka Consumer部分的源码读了一遍,因为:

Kafka应该是业界最著名的一个开源MQ了(RocketMQ最初也是参考了Kafka去实现的)

希望通过读Kafka源码能找到一些定义MQ接口的想法

但是在读完Kafka Consumer部分的源码后稍稍有一些失望,因为它并没有给我代码我想要的,反而在读完后觉得接口设计和源码实现上相对于Kafka的盛名有一些名不副实的感觉。

接口定义

Kafka在消费部分只提供了一个接口,即Consumer接口。

Consumer接口如下:

Set<TopicPartition> assignment();

Set<String> subscription();

void subscribe(Collection<String> topics);

void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);

void assign(Collection<TopicPartition> partitions);

void subscribe(Pattern pattern, ConsumerRebalanceListener callback);

void subscribe(Pattern pattern);

void unsubscribe();

ConsumerRecords<k, v=""> poll(long timeout);

void commitSync();

void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);

void commitAsync();

void commitAsync(OffsetCommitCallback callback);

void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

void seek(TopicPartition partition, long offset);

void seekToBeginning(Collection<TopicPartition> partitions);

void seekToEnd(Collection<TopicPartition> partitions);

long position(TopicPartition partition);

OffsetAndMetadata committed(TopicPartition partition);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

Map<String, List> listTopics();

Set<TopicPartition> paused();

void pause(Collection<TopicPartition> partitions);

void resume(Collection<TopicPartition> partitions);

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);

Map<TopicPartition, Long> beginningOffsets(Collection partitions);

Map<TopicPartition, Long> endOffsets(Collection partitions);

...

(读源码时光看完这部分接口我就已经晕了)

上面的方法大致可以分为四类:

订阅相关:subscribe、unsubscribe

消费相关:assign、poll、commit

元数据相关:搜索、设置、获取offset信息;partition信息

生命周期相关:pause、resume、close等

看完这个接口的第一个感觉就是灵活有余易用不足。

Kafka几乎暴露了所有的操作API,这样的好处是足够灵活,但是带来的问题就是易用性下降,哪怕用户只是希望简单的获取消息并处理也需要关心offset的提交和管理以及commit等等。

另外功能上也并没有提供用户更多的选择,比如只提供了poll模式去获取消息,而没有提供类似push的模式。

线程模型部分

看完接口之后,第二步看了Kafka Consumer部分的线程模型,即尝试将Consumer部分的线程模型梳理清楚:Consumer部分有哪些线程,线程间的交互等。

Consumer部分包含以下几个模块:

Consuming

Consumer、ConsumerConfig、ConsumerProtocol

Fetcher

分布式协调

AbstractCoordinator、ConsumerCoordinator

分区分配和负载均衡

Assignor

ReblanceListener

网络组件

NetClient

Future

FutureListener

异常

NoAvailableBrokersException、CommitFailedExceptin、...

元数据和数据

ConsumerRecord、ConsumerRecords

TopicPartition

统计及其他

通过分布式系统组件及分区分配策略,每个Consumer可以拿到自己消费的分区。之后通过Fetcher来执行获取消息的操作,而底层通过网络组件NetworkClient和Broker完成交互。

通过阅读源码和注释发现,Kafka Consumer并没有去管理线程,而是所有的操作都在用户线程中完成。

所以线程模型就非常简单,Consumer非线程安全,同时只能有一个线程执行操作,且所有的操作都在用户的线程中执行。

Consumer通过一个AtomicLong的CAS操作来保证只能有一个线程操作(多线程的情况下会报出异常)

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspxpj.html