go-mysql-elasticsearch完成了最基本的mysql实时同步数据到ES的功能,业务如果需要更深层次的功能如允许运行中修改mysql表结构,可以进行自行定制化开发。
异常处理不足,解析binlog event失败直接抛出异常
据作者描述,该项目并没有被其应用于生产环境中,所以使用过程中建议通读源码,知其利弊。
使用mypipe同步数据到ES集群mypipe是一个mysql binlog同步工具,在设计之初是为了能够将binlog event发送到kafka, 当前版本可根据业务的需要也可以自定以将数据同步到任意的存储介质,项目github地址 https://github.com/mardambey/mypipe.
使用限制 1. mysql binlog必须是ROW模式 2. 要赋予用于连接mysql的账户REPLICATION权限 GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO \'elastic\'@\'%\' IDENTIFIED BY \'Elastic_123\' 3. mypipe只是将binlog日志内容解析后编码成Avro格式推送到kafka broker, 并不是将数据推送到kafka,如果需要同步到ES集群,可以从kafka消费数据后,再写入ES 4. 消费kafka中的消息(mysql insert, update, delete操作及具体的数据),需要对消息内容进行Avro解析,获取到对应的数据操作内容,进行下一步处理;mypipe封装了一个KafkaGenericMutationAvroConsumer类,可以直接继承该类使用,或者自行解析 5. mypipe只支持binlog同步,不支持存量数据同步,也即mypipe程序启动后无法对mysql中已经存在的数据进行同步 使用方式git clone https://github.com/mardambey/mypipe.git
./sbt package
配置mypipe-runner/src/main/resources/application.conf
mypipe { Avro schema repository client class nameschema-repo-client = "mypipe.avro.schema.SchemaRepo"
consumers represent sources for mysql binary logsconsumers {
localhost { <span># database <span>"host:port:user:pass"</span> array</span> source = <span>"172.16.0.101:3306:elastic:Elastic_123"</span> }}
data producers export data out (stdout, other stores, external services, etc.)producers {
kafka-generic { class = <span>"mypipe.kafka.producer.KafkaMutationGenericAvroProducer"</span> }}
pipes join consumers and producerspipes {
kafka-generic { enabled = <span>true</span> consumers = [<span>"localhost"</span>] producer { kafka-generic { metadata-brokers = <span>"172.16.16.22:9092"</span> } } binlog-position-repo { # saved to a file, <span>this</span> is the <span>default</span> <span>if</span> unspecified class = <span>"mypipe.api.repo.ConfigurableFileBasedBinaryLogPositionRepository"</span> config { file-prefix = <span>"stdout-00"</span> # required <span>if</span> binlog-position-repo is specifiec data-dir = <span>"/tmp/mypipe/data"</span> # defaults to mypipe.data-dir <span>if</span> <span>not</span> present } } }}
}