创建任务类StudentSink.java,用来创建一个flink任务,里面通过ArrayList创建了一个数据集,然后直接addSink,为了看清DAG,调用disableChaining方法取消了operator chain:
package com.bolingcavalry.customize; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; public class StudentSink { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度为1 env.setParallelism(1); List<Student> list = new ArrayList<>(); list.add(new Student("aaa", 11)); list.add(new Student("bbb", 12)); list.add(new Student("ccc", 13)); list.add(new Student("ddd", 14)); list.add(new Student("eee", 15)); list.add(new Student("fff", 16)); env.fromCollection(list) .addSink(new MySQLSinkFunction()) .disableChaining(); env.execute("sink demo : customize mysql obj"); } }在flink web页面提交任务,并设置任务类:
任务完成后,DAG图显示任务和记录数都符合预期:
去检查数据库,发现数据已写入:
至此,自定义sink的实战已经完成,希望本文能给您一些参考;
欢迎关注公众号:程序员欣宸微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos