运行结果如下:
下面的ReadTextFile类会读取绝对路径的文本文件,并对内容做单词统计:
package com.bolingcavalry.api; import com.bolingcavalry.Splitter; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class ReadTextFile { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1 env.setParallelism(1); //用txt文件作为数据源 DataStream<String> textDataStream = env.readTextFile("file:///Users/zhaoqin/temp/202003/14/README.txt", "UTF-8"); //统计单词数量并打印出来 textDataStream .flatMap(new Splitter()) .keyBy(0) .sum(1) .print(); env.execute("API DataSource demo : readTextFile"); } }请确保代码中的绝对路径下存在名为README.txt文件,运行结果如下:
3. 打开StreamExecutionEnvironment.java源码,看一下刚才使用的readTextFile方法实现如下,原来是调用了另一个同名方法,该方法的第三个参数确定了文本文件是一次性读取完毕,还是周期性扫描内容变更,而第四个参数就是周期性扫描的间隔时间: public DataStreamSource<String> readTextFile(String filePath, String charsetName) { Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(filePath), "The file path must not be null or blank."); TextInputFormat format = new TextInputFormat(new Path(filePath)); format.setFilesFilter(FilePathFilter.createDefaultFilter()); TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO; format.setCharsetName(charsetName); return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo); }
上面的FileProcessingMode是个枚举,源码如下:
@PublicEvolving public enum FileProcessingMode { /** Processes the current contents of the path and exits. */ PROCESS_ONCE, /** Periodically scans the path for new data. */ PROCESS_CONTINUOUSLY }另外请关注readTextFile方法的filePath参数,这是个URI类型的字符串,除了本地文件路径,还可以是HDFS的地址:hdfs://host:port/file/path
至此,通过直接API创建DataSource的实战就完成了,后面的章节我们继续学习内置connector方式的DataSource;
欢迎关注公众号:程序员欣宸微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos