Flink的sink实战之四:自定义

欢迎访问我的GitHub

https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概览

Flink官方提供的sink服务可能满足不了我们的需要,此时可以开发自定义的sink,文本就来一起实战;

全系列链接

《Flink的sink实战之一:初探》

《Flink的sink实战之二:kafka》

《Flink的sink实战之三:cassandra3》

《Flink的sink实战之四:自定义

继承关系

在正式编码前,要先弄清楚对sink能力是如何实现的,前面我们实战过的print、kafka、cassandra等sink操作,核心类的继承关系如下图所示:

在这里插入图片描述

可见实现sink能力的关键,是实现RichFunction和SinkFunction接口,前者用于资源控制(如open、close等操作),后者负责sink的具体操作,来看看最简单的PrintSinkFunction类是如何实现SinkFunction接口的invoke方法:

@Override public void invoke(IN record) { writer.write(record); }

现在对sink的基本逻辑已经清楚了,可以开始编码实战了;

内容和版本

本次实战很简单:自定义sink,用于将数据写入MySQL,涉及的版本信息如下:

jdk:1.8.0_191

flink:1.9.2

maven:3.6.0

flink所在操作系统:CentOS Linux release 7.7.1908

MySQL:5.7.29

IDEA:2018.3.5 (Ultimate Edition)

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

名称 链接 备注
项目主页   https://github.com/zq2599/blog_demos   该项目在GitHub上的主页  
git仓库地址(https)   https://github.com/zq2599/blog_demos.git   该项目源码的仓库地址,https协议  
git仓库地址(ssh)   git@github.com:zq2599/blog_demos.git   该项目源码的仓库地址,ssh协议  

这个git项目中有多个文件夹,本章的应用在flinksinkdemo文件夹下,如下图红框所示:

在这里插入图片描述

数据库准备

请您将MySQL准备好,并执行以下sql,用于创建数据库flinkdemo和表student:

create database if not exists flinkdemo; USE flinkdemo; DROP TABLE IF EXISTS `student`; CREATE TABLE `student` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(25) COLLATE utf8_bin DEFAULT NULL, `age` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin; 编码

使用《Flink的sink实战之二:kafka》中创建的flinksinkdemo工程;

在pom.xml中增加mysql的依赖:

<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.11</version> </dependency>

创建和数据库的student表对应的实体类Student.java:

package com.bolingcavalry.customize; public class Student { private int id; private String name; private int age; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Student(String name, int age) { this.name = name; this.age = age; } }

创建自定义sink类MySQLSinkFunction.java,这是本文的核心,有关数据库的连接、断开、写入数据都集中在此:

package com.bolingcavalry.customize; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; public class MySQLSinkFunction extends RichSinkFunction<Student> { PreparedStatement preparedStatement; private Connection connection; private ReentrantLock reentrantLock = new ReentrantLock(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //准备数据库相关实例 buildPreparedStatement(); } @Override public void close() throws Exception { super.close(); try{ if(null!=preparedStatement) { preparedStatement.close(); preparedStatement = null; } } catch(Exception e) { e.printStackTrace(); } try{ if(null!=connection) { connection.close(); connection = null; } } catch(Exception e) { e.printStackTrace(); } } @Override public void invoke(Student value, Context context) throws Exception { preparedStatement.setString(1, value.getName()); preparedStatement.setInt(2, value.getAge()); preparedStatement.executeUpdate(); } /** * 准备好connection和preparedStatement * 获取mysql连接实例,考虑多线程同步, * 不用synchronize是因为获取数据库连接是远程操作,耗时不确定 * @return */ private void buildPreparedStatement() { if(null==connection) { boolean hasLock = false; try { hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS); if(hasLock) { Class.forName("com.mysql.cj.jdbc.Driver"); connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456"); } if(null!=connection) { preparedStatement = connection.prepareStatement("insert into student (name, age) values (?, ?)"); } } catch (Exception e) { //生产环境慎用 e.printStackTrace(); } finally { if(hasLock) { reentrantLock.unlock(); } } } } }

上述代码很简单,只需要注意在创建连接的时候用到了锁来控制多线程同步,以及高版本mysql驱动对应的driver和uri的写法与以前5.x版本的区别;

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpsxfx.html