Flink UDF (2)

   默认情况下TableFunction返回值类型是由flink类型抽取工具决定。对于基础类型及简单的POJOS是足够的,但是更复杂的类型,自定义类型,组合类型,会报错。这种情况下,返回值类型的TypeInformation,需要手动指定,方法是重载TableFunction#getResultType()。

下面的例子,我们通过复写TableFunction#getResultType()方法使得表返回类型是RowTypeInfo(String, Integer)。

public class CustomTypeSplit extends TableFunction<Row> { public void eval(String str) { for (String s : str.split(" ")) { Row row = new Row(2); row.setField(0, s); row.setField(1, s.length); collect(row); } } @Override public TypeInformation<Row> getResultType() { return Types.ROW(Types.STRING(), Types.INT()); } } 3.Aggregation Functions 聚合函数

   用户自定义聚合函数聚合一张表(一行或者多行,一行有一个或者多个属性)为一个标量的值。
[图片上传失败...(image-f5e972-1542542047386)]
上图中是讲的一张饮料的表这个表有是那个字段五行数据,现在要做的是求出所有饮料的最高价。

   聚合函数需要继承AggregateFunction。聚合函数工作方式如下:

首先,需要一个accumulator,这个是保存聚合中间结果的数据结构。调用AggregateFunction函数的createAccumulator()方法来创建一个空accumulator.

随后,每个输入行都会调用accumulate()方法来更新accumulator。一旦所有的行被处理了,getValue()方法就会被调用,计算和返回最终的结果。

对于每个AggregateFunction,下面三个方法都是比不可少的:

createAccumulator() accumulate() getValue()

   flink的类型抽取机制不能识别复杂的数据类型,比如,数据类型不是基础类型或者简单的pojos类型。所以,类似于ScalarFunction 和TableFunction,AggregateFunction提供了方法去指定返回结果类型的TypeInformation,用的是AggregateFunction#getResultType()。Accumulator类型用的是AggregateFunction#getAccumulatorType()。

   除了上面的方法,还有一些可选的方法。有些方法是让系统更加高效的执行查询,另外的一些在特定的场景下是必须的。例如,merge()方法在会话组窗口(session group window)上下文中是必须的。当一行数据是被视为跟两个回话窗口相关的时候,两个会话窗口的accumulators需要被join。

AggregateFunction的下面几个方法,根据使用场景的不同需要被实现:

retract():在bounded OVER窗口的聚合方法中是需要实现的。

merge():在很多batch 聚合和会话窗口聚合是必须的。

resetAccumulator(): 在大多数batch聚合是必须的。

AggregateFunction的所有方法都是需要被声明为public,而不是static。定义聚合函数需要实现org.apache.flink.table.functions.AggregateFunction同时需要实现一个或者多个accumulate方法。该方法可以被重载为不同的数据类型,并且支持变参。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwdjxf.html