Flink的DataSource三部曲之一:直接API

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

本文是《Flink的DataSource三部曲》系列的第一篇,该系列旨在通过实战学习和了解Flink的DataSource,为以后的深入学习打好基础,由以下三部分组成:

直接API:即本篇,除了准备环境和工程,还学习了StreamExecutionEnvironment提供的用来创建数据来的API;

内置connector:StreamExecutionEnvironment的addSource方法,入参可以是flink内置的connector,例如kafka、RabbitMQ等;

自定义:StreamExecutionEnvironment的addSource方法,入参可以是自定义的SourceFunction实现类;

Flink的DataSource三部曲文章链接

《Flink的DataSource三部曲之一:直接API》

《Flink的DataSource三部曲之二:内置connector》

《Flink的DataSource三部曲之三:自定义》

关于Flink的DataSource

官方对DataSource的解释:Sources are where your program reads its input from,即DataSource是应用的数据来源,如下图的两个红框所示:

在这里插入图片描述

DataSource类型

对于常见的文本读入、kafka、RabbitMQ等数据来源,可以直接使用Flink提供的API或者connector,如果这些满足不了需求,还可以自己开发,下图是我按照自己的理解梳理的:

在这里插入图片描述

环境和版本

熟练掌握内置DataSource的最好办法就是实战,本次实战的环境和版本如下:

JDK:1.8.0_211

Flink:1.9.2

Maven:3.6.0

操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)

IDEA:2018.3.5 (Ultimate Edition)

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

名称 链接 备注
项目主页   https://github.com/zq2599/blog_demos   该项目在GitHub上的主页  
git仓库地址(https)   https://github.com/zq2599/blog_demos.git   该项目源码的仓库地址,https协议  
git仓库地址(ssh)   git@github.com:zq2599/blog_demos.git   该项目源码的仓库地址,ssh协议  

这个git项目中有多个文件夹,本章的应用在flinkdatasourcedemo文件夹下,如下图红框所示:

在这里插入图片描述

环境和版本

本次实战的环境和版本如下:

JDK:1.8.0_211

Flink:1.9.2

Maven:3.6.0

操作系统:macOS Catalina 10.15.3 (MacBook Pro 13-inch, 2018)

IDEA:2018.3.5 (Ultimate Edition)

创建工程

在控制台执行以下命令就会进入创建flink应用的交互模式,按提示输入gourpId和artifactId,就会创建一个flink应用(我输入的groupId是com.bolingcavalry,artifactId是flinkdatasourcedemo):

mvn \ archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.9.2

现在maven工程已生成,用IDEA导入这个工程,如下图:

在这里插入图片描述

以maven的类型导入:

在这里插入图片描述

导入成功的样子:

在这里插入图片描述

项目创建成功,可以开始写代码实战了;

辅助类Splitter

实战中有个功能常用到:将字符串用空格分割,转成Tuple2类型的集合,这里将此算子做成一个公共类Splitter.java,代码如下:

package com.bolingcavalry; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import org.apache.flink.util.StringUtils; public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { if(StringUtils.isNullOrWhitespaceOnly(s)) { System.out.println("invalid line"); return; } for(String word : s.split(" ")) { collector.collect(new Tuple2<String, Integer>(word, 1)); } } }

准备完毕,可以开始实战了,先从最简单的Socket开始。

Socket DataSource

Socket DataSource的功能是监听指定IP的指定端口,读取网络数据;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyyxzf.html