alpakka-kafka-consumer的功能描述很简单:向kafka订阅某些topic然后把读到的消息传给akka-streams做业务处理。在kafka-consumer的实现细节上,为了达到高可用、高吞吐的目的,topic又可用划分出多个分区partition。分区是分布在kafka集群节点broker上的。由于一个topic可能有多个partition,对应topic就会有多个consumer,形成一个consumer组,共用统一的groupid。一个partition只能对应一个consumer、而一个consumer负责从多个partition甚至多个topic读取消息。kafka会根据实际情况将某个partition分配给某个consumer,即partition-assignment。所以一般来说我们会把topic订阅与consumer-group挂钩。这个可以在典型的ConsumerSettings证实:
val system = ActorSystem("kafka-sys") val config = system.settings.config.getConfig("akka.kafka.consumer") val bootstrapServers = "localhost:9092" val consumerSettings = ConsumerSettings(config, new StringDeserializer, new ByteArrayDeserializer) .withBootstrapServers(bootstrapServers) .withGroupId("group1") .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")