mysql数据实时同步到Elasticsearch (3)

从以上内容可以看出,mysqldump导出的sql文件包含create table, drop table以及插入数据的sql语句,但是不包含create database建库语句。

使用go-mysql-elasticsearch开源工具同步数据到ES

go-mysql-elasticsearch是用于同步mysql数据到ES集群的一个开源工具,项目github地址:https://github.com/siddontang/go-mysql-elasticsearch

go-mysql-elasticsearch的基本原理是:如果是第一次启动该程序,首先使用mysqldump工具对源mysql数据库进行一次全量同步,通过elasticsearch client执行操作写入数据到ES;然后实现了一个mysql client,作为slave连接到源mysql,源mysql作为master会将所有数据的更新操作通过binlog event同步给slave, 通过解析binlog event就可以获取到数据的更新内容,之后写入到ES.

另外,该工具还提供了操作统计的功能,每当有数据增删改操作时,会将对应操作的计数加1,程序启动时会开启一个http服务,通过调用http接口可以查看增删改操作的次数。

使用限制: 1. mysql binlog必须是ROW模式 2. 要同步的mysql数据表必须包含主键,否则直接忽略,这是因为如果数据表没有主键,UPDATE和DELETE操作就会因为在ES中找不到对应的document而无法进行同步 3. 不支持程序运行过程中修改表结构 4. 要赋予用于连接mysql的账户RELOAD权限以及REPLICATION权限, SUPER权限: GRANT REPLICATION SLAVE ON *.* TO \'elastic\'@\'172.16.32.44\'; GRANT RELOAD ON *.* TO \'elastic\'@\'172.16.32.44\'; UPDATE mysql.user SET Super_Priv=\'Y\' WHERE user=\'elastic\' AND host=\'172.16.32.44\'; 使用方式:

git clone https://github.com/siddontang/go-mysql-elasticsearch

cd go-mysql-elasticsearch/src/github.com/siddontang/go-mysql-elasticsearch

vi etc/river.toml, 修改配置文件,同步172.16.0.101:3306数据库中的webservice.building表到ES集群172.16.32.64:9200的building index(更详细的配置文件说明可以参考项目文档)

# MySQL address, user and password # user must have replication privilege in MySQL. my_addr = "172.16.0.101:3306" my_user = "bellen" my_pass = "Elastic_123" my_charset = "utf8" <span># Set true when elasticsearch use https</span> <span>#es_https = false</span> <span># Elasticsearch address</span> es_addr = <span>"172.16.32.64:9200"</span> <span># Elasticsearch user and password, maybe set by shield, nginx, or x-pack</span> es_user = <span>""</span> es_pass = <span>""</span> <span># Path to store data, like master.info, if not set or empty,</span> <span># we must use this to support breakpoint resume syncing.</span> <span># <span>TODO:</span> support other storage, like etcd.</span> data_dir = <span>"./var"</span> <span># Inner Http status address</span> stat_addr = <span>"127.0.0.1:12800"</span> <span># pseudo server id like a slave</span> server_id = 1001 <span># mysql or mariadb</span> flavor = <span>"mariadb"</span> <span># mysqldump execution path</span> <span># if not set or empty, ignore mysqldump.</span> mysqldump = <span>"mysqldump"</span> <span># if we have no privilege to use mysqldump with --master-data,</span> <span># we must skip it.</span> <span>#skip_master_data = false</span> <span># minimal items to be inserted in one bulk</span> bulk_size = 128 <span># force flush the pending requests if we don\'t have enough items &gt;= bulk_size</span> flush_bulk_time = <span>"200ms"</span> <span># Ignore table without primary key</span> skip_no_pk_table = <span>false</span> <span># MySQL data source</span> [[<span>source</span>]] schema = <span>"webservice"</span> tables = [<span>"building"</span>] [[rule]] schema = <span>"webservice"</span> table = <span>"building"</span> index = <span>"building"</span> <span>type</span> = <span>"buildingtype"</span>

在ES集群中创建building index, 因为该工具并没有使用ES的auto create index功能,如果index不存在会报错

执行命令:./bin/go-mysql-elasticsearch -config=./etc/river.toml

控制台输出结果:

2018/06/02 16:13:21 INFO create BinlogSyncer with config {1001 mariadb 172.16.0.101 3306 bellen utf8 false false <nil> false false 0 0s 0s 0} 2018/06/02 16:13:21 INFO run status http server 127.0.0.1:12800 2018/06/02 16:13:21 INFO skip dump, use last binlog replication pos (mysql-bin.000001, 120) or GTID %!s(<nil>) 2018/06/02 16:13:21 INFO begin to sync binlog from position (mysql-bin.000001, 120) 2018/06/02 16:13:21 INFO register slave for master server 172.16.0.101:3306 2018/06/02 16:13:21 INFO start sync binlog at binlog file (mysql-bin.000001, 120) 2018/06/02 16:13:21 INFO rotate to (mysql-bin.000001, 120) 2018/06/02 16:13:21 INFO rotate binlog to (mysql-bin.000001, 120) 2018/06/02 16:13:21 INFO save position (mysql-bin.000001, 120)

测试:向mysql中插入、修改、删除数据,都可以反映到ES中

使用体验

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwfzyd.html