K8S 搭建 Kafka:2.13-2.6.0 和 Zookeeper:3.6.2 集群 (2)

这个镜像中使用 start-kafka.sh 脚本来初始化 Kafka 的配置并启动,但是其中有些内容不符合在 K8S 中部署的需求,所以对该脚本进行修改。

1. 修改 start-kafka.sh 脚本

原始的 start-kafka.sh 脚本内容可到 https://github.com/wurstmeister/kafka-docker 中查看。修改后的内容如下:

#!/bin/bash -e # Allow specific kafka versions to perform any unique bootstrap operations OVERRIDE_FILE="/opt/overrides/${KAFKA_VERSION}.sh" if [[ -x "$OVERRIDE_FILE" ]]; then echo "Executing override file $OVERRIDE_FILE" eval "$OVERRIDE_FILE" fi # Store original IFS config, so we can restore it at various stages ORIG_IFS=$IFS # 设置 zookeeper 连接地址,如果没有指定该变量会报错 if [[ -z "$KAFKA_ZOOKEEPER_CONNECT" ]]; then echo "ERROR: missing mandatory config: KAFKA_ZOOKEEPER_CONNECT" exit 1 fi # 设置 kafka 的端口,如果没有指定端口则使用默认端口 if [[ -z "$KAFKA_PORT" ]]; then export KAFKA_PORT=9092 fi # kafka 启动后自动创建 topic,如果没有指定 KAFKA_CREATE_TOPICS 则不会自动创建 topic create-topics.sh & unset KAFKA_CREATE_TOPICS # 如果没有直接指定 KAFKA_BROKER_ID,则通过 BROKER_ID_COMMAND 变量中包含的命令来自动生成 broker id,这样可以确保 broker id 是唯一且递增的 if [[ -z "$KAFKA_BROKER_ID" ]]; then if [[ -n "$BROKER_ID_COMMAND" ]]; then KAFKA_BROKER_ID=$(eval "$BROKER_ID_COMMAND") export KAFKA_BROKER_ID else export KAFKA_BROKER_ID=-1 fi fi # 如果没有指定 kafka log 目录,则使用默认的地址,默认的目录名会带有当前主机名 if [[ -z "$KAFKA_LOG_DIRS" ]]; then export KAFKA_LOG_DIRS="/kafka/kafka-logs-$HOSTNAME" fi # 如果指定了 KAFKA_HEAP_OPTS 配置,将其写入到 kafka-server-start.sh 脚本中 if [[ -n "$KAFKA_HEAP_OPTS" ]]; then sed -r -i 's/(export KAFKA_HEAP_OPTS)="(.*)"/\1="'"$KAFKA_HEAP_OPTS"'"/g' "$KAFKA_HOME/bin/kafka-server-start.sh" unset KAFKA_HEAP_OPTS fi # 此处的作用是如果希望容器在启动后根据执行指定命令的返回结果作为主机名,那么就将这个命令赋值给 HOSTNAME_COMMAND # 然后使用 eval 来执行变量中的命令来获取结果,再赋值给 HOSTNAME_VALUE 变量 if [[ -n "$HOSTNAME_COMMAND" ]]; then HOSTNAME_VALUE=$(eval "$HOSTNAME_COMMAND") # Replace any occurences of _{HOSTNAME_COMMAND} with the value IFS=$'\n' for VAR in $(env); do if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{HOSTNAME_COMMAND}" ]]; then eval "export ${VAR//_\{HOSTNAME_COMMAND\}/$HOSTNAME_VALUE}" fi done IFS=$ORIG_IFS fi # 此处的作用是如果希望容器在启动后根据执行指定命令的返回结果作为端口号,那么就将这个命令赋值给 PORT_COMMAND # 然后使用 eval 来执行变量中的命令来获取结果,再赋值给 PORT_VALUE 变量 if [[ -n "$PORT_COMMAND" ]]; then PORT_VALUE=$(eval "$PORT_COMMAND") # Replace any occurences of _{PORT_COMMAND} with the value IFS=$'\n' for VAR in $(env); do if [[ $VAR =~ ^KAFKA_ && "$VAR" =~ "_{PORT_COMMAND}" ]]; then eval "export ${VAR//_\{PORT_COMMAND\}/$PORT_VALUE}" fi done IFS=$ORIG_IFS fi if [[ -n "$RACK_COMMAND" && -z "$KAFKA_BROKER_RACK" ]]; then KAFKA_BROKER_RACK=$(eval "$RACK_COMMAND") export KAFKA_BROKER_RACK fi # 这里是检查是否设置了 KAFKA_LISTENERS 变量,一般将其值设置为 PLAINTEXT://:9092 if [[ -z "$KAFKA_ADVERTISED_HOST_NAME$KAFKA_LISTENERS" ]]; then if [[ -n "$KAFKA_ADVERTISED_LISTENERS" ]]; then echo "ERROR: Missing environment variable KAFKA_LISTENERS. Must be specified when using KAFKA_ADVERTISED_LISTENERS" exit 1 elif [[ -z "$HOSTNAME_VALUE" ]]; then echo "ERROR: No listener or advertised hostname configuration provided in environment." echo " Please define KAFKA_LISTENERS / (deprecated) KAFKA_ADVERTISED_HOST_NAME" exit 1 fi # Maintain existing behaviour # If HOSTNAME_COMMAND is provided, set that to the advertised.host.name value if listeners are not defined. export KAFKA_ADVERTISED_HOST_NAME="$HOSTNAME_VALUE" fi #Issue newline to config file in case there is not one already echo "" >> "$KAFKA_HOME/config/server.properties" ( function updateConfig() { key=$1 value=$2 file=$3 # Omit $value here, in case there is sensitive information echo "[Configuring] '$key' in '$file'" # If config exists in file, replace it. Otherwise, append to file. if grep -E -q "^#?$key=" "$file"; then sed -r -i "s@^#?$key=.*@$key=$value@g" "$file" #note that no config values may contain an '@' char else echo "$key=$value" >> "$file" fi } # KAFKA_VERSION + KAFKA_HOME + grep -rohe KAFKA[A-Z0-0_]* /opt/kafka/bin | sort | uniq | tr '\n' '|' # 定义要排除的初始化配置,这些配置已经在配置文件中存在了,所以不需要更改或添加 EXCLUSIONS="|KAFKA_VERSION|KAFKA_HOME|KAFKA_DEBUG|KAFKA_GC_LOG_OPTS|KAFKA_HEAP_OPTS|KAFKA_JMX_OPTS|KAFKA_JVM_PERFORMANCE_OPTS|KAFKA_LOG|KAFKA_OPTS|" IFS=$'\n' for VAR in $(env) do env_var=$(echo "$VAR" | cut -d= -f1) if [[ "$EXCLUSIONS" = *"|$env_var|"* ]]; then echo "Excluding $env_var from broker config" continue fi if [[ $env_var =~ ^KAFKA_ ]]; then kafka_name=$(echo "$env_var" | cut -d_ -f2- | tr '[:upper:]' '[:lower:]' | tr _ .) updateConfig "$kafka_name" "${!env_var}" "$KAFKA_HOME/config/server.properties" fi if [[ $env_var =~ ^LOG4J_ ]]; then log4j_name=$(echo "$env_var" | tr '[:upper:]' '[:lower:]' | tr _ .) updateConfig "$log4j_name" "${!env_var}" "$KAFKA_HOME/config/log4j.properties" fi done # 主要是添加了这里的配置,根据 SERVERS 的值,拼接出 BOOTSTRAP_SERVERS 的地址,并将该配置更新到配置文件中 PODNAME=$(hostname -s | awk -F'-' 'OFS="-"{$NF="";print}' |sed 's/-$//g') for ((i=0;i<$SERVERS;i++)) do BOOTSTRAP_SERVERS+="$PODNAME-$i.$(hostname -d):${KAFKA_PORT}," done BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS%?} echo ${BOOTSTRAP_SERVERS} > /opt/log.txt sed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/consumer.properties sed -i "s/bootstrap.servers.*$/bootstrap.servers=$BOOTSTRAP_SERVERS/g" $KAFKA_HOME/config/producer.properties ) # 如果还定义了其他初始化的配置脚本,则执行 if [[ -n "$CUSTOM_INIT_SCRIPT" ]] ; then eval "$CUSTOM_INIT_SCRIPT" fi exec "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties" 2. 修改 Dockerfile

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgsxy.html