Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。官网()、GitHub(https://github.com/zendesk/maxwell)
Maxwell主要提供了下列功能:
支持 SELECT * FROM table 的方式进行全量数据初始化
支持在主库发生failover后,自动恢复binlog位置(GTID)
可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区
工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event
除了Maxwell外,目前常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三个工具对比如下:
canal 由Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费canal解析到的数据。
maxwell相对于canal的优势是使用简单,它直接将数据变更输出为json字符串,不需要再编写客户端。
快速开始首先MySQL需要先启用binlog,关于什么是MySQL binlog,可以参考文章《MySQL Binlog 介绍》
$ vi my.cnf [mysqld] server_id=1 log-bin=master binlog_format=row创建Maxwell用户,并赋予 maxwell 库的一些权限
CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456'; GRANT ALL ON maxwell.* TO 'maxwell'@'%'; GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';使用 maxwell 之前需要先启动 kafka
wget tar -xzf kafka_2.11-2.1.0.tgz cd kafka_2.11-2.1.0 # 启动Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties单机启动 kafka 之前,需要修改一下配置文件,打开配置文件 vi config/server.properties,在文件最后加入 advertised.host.name 的配置,值为 kafka 所在机器的IP
advertised.host.name=10.100.97.246不然后面通过 docker 启动 maxwell 将会报异常(其中的 hadoop2 是我的主机名)
17:45:21,446 DEBUG NetworkClient - [Producer clientId=producer-1] Error connecting to node hadoop2:9092 (id: 0 rack: null) java.io.IOException: Can't resolve address: hadoop2:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:217) ~[kafka-clients-1.0.0.jar:?] at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793) [kafka-clients-1.0.0.jar:?] at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:230) [kafka-clients-1.0.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:263) [kafka-clients-1.0.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-1.0.0.jar:?] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-1.0.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181] Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:101) ~[?:1.8.0_181] at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[?:1.8.0_181] at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-1.0.0.jar:?] ... 6 more接着可以启动 kafka
bin/kafka-server-start.sh config/server.properties测试 kafka
# 创建一个 topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test # 列出所有 topic bin/kafka-topics.sh --list --zookeeper localhost:2181 # 启动一个生产者,然后随意发送一些消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message # 在另一个终端启动一下消费者,观察所消费的消息 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message通过 docker 快速安装并使用 Maxwell (当然之前需要自行安装 docker)
# 拉取镜像 docker pull zendesk/maxwell # 启动maxwell,并将解析出的binlog输出到控制台 docker run -ti --rm zendesk/maxwell bin/maxwell --user='maxwell' --password='123456' --host='10.100.97.246' --producer=stdout测试Maxwell,首先创建一张简单的表,然后增改删数据
CREATE TABLE `test` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `age` int(11) DEFAULT NULL, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; insert into test values(1,22,"小旋锋"); update test set where id=1; delete from test where id=1;观察docker控制台的输出,从输出的日志中可以看出Maxwell解析出的binlog的JSON字符串的格式
{"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小旋锋"}} {"database":"test","table":"test","type":"update","ts":1552153502,"xid":833,"commit":true,"data":{"id":1,"age":22,"name":"whirly"},"old":{"name":"小旋锋"}} {"database":"test","table":"test","type":"delete","ts":1552153502,"xid":834,"commit":true,"data":{"id":1,"age":22,"name":"whirly"}}