mysql数据实时同步到Elasticsearch (5)

配置mypipe-api/src/main/resources/reference.conf,修改include-event-condition选项,指定需要同步的database及table

include-event-condition = """ db == "webservice" && table =="building" """

在kafka broker端创建topic: webservice_building_generic, 默认情况下mypipe以"${db}_${table}_generic"为topic名,向该topic发送数据

执行:./sbt "project runner" "runMain mypipe.runner.PipeRunner"

测试:向mysql building表中插入数据,写一个简单的consumer消费mypipe推送到kafka中的消息

消费到没有经过解析的数据如下:

ConsumerRecord(topic=u\'webservice_building_generic\', partition=0, offset=2, timestamp=None, timestamp_type=None, key=None, value=\'\x00\x01\x00\x00\x14webservice\x10building\xcc\x01\x02\x91,\xae\xa3fc\x11\xe8\xa1\xaaRT\x00Z\xf9\xab\x00\x00\x04\x18BuildingName\x06xxx\x14BuildingId\nId-10\x00\x02\x04Id\xd4%\x00\', checksum=128384379, serialized_key_size=-1, serialized_value_size=88) 使用体验

mypipe相比go-mysql-elasticsearch更成熟,支持运行时ALTER TABLE,同时解析binlog异常发生时,可通过配置不同的策略处理异常

mypipe不能同步存量数据,如果需要同步存量数据可通过其它方式先全量同步后,再使用mypipe进行增量同步

mypipe只同步binlog, 需要同步数据到ES需要另行开发

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwfzyd.html