然后正常kafka的指令是 : ./bin/kafka-topics.sh --zookeeper hadoop300:2181 .......
但是使用CDH安装的kafka则不需要全写出此 ./bin/kafka-topics.sh 部分。只许直接写 kafka-topics 即可,这是很重要的一个区别,使用CDH安装的kafka时候要特别注意一下。
具体有哪些指令可以看此路径下:
所以我们使用topic的指令格式应该都类似:
kafka-topics --zookeeper hadoop300:2181/kafka ......
创建一个名为test 的主题(Topic):
kafka-topics --zookeeper hadoop300:2181/kafka --create -replication-factor 1 --partitions 3 --topic test若是上述中的 ZooKeeper Root 的Kafka服务范围为: " / "。则这里的创建主题指令改为:
kafka-topics --zookeeper hadoop300:2181--create -replication-factor 1 --partitions 3 --topic test
删除创建的topic
kafka-topics --zookeeper hadoop300:2181/kafka --delete --topic test这里如果直接删除,则会输出 Topic *** is marked for deletion 如果我们topic中消息堆积的太多,或者kafka所在磁盘空间满了等等,则会需要彻底清理一下kafka topic。
方法一:修改kafaka配置文件server.properties, 添加 delete.topic.enable=true,重启kafka,之后通过kafka命令行就可以直接删除topic。
方法二:通过命令行删除topic: ./bin/kafka-topics.sh --delete --zookeeper {zookeeper server} --topic {topic name}
因为kafaka配置文件中server.properties没有配delete.topic.enable=true,此时的删除并不是真正的删除,只是把topic标记为:marked for deletion 你可以通过命令:./bin/kafka-topics --zookeeper {zookeeper server} --list 来查看所有topic
方法三:若需要真正删除它
需要登录zookeeper客户端:zookeeper-client
找到topic所在的目录:ls /kafka/brokers/topics
执行命令,即可,此时topic被彻底删除:rmr /kafka/brokers/topics/{topic name}
修改topic的分区数
kafka-topics --zookeeper hadoop300:2181/kafka --alter --topic test \ partitions 5接着启动消费者
kafka-console-consumer --bootstrap-server hadoop300:9092 --topic test --from-beginning查看消费数据后的偏移量 kafka-run-class
(1)查看每个Partition的最新偏移量 kafka-run-class kafka.tools.GetOffsetShell --broker-list hadoop:9092 --topic yourTopic --time -1 (2)查看每个Partition的最早的偏移量 kafka-run-class kafka.tools.GetOffsetShell --broker-list hadoop:9092 --topic yourTopic --time -2 (3)查看consumer组内消费的offset kafka-run-class kafka.tools.ConsumerOffsetChecker --zookeeper hadoop:2181 --topic yourTopic获取topic消费组的偏移量
kafka-consumer-offset-checker --zookeeper=hadoop300:2181 --topic=mytopic --group=my_consumer_group