实时统计pv、uv是再常见不过的大数据统计需求了,前面出过一篇SparkStreaming实时统计pv,uv的案例,这里用Flink实时计算pv,uv。
我们需要统计不同数据类型每天的pv,uv情况,并且有如下要求.
每秒钟要输出最新的统计结果;
程序永远跑着不会停,所以要定期清理内存里的过时数据;
收到的消息里的时间字段并不是按照顺序严格递增的,所以要有一定的容错机制;
访问uv并不一定每秒钟都会变化,重复输出对IO是巨大的浪费,所以要在uv变更时在一秒内输出结果,未变更时不输出;
Flink数据流上的类型和操作DataStream是flink流处理最核心的数据结构,其它的各种流都可以直接或者间接通过DataStream来完成相互转换,一些常用的流直接的转换关系如图:
可以看出,DataStream可以与KeyedStream相互转换,KeyedStream可以转换为WindowedStream,DataStream不能直接转换为WindowedStream,WindowedStream可以直接转换为DataStream。各种流之间虽然不能相互直接转换,但是都可以通过先转换为DataStream,再转换为其它流的方法来实现。
在这个计算pv,uv的需求中就主要用到DataStream、KeyedStream以及WindowedStream这些数据结构。
这里需要用到window和watermark,使用窗口把数据按天分割,使用watermark可以通过“水位”来定期清理窗口外的迟到数据,起到清理内存的作用。
业务代码我们的数据是json类型的,含有date,helperversion,guid这3个字段,在实时统计pv,uv这个功能中,其它字段可以直接丢掉,当然了在离线数据仓库中,所有有含义的业务字段都是要保留到hive当中的。
其它相关概念就不说了,会专门介绍,这里直接上代码吧。
主要代码,主要使用scala开发:
package com.ddxygq.bigdata.flink.streaming.pvuv import java.util.Properties import com.alibaba.fastjson.JSON import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 import org.apache.flink.streaming.util.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.extensions._ import org.apache.flink.api.scala._ /** * @ Author: keguang * @ Date: 2019/3/18 17:34 * @ version: v1.0.0 * @ description: */ object PvUvCount { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 容错 env.enableCheckpointing(5000) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.setStateBackend(new FsStateBackend("file:///D:/space/IJ/bigdata/src/main/scala/com/ddxygq/bigdata/flink/checkpoint/flink/tagApp")) // kafka 配置 val ZOOKEEPER_HOST = "hadoop01:2181,hadoop02:2181,hadoop03:2181" val KAFKA_BROKERS = "hadoop01:9092,hadoop02:9092,hadoop03:9092" val TRANSACTION_GROUP = "flink-count" val TOPIC_NAME = "flink" val kafkaProps = new Properties() kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKERS) kafkaProps.setProperty("group.id", TRANSACTION_GROUP) // watrmark 允许数据延迟时间 val MaxOutOfOrderness = 86400 * 1000L // 消费kafka数据 val streamData: DataStream[(String, String, String)] = env.addSource( new FlinkKafkaConsumer010[String](TOPIC_NAME, new SimpleStringSchema(), kafkaProps) ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(MaxOutOfOrderness)) { override def extractTimestamp(element: String): Long = { val t = JSON.parseObject(element) val time = JSON.parseObject(JSON.parseObject(t.getString("message")).getString("decrypted_data")).getString("time") time.toLong } }).map(x => { var date = "error" var guid = "error" var helperversion = "error" try { val messageJsonObject = JSON.parseObject(JSON.parseObject(x).getString("message")) val datetime = messageJsonObject.getString("time") date = datetime.split(" ")(0) // hour = datetime.split(" ")(1).substring(0, 2) val decrypted_data_string = messageJsonObject.getString("decrypted_data") if (!"".equals(decrypted_data_string)) { val decrypted_data = JSON.parseObject(decrypted_data_string) guid = decrypted_data.getString("guid").trim helperversion = decrypted_data.getString("helperversion") } } catch { case e: Exception => { println(e) } } (date, helperversion, guid) }) // 这上面是设置watermark并解析json部分 // 聚合窗口中的数据,可以研究下applyWith这个方法和OnWindowedStream这个类 val resultStream = streamData.keyBy(x => { x._1 + x._2 }).timeWindow(Time.days(1)) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1))) .applyWith(("", List.empty[Int], Set.empty[Int], 0L, 0L))( foldFunction = { case ((_, list, set, _, 0), item) => { val date = item._1 val helperversion = item._2 val guid = item._3 (date + "_" + helperversion, guid.hashCode +: list, set + guid.hashCode, 0L, 0L) } } , windowFunction = { case (key, window, result) => { result.map { case (leixing, list, set, _, _) => { (leixing, list.size, set.size, window.getStart, window.getEnd) } } } } ).keyBy(0) .flatMapWithState[(String, Int, Int, Long, Long),(Int, Int)]{ case ((key, numpv, numuv, begin, end), curr) => curr match { case Some(numCurr) if numCurr == (numuv, numpv) => (Seq.empty, Some((numuv, numpv))) //如果之前已经有相同的数据,则返回空结果 case _ => (Seq((key, numpv, numuv, begin, end)), Some((numuv, numpv))) } } // 最终结果 val resultedStream = resultStream.map(x => { val keys = x._1.split("_") val date = keys(0) val helperversion = keys(1) (date, helperversion, x._2, x._3) }) resultedStream.print() env.execute("PvUvCount") } }