env.execute();
}
创建一个新StreamExecutionEnvironment对象,这是使用Flink应用程序的起点
DataStream在应用程序环境中创建一个新的SimpleStringGenerator,该类实现 SourceFunction Flink中所有流数据源的基本接口。
将FlinkKafkaProducer09添加到主题中。
消费者只需从flink-demo主题中读取消息,然后将其打印到控制台中。
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", “localhost:9092");
properties.setProperty("group.id", "flink_consumer");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer09<>(
"flink-demo", new SimpleStringSchema(), properties) );
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Stream Value: " + value;
}}).print();
env.execute();
}
用消费者信息创建一组属性,在这个应用程序中我们只能设置消费者group.id。使用FlinkKafkaConsumer09来获取主题中的消息flink-demo。
Linux公社的RSS地址:https://www.linuxidc.com/rssFeed.aspx