SpringBoot整合Kafka和Storm (2)

成功添加了相关依赖之后,这里我们再来添加相应的配置。
application.properties中添加如下配置:

# log logging.config=classpath:logback.xml ## mysql spring.datasource.url=jdbc:mysql://localhost:3306/springBoot2?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true spring.datasource.username=root spring.datasource.password=123456 spring.datasource.driverClassName=com.mysql.jdbc.Driver ## kafka kafka.servers = 192.169.0.23\:9092,192.169.0.24\:9092,192.169.0.25\:9092 kafka.topicName = USER_TOPIC kafka.autoCommit = false kafka.maxPollRecords = 100 kafka.groupId = groupA kafka.commitRule = earliest

注:上述的配置只是一部分,完整的配置可以在我的github中找到。

数据库脚本:

-- springBoot2库的脚本 CREATE TABLE `t_user` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id', `name` varchar(10) DEFAULT NULL COMMENT '姓名', `age` int(2) DEFAULT NULL COMMENT '年龄', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8

注:因为这里我们只是简单的模拟一下业务场景,所以只是建立一张简单的表。

代码编写

说明:这里我只对几个关键的类进行说明,完整的项目工程链接可以在博客底部找到。

在使用SpringBoot整合kafka和storm之前,我们可以先对kfaka和storm的相关代码编写,然后在进行整合。

首先是数据源的获取,也就是使用storm中的spout从kafka中拉取数据。

在之前的storm入门中,讲过storm的运行流程,其中spout是storm获取数据的一个组件,其中我们主要实现nextTuple方法,编写从kafka中获取数据的代码就可以在storm启动后进行数据的获取。

spout类的主要代码如下:

@Override public void nextTuple() { for (;;) { try { msgList = consumer.poll(100); if (null != msgList && !msgList.isEmpty()) { String msg = ""; List<User> list=new ArrayList<User>(); for (ConsumerRecord<String, String> record : msgList) { // 原始数据 msg = record.value(); if (null == msg || "".equals(msg.trim())) { continue; } try{ list.add(JSON.parseObject(msg, User.class)); }catch(Exception e){ logger.error("数据格式不符!数据:{}",msg); continue; } } logger.info("Spout发射的数据:"+list); //发送到bolt中 this.collector.emit(new Values(JSON.toJSONString(list))); consumer.commitAsync(); }else{ TimeUnit.SECONDS.sleep(3); logger.info("未拉取到数据..."); } } catch (Exception e) { logger.error("消息队列处理异常!", e); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e1) { logger.error("暂停失败!",e1); } } } }

注:如果spout在发送数据的时候发送失败,是会重发的!

上述spout类中主要是将从kafka获取的数据传输传输到bolt中,然后再由bolt类处理该数据,处理成功之后,写入数据库,然后给与sqout响应,避免重发。

bolt类主要处理业务逻辑的方法是execute,我们主要实现的方法也是写在这里。需要注意的是这里只用了一个bolt,因此也不用定义Field进行再次的转发。
代码的实现类如下:

@Override public void execute(Tuple tuple) { String msg=tuple.getStringByField(Constants.FIELD); try{ List<User> listUser =JSON.parseArray(msg,User.class); //移除age小于10的数据 if(listUser!=null&&listUser.size()>0){ Iterator<User> iterator = listUser.iterator(); while (iterator.hasNext()) { User user = iterator.next(); if (user.getAge()<10) { logger.warn("Bolt移除的数据:{}",user); iterator.remove(); } } if(listUser!=null&&listUser.size()>0){ userService.insertBatch(listUser); } } }catch(Exception e){ logger.error("Bolt的数据处理失败!数据:{}",msg,e); } }

编写完了spout和bolt之后,我们再来编写storm的主类。

storm的主类主要是对Topology(拓步)进行提交,提交Topology的时候,需要对spout和bolt进行相应的设置。Topology的运行的模式有两种:

一种是本地模式,利用本地storm的jar模拟环境进行运行。

LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TopologyApp", conf,builder.createTopology());

另一种是远程模式,也就是在storm集群进行运行。

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpggzj.html