Flink UDF

AggregateFunction

    用户自定义函数是非常重要的一个特征,因为他极大地扩展了查询的表达能力。本文除了介绍这三种udf之外,最后会介绍一个redis作为交互数据源的udf案例。

注册用户自定义函数

   在大多数场景下,用户自定义函数在使用之前是必须要注册的。对于Scala的Table API,udf是不需要注册的。

   调用TableEnvironment的registerFunction()方法来实现注册。Udf注册成功之后,会被插入TableEnvironment的function catalog,这样table API和sql就能解析他了。

1.Scalar Functions 标量函数

   标量函数,是指返回一个值的函数。标量函数是实现将0,1,或者多个标量值转化为一个新值。

   实现一个标量函数需要继承ScalarFunction,并且实现一个或者多个evaluation方法。标量函数的行为就是通过evaluation方法来实现的。evaluation方法必须定义为public,命名为eval。evaluation方法的输入参数类型和返回值类型决定着标量函数的输入参数类型和返回值类型。evaluation方法也可以被重载实现多个eval。同时evaluation方法支持变参数,例如:eval(String... strs)。

下面给出一个标量函数的例子。例子实现的是一个hashcode方法。

public class HashCode extends ScalarFunction { private int factor = 12; public HashCode(int factor) { this.factor = factor; } public int eval(String s) { return s.hashCode() * factor; } } BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // register the function tableEnv.registerFunction("hashCode", new HashCode(10)); // use the function in Java Table API myTable.select("string, string.hashCode(), hashCode(string)"); // use the function in SQL API tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");

   默认情况下evaluation方法的返回值类型是由flink类型抽取工具决定。对于基础类型及简单的POJOS是足够的,但是更复杂的类型,自定义类型,组合类型,会报错。这种情况下,返回值类型的TypeInformation,需要手动指定,方法是重载ScalarFunction#getResultType()。

   下面给一个例子,通过复写ScalarFunction#getResultType(),将long型的返回值在代码生成的时候翻译成Types.TIMESTAMP。

public static class TimestampModifier extends ScalarFunction { public long eval(long t) { return t % 1000; } public TypeInformation<?> getResultType(signature: Class<?>[]) { return Types.TIMESTAMP; } } 2.Table Functions 表函数

   与标量函数相似之处是输入可以0,1,或者多个参数,但是不同之处可以输出任意数目的行数。返回的行也可以包含一个或者多个列。

   为了自定义表函数,需要继承TableFunction,实现一个或者多个evaluation方法。表函数的行为定义在这些evaluation方法内部,函数名为eval并且必须是public。TableFunction可以重载多个eval方法。Evaluation方法的输入参数类型,决定着表函数的输入类型。Evaluation方法也支持变参,例如:eval(String... strs)。返回表的类型取决于TableFunction的基本类型。Evaluation方法使用collect(T)发射输出rows。

   在Table API中,表函数在scala语言中使用方法如下:.join(Expression) 或者 .leftOuterJoin(Expression),在java语言中使用方法如下:.join(String) 或者.leftOuterJoin(String)。

Join操作算子会使用表函数(操作算子右边的表)产生的所有行进行(cross) join 外部表(操作算子左边的表)的每一行。

leftOuterJoin操作算子会使用表函数(操作算子右边的表)产生的所有行进行(cross) join 外部表(操作算子左边的表)的每一行,并且在表函数返回一个空表的情况下会保留所有的outer rows。

在sql语法中稍微有点区别:

cross join用法是LATERAL TABLE(<TableFunction>)。

LEFT JOIN用法是在join条件中加入ON TRUE。

下面的例子讲的是如何使用表值函数。

// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer). public class Split extends TableFunction<Tuple2<String, Integer>> { private String separator = " "; public Split(String separator) { this.separator = separator; } public void eval(String str) { for (String s : str.split(separator)) { // use collect(...) to emit a row collect(new Tuple2<String, Integer>(s, s.length())); } } } BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Table myTable = ... // table schema: [a: String] // Register the function. tableEnv.registerFunction("split", new Split("#")); // Use the table function in the Java Table API. "as" specifies the field names of the table. myTable.join("split(a) as (word, length)").select("a, word, length"); myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length"); // Use the table function in SQL with LATERAL and TABLE keywords. // CROSS JOIN a table function (equivalent to "join" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");

   需要注意的是PROJO类型不需要一个确定的字段顺序。意味着你不能使用as修改表函数返回的pojo的字段的名字。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwdjxf.html