基于Canal和Kafka实现MySQL的Binlog近实时同步 (3)

操作完成之后,就可以使用root用户远程访问此虚拟机上的MySQL服务。最后确认是否开启了binlog(注意一点是MySQL8.x默认开启binlog)SHOW VARIABLES LIKE \'%bin%\';:

基于Canal和Kafka实现MySQL的Binlog近实时同步

最后在MySQL的Shell执行下面的命令,新建一个用户名canal密码为QWqw12!@的新用户,赋予REPLICATION SLAVE和 REPLICATION CLIENT权限:

CREATE USER canal IDENTIFIED BY \'QWqw12!@\'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO \'canal\'@\'%\'; FLUSH PRIVILEGES; ALTER USER \'canal\'@\'%\' IDENTIFIED WITH mysql_native_password BY \'QWqw12!@\';

切换回去root用户,创建一个数据库test:

CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`; 安装Zookeeper

Canal和Kafka集群都依赖于Zookeeper做服务协调,为了方便管理,一般会独立部署Zookeeper服务或者Zookeeper集群。笔者这里选用2020-03-04发布的3.6.0版本:

midkr /data/zk # 创建数据目录 midkr /data/zk/data cd /data/zk wget tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz cd apache-zookeeper-3.6.0-bin/conf cp zoo_sample.cfg zoo.cfg && vim zoo.cfg

把zoo.cfg文件中的dataDir设置为/data/zk/data,然后启动Zookeeper:

[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start /usr/bin/java ZooKeeper JMX enabled by default Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg Starting zookeeper ... STARTED

这里注意一点,要启动此版本的Zookeeper服务必须本地安装好JDK8+,这一点需要自行处理。启动的默认端口是2181,启动成功后的日志如下:

基于Canal和Kafka实现MySQL的Binlog近实时同步

安装Kafka

Kafka是一个高性能分布式消息队列中间件,它的部署依赖于Zookeeper。笔者在此选用2.4.0并且Scala版本为2.13的安装包:

mkdir /data/kafka mkdir /data/kafka/data wget tar -zxvf kafka_2.13-2.4.0.tgz

由于解压后/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中对应的zookeeper.connect=localhost:2181已经符合需要,不必修改,需要修改日志文件的目录log.dirs为/data/kafka/data。然后启动Kafka服务:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties

基于Canal和Kafka实现MySQL的Binlog近实时同步

这样启动一旦退出控制台就会结束Kafka进程,可以添加-daemon参数用于控制Kafka进程后台不挂断运行。

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties 安装和使用Canal

终于到了主角登场,这里选用Canal的v1.1.4稳定发布版,只需要下载deployer模块:

mkdir /data/canal cd /data/canal # 这里注意一点,Github在国内被墙,下载速度极慢,可以先用其他下载工具下载完再上传到服务器中 wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz tar -zxvf canal.deployer-1.1.4.tar.gz

解压后的目录如下:

- bin # 运维脚本 - conf # 配置文件 canal_local.properties # canal本地配置,一般不需要动 canal.properties # canal服务配置 logback.xml # logback日志配置 metrics # 度量统计配置 spring # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件 example # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹 instance.properties # 实例配置,一般指单个数据库的配置 - lib # 服务依赖包 - logs # 日志文件输出目录

在开发和测试环境建议把logback.xml的日志级别修改为DEBUG方便定位问题。这里需要关注canal.properties和instance.properties两个配置文件。canal.properties文件中,需要修改:

去掉canal.instance.parser.parallelThreadSize = 16这个配置项的注释,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析。

canal.serverMode配置项指定为kafka,可选值有tcp、kafka和rocketmq(master分支或者最新的的v1.1.5-alpha-1版本,可以选用rabbitmq),默认是kafka。

canal.mq.servers配置需要指定为Kafka服务或者集群Broker的地址,这里配置为127.0.0.1:9092。

canal.mq.servers在不同的canal.serverMode有不同的意义。
kafka模式下,指Kafka服务或者集群Broker的地址,也就是bootstrap.servers
rocketmq模式下,指NameServer列表
rabbitmq模式下,指RabbitMQ服务的Host和Port

其他配置项可以参考下面两个官方Wiki的链接:

Canal-Kafka-RocketMQ-QuickStart

AdminGuide

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyspdz.html