成功添加了相关依赖之后,这里我们再来添加相应的配置。
在application.properties中添加如下配置:
注:上述的配置只是一部分,完整的配置可以在我的github中找到。
数据库脚本:
-- springBoot2库的脚本 CREATE TABLE `t_user` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '自增id', `name` varchar(10) DEFAULT NULL COMMENT '姓名', `age` int(2) DEFAULT NULL COMMENT '年龄', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8注:因为这里我们只是简单的模拟一下业务场景,所以只是建立一张简单的表。
代码编写说明:这里我只对几个关键的类进行说明,完整的项目工程链接可以在博客底部找到。
在使用SpringBoot整合kafka和storm之前,我们可以先对kfaka和storm的相关代码编写,然后在进行整合。
首先是数据源的获取,也就是使用storm中的spout从kafka中拉取数据。
在之前的storm入门中,讲过storm的运行流程,其中spout是storm获取数据的一个组件,其中我们主要实现nextTuple方法,编写从kafka中获取数据的代码就可以在storm启动后进行数据的获取。
spout类的主要代码如下:
@Override public void nextTuple() { for (;;) { try { msgList = consumer.poll(100); if (null != msgList && !msgList.isEmpty()) { String msg = ""; List<User> list=new ArrayList<User>(); for (ConsumerRecord<String, String> record : msgList) { // 原始数据 msg = record.value(); if (null == msg || "".equals(msg.trim())) { continue; } try{ list.add(JSON.parseObject(msg, User.class)); }catch(Exception e){ logger.error("数据格式不符!数据:{}",msg); continue; } } logger.info("Spout发射的数据:"+list); //发送到bolt中 this.collector.emit(new Values(JSON.toJSONString(list))); consumer.commitAsync(); }else{ TimeUnit.SECONDS.sleep(3); logger.info("未拉取到数据..."); } } catch (Exception e) { logger.error("消息队列处理异常!", e); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e1) { logger.error("暂停失败!",e1); } } } }注:如果spout在发送数据的时候发送失败,是会重发的!
上述spout类中主要是将从kafka获取的数据传输传输到bolt中,然后再由bolt类处理该数据,处理成功之后,写入数据库,然后给与sqout响应,避免重发。
bolt类主要处理业务逻辑的方法是execute,我们主要实现的方法也是写在这里。需要注意的是这里只用了一个bolt,因此也不用定义Field进行再次的转发。
代码的实现类如下:
编写完了spout和bolt之后,我们再来编写storm的主类。
storm的主类主要是对Topology(拓步)进行提交,提交Topology的时候,需要对spout和bolt进行相应的设置。Topology的运行的模式有两种:
一种是本地模式,利用本地storm的jar模拟环境进行运行。
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TopologyApp", conf,builder.createTopology());
另一种是远程模式,也就是在storm集群进行运行。
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());