SpringBoot整合Kafka和Storm

kafka 和 storm的环境安装

地址:

kafka的相关使用

地址:

storm的相关使用

地址:

SpringBoot整合kafka和storm 为什么使用SpringBoot整合kafka和storm

一般而言,使用kafka整合storm可以应付大多数需求。但是在扩展性上来说,可能就不太好。目前主流的微服务框架SpringCloud是基于SpringBoot的,所以使用SpringBoot对kafka和storm进行整合,可以进行统一配置,扩展性会更好。

使用SpringBoot整合kafka和storm做什么

一般来说,kafka和storm的整合,使用kafka进行数据的传输,然后使用storm实时的处理kafka中的数据。

在这里我们加入SpringBoot之后,也是做这些,只不过是由SpringBoot对kafka和storm进行统一的管理。

如果还是不好理解的话,可以通过下面这个简单的业务场景了解下:

在数据库中有一批大量的用户数据,其中这些用户数据中有很多是不需要的,也就是脏数据,我们需要对这些用户数据进行清洗,然后重新存入数据库中,但是要求实时、延时低,并且便于管理。

所以这里我们就可以使用SpringBoot+kafka+storm来进行相应的开发。

开发准备

在进行代码开发前,我们要明确开发什么。
在上述的业务场景中,需要大量的数据,但是我们这里只是简单的进行开发,也就是写个简单的demo出来,能够简单的实现这些功能,所以我们只需满足如下条件就可以了:

提供一个将用户数据写入kafka的接口;

使用storm的spout获取kafka的数据并发送给bolt;

在bolt移除年龄小于10岁的用户的数据,并写入mysql;

那么根据上述要求我们进行SpringBoot、kafka和storm的整合。
首先需要相应jar包,所以maven的依赖如下:

<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <springboot.version>1.5.9.RELEASE</springboot.version> <mybatis-spring-boot>1.2.0</mybatis-spring-boot> <mysql-connector>5.1.44</mysql-connector> <slf4j.version>1.7.25</slf4j.version> <logback.version>1.2.3</logback.version> <kafka.version>1.0.0</kafka.version> <storm.version>1.2.1</storm.version> <fastjson.version>1.2.41</fastjson.version> <druid>1.1.8</druid> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>${springboot.version}</version> </dependency> <!-- Spring Boot Mybatis 依赖 --> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>${mybatis-spring-boot}</version> </dependency> <!-- MySQL 连接驱动依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql-connector}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>${kafka.version}</version> </dependency> <!--storm相关jar --> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>${storm.version}</version> <!--排除相关依赖 --> <exclusions> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-1.2-api</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-web</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <artifactId>ring-cors</artifactId> <groupId>ring-cors</groupId> </exclusion> </exclusions> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>${storm.version}</version> </dependency> <!--fastjson 相关jar --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!-- Druid 数据连接池依赖 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>${druid}</version> </dependency> </dependencies>

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpggzj.html