使用的flink版本:1.9.1
异常描述需求:
从kafka读取一条数据流
经过filter初次筛选符合要求的数据
然后通过map进行一次条件判断再解析。这个这个过程中可能返回null或目标输出outData。
最后将outData通过自定义sink写入hbase。
转换核心代码: val stream: DataStream[Input] = source.filter(s => (!s.equals(null)) && (s.contains("\"type\":\"type1\"") || s.contains("\"type\":\"type2\"")))//一次过滤 .map(json => { try { val recode: JSONObject = JSON.parseObject(json) val dataStr: String = recode.getString("data") val type = recode.getString("type") val data = JSON.parseObject(dataStr) var id: String = "" type match { case "type1" => { if (data.getInteger("act") == 2) { //二次过滤 if (data.getJSONArray("ids").toArray().length > 0) id = recode.getString("id") + "," + data.getJSONArray("ids").toArray().mkString(",") else id = recode.getString("id") Input( id.reverse, data.getString("sid"), data.getString("sn"), recode.getLong("time"), recode.getLong("time") * 1000)//正常输出----标记点:1 } else null//非目标输出 导致问题的位置 此处给个随便的默认值 只要不是null就不会出问题,但是这样后面操作需要二次过滤-----标记点:2 } case "type2" => { if (data.getInteger("act") == 2) { //二次过滤 id = recode.getString("id") Input(id.reverse, data.getString("sid"), data.getString("sn"), recode.getLong("time"), recode.getLong("time") * 1000)//正常输出----标记点:1 } else null //非目标输出 导致问题的位置 此处给个随便的默认值 只要不是null就不会出问题,但是这样后面操作需要二次过滤 ----标记点:2 } } } catch { case e => { e.printStackTrace() println("解析json失败: ", json) Input("id","sid", "sn", 0l) } } } ) val result: DataStream[Output] = stream.map(s => { var rowkey = "" s.id.split(",").map(id => rowkey += s"$id${9999999999l - s.ts}|") if (rowkey.equals("")) { null } else { Output(rowkey, s.sid, s.sn, s.ts + "") } }) result.addSink(new CustomSinkToHbase("habse_table", "cf", proInstance)).name("write to hbase").setParallelism(1) 自定义sink核心代码 override def invoke(value: Output, context: SinkFunction.Context[_]): Unit = { println(s"on ${new Date}, put $value to hbase invoke ") //输出标记:1 try { init() val puts = new util.ArrayList[Put]() value.rowkey.split("\\|").map(s => { val rowkey = s val put: Put = new Put(Bytes.toBytes(rowkey)) put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("sid"), Bytes.toBytes(value.sid)) put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("sn"), Bytes.toBytes(value.sn)) put.addColumn(Bytes.toBytes(cf), Bytes.toBytes("ts"), Bytes.toBytes(value.ts)) puts.add(put) }) table.put(puts) println(s"on ${new Date}, put $value to hbase succeese ")//输出标记:2 } catch { case e => { e.printStackTrace() if (table != null) table.close() if (conn != null) conn.close() } } } 执行情况在程序启动后,随着数据流的进入会产生不一样的结果:
如果数据从未有数据进入标记点2,那么一切正常
如果如果有数据进入标记点2,说明此时返回的是null,程序会马上报错:ExceptionInChainedOperatorException,后续的数据处理也会失败,程序陷入死循环。
具体表现如下:
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203) 问题追踪在程序报错后在taskmanager日志的表现为错误日志无限循环,web页面的表现为任务的开始时间重置。
辅助输出,确定程序出错位置