实践:使用 Apache Hadoop 处理日志(2)

练习 3. 编写一个简单的 MapReduce 应用程序

按照 用 Hadoop 进行分布式数据处理,第 3 部分:应用程序开发() 中的演示,编写一个单词数映射和缩减应用程序非常简单。使用本文中演示的 Ruby 示例,开发一个 Python 映射和缩减应用程序,并在样例数据集上运行它们。回想一下,Hadoop 对映射的输出进行了排序,所以词语很可能是连续的,这为缩减程序 (reducer) 提供了一种有用的优化。

练习 4. 编写一个简单的 Pig 查询

正如您在 使用 Apache Pig 处理数据 () 中所看到的,Pig 允许您构建一个简单的、可转换成 MapReduce 应用程序的脚本。在本练习中,您提取了所有的日志条目(来自 /var/log/messages),这些日志条目中包含单词 kernel: 和单词 terminating。

创建一个根据预定义的标准提取所有日志行的脚本。

练习 5. 编写一个聚合 Pig 查询

日志消息由 Linux 内核(比如 kernel 或 dhclient)中的各种来源生成。在本例中,您需要找出生成日志消息的各种来源,以及每个来源生成的日志消息数量。

创建一个对每个日志来源生成的日志消息数进行计数的脚本。

练习解决方案

具体的输出取决于特定的 Hadoop 安装和配置。

练习 1 的解决方案:建立一个简单的 Hadoop 环境并运行它

在 练习 1 中,您在 HDFS 上执行了一个 ls 命令。清单 1 展示了恰当的解决方案。

清单 1. 在 HDFS 上执行一个 ls 操作
    
$ hadoop dfs -ls /
drwxrwxrwx    - hue       supergroup           0 2011-12-10 06:56 /tmp
drwxr-xr-x    - hue       supergroup           0 2011-12-08 05:20 /user
drwxr-xr-x    - mapred    supergroup           0 2011-12-08 10:06 /var
$

所显示文件的多少取决于具体的使用。

练习 2 的解决方案:与 HDFS 进行交互

在 练习 2 中,您在 HDFS 内创建了一个子目录,并将一个文件复制到这个子目录中。请注意,您是通过将内核消息缓冲器移动到某个文件中来创建测试数据的。对于额外的练习,请使用 cat 命令(参见 清单 2)查看 HDFS 内的文件。


清单 2. 操作 HDFS
    
$ dmesg > kerndata
$ hadoop dfs -mkdir /test
$ hadoop dfs -ls /test
$ hadoop dfs -copyFromLocal kerndata /test/mydata
$ hadoop dfs -cat /test/mydata
Linux version 2.6.18-274-7.1.el5 (mockbuild@builder10.CentOS.org)...
...
e1000: eth0 NIC Link is Up 1000 Mbps Full Duplex, Flow Control: RX
$

练习 3 的解决方案:编写一个简单的 MapReduce 应用程序

在 练习 3 中,您用 Python 语言创建了一个简单的单词计数 MapReduce 应用程序。Python 实际上是实现单词计数示例的一种极好的语言。您可以在 Writing a Hadoop MapReduce Program in Python(由 Michael G. Noll 编著)中发现一个关于 Python MapReduce 的有用的整理文章。

本示例假设您执行了练习 2 的步骤(将数据摄入 HDFS)。清单 3 提供了映射应用程序。


清单 3. 用 Python 编写的映射应用程序
    
#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '%s\t1' % word
 

清单 4 提供了缩减应用程序。


清单 4. 用 Python 编写的缩减应用程序
    
#!/usr/bin/env python

from operator import itemgetter
import sys

last_word = None
last_count = 0
cur_word = None

for line in sys.stdin:
    line = line.strip()

    cur_word, count = line.split('\t', 1)

    count = int(count)

    if last_word == cur_word:
        last_count += count
    else:
        if last_word:
           print '%s\t%s' % (last_word, last_count)
           last_count = count
        last_word = cur_word

if last_word == cur_word:
    print '%s\t%s' % (last_word, last_count)

清单 5 说明了在 Hadoop 中调用 Python MapReduce 示例的流程。


清单 5. 使用 Hadoop 测试 Python MapReduce
    
$ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u2.jar \
  -file pymap.py -mapper pymap.py -file pyreduce.py -reducer pyreduce.py \
  -input /test/mydata -output /test/output
...
$ hadoop dfs -cat /test/output/part-00000
...
write 3
write-combining 2
wrong. 1
your 2
zone: 2
zonelists. 1

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/0ff105f51ee5c601f9ef4bdcf5beb688.html