在刚才新建的工程中创建一个类Socket.java:
package com.bolingcavalry.api; import com.bolingcavalry.Splitter; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; public class Socket { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //监听本地9999端口,读取字符串 DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999); //每五秒钟一次,将当前五秒内所有字符串以空格分割,然后统计单词数量,打印出来 socketDataStream .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) .print(); env.execute("API DataSource demo : socket"); } }从上述代码可见,StreamExecutionEnvironment.socketTextStream就可以创建Socket类型的DataSource,在控制台执行命令nc -lk 9999,即可进入交互模式,此时输出任何字符串再回车,都会将字符串传输到本机9999端口;
在IDEA上运行Socket类,启动成功后再回到刚才执行nc -lk 9999的控制台,输入一些字符串再回车,可见Socket的功能已经生效:
集合DataSource(generateSequence)基于集合的DataSource,API如下图所示:
2. 先试试最简单的generateSequence,创建指定范围内的数字型的DataSource: package com.bolingcavalry.api; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class GenerateSequence { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度为1 env.setParallelism(1); //通过generateSequence得到Long类型的DataSource DataStream<Long> dataStream = env.generateSequence(1, 10); //做一次过滤,只保留偶数,然后打印 dataStream.filter(new FilterFunction<Long>() { @Override public boolean filter(Long aLong) throws Exception { return 0L==aLong.longValue()%2L; } }).print(); env.execute("API DataSource demo : collection"); } }
运行时会打印偶数:
集合DataSource(fromElements+fromCollection)fromElements和fromCollection就在一个类中试了吧,创建FromCollection类,里面是这两个API的用法:
package com.bolingcavalry.api; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; public class FromCollection { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度为1 env.setParallelism(1); //创建一个List,里面有两个Tuple2元素 List<Tuple2<String, Integer>> list = new ArrayList<>(); list.add(new Tuple2("aaa", 1)); list.add(new Tuple2("bbb", 1)); //通过List创建DataStream DataStream<Tuple2<String, Integer>> fromCollectionDataStream = env.fromCollection(list); //通过多个Tuple2元素创建DataStream DataStream<Tuple2<String, Integer>> fromElementDataStream = env.fromElements( new Tuple2("ccc", 1), new Tuple2("ddd", 1), new Tuple2("aaa", 1) ); //通过union将两个DataStream合成一个 DataStream<Tuple2<String, Integer>> unionDataStream = fromCollectionDataStream.union(fromElementDataStream); //统计每个单词的数量 unionDataStream .keyBy(0) .sum(1) .print(); env.execute("API DataSource demo : collection"); } }