在OGG的命令行下执行:
GGSCI (10.0.0.2) 4> edit params r2kafka REPLICAT r2kafka sourcedefs /data/gg/dirdef/tcloud.t_ogg TARGETDB LIBFILE libggjava.so SET property=dirprm/r2kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP tcloud.t_ogg, TARGET tcloud.t_ogg;replicate进程和导入到HDFS的配置类似,差异是调用不同的配置dirprm/r2kafka.props。这个配置的主要配置如下:
gg.handlerlist = kafkahandler //handler类型 gg.handler.kafkahandler.type = kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相关配置 gg.handler.kafkahandler.TopicName =ggtopic //kafka的topic名称,无需手动创建 gg.handler.kafkahandler.format =json //传输文件的格式,支持json,xml等 gg.handler.kafkahandler.mode =op //OGG for Big Data中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次 gg.classpath=dirprm/:/usr/hdp/2.2.0.0-2041/kafka/libs/*:/data/gg/:/data/gg/lib/* //相关库文件的引用r2kafka.props引用的custom_kafka_producer.properties定义了Kafka的相关配置如下:
bootstrap.servers=10.0.0.62:6667 //kafkabroker的地址 acks=1 compression.type=gzip //压缩类型 reconnect.backoff.ms=1000 //重连延时 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000以上配置以及其他可配置项可参考:
以上配置完成后,在OGG命令行下添加trail文件到replicate进程并启动导入到Kafka的replicate进程
GGSCI (10.0.0.2) 5> add replicat r2kafka exttrail /data/gg/dirdat/tc,checkpointtable tcloud.checkpoint REPLICAT added. GGSCI (10.0.0.2) 6> start r2kafka Sending START request to MANAGER ... REPLICAT R2KAFKA starting GGSCI (10.0.0.2) 10> info r2kafka REPLICAT R2KAFKA Last Started 2016-11-09 17:59 Status RUNNING Checkpoint Lag 00:00:00 (updated 00:00:09 ago) Process ID 5236 Log Read Checkpoint File /data/gg/dirdat/tc000000 2016-11-09 17:05:25.067082 RBA 1217检查实时同步到kafka的效果,在Oracle源端更新表的同时,使用kafka客户端自带的脚本去查看这里配置的ggtopic这个kafkatopic下的消息:
SQL> insert into t_ogg values(2,'test2'); 1 row created. SQL> commit; Commit complete.目标端Kafka的同步情况:
[root@10 kafka]# bin/kafka-console-consumer.sh --zookeeper 10.0.0.223:2181 -- from-beginning --topic ggtopic {"table":"TCLOUD.T_OGG","op_type":"I","op_ts":"2016-11-09 09:05:25.067082","current_ts":"2016-11- 09T17:59:20.943000","pos":"00000000000000001080","after": {"ID":"1","TEXT_NAME":"test"}} {"table":"TCLOUD.T_OGG","op_type":"I","op_ts":"2016-11-09 10:02:06.827204","current_ts":"2016-11- 09T18:02:12.323000","pos":"00000000000000001217","after": {"ID":"2","TEXT_NAME":"test2"}}显然,Oracle的数据已准实时同步到Kafka。从头开始消费这个topic发现之前的同步信息也存在。架构上可以直接接Storm,SparkStreaming等直接消费kafka消息进行业务逻辑的处理。
从Oracle实时同步到其他的Hadoop集群中,官方最新版本提供了HDFS,Hbase,Flume和Kafka,相关配置可参考官网给出的例子配置即可。
参考文档: