需要声明的一点是,我当前使用的存储是 cephfs,并将其挂载到 K8S 的各个节点的 /opt/ops_ceph_data 目录下,所以在创建 PV 的时候使用的存储类型是 local。
七、创建 Labels由于上面创建 PV 时指定的存储类型是 local,这个 PV 只能在满足指定 Label 的节点中进行调度,所以为集群中的所有节点添加一个 label:
for i in 1 2 3 4 5; do kubectl label nodes k8s-node${i} kafka-cluster=true; done 八、创建 Zookeeper 集群 1. 创建 Service创建用于 Zookeeper 与其他节点通信的 Service,yaml 文件内容如下:
--- apiVersion: v1 kind: Service metadata: name: zk-inner-service namespace: ns-kafka labels: app: zk spec: selector: app: zk clusterIP: None ports: - name: server port: 2888 - name: leader-election port: 3888 --- apiVersion: v1 kind: Service metadata: name: zk-client-service namespace: ns-kafka labels: app: zk spec: selector: app: zk type: NodePort ports: - name: client port: 2181 nodePort: 31811 2. 创建 StatefulSetZookeeper 属于有状态服务,所以要使用 StatefulSet 来部署,yaml 文件内容如下:
--- apiVersion: apps/v1 kind: StatefulSet metadata: name: zk namespace: ns-kafka spec: selector: matchLabels: app: zk serviceName: "zk-inner-service" replicas: 3 updateStrategy: type: RollingUpdate podManagementPolicy: Parallel template: metadata: labels: app: zk spec: containers: - name: zk imagePullPolicy: Always image: 10.16.12.204/ops/zookeeper:custom-v3.6.2 resources: requests: memory: "500Mi" cpu: "0.5" ports: - containerPort: 2181 name: client - containerPort: 2888 name: server - containerPort: 3888 name: leader-election env: - name: SERVERS # 设置 SERVERS 变量,一定要与副本数一致 value: "3" - name: ZOO_CONF_DIR # 设置配置文件的目录 value: /opt/conf - name: ZOO_DATA_DIR # 设置数据文件的目录 value: /opt/data - name: ZOO_DATA_LOG_DIR # 设置数据日志文件的目录 value: /opt/data_log volumeMounts: # 设置需要持久化存储数据的目录 - name: zookeeper-data mountPath: /opt/data subPath: zookeeper-cluster-data/data - name: zookeeper-data mountPath: /opt/data_log subPath: zookeeper-cluster-data/data_log - name: data-conf mountPath: /etc/localtime imagePullSecrets: - name: harbor-secret volumes: - name: zookeeper-data persistentVolumeClaim: claimName: kafka-data-pvc - name: data-conf hostPath: path: /usr/share/zoneinfo/Asia/Shanghai 3. 验证集群状态集群搭建完成后,查看 zookeeper 各个节点当前的状态,使用如下命令:
[@k8s-master1 /]# for i in 0 1 2; do kubectl exec -it zk-$i -n ns-kafka -- zkServer.sh --config /opt/conf/zk-$i status; done ZooKeeper JMX enabled by default Using config: /opt/conf/zk-0/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower ZooKeeper JMX enabled by default Using config: /opt/conf/zk-1/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: leader ZooKeeper JMX enabled by default Using config: /opt/conf/zk-2/zoo.cfg Client port found: 2181. Client address: localhost. Client SSL: false. Mode: follower可以看到当前集群中是一个 leader,两个follower。接下来验证集群各个节点的消息同步,首先在 zk-0 节点上创建一个信息:
[@k8s-master1 /]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh [zk: localhost:2181(CONNECTED) 0] create /testMessage Hello Created /testMessage在其他两个节点上查看这条消息:
[@k8s-master1 /]# kubectl exec -it zk-1 -n ns-kafka -- zkCli.sh [zk: localhost:2181(CONNECTED) 0] get /testMessage Hello [@k8s-master1 /]# kubectl exec -it zk-2 -n ns-kafka -- zkCli.sh [zk: localhost:2181(CONNECTED) 0] get /testMessage Hello可以正常看到消息,代表集群当前运行正常。
九、创建 Kafka 集群 1. 创建 Service创建用于 Kafka 通信的 Service,yaml 文件内容如下:
--- apiVersion: v1 kind: Service metadata: name: kafka-service namespace: ns-kafka labels: app: kafka spec: ports: - port: 9092 name: server clusterIP: None selector: app: kafka 2. 创建 StatefulSetKafka 属于有状态服务,所以要使用 StatefulSet 来部署,yaml 文件内容如下:
--- apiVersion: apps/v1 kind: StatefulSet metadata: name: kafka namespace: ns-kafka spec: selector: matchLabels: app: kafka serviceName: "kafka-service" replicas: 3 updateStrategy: type: RollingUpdate podManagementPolicy: Parallel template: metadata: labels: app: kafka spec: imagePullSecrets: - name: harbor-secret containers: - name: kafka imagePullPolicy: Always image: 10.16.12.204/ops/kafka:custom-v2.13-2.6.0 resources: requests: memory: "500Mi" cpu: "0.5" env: - name: SERVERS # 要确保 SERVERS 设置的值与副本数一致 value: "3" - name: KAFKA_LISTENERS value: "PLAINTEXT://:9092" - name: KAFKA_ZOOKEEPER_CONNECT # 设置 Zookeeper 连接地址 value: "zk-inner-service.ns-kafka.svc.cluster.local:2181" - name: KAFKA_PORT value: "9092" - name: KAFKA_MESSAGE_MAX_BYTES value: "20000000" - name: BROKER_ID_COMMAND # 这个变量用于在容器内部生成一个 broker id value: "hostname | awk -F'-' '{print $NF}'" volumeMounts: - name: kafka-log # 只需要将 kafka 的 log 目录持久化存储 mountPath: /kafka subPath: kafka-cluster-log - name: data-conf mountPath: /etc/localtime volumes: - name: kafka-log persistentVolumeClaim: claimName: kafka-data-pvc - name: data-conf hostPath: path: /usr/share/zoneinfo/Asia/Shanghai 3. 验证集群状态 3.1 在 Zookeeper 中查看 broker [@k8s-master1 ~]# kubectl exec -it zk-0 -n ns-kafka -- zkCli.sh Connecting to localhost:2181 [zk: localhost:2181(CONNECTED) 0] ls / [admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper] [zk: localhost:2181(CONNECTED) 1] ls /brokers [ids, seqid, topics] [zk: localhost:2181(CONNECTED) 2] ls /brokers/ids [0, 1, 2] [zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-0.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-0.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074102"} [zk: localhost:2181(CONNECTED) 4] get /brokers/ids/1 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-1.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-1.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074079"} [zk: localhost:2181(CONNECTED) 5] get /brokers/ids/2 {"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-2.kafka-service.ns-kafka.svc.cluster.local:9092"],"jmx_port":-1,"port":9092,"host":"kafka-2.kafka-service.ns-kafka.svc.cluster.local","version":4,"timestamp":"1604644074009"}可以看到 3 个 broker 都已经在 zookeeper 中注册了。
3.2 创建 Topic