1.FlinkRunner在实战中是显式指定的,如果想设置参数怎么使用呢?其实还有另外一种写法,例如以下代码:
//FlinkPipelineOptions options =PipelineOptionsFactory.as(FlinkPipelineOptions.class); //options.setStreaming(true); //options.setAppName("app_test"); //options.setJobName("flinkjob"); //options.setFlinkMaster("localhost:6123"); //options.setParallelism(10);//设置flink 的并行度 //显式指定PipelineRunner:FlinkRunner,必须指定,如果不指定则为本地 options.setRunner(FlinkRunner.class);2.Kafka 有三种数据读取类型,分别是 “earliest ”,“latest ”,“none ”,分别的意思代表是:
earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
.updateConsumerProperties(ImmutableMap.<String,Object>of("auto.offset.reset", "earliest")));3.实战中我自己想把Kafka的数据写入,key不想写入,所以出现了Kafka的key项为空,而values才是真正发送的数据。所以开始和结尾要设置个.values(),如果不加上就会报错。
KafkaIO.<Void, String>write() .values() // 只需要在此写入默认的key就行了,默认为null值 八.小结随着AI和loT的时代的到来,各个公司不同结构、不同类型、不同来源的数据进行整合的成本越来越高。Apache Beam 技术的统一模型和大数据计算平台特性优雅地解决了这一问题,相信在loT万亿市场中,Apache Beam将会发挥越来越重要的角色。
作者介绍张海涛,目前就职于海康威视云基础平台,负责云计算大数据的基础架构设计和中间件的开发,专注云计算大数据方向。Apache Beam 中文社区发起人之一。
Linux公社的RSS地址:https://www.linuxidc.com/rssFeed.aspx