Flink的sink实战之四:自定义 (2)

创建任务类StudentSink.java,用来创建一个flink任务,里面通过ArrayList创建了一个数据集,然后直接addSink,为了看清DAG,调用disableChaining方法取消了operator chain:

package com.bolingcavalry.customize; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; public class StudentSink { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度为1 env.setParallelism(1); List<Student> list = new ArrayList<>(); list.add(new Student("aaa", 11)); list.add(new Student("bbb", 12)); list.add(new Student("ccc", 13)); list.add(new Student("ddd", 14)); list.add(new Student("eee", 15)); list.add(new Student("fff", 16)); env.fromCollection(list) .addSink(new MySQLSinkFunction()) .disableChaining(); env.execute("sink demo : customize mysql obj"); } }

在flink web页面提交任务,并设置任务类:

在这里插入图片描述

任务完成后,DAG图显示任务和记录数都符合预期:

在这里插入图片描述

去检查数据库,发现数据已写入:

在这里插入图片描述

至此,自定义sink的实战已经完成,希望本文能给您一些参考;

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
https://github.com/zq2599/blog_demos

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsxfx.html