通过在hbase中添加辅助输出,结果如下
on Tue Apr 21 18:30:41 CST 2020, put Output(714114118412528160|,001,张三,1587471839) to hbase invoke on Tue Apr 21 18:30:42 CST 2020, put Output(714114118412528160|,001,张三,1587471839) to hbase invoke on Tue Apr 21 18:30:44 CST 2020, put Output(714114118412528160|,001,张三,1587471839) to hbase invoke on Tue Apr 21 18:30:45 CST 2020, put Output(714114118412528160|,001,张三,1587471839) to hbase invoke on Tue Apr 21 18:30:47 CST 2020, put Output(714114118412528160|,001,张三,1587471839) to hbase invoke . . . on Tue Apr 21 18:30:45 CST 2020, put Output(714114118412528160|,001,张三,1587471839) to hbase invoke on Tue Apr 21 18:30:47 CST 2020, put Output(714114118412528160|,001,张三,1587471839) to hbase invoke //并没有到success这一步如果数据流d1进入了标记点:2(输出null);
那么后续的数据流d2进入标记点:1(正常输出) ,此时在web页面task-manager stdout的中出现d2在输出标记:1 和输出标记:2(没有输出2的部分)无限循环。
输出标记:2 没有执行 说明没有写hbase。加上错误产生的条件为要有数据进入标记点:2,初步分析是这个null的返回值影响到了后面hbase的操作。
写hbase前过滤掉null的值
val result: DataStream[Output] = stream.map(s => { var rowkey = "" s.id.split(",").map(id => rowkey += s"$id${9999999999l - s.ts}|") if (rowkey.equals("")) { null } else { Output(rowkey, s.sid, s.sn, s.ts + "") } }).filter(_!=null)//过滤null经过测试,此方法无效。
有效的手段将二次过滤放到一次过滤的位置
source.filter(s => (!s.equals(null)) && (s.contains("\"type\":\"type1\"") || s.contains("\"type\":\"type2\"")) && (s.contains("\"act\":2"))//提前过滤act=2问题解决,但是因为业务的问题,act不是通用条件,不具备通用性。当然可以进行了;进行两次filter,但是过于繁琐并且会产生多条数据流。
将标记点2的null改成默认值,然后通过二次过滤,去除默认值
type match { case "type1" => { if (data.getInteger("act") == 2) { //二次过滤 if (data.getJSONArray("ids").toArray().length > 0) id = recode.getString("id") + "," + data.getJSONArray("ids").toArray().mkString(",") else id = recode.getString("id") Input( id.reverse, data.getString("sid"), data.getString("sn"), recode.getLong("time"), recode.getLong("time") * 1000)//正常输出----标记点:1 } else Input("id","sid", "sn", 0l)//非目标输出 默认值--标记点:2 } case "type2" => { if (data.getInteger("act") == 2) { //二次过滤 id = recode.getString("id") Input(id.reverse, data.getString("sid"), data.getString("sn"), recode.getLong("time"), recode.getLong("time") * 1000)//正常输出----标记点:1 } else Input("id","sid", "sn", 0l) //非目标输出 默认值--标记点:2 } }问题解决,但是从整体数据量来看,标记点1的数量仅为标记点2数量的六分之一到五分之一之间,此处会做很多无用的json解析。在大数据量的时候还是会对效率的些许影响
采用侧输出进行数据分流,将一次过滤的通过侧输出拆分,对拆分后的出具进行特定条件的二次过滤,然后进行对应的解析。
/** * 数据流处理 * * @param source * @return */ def deal(source: DataStream[String]) = { println("数据流处理") //拆分数据流 val splitData: DataStream[String] = splitSource(source) //解析minkTV的 val type1: DataStream[Input] = getMkc(splitData) //解析type2 val type2: DataStream[Input] = getMss(splitData) //合并数据流 val stream: DataStream[Input] = type1.union(type2) //拼接rowkey val result: DataStream[Output] = stream.map(s => { var rowkey = "" s.id.split(",").map(id => rowkey += s"$id${9999999999l - s.ts}|") if (rowkey.equals("")) { null } else { Output(rowkey, s.prdct_cd, s.sid, s.sn, s.ts + "") } }) //将结果写入hbase result.addSink(new CustomSinkToHbase("habse_table", "cf", proInstance)).name("write to hbase").setParallelism(1) env.execute("test") } /** * 从侧输出中获取type1的数据,过滤开始演唱数据 .filter(_.contains("\"act\":2")) 进行解析 * @param splitData * @return */ def getMkc(splitData: DataStream[String]): DataStream[Input] = { splitData.getSideOutput(new OutputTag[String]("type1")) .filter(_.contains("\"act\":2")) .map(str => { try { val recode: JSONObject = JSON.parseObject(str) val dataStr: String = recode.getString("data") val data = JSON.parseObject(dataStr) var id: String = "" if (data.getJSONArray("ids").toArray().length > 0) id = recode.getString("id") + "," + data.getJSONArray("ids").toArray().mkString(",") else id = recode.getString("id") Input( id.reverse, data.getString("sid"), data.getString("sn"), recode.getLong("time") * 1000) } catch { case e => { e.printStackTrace() println("解析json失败: ", str) Input("id","sid", "sn", 0l) } } } ) } /** * 从侧输出中获取type2的数据,过滤开始演唱数据 .filter(_.contains("\"act\":2")) 进行解析 * @param splitData * @return */ def getMss(splitData: DataStream[String]): DataStream[Input] = { splitData.getSideOutput(new OutputTag[String]("type2")) .filter(_.contains("\"act\":2")) .map(str => { try { val recode: JSONObject = JSON.parseObject(str) val dataStr: String = recode.getString("data") val data = JSON.parseObject(dataStr) var id: String = "" id = recode.getString("id") Input(id.reverse, data.getString("sid"), data.getString("sn"), recode.getLong("time") * 1000) } catch { case e => { e.printStackTrace() println("解析json失败: ", str) Input("id","sid", "sn", 0l) } } } ) } /** * 使用侧输出切分数据流 * @param source * @return */ def splitSource(source: DataStream[String]) = { source.process(new ProcessFunction[String, String] { override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = { value match { case value if value.contains("\"type\":\"type1\"") => ctx.output(new OutputTag[String]("type1"), value) case value if value.contains("\"type\":\"type2\"") => ctx.output(new OutputTag[String]("type2"), value) case _ => out.collect(value) } } }) }问题解决,对比1的好处是,侧输出的时候,数据流还是只有一个,只是给数据打了一个标签,并且对可后期业务的扩展很友好。
总结