Flink UDF




   在大多数场景下,用户自定义函数在使用之前是必须要注册的。对于Scala的Table API,udf是不需要注册的。

   调用TableEnvironment的registerFunction()方法来实现注册。Udf注册成功之后,会被插入TableEnvironment的function catalog,这样table API和sql就能解析他了。

1.Scalar Functions 标量函数


   实现一个标量函数需要继承ScalarFunction,并且实现一个或者多个evaluation方法。标量函数的行为就是通过evaluation方法来实现的。evaluation方法必须定义为public,命名为eval。evaluation方法的输入参数类型和返回值类型决定着标量函数的输入参数类型和返回值类型。evaluation方法也可以被重载实现多个eval。同时evaluation方法支持变参数,例如:eval(String... strs)。


public class HashCode extends ScalarFunction { private int factor = 12; public HashCode(int factor) { this.factor = factor; } public int eval(String s) { return s.hashCode() * factor; } } BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // register the function tableEnv.registerFunction("hashCode", new HashCode(10)); // use the function in Java Table API myTable.select("string, string.hashCode(), hashCode(string)"); // use the function in SQL API tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");



public static class TimestampModifier extends ScalarFunction { public long eval(long t) { return t % 1000; } public TypeInformation<?> getResultType(signature: Class<?>[]) { return Types.TIMESTAMP; } } 2.Table Functions 表函数


   为了自定义表函数,需要继承TableFunction,实现一个或者多个evaluation方法。表函数的行为定义在这些evaluation方法内部,函数名为eval并且必须是public。TableFunction可以重载多个eval方法。Evaluation方法的输入参数类型,决定着表函数的输入类型。Evaluation方法也支持变参,例如:eval(String... strs)。返回表的类型取决于TableFunction的基本类型。Evaluation方法使用collect(T)发射输出rows。

   在Table API中,表函数在scala语言中使用方法如下:.join(Expression) 或者 .leftOuterJoin(Expression),在java语言中使用方法如下:.join(String) 或者.leftOuterJoin(String)。

Join操作算子会使用表函数(操作算子右边的表)产生的所有行进行(cross) join 外部表(操作算子左边的表)的每一行。

leftOuterJoin操作算子会使用表函数(操作算子右边的表)产生的所有行进行(cross) join 外部表(操作算子左边的表)的每一行,并且在表函数返回一个空表的情况下会保留所有的outer rows。


cross join用法是LATERAL TABLE(<TableFunction>)。

LEFT JOIN用法是在join条件中加入ON TRUE。


// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer). public class Split extends TableFunction<Tuple2<String, Integer>> { private String separator = " "; public Split(String separator) { this.separator = separator; } public void eval(String str) { for (String s : str.split(separator)) { // use collect(...) to emit a row collect(new Tuple2<String, Integer>(s, s.length())); } } } BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Table myTable = ... // table schema: [a: String] // Register the function. tableEnv.registerFunction("split", new Split("#")); // Use the table function in the Java Table API. "as" specifies the field names of the table. myTable.join("split(a) as (word, length)").select("a, word, length"); myTable.leftOuterJoin("split(a) as (word, length)").select("a, word, length"); // Use the table function in SQL with LATERAL and TABLE keywords. // CROSS JOIN a table function (equivalent to "join" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)"); // LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API). tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");


