Kafka集群部署与配置手册

在三个主机节点上进行部署。
server1:192.168.10.1
server2:192.168.10.2
server3:192.168.10.3
 
1、jdk7u80的安装与配置
 
rpm -ivh jdk-7u80-linux-x64.rpm
 
配置环境变量:

more /etc/profile
Java_HOME=/usr/java/jdk1.7.0_80
PATH=\$JAVA_HOME/bin:\$PATH:.
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME
export PATH
export CLASSPATH

注:低版本jdk在运行kafka时存在bug。
 
2、系统iptables防火墙对局域网段开放以下端口

-A INPUT -s 192.168.10.0/24 -m state --state NEW -m tcp -p tcp --dport 2888 -j ACCEPT
-A INPUT -s 192.168.10.0/24 -m state --state NEW -m tcp -p tcp --dport 3888 -j ACCEPT
-A INPUT -s 192.168.10.0/24 -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
-A INPUT -s 192.168.10.0/24 -m state --state NEW -m tcp -p tcp --dport 2181 -j ACCEPT

3、集群节点间主机名解析配置

more /etc/hosts 
192.168.10.1    server1 
192.168.10.2    server2 
192.168.10.3    server3 

4、在三个主机节点上部署kafka-2.11如下

cd /data
unzip kafka_2.11-0.10.0.0.zip
mv kafka_2.11-0.10.0.0/ kafka

5、配置zookeeper集群
注:以下除特别说明在哪个节点进行配置外,均需要修改三个主机节点。
 
因为该zookeeper是专服务于kafka的,所以直接把其数据目录放置于/data/kafka/zookeeper,便于后续管理。
mkdir -p /data/kafka/zookeeper
 
编辑zookeeper配置文件:

cd /data/kafka
vi config/zookeeper.properties

tickTime=2000
dataDir=/data/kafka/zookeeper
clientPort=2181
maxClientCnxns=0
initLimit=15
syncLimit=5
server.1=192.168.10.1:2888:3888
server.2=192.168.10.2:2888:3888
server.3=192.168.10.3:2888:3888

创建ServerID标识:
节点server1:echo "1" > /data/kafka/zookeeper/myid
节点server2:echo "2" > /data/kafka/zookeeper/myid
节点server3:echo "3" > /data/kafka/zookeeper/myid
注:这里设置的myid取值需要和zookeeper.properties中“server.id”保持一致。
 
chmod +x zookeeper-server-start.sh zookeeper-server-stop.sh kafka-run-class.sh
 
修改zookeeper启动脚本如下,以便于管理:

$ more zookeeper-server-start.sh
#!/bin/bash
#if [ $# -lt 1 ];
#then
#    echo "USAGE: $0 [-daemon] zookeeper.properties"
#    exit 1
#fi
base_dir=$(dirname $0)
if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../config/log4j.properties"
fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Xms512M"
fi
EXTRA_ARGS="-name zookeeper -loggc"
#COMMAND=$1
COMMAND="-daemon"
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
 *)
    ;;
esac
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "../config/zookeeper.properties"

启停zookeeper的方法:

cd /data/kafka/bin
./zookeeper-server-start.sh
./zookeeper-server-stop.sh

6、配置kafka集群
配置/data/kafka/config/server.properties如下。三个主机节点上配置文件中仅前面几行的参数取值不同。

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
############################# Socket Server Settings #############################
listeners=PLAINTEXT://192.168.10.1:9092
port=9092
host.name=192.168.10.1
# The number of threads handling network requests
num.network.threads=8
# The number of threads doing disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# The number of queued requests allowed before blocking the network threads
queued.max.requests=100
# The purge interval (in number of requests) of the fetch request purgatory
fetch.purgatory.purge.interval.requests=200
# The purge interval (in number of requests) of the producer request purgatory
producer.purgatory.purge.interval.requests=200

############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/data/kafka/kafka-logs
# The default number of log partitions per topic.
num.partitions=24
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
num.recovery.threads.per.data.dir=2
# The maximum size of message that the server can receive
message.max.bytes=1000000
# Enable auto creation of topic on the server
auto.create.topics.enable=true
# The interval with which we add an entry to the offset index
log.index.interval.bytes=4096
# The maximum size in bytes of the offset index
log.index.size.max.bytes=10485760
# Allow to delete topics
delete.topic.enable=true
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
log.flush.interval.messages=20000
# The maximum amount of time a message can sit in a log before we force a flush
log.flush.interval.ms=10000
# The frequency in ms that the log flusher checks whether any log needs to be flushed to disk
log.flush.scheduler.interval.ms=2000
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs.
log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# The maximum time before a new log segment is rolled out (in hours)
log.roll.hours=168
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
zookeeper.connect=192.168.10.1:2181,192.168.10.2:2181,192.168.10.3:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
# How far a ZK follower can be behind a ZK leader
zookeeper.sync.time.ms=2000

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/7853397c2e5bf77c9414c0e80ec610f3.html