大数据时代,数据实时同步解决方案的思考—最全的数据同步总结 (2)

大数据时代,数据实时同步解决方案的思考—最全的数据同步总结

A、 canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议

 

B、 mysql master收到dump请求,开始推送binary log给slave(也就是canal)

C、 canal解析binary log对象(原始为byte流)

另外canal 在设计时,特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample

canal c# 客户端: https://github.com/dotnetcore/CanalSharp

canal go客户端: https://github.com/CanalClient/canal-go

canal php客户端: https://github.com/xingwenge/canal-php、

github的地址:https://github.com/alibaba/canal/

D、在使用canal时,mysql需要开启binlog,并且binlog-format必须为row,可以在mysql的my.cnf文件中增加如下配置

log-bin=E:/mysql5.5/bin_log/mysql-bin.log

binlog-format=ROW

server-id=123、

E、 部署canal的服务端,配置canal.properties文件,然后 启动 bin/startup.sh 或bin/startup.bat

#设置要监听的mysql服务器的地址和端口

canal.instance.master.address = 127.0.0.1:3306

#设置一个可访问mysql的用户名和密码并具有相应的权限,本示例用户名、密码都为canal

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

#连接的数据库

canal.instance.defaultDatabaseName =test

#订阅实例中所有的数据库和表

canal.instance.filter.regex = .*\\..*

#连接canal的端口

canal.port= 11111

#监听到的数据变更发送的队列

canal.destinations= example

F、 客户端开发,在maven中引入canal的依赖

   <dependency>          <groupId>com.alibaba.otter</groupId>           <artifactId>canal.client</artifactId>           <version>1.0.21</version>       </dependency>

代码示例:

 

package com.example; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; public class CanalClientExample { public static void main(String[] args) { while (true) { //连接canal CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal"); connector.connect(); //订阅 监控的 数据库.表 connector.subscribe("demo_db.user_tab"); //一次取10条 Message msg = connector.getWithoutAck(10); long batchId = msg.getId(); int size = msg.getEntries().size(); if (batchId < 0 || size == 0) { System.out.println("没有消息,休眠5秒"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } else { // CanalEntry.RowChange row = null; for (CanalEntry.Entry entry : msg.getEntries()) { try { row = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); List<CanalEntry.RowData> rowDatasList = row.getRowDatasList(); for (CanalEntry.RowData rowdata : rowDatasList) { List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList(); Map<String, Object> dataMap = transforListToMap(afterColumnsList); if (row.getEventType() == CanalEntry.EventType.INSERT) { //具体业务操作 System.out.println(dataMap); } else if (row.getEventType() == CanalEntry.EventType.UPDATE) { //具体业务操作 System.out.println(dataMap); } else if (row.getEventType() == CanalEntry.EventType.DELETE) { List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { if ("id".equals(column.getName())) { //具体业务操作 System.out.println("删除的id:" + column.getValue()); } } } else { System.out.println("其他操作类型不做处理"); } } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } //确认消息 connector.ack(batchId); } } } public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) { Map map = new HashMap(); if (afterColumnsList != null && afterColumnsList.size() > 0) { for (CanalEntry.Column column : afterColumnsList) { map.put(column.getName(), column.getValue()); } } return map; } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgxdp.html