Spark Streaming自定义Receiver

Spark社区为Spark Streaming提供了很多数据源接口,但是有些比较偏的数据源没有覆盖,由于公司技术栈选择,用了阿里云的MQ服务ONS,要做实时需求,要自己编写Receiver

二 技术实现

1.官网的例子已经比较详细,但是进入实践还需要慢慢调试,官方文档。

2.实现代码,由三部分组成,receiver,inputstream,util

3.receiver代码

import java.io.Serializable import java.util.Properties import com.aliyun.openservices.ons.api._ import com.aliyun.openservices.ons.api.impl.ONSFactoryImpl import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver class OnsReceiver( cid: String, accessKey: String, secretKey: String, addr: String, topic: String, tag: String, func: Message => Array[Byte]) extends Receiver[Array[Byte]](StorageLevel.MEMORY_AND_DISK_2) with Serializable with Logging { receiver => private var consumer: Consumer = null private var workerThread: Thread = null override def onStart(): Unit = { workerThread = new Thread(new Runnable { override def run(): Unit = { val properties = new Properties properties.put(PropertyKeyConst.ConsumerId, cid) properties.put(PropertyKeyConst.AccessKey, accessKey) properties.put(PropertyKeyConst.SecretKey, secretKey) properties.put(PropertyKeyConst.ONSAddr, addr) properties.put(PropertyKeyConst.MessageModel, "CLUSTERING") properties.put(PropertyKeyConst.ConsumeThreadNums, "50") val onsFactoryImpl = new ONSFactoryImpl consumer = onsFactoryImpl.createConsumer(properties) consumer.subscribe(topic, tag, new MessageListener() { override def consume(message: Message, context: ConsumeContext): Action = { try { receiver.store(func(message)) Action.CommitMessage } catch { case e: Throwable => e.printStackTrace() Action.ReconsumeLater } } }) consumer.start() } }) workerThread.setName(s"Aliyun ONS Receiver $streamId") workerThread.setDaemon(true) workerThread.start() } override def onStop(): Unit = { if (workerThread != null) { if (consumer != null) { consumer.shutdown() } workerThread.join() workerThread = null logInfo(s"Stopped receiver for streamId $streamId") } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyygjg.html