以前老的sarama版本不支持消费者组的消费方式,所以大多数人都用sarama-cluster。
后来sarama支持了消费者组的消费方式,sarama-cluster也停止维护了,但网上关于sarama的消费者组的解析很少,且官方的样例很简单,所以这里分析一下。
一、官方样例官方样例比较简单:
1、通过 sarama.NewConfig 创建一个配置
2、通过 NewConsumerGroup 创建一个消费者组
3、通过 Consume 创建消费者组的会话,该函数的第三个参数即为该会话三个阶段的回调: Setup Cleanup 和 ConsumeClaim ,分别在创建会话之前、会话结束之后 和 会话生存中(主要就是在此阶段进行消息读取)进行调用。
二、问题1、当指定的topic在kafka中不存的时候,kafka会新建该topic,如果只想让用户消费已存在的topic,那么该如何获取kafka中已经存在的topic?
2、 setup 和 Cleanup 的调用流程是怎样的?会在哪些情况下被调用?
3、既然是消费者组,那如何查看组里某个消费者拥有哪些topic和partition?
4、如何使用指定的 offset 来消费某个 topic ?
5、如何实现消费的 Exactly-once?
注:以上测试使用的示例代码是自己写的样例代码的部分内容,完整的样例代码见文章最后
三、分析1、在 sarama 中,获取 topic 的接口在 Client interface 中,所以需要先通过 NewClient 接口创建一个 client,然后就可以通过该 client 的 Topics 接口获取到 kafka 中所有的 topic。但消费者组使用的类型是 ConsumerGroup ,那该如何获取该类型呢?sarama 中提供 NewConsumerGroupFromClient 接口,可以从一个现存的 client 创建一个 ConsumerGroup ,所以,修改后的流程,由原先的 NewConsumerGroup 直接创建,变成:
a、使用 NewClient 创建一个 client
b、使用 NewConsumerGroupFromClient 创建 ConsumerGroup 。
具体代码实现如下:
// 创建client newClient, err := sarama.NewClient(brokers, config) if err != nil { log.Fatal(err) } // 获取所有的topic topics, err := newClient.Topics() if err != nil { log.Fatal(err) } log.Info("topics: ", topics) // 根据client创建consumerGroup client, err := sarama.NewConsumerGroupFromClient(k.group, newClient) if err != nil { log.Fatalf("Error creating consumer group client: %v", err) }这么做的好处就是:可以使用 client 的接口,获取一些信息,例如 kafka 的当前配置有哪些,controller 有哪些,brokers 有哪些,topic 总共有哪些,特定的 topic 有哪些 partitions,partition 当前的 offset 是多少 等等,具体功能可查看 Client interface :
type Client interface { // Config returns the Config struct of the client. This struct should not be // altered after it has been created. Config() *Config // Controller returns the cluster controller broker. It will return a // locally cached value if it's available. You can call RefreshController // to update the cached value. Requires Kafka 0.10 or higher. Controller() (*Broker, error) // RefreshController retrieves the cluster controller from fresh metadata // and stores it in the local cache. Requires Kafka 0.10 or higher. RefreshController() (*Broker, error) // Brokers returns the current set of active brokers as retrieved from cluster metadata. Brokers() []*Broker ...... }2、 setup 、 Cleanup 和 ConsumeClaim 是 s.handler.ConsumeClaim 的三个接口,需要用户自己实现。可以简单理解为:当需要创建一个会话时,先运行 setup ,然后在 ConsumerClaim 中处理消息,最后运行 Cleanup 。
setup 会在一个新会话开始之前运行,且也在 ConsumerClaim 接口之前运行。调用流程为: Consume —> newSession —> newConsumerGroupSession —> handler.Setup 。
在调用了 Setup 之后,后面会创建一个协程,该协程里面其实调用的就是 ConsumeClaim 接口,所以我们实现的 ConsumerClaim 其实是一个单独的协程,其调用流程为: Consume —> newSession —> newConsumerGroupSession —> consume —> s.handler.ConsumeClaim 。
Cleanup 会在一个会话结束的时候运行。调用流程为:Consume —>release —> s.handler.Cleanup 。
了解了调用流程之后,哪些情况又会调用到他们呢?—> 1、新建consumeGroup的时候。2、发生rebalance的时候。
我们可以在setup和cleanup中加一个打印:
func (k *Kafka) Setup(session sarama.ConsumerGroupSession) error { log.Info("setup") close(k.ready) return nil } func (k *Kafka) Cleanup(sarama.ConsumerGroupSession) error { log.Info("cleanup") return nil }然后启动一个consumer,可以观察到打印:
INFO[0000] setup然后按 Ctrl + C 关闭 consumer,可以观察到打印:
INFO[0101] cleanup说明新建consumer然后退出时,会调用 setup 和 cleanup。
我们再试一下发生rebalance的情况:先启动一个consumer,然后再启动一个同一组的consumer,可以看到打印为:
第一个启动的 consumer 打印为: INFO[0000] setup INFO[0006] cleanup INFO[0006] setup 第二个启动的 consumer 打印为: INFO[0002] setup说明在发生 reblance 的时候,会先关闭原先的会话,并调用 cleanup,然后再调用 setup,最后生成一个新的会话。