Kafka 依赖 Zookeeper 来维护集群成员的信息:
Kafka 使用 Zookeeper 的临时节点来选举 controller
Zookeeper 在 broker 加入集群或退出集群时通知 controller
controller 负责在 broker 加入或离开集群时进行分区 leader 选举
broker 管理每个 broker 都有一个唯一标识符 ID,这个标识符可以在配置文件里指定,也可以自动生成。
在 broker 启动的时候,它通过在 Zookeeper 的 /brokers/ids 路径上创建临时节点,把自己的 ID 注册 Zookeeper。
Kafka 组件会订阅 Zookeeper 的 /brokers/ids 路径,当有 broker 加入集群或退出集群时,这些组件就可以获得通知。
在 broker 停机、出现网络分区或长时间垃圾回收停顿时,会导致其 Zookeeper 会话失效,导致其在启动时创建的临时节点会自动被移除。
监听 broker 列表的 Kafka 组件会被告知该 broker 已移除,然后处理 broker 崩溃的后续事宜。
在完全关闭一个 broker 之后,如果使用相同的 ID 启动另一个全新的 broker,它会立即加入集群,并拥有与旧 broker 相同的分区和主题。
controller 选举controller 其实就是一个 broker,它除了具有一般 broker 的功能之外,还负责分区 leader 的选举。
为了在整个集群中指定一个唯一的 controller,broker 集群需要进行选举,该过程依赖以下两个 Zookeeper 节点:
// 临时节点 controller(保存最新的 controller 节点信息,保证唯一性) object ControllerZNode { def path = "/controller" def encode(brokerId: Int, timestamp: Long): Array[Byte] = { Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString).asJava) } def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js => js.asJsonObject("brokerid").to[Int] } } // 永久节点 controller_epoch(保存最新 controller 对应的任期号,用于避免脑裂) object ControllerEpochZNode { def path = "/controller_epoch" def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8) def decode(bytes: Array[Byte]): Int = new String(bytes, UTF_8).toInt }broker 启动后会发起一轮选举,选举通过 Zookeeper 提供的创建节点功能来实现:
所有 broker 启动后都会尝试抢占创建临时节点 /controller,创建成功的 broker 将成为 controller。
新选出的 controller 会同时递增 /controller_epoch 中的任期号,其他 broker 可以根据任期号忽略已过期 controller 的消息。
抢占失败的 broker 会收到一个 NODEEXISTS 响应,转而在节点上创建Watcher实时监控/controller节点。
当 controller 被关闭或者断开连接,Zookeeper 上的临时节点就会消失,集群里的其他 broker 会接收到通知并发起一轮新的选举。