使用Flume消费Kafka数据到HDFS

对于数据的转发,Kafka是一个不错的选择。Kafka能够装载数据到消息队列,然后等待其他业务场景去消费这些数据,Kafka的应用接口API非常的丰富,支持各种存储介质,例如HDFS、HBase等。如果不想使用Kafka API编写代码去消费Kafka Topic,也是有组件可以去集成消费的。下面笔者将为大家介绍如何使用Flume快速消费Kafka Topic数据,然后将消费后的数据转发到HDFS上。

2.内容

在实现这套方案之间,可以先来看看整个数据的流向,如下图所示:

使用Flume消费Kafka数据到HDFS

业务数据实时存储到Kafka集群,然后通过Flume Source组件实时去消费Kafka业务Topic获取数据,将消费后的数据通过Flume Sink组件发送到HDFS进行存储。

2.1 准备基础环境

按照上图所示数据流向方案,需要准备好Kafka、Flume、Hadoop(HDFS可用)等组件。

2.1.1 启动Kafka集群并创建Topic

Kafka目前来说,并没有一个批量的管理脚本,不过我们可以对kafka-server-start.sh脚本和kafka-server-stop.sh脚本进行二次封装。代码如下所示:

#! /bin/bash # Kafka代理节点地址, 如果节点较多可以用一个文件来存储 hosts=(dn1 dn2 dn3) # 打印启动分布式脚本信息 mill=`date "+%N"` tdate=`date "+%Y-%m-%d %H:%M:%S,${mill:0:3}"` echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation. # 执行分布式开启命令 function start() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.;kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" & sleep 1 done } # 执行分布式关闭命令 function stop() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.;kafka-server-stop.sh>/dev/null;" & sleep 1 done } # 查看Kafka代理节点状态 function status() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S,${smill:0:3}"` ssh hadoop@$i "source /etc/profile;echo [$stdate] INFO [Kafka Broker $i] status message is :;jps | grep Kafka;" & sleep 1 done } # 判断输入的Kafka命令参数是否有效 case "$1" in start) start ;; stop) stop ;; status) status ;; *) echo "Usage: $0 {start|stop|status}" RETVAL=1 esac

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpfygz.html