CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

然后正常kafka的指令是 :  ./bin/kafka-topics.sh --zookeeper hadoop300:2181 .......

但是使用CDH安装的kafka则不需要全写出此  ./bin/kafka-topics.sh  部分。只许直接写 kafka-topics 即可,这是很重要的一个区别,使用CDH安装的kafka时候要特别注意一下。

具体有哪些指令可以看此路径下:

/opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/bin

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

二、topic主题使用

CDH中Kafka基本指令总结——topic使用与测试producer产生数据、consumer消费数据

所以我们使用topic的指令格式应该都类似:

kafka-topics --zookeeper hadoop300:2181/kafka ......

创建一个名为test 的主题(Topic):

kafka-topics --zookeeper hadoop300:2181/kafka --create -replication-factor 1 --partitions 3 --topic test

若是上述中的 ZooKeeper Root 的Kafka服务范围为: " "。则这里的创建主题指令改为:

kafka-topics --zookeeper hadoop300:2181--create -replication-factor 1 --partitions 3 --topic test

删除创建的topic

kafka-topics --zookeeper hadoop300:2181/kafka --delete --topic test

这里如果直接删除,则会输出    Topic *** is marked for deletion  如果我们topic中消息堆积的太多,或者kafka所在磁盘空间满了等等,则会需要彻底清理一下kafka topic。

方法一:修改kafaka配置文件server.properties, 添加 delete.topic.enable=true,重启kafka,之后通过kafka命令行就可以直接删除topic。

方法二:通过命令行删除topic:  ./bin/kafka-topics.sh --delete --zookeeper {zookeeper server} --topic {topic name}

因为kafaka配置文件中server.properties没有配delete.topic.enable=true,此时的删除并不是真正的删除,只是把topic标记为:marked for deletion     你可以通过命令:./bin/kafka-topics --zookeeper {zookeeper server} --list 来查看所有topic

方法三:若需要真正删除它

需要登录zookeeper客户端:zookeeper-client

找到topic所在的目录:ls /kafka/brokers/topics

执行命令,即可,此时topic被彻底删除:rmr /kafka/brokers/topics/{topic name}

修改topic的分区数

kafka-topics --zookeeper hadoop300:2181/kafka --alter --topic test \ partitions 5

接着启动消费者

kafka-console-consumer --bootstrap-server hadoop300:9092 --topic test --from-beginning

查看消费数据后的偏移量 kafka-run-class

(1)查看每个Partition的最新偏移量 kafka-run-class kafka.tools.GetOffsetShell --broker-list hadoop:9092 --topic yourTopic --time -1 (2)查看每个Partition的最早的偏移量 kafka-run-class kafka.tools.GetOffsetShell --broker-list hadoop:9092 --topic yourTopic --time -2 (3)查看consumer组内消费的offset kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper hadoop:2181 --topic yourTopic

获取topic消费组的偏移量

kafka-consumer-offset-checker --zookeeper=hadoop300:2181 --topic=mytopic --group=my_consumer_group

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzdpgf.html