KafkaCenter是一个针对Kafka的一站式,解决方案。用于Kafka集群的维护与管理,生产者和消费者的监控,以及Kafka部分生态组件的使用。
对于Kafka的平台化,一直缺少一个成熟的解决方案,之前比较流行的kafka监控方案,如kafka-manager提供了集群管理与topic管理等等功能。但是对于生产者、消费者的监控,以及Kafka的新生态,如Connect,KSQL还缺少响应的支持。Confluent Control Center功能要完整一些,但却是非开源收费的。
对于Kafka的使用,一直都是一个让人头疼的问题,由于实时系统的强运维特性,我们不得不投入大量的时间用于集群的维护,kafka的运维,比如:
人工创建topic,特别费力
相关kafka运维,监控孤岛化
现有消费监控工具监控不准确
无法拿到Kafka 集群的summay信息
无法快速知晓集群健康状态
无法知晓业务对team kafka使用情况
kafka管理,监控工具稀少,没有一个好的工具我们直接可以使用
无法快速查询topic消息
功能模块介绍
Home-> 查看平台管理的Kafka Cluster集群信息及监控信息
Topic-> 用户可以在此模块查看自己的Topic,发起申请新建Topic,同时可以对Topic进行生产消费测试。
Monitor-> 用户可以在此模块中可以查看Topic的生产以及消费情况,同时可以针对消费延迟情况设置预警信息。
Connect-> 实现用户快速创建自己的Connect Job,并对自己的Connect进行维护。
KSQL-> 实现用户快速创建自己的KSQL Job,并对自己的Job进行维护。
Approve-> 此模块主要用于当普通用户申请创建Topic,管理员进行审批操作。
Setting-> 此模块主要功能为管理员维护User、Team以及kafka cluster信息
Kafka Manager-> 此模块用于管理员对集群的正常维护操作。
系统截图:
安装与入门安装需要依赖 mysql es email server
组件 是否必须 功能mysql 必须 配置信息存在mysql
elasticsearch(7.0+) 可选 各种监控信息的存储
email server 可选 Apply, approval, warning e-mail alert
1、初始化
在MySQL中执行sql建表
-- Dumping database structure for kafka_center CREATE DATABASE IF NOT EXISTS `kafka_center` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */; USE `kafka_center`; -- Dumping structure for table kafka_center.alert_group CREATE TABLE IF NOT EXISTS `alert_group` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_id` int(11) NOT NULL, `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `consummer_group` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `consummer_api` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `threshold` int(11) DEFAULT NULL, `dispause` int(11) DEFAULT NULL, `mail_to` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `webhook` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `create_date` datetime DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, `disable_alerta` tinyint(1) DEFAULT 0, `enable` tinyint(1) NOT NULL DEFAULT 1, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.cluster_info CREATE TABLE IF NOT EXISTS `cluster_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_bin NOT NULL, `zk_address` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `broker` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `create_time` datetime DEFAULT NULL, `comments` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `enable` int(11) DEFAULT NULL, `broker_size` int(4) DEFAULT 0, `kafka_version` varchar(10) COLLATE utf8_bin DEFAULT '', `location` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `graf_addr` varchar(255) COLLATE utf8_bin DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.ksql_info CREATE TABLE IF NOT EXISTS `ksql_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_id` int(11) DEFAULT NULL, `cluster_name` varchar(255) DEFAULT NULL, `ksql_url` varchar(255) DEFAULT NULL, `ksql_serverId` varchar(255) DEFAULT NULL, `version` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- Data exporting was unselected. -- Dumping structure for table kafka_center.task_info CREATE TABLE IF NOT EXISTS `task_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_ids` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `location` varchar(20) COLLATE utf8_bin NOT NULL DEFAULT '', `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `partition` int(11) DEFAULT NULL, `replication` int(11) DEFAULT NULL, `message_rate` int(50) DEFAULT NULL, `ttl` int(11) DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `create_time` datetime DEFAULT NULL, `approved` int(11) DEFAULT NULL, `approved_id` int(11) DEFAULT NULL, `approved_time` datetime DEFAULT NULL, `approval_opinions` varchar(1000) COLLATE utf8_bin DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.team_info CREATE TABLE IF NOT EXISTS `team_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `own` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.topic_collection CREATE TABLE IF NOT EXISTS `topic_collection` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `cluster_id` int(11) NOT NULL, `user_id` int(11) NOT NULL, `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `type` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.topic_info CREATE TABLE IF NOT EXISTS `topic_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `cluster_id` int(11) NOT NULL, `topic_name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `partition` int(11) DEFAULT NULL, `replication` int(11) DEFAULT NULL, `ttl` bigint(11) DEFAULT NULL, `config` varchar(512) COLLATE utf8_bin DEFAULT NULL, `owner_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, `comments` varchar(1000) COLLATE utf8_bin NOT NULL DEFAULT '', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.user_info CREATE TABLE IF NOT EXISTS `user_info` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `real_name` varchar(255) COLLATE utf8_bin DEFAULT '', `email` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '', `role` varchar(255) COLLATE utf8_bin NOT NULL DEFAULT '100', `create_time` datetime DEFAULT NULL, `password` varchar(255) COLLATE utf8_bin DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; -- Data exporting was unselected. -- Dumping structure for table kafka_center.user_team CREATE TABLE IF NOT EXISTS `user_team` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` int(11) DEFAULT NULL, `team_id` int(11) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin; 2、配置相关配置位于application.properties