使用Apache Flink和Kafka进行大数据流处理(2)

env.execute();
}
创建一个新StreamExecutionEnvironment对象,这是使用Flink应用程序的起点
DataStream在应用程序环境中创建一个新的SimpleStringGenerator,该类实现 SourceFunction Flink中所有流数据源的基本接口。
将FlinkKafkaProducer09添加到主题中。
消费者只需从flink-demo主题中读取消息,然后将其打印到控制台中。

public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", “localhost:9092");
properties.setProperty("group.id", "flink_consumer");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>(
"flink-demo", new SimpleStringSchema(), properties) );

stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;

@Override
  public String map(String value) throws Exception {
    return "Stream Value: " + value;
  }}).print();

env.execute();
}

用消费者信息创建一组属性,在这个应用程序中我们只能设置消费者group.id。使用FlinkKafkaConsumer09来获取主题中的消息flink-demo。

Linux公社的RSS地址:https://www.linuxidc.com/rssFeed.aspx

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/184e758f5e2cfe45b4e35fd053920ece.html