Flume直接对接SaprkStreaming的两种方式

一、flume对接sparkStreaming的两种方式:

Push推送的方式

Poll拉取的方式

第一种Push方式:

代码如下:

package cn.itcast.spark.day5 import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} /** * . */ object FlumePushWordCount { def main(args: Array[String]) { val host = args(0) val port = args(1).toInt LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("FlumeWordCount")//.setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(5)) //推送方式: flume向spark发送数据 val flumeStream = FlumeUtils.createStream(ssc, host, port) //flume中的数据通过event.getBody()才能拿到真正的内容 val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1)) val results = words.reduceByKey(_ + _) results.print() ssc.start() ssc.awaitTermination() } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpzpzs.html