通过已经获得的在 流 实用工具基本理解上的经验,您已经准备编写一个简单的 Ruby map 和 reduce 应用程序并查看如何在 Hadoop 框架中使用过程。虽然此处的示例伴随着规范的 MapReduce 应用程序,但是稍后您将看到其他的应用程序(取决于您将如何用 map 和 reduce 格式实现它们)。
首选是 mapper。此脚本从 stdin 提取文本输入,首先标记它,然后将一系列键值对发送到 stdout。像大多数面向对象的脚本语言一样,这个任务几乎太简单了。如清单 4 中所示的 mapper 脚本(通过一些注释和空白区域可给与其大一点的大小)。此程序使用一个迭代器来从 stdin 中读取一行,同时另一个迭代器将该行分割成单个的标记。使用为 1 的相关值(通过选项卡分隔)将每一个标记(单词)发送到 stdout。
清单 4. Ruby map 脚本(map.rb)
#!/usr/bin/env ruby # Our input comes from STDIN STDIN.each_line do |line| # Iterate over the line, splitting the words from the line and emitting # as the word with a count of 1. line.split.each do |word| puts "#{word}\t1" end end
下一步,查看 reduce 应用程序。虽然此应用程序稍微有些复杂,但是使用 Ruby hash(关联阵列)可简化 reduce 操作(请参考清单 5)。此脚本可通过来自 stdin (通过 流 实用工具传递)的输入数据再次工作且将该行分割成一个单词或值。而后该 hash 会检查该单词;如果发现,则将计数添加到元素。否则,您需要在该单词的 hash 中创建新的条目,然后加载计数(应该是来自 mapper 过程的 1)。在所有输入都被处理以后,通过 hash 可简单迭代且将键值对发送到 stdout。
清单 5. Ruby reduce 脚本(reduce.rb)
#!/usr/bin/env ruby # Create an empty word hash wordhash = {} # Our input comes from STDIN, operating on each line STDIN.each_line do |line| # Each line will represent a word and count word, count = line.strip.split # If we have the word in the hash, add the count to it, otherwise # create a new one. if wordhash.has_key?(word) wordhash[word] += count.to_i else wordhash[word] = count.to_i end end # Iterate through and emit the word counters wordhash.each {|record, count| puts "#{record}\t#{count}"}
随着 map 和 reduce 脚本的完成,需从命令行测试它们。记得要使用 chmod +x 将这些文件更改为可执行。通过生成输入文件来启动,如清单 6 所示。
清单 6. 生成输入文件
# echo "Hadoop is an implementation of the map reduce framework for " "distributed processing of large data sets." > input #
通过单词输入,现在您可以测试您的 mapper 脚本,如清单 7 所示。回想此脚本简单地将输入标记到键值对,此处每个值都将是 1(非惟一输入)。
清单 7. 测试 mapper 脚本
# cat input | ruby map.rb Hadoop 1 is 1 an 1 implementation 1 of 1 the 1 map 1 reduce 1 framework 1 for 1 distributed 1 processing 1 of 1 large 1 data 1 sets. 1 #
到目前为止,一切都很顺利。现在,在原始流格式中将整个应用程序一起调出。在清单 8 中,通过 map 脚本传递您的输入、排序输出(可选步骤)、然后通过 reduce 脚本传递由此产生的中间数据。
清单 8. 使用 Linux 管道的简单 MapReduce
# cat input | ruby map.rb | sort | ruby reduce.rb large 1 of 2 framework 1 distributed 1 data 1 an 1 the 1 reduce 1 map 1 sets. 1 Hadoop 1 implementation 1 for 1 processing 1 is 1 #