flink1.7自定义source实现

flink读取source data

数据的来源是flink程序从中读取输入的地方。我们可以使用StreamExecutionEnvironment.addSource(sourceFunction)将源添加到程序中。
flink附带大量预先实现好的各种读取数据源的函数,也可以通过为非并行源去实现SourceFunction接口或者为并行源实现ParallelSourceFunction接口或扩展RichParallelSourceFunction来编写满足自己业务需要的定制源。

flink预先实现好数据源

下面有几个预定义的流源可以从StreamExecutionEnvironment访问

基于文件

readTextFile(path): 读取文本文件,该文件要符合TextInputFormat规范,逐行读取并作为字符串返回。
readFile(fileInputFormat,path): 根据指定的文件输入格式指定读取文件。
readFile(fileInputFormat,path,watchType,interval,pathFilter,typeInfo): 这是前两个方法在内部调用的方法。它根据给定的fileInputFormat读取路径中的文件。根据提供的watchType,该源可能会定期监视(每间隔ms)该路径下来到的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者处理当前路径中的数据后并退出(FileProcessingMode.PROCESS_ONCE)。使用pathFilter,用户可以进一步排除文件的处理。

基于套接字

socketTextStream : 从套接字读取。元素可以用分隔符分隔。

基于集合

fromCollection(Collection) : 从Java Java.util.Collection创建一个数据流。集合中的所有元素必须是相同的类型。
fromCollection(Iterator,Class) :从迭代器创建数据流。该类要指定迭代器返回的元素的数据类型。
fromElements(T ...) :根据给定的对象序列创建数据流。所有对象必须是相同的类型。
fromParallelCollection(SplittableIterator,Class) : 并行地从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
generateSequence(from,to) : 在给定的区间内并行生成数字序列 。

自定义数据原 package com.intsmaze.flink.streaming.source; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Random; /** * @Description: 自定义数据源的模板 * @Author: intsmaze * @Date: 2019/1/4 */ public class CustomSource { private static final int BOUND = 100; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<Integer, Integer>> inputStream= env.addSource(new RandomFibonacciSource()); inputStream.map(new InputMap()).print(); env.execute("Intsmaze Custom Source"); } /** * @Description: * @Author: intsmaze * @Date: 2019/1/5 */ private static class RandomFibonacciSource implements SourceFunction<Tuple2<Integer, Integer>> { private static final long serialVersionUID = 1L; private Random rnd = new Random(); private volatile boolean isRunning = true; private int counter = 0; /** * @Description: * @Param: * @return: * @Author: intsmaze * @Date: 2019/1/5 */ @Override public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception { while (isRunning && counter < BOUND) { int first = rnd.nextInt(BOUND / 2 - 1) + 1; int second = rnd.nextInt(BOUND / 2 - 1) + 1; ctx.collect(new Tuple2<>(first, second)); counter++; Thread.sleep(50L); } } @Override public void cancel() { isRunning = false; } } /** * @Description: * @Param: * @return: * @Author: intsmaze * @Date: 2019/1/5 */ public static class InputMap implements MapFunction<Tuple2<Integer, Integer>, Tuple5<Integer, Integer, Integer, Integer, Integer>> { private static final long serialVersionUID = 1L; @Override public Tuple5<Integer, Integer, Integer, Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception { return new Tuple5<>(value.f0, value.f1, value.f0, value.f1, 0); } } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpgxsj.html