Canal:同步mysql增量数据工具,一篇详解核心知识点 (2)

缺点:由于它是记录的执行语句,所以,为了让这些语句在 slave 端也能正确执行,那他还必须记录每条语句在执行过程中的一些相关信息,也就是上下文信息,以保证所有语句在 slave 端被执行的时候能够得到和在 master 端执行时候相同的结果。

但目前例如 step()函数在有些版本中就不能被正确复制,在存储过程中使用了 last-insert-id()函数,可能会使 slave 和 master 上得到不一致的 id,就是会出现数据不一致的情况,ROW 模式下就没有。

MIXED 模式

以上两种模式都使用。

Canal 实时同步

首先我们要配置环境,在 conf/example/instance.properties 下:

 ## mysql serverId
 canal.instance.mysql.slaveId = 1234
 #position info,需要修改成自己的数据库信息
 canal.instance.master.address = 127.0.0.1:3306
 canal.instance.master.journal.name =
 canal.instance.master.position =
 canal.instance.master.timestamp =
 #canal.instance.standby.address =
 #canal.instance.standby.journal.name =
 #canal.instance.standby.position =
 #canal.instance.standby.timestamp =
 #username/password,需要修改成自己的数据库信息
 canal.instance.dbUsername = canal
 canal.instance.dbPassword = canal
 canal.instance.defaultDatabaseName =
 canal.instance.connectionCharset = UTF-8
 #table regex
 canal.instance.filter.regex = .\*\\\\..\*

其中,canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK,ISO-8859-1。

配置完后,就要启动了

 sh bin/startup.sh
 关闭使用 bin/stop.sh

观察日志

一般使用 cat 查看 canal/canal.log、example/example.log

启动客户端

在 IDEA 中业务代码,mysql 中如果有增量数据就拉取过来,在 IDEA 控制台打印出来

在 pom.xml 文件中添加:

 <dependency>
   <groupId>com.alibaba.otter</groupId>
   <artifactId>canal.client</artifactId>
   <version>1.0.12</version>
 </dependency>

添加客户端代码:

public class Demo {
 public static void main(String[] args) {
     //创建连接
     CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop03", 11111),
             "example""""");
     connector.connect();
     //订阅
     connector.subscribe();
     connector.rollback();
     int batchSize = 1000;
     int emptyCount = 0;
     int totalEmptyCount = 100;
     while (totalEmptyCount > emptyCount) {
         Message msg = connector.getWithoutAck(batchSize);
         long id = msg.getId();
         List<CanalEntry.Entry> entries = msg.getEntries();
         if(id == -1 || entries.size() == 0){
             emptyCount++;
             System.out.println("emptyCount : " + emptyCount);
             try {
                 Thread.sleep(3000);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }else{
             emptyCount = 0;
             printEntry(entries);
         }
         connector.ack(id);
     }
 }
 // batch -> entries -> rowchange - rowdata -> cols
 private static void printEntry(List<CanalEntry.Entry> entries) {
     for (CanalEntry.Entry entry : entries){
         if(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
                 entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){
             continue;
         }
         CanalEntry.RowChange rowChange = null;
         try {
             rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
         } catch (InvalidProtocolBufferException e) {
             e.printStackTrace();
         }
         CanalEntry.EventType eventType = rowChange.getEventType();
         System.out.println(entry.getHeader().getLogfileName()+" __ " +
                 entry.getHeader().getSchemaName() + " __ " + eventType);
         List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
         for(CanalEntry.RowData rowData : rowDatasList){
             for(CanalEntry.Column column: rowData.getAfterColumnsList()){
                 System.out.println(column.getName() + " - " +
                         column.getValue() + " - " +
                         column.getUpdated());
             }
         }
     }
 }
}

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyyyjg.html