instance.properties一般指一个数据库实例的配置,Canal架构支持一个Canal服务实例,处理多个数据库实例的binlog异步解析。instance.properties需要修改的配置项主要包括:
canal.instance.mysql.slaveId需要配置一个和Master节点的服务ID完全不同的值,这里笔者配置为654321。
配置数据源实例,包括地址、用户、密码和目标数据库:
canal.instance.master.address,这里指定为127.0.0.1:3306。
canal.instance.dbUsername,这里指定为canal。
canal.instance.dbPassword,这里指定为QWqw12!@。
新增canal.instance.defaultDatabaseName,这里指定为test(需要在MySQL中建立一个test数据库,见前面的流程)。
Kafka相关配置,这里暂时使用静态topic和单个partition:
canal.mq.topic,这里指定为test,也就是解析完的binlog结构化数据会发送到Kafka的命名为test的topic中。
canal.mq.partition,这里指定为0。
配置工作做好之后,可以启动Canal服务:
sh /data/canal/bin/startup.sh # 查看服务日志 tail -100f /data/canal/logs/canal/canal # 查看实例日志 -- 一般情况下,关注实例日志即可 tail -100f /data/canal/logs/example/example.log启动正常后,见实例日志如下:
在test数据库创建一个订单表,并且执行几个简单的DML:
use `test`; CREATE TABLE `order` ( id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT \'主键\', order_id VARCHAR(64) NOT NULL COMMENT \'订单ID\', amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT \'订单金额\', create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT \'创建时间\', UNIQUE uniq_order_id (`order_id`) ) COMMENT \'订单表\'; INSERT INTO `order`(order_id, amount) VALUES (\'10086\', 999); UPDATE `order` SET amount = 10087 WHERE order_id = \'10086\'; DELETE FROM `order` WHERE order_id = \'10086\';这个时候,可以利用Kafka的kafka-console-consumer或者Kafka Tools查看test这个topic的数据:
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test具体的数据如下:
// test数据库建库脚本 {"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"} // order表建表DDL {"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `order`\n(\n id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT \'主键\',\n order_id VARCHAR(64) NOT NULL COMMENT \'订单ID\',\n amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT \'订单金额\',\n create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT \'创建时间\',\n UNIQUE uniq_order_id (`order_id`)\n) COMMENT \'订单表\'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"} // INSERT {"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"} // UPDATE {"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"} // DELETE {"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}可见Kafka的名为test的topic已经写入了对应的结构化binlog事件数据,可以编写消费者监听Kafka对应的topic然后对获取到的数据进行后续处理。
小结