消息并不会立即发送,而是先进行序列化后,发送给Partitioner,也就是上面提到的hash函数,由Partitioner确定目标分区后,发送到一块内存缓冲区中(发送队列)。Producer的另一个工作线程(即Sender线程),则负责实时地从该缓冲区中提取出准备好的消息封装到一个批次内,统一发送到对应的broker中。其过程大致是这样的:
图片来自123archu
Consumer每个Consumer都划归到一个逻辑Consumer Group中,一个partition只能被同一个Consumer Group中的一个Consumer消费,但可以被不同的Consumer Group消费。
若 topic 的 partition 数量为 p,Consumer Group 中订阅此 topic 的 consumer 数量为 c; 则:
p < c: 会有 c - p 个 consumer闲置,造成浪费 p > c: 一个 consumer 对应多个 partition p = c: 一个 consumer 对应一个 partition应该合理分配consumer和partition的数量,避免造成资源倾斜,最好partiton数目是consumer数目的整数倍。
如何将Partition分配给Consumer生产过程中broker要分配partition,消费过程这里,也要分配partition给消费者。类似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。
当 partition 或 consumer 数量发生变化时,比如 增加 consumer, 减少 consumer(主动或被动),增加 partition,都会进行 rebalance。
其过程如下:
consumer 给 coordinator 发送 JoinGroupRequest 请求。这时其他consumer 发 heartbeat 请求过来时,coordinator 会告诉他们,要 rebalance了。其他 consumer 也发送 JoinGroupRequest 请求。
coordinator在consumer中选出一个leader,其他作为 follower,通知给各个 consumer,对于leader,还会把 follower 的 metadata 带给它。
consumer leader 根据 consumer metadata 重新分配 partition
consumer向coordinator发送SyncGroupRequest,其中leader的SyncGroupRequest会包含分配的情况。coordinator回包,把分配的情况告诉consumer,包括leader。
Consumer Fetch MessageConsumer 采用"拉模式"消费消息,这样 consumer 可以自行决定消费的行为。
Consumer 调用 poll(duration) 从服务器拉取消息。拉取消息的具体行为由下面的配置项决定:
#consumer.properties #消费者最多 poll 多少个 record max.poll.records=500 #消费者 poll 时 partition 返回的最大数据量 max.partition.fetch.bytes=1048576 #Consumer 最大 poll 间隔 #超过此值服务器会认为此 consumer failed #并将此 consumer 踢出对应的 consumer group max.poll.interval.ms=300000在 partition 中,每个消息都有一个 offset。新消息会被写到 partition 末尾(最新的一个 segment 文件末尾), 每个 partition 上的消息是顺序消费的,不同的 partition 之间消息的消费顺序是不确定的。
若一个 consumer 消费多个 partition, 则各个 partition 之前消费顺序是不确定的,但在每个 partition 上是顺序消费。
若来自不同 consumer group 的多个 consumer 消费同一个 partition,则各个 consumer 之间的消费互不影响,每个 Consumer 都会有自己的 offset。
Consumer A 和 Consumer B 属于不同的 Consumer Group。Cosumer A 读取到 offset = 9, Consumer B 读取到 offset = 11,这个值表示下次读取的位置。也就是说 Consumer A 已经读取了 offset 为 0 ~ 8 的消息,Consumer B 已经读取了 offset 为 0 ~ 10 的消息。
下次从 offset = 9 开始读取的 Consumer 并不一定还是 Consumer A 因为可能发生 rebalance
offset的保存Consumer 消费 partition 时,需要保存 offset 记录当前消费位置。
offset 可以选择自动提交或调用 Consumer 的 commitSync() 或 commitAsync() 手动提交,相关配置为:
#是否自动提交 offset enable.auto.commit=true #自动提交间隔。 enable.auto.commit=true 时有效 auto.commit.interval.ms=5000offset 保存在名叫 __consumeroffsets 的 topic 中。写消息的 key 由 groupid、topic、partition 组成,value 是 offset。
一般情况下,每个 key 的 offset 都是缓存在内存中,查询的时候不用遍历partition,如果没有缓存,第一次就会遍历 partition 建立缓存,然后查询返回。
__consumeroffsets 的 partition 数量由下面的 server 配置决定:
offsets.topic.num.partitions=50offset 保存在哪个分区上,即 __consumeroffsets 的分区机制,可以表示为:
groupId.hashCode() mode groupMetadataTopicPartitionCountgroupMetadataTopicPartitionCount 是上面配置的分区数。
因为一个 partition 只能被同一个 Consumer Group 的一个 consumer 消费,因此可以用 groupId 表示此 consumer 消费 offeset 所在分区
消息系统可能遇到那些问题kafka支持3种消息投递语义
at most once:最多一次,消息可能会丢失,但不会重复
获取数据 -> commit offset -> 业务处理
at least once:最少一次,消息不会丢失,可能会重复
获取数据 -> 业务处理 -> commit offset。
exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)
如何保证消息不被重复消费?(消息的幂等性)