用canal同步binlog到kafka,spark streaming消费kafka topic乱码问题

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有kafka和RocketMQ。

在投递的时候我们使用的是非压平的消息模式(canal.mq.flatMessage =false //是否为flat json格式对象),然后消费topic的时候就一直无法正常显示和序列化,通过kafka-console-consumer.sh命令收到的消息如下图

用canal同步binlog到kafka,spark streaming消费kafka topic乱码问题

 

在github上也能找到相关问题

canal-kafka 数据同步到kafka之后,kafka topic乱码:https://github.com/alibaba/canal/issues/898

canal.kafka 用bin/kafka-console-consumer.sh命令收到乱码:https://github.com/alibaba/canal/issues/1013

 在非flatmessage模式下向kafka数据投递传输的是数据包,收到数据后还要解包成对应的message,可参考canal client中的kafka实现, github地址为 https://github.com/alibaba/canal/tree/master/client/src/main/java/com/alibaba/otter/canal/client/kafka

用canal同步binlog到kafka,spark streaming消费kafka topic乱码问题

打开连接后 kafkaConsumer = new KafkaConsumer<String, Message>(properties);

参考这种操作只是简单的kafka能够收消息,结合spark streaming收消息也差不多。

在kafkaparam中设置key和value的反序列化方式

"key.deserializer" -> classOf[StringDeserializer].getName
"value.deserializer" -> classOf[MessageDeserializer].getName

 在拉取消息的时候设置接受格式为Array[Byte]

val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)

在处理每个RDD的时候再对内容进行反序列化:

val parData = rdd.mapPartitions(t => {
val mesDesc = new MessageDeserializer
var list = List[consumerUser]()
while (t.hasNext) {
try {
val value = t.next()._2
val message = mesDesc.deserialize("", value)
//val listMaps = CanalParse.parseData(message)
//逻辑
} catch {
case e: Exception => log.error(e)
}
}
list.iterator
})

这样就拿到了message对象。

依赖jar包

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency> <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.kafka.client</artifactId>
<version>1.1.0</version>
</dependency>

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpspjd.html