Flink提供了像表一样处理的API和像执行SQL语句一样把结果集进行执行。这样很方便的让大家进行数据处理了。比如执行一些查询,在***数据和批处理的任务上,然后将这些按一定的格式进行输出,很方便的让大家像执行SQL一样简单。
今天主要写的东西分为如下几个方面,然后遵循着下边几个方面进行展开:
1. Flink的不同API的层级梗概。
2. FlinkSQL的编程的步骤。
3. Flink编程的例子。
一、 Flink有着不同级别的API,不同级别的API方便不同用户进行处理。普通用户使用Datastream以及Dataset进行程序编写,我们可以在其更高的基础上使用Table API以及SQL,这也是Flink的强大之处,可以像使用处理表一样处理数据。如果想研究的更高可以看更底层的东西。
SQL High-level LanguageTable API Declarative DSL
Datastream / Dataset API Core API
Stateful Stream Processing
Low-level building block
(streams, state, [event] time)
二、 Flink的Table API 和 SQL编程步骤如下:
1) 创建一个TableEnvironment表环境用于后续使用。TableEnvironment是 SQL 和 Table API的核心概念,它用于设置执行所需要的数据属性,和ExecutionEnvironment类似,它主要负责:
a) 注册表数据源,从内部或者外部来源。
b) 执行相应的SQL语句。
c) 注册自定义集数。
d 将结果集进行扫描和写入到目标数据源。
e) 相同的environment可以执行相应的join unin操作。
2)接下来,咱们看一下如何注册数据源,注意不同的Flink版本有不同的实现,但是核心的内容是不变的:
a) 可以直接从数据集里进行注册。比如 tableEnvironment.registerDataSet()。
b) 在一个已经存在的Table中直接执行scan或者select,那么会生成一个新的Table,也就是数据可以从已有的Table中再次获取,Table t = tableEnv.scan("x").select("a, b,c")。
c) 可以是TableSource, 也就是从不同的文件、数据库、消息系统进行读取。 比如csv文件,TableSource csvSource = new CsvTableSource("path/to/file")。
3)读取完数据后进行处理,处理完之后要存储起来,那么需要Sink(存储)到文件或者数据库、消息系统等。
a) 比如Sink到CSV文件。 TableSink csvSink = new TableCSVSink("path/to/sink", ..)。
b) Sink为指定字段句和类型到CSV文件中。
指定表字段: String[] fieldNames = {"fild1", "filed2", "field3"};
指定字段类型: TypeInformation[] fieldTypes = {Types.INT, Types.STRING, Types.LONG};
指定表名和csv文件:tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink);
三、接下来,看一下真实的例子。
1)从给定的单词和单词的个数中统计一下,每个单词出现的数据,使用SQL语句进行实现查询统计。完整的样例如下(注意,不同的FLink版本实现上有稍微的差异):
package myflink.sql; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.BatchTableEnvironment; public class WordCountSQL { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.getTableEnvironment(env); DataSet<WC> input = env.fromElements( WC.of("hello", 1), WC.of("hqs", 1), WC.of("world", 1), WC.of("hello", 1) ); //注册数据集 tEnv.registerDataSet("WordCount", input, "word, frequency"); //执行SQL,并结果集做为一个新表 Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word"); DataSet<WC> result = tEnv.toDataSet(table, WC.class); result.print(); } public static class WC { public String word; //hello public long frequency; //创建构造方法,让flink进行实例化 public WC() {} public static WC of(String word, long frequency) { WC wc = new WC(); wc.word = word; wc.frequency = frequency; return wc; } @Override public String toString() { return "WC " + word + " " + frequency; } } }