Flink1.9整合Kafka

file

本文基于Flink1.9版本简述如何连接Kafka。

流式连接器

file

我们知道可以自己来开发Source 和 Sink ,但是一些比较基本的 Source 和 Sink 已经内置在 Flink 里。

预定义的source支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。

预定义的sink支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。

连接器可以和多种多样的第三方系统进行交互。目前支持以下系统:

Apache Kafka

Apache Cassandra(sink)

Amazon Kinesis Streams(source/sink)

Elasticsearch(sink)

Hadoop FileSystem (sink)

RabbitMQ(source/sink)

Apache NiFi(source/sink)

Twitter Streaming API(source)

请记住,在使用一种连接器时,通常需要额外的第三方组件,比如:数据存储服务器或者消息队列。

Apache Bahir 中定义了其他一些连接器

Apache ActiveMQ(source/sink)

Apache Flume(sink)

Redis(sink)

Akka (sink)

Netty (source)

使用connector并不是唯一可以使数据进入或者流出Flink的方式。一种常见的模式是从外部数据库或者 Web 服务查询数据得到初始数据流,然后通过 Map 或者 FlatMap 对初始数据流进行丰富和增强,这里要使用Flink的异步IO。

而向外部存储推送大量数据时会导致 I/O 瓶颈问题出现。在这种场景下,如果对数据的读操作远少于写操作,可以让外部应用从 Flink 拉取所需的数据,需要用到Flink的可查询状态接口。

本文重点介绍Apache Kafka Connector

Kafka连接器

此连接器提供对Apache Kafka提供的事件流的访问。

Flink提供特殊的Kafka连接器,用于从/向Kafka主题读取和写入数据。Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不完全依赖Kafka 的消费者组的偏移量,而是在内部跟踪和检查这些偏移。

下表为不同版本的kafka与Flink Kafka Consumer的对应关系。

Maven Dependency Supported since Consumer and Producer Class name Kafka version
flink-connector-kafka-0.8_2.11   1.0.0   FlinkKafkaConsumer08 FlinkKafkaProducer08   0.8.x  
flink-connector-kafka-0.9_2.11   1.0.0   FlinkKafkaConsumer09 FlinkKafkaProducer09   0.9.x  
flink-connector-kafka-0.10_2.11   1.2.0   FlinkKafkaConsumer010 FlinkKafkaProducer010   0.10.x  
flink-connector-kafka-0.11_2.11   1.4.0   FlinkKafkaConsumer011 FlinkKafkaProducer011   0.11.x  
flink-connector-kafka_2.11   1.7.0   FlinkKafkaConsumer FlinkKafkaProducer   >= 1.0.0  

而从最新的Flink1.9.0版本开始,使用Kafka 2.2.0客户端。

下面简述使用步骤。

导入maven依赖: <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.0</version> </dependency> 安装Kafka:

可以参照 Kafka入门宝典(详细截图版)

兼容性:

从Flink 1.7开始,它不跟踪特定的Kafka主要版本。相反,它在Flink发布时跟踪最新版本的Kafka。如果您的Kafka代理版本是1.0.0或更高版本,则应使用此Kafka连接器。如果使用旧版本的Kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的连接器。

升级Connect要注意Flink升级作业,同时

在整个过程中使用Flink 1.9或更新版本。

不要同时升级Flink和运营商。

确保您作业中使用的Kafka Consumer和/或Kafka Producer分配了唯一标识符(uid)。

使用stop with savepoint功能获取保存点(例如,使用stop --withSavepoint)。

用法:

引入依赖后,实例化新的source(FlinkKafkaConsumer)和sink(FlinkKafkaProducer)。

Kafka Consumer

先分步骤介绍构建过程,文末附Flink1.9连接Kafka完整代码。

Kafka consumer 根据版本分别叫做FlinkKafkaConsumer08 FlinkKafkaConsumer09等等
Kafka >= 1.0.0 的版本就叫FlinkKafkaConsumer。

构建FlinkKafkaConsumer

java示例代码如下:

Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); // only required for Kafka 0.8 properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); DataStream<String> stream = env .addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjyfj.html