使 Couchbase Server 能够与 Hadoop 连接器通信
Couchbase Server 使用 Sqoop 连接器与您的 Hadoop 集群通信。Sqoop 提供了一个连接在 Hadoop 与 Couchbase Server 之间批量传输数据。
从技术上讲,Sqoop 是一个设计用于在结构化数据库与 Hadoop 之间转换信息的应用程序。Sqoop 这个名称实际上来源于 SQL 和 Hadoop。
安装 Sqoop
如果使用 CDH3 安装,您可使用报管理器来安装 Sqoop:$ sudo apt-get install sqoop。
这将把 Sqoop 安装在 /usr/lib/sqoop 中。
注意:Sqoop 中一个最新的 bug 表明它有时会尝试传输uowu的数据集。修补程序包含在 Sqoop 1.4.2 版中。如果遇到问题,请尝试使用 V1.4.2 或更高的版本。
安装 Couchbase Hadoop Connector
Couchbase Hadoop Connector 是一个支持 Sqoop 与 Couchbase 之间的连接的 Java jar 文件集合。从 Couchbase 网站下载 Hadoop 连接器(参阅 参考资料)。该文件封装为一个 zip 文件。解压它,然后运行其中的 install.sh 脚本,提供 Sqoop 系统的位置。例如:$ sudo bash install.sh /usr/lib/sqoop。
这将安装所有必要的库和配置文件。现在我们可以开始在两个系统之间交换信息了。
将数据从 Couchbase Server 导入 Hadoop
尽管该场景不是我们这里将直接处理的场景,但需要注意我们可从 Couchbase Server 将数据导入 Hadoop。如果您在 Couchbase Server 中加载了大量数据,并希望利用 Hadoop 来处理和简化它,这可能很有用。为此,您可以使用以下命令,从 Couchbase Server 将整个数据集加载到 HDFS 中的一个 Hadoop 文件中:$ sqoop import --connect :8091/pools --table cbdata。
这里提供的 URL 是 Couchbase Server 桶池 (bucket pool) 的位置。这里指定的表实际上是 HDFS 中将存储数据的目录的名称。
数据本身被存储为来自 Couchbase Server 的信息的一种键/值转储形式。在 Couchbase Server 2.0 中,这意味着数据是使用惟一文档 ID 写出的,包含记录的 JSON 值。
将 JSON 数据写入 Hadoop MapReduce
要在 Hadoop 与 Couchbase Server 之间交换信息,需要使用一种通用语言来表达这些信息,在本例中使用的是 JSON(参见 清单 5)。
清单 5. 在 Hadoop MapReduce 中输出 JSON
package org.mcslp; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; import com.google.gson.*; public class WordCount { public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, Text> { class wordRecord { private String word; private int count; wordRecord() { } } public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } wordRecord word = new wordRecord(); word.word = key.toString();; word.count = sum; Gson json = new Gson(); System.out.println(json.toJson(word)); output.collect(key, new Text(json.toJson(word))); } } public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(Map.class); conf.setReducerClass(Reduce.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); } }该代码是 Hadoop 发行版所提供的字数示例的修改版。
此版本使用 Google Gson 库从处理过程的精减阶段写入 JSON 信息。为了方便起见,我们使用了一个新类 (wordRecord),它由 Gson 转换为一条 JSON 记录,这种记录是 Couchbase Server 逐个文档地处理和解析内容所需的格式。
请注意,我们没有为 Hadoop 定义一个 Combiner 类。这将阻止 Hadoop 尝试重新精减该信息,该操作在当前的代码中会失败,因为我们的精减阶段仅接收该单词和一位数,并输出一个 JSON 值。对于辅助的精减/组合阶段,我们需要解析 JSON 输入或定义一个新 Combiner 类,以便输出信息的 JSON 版本。这稍微简化了定义。
要在 Hadoop 中使用此代码,首先需要将 Google Gson 库复制到 Hadoop 目录中 (/usr/lib/hadoop/lib)。然后重新启动 Hadoop,以确保 Hadoop 已经正确识别出该库。
接下来,将您的代码编译到一个目录中: $ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar:./google-gson-2.2.1/gson-2.2.1.jar -d wordcount_classes WordCount.java 。
现在为您的库创建一个 jar 文件: $ jar -cvf wordcount.jar -C wordcount_classes/。
完成此过程后,您可以将一些文本文件复制到某个目录中,然后使用此 jar 文件将这些文本文件处理为许多独立的单词,创建一条 JSON 记录来包含每个单词和计数。例如,要在一些 Project Gutenberg 文本上处理此数据: $ hadoop jar wordcount.jar org.mcslp.WordCount /user/mc/gutenberg /user/mc/gutenberg-output。
这将在我们的目录中生成已由 Hadoop 内的 MapReduce 函数统计的单词列表。
将数据从 Hadoop 导出到 Couchbase Server
要从 Hadoop 取回数据并导入 Couchbase Server 中,则需要使用 Sqoop 导出该数据: $ sqoop export --connect :8091/pools --table ignored --export-dir gutenberg-output。
此示例中忽略了 --table 参数,但 --export-dir 是要导出的信息所在的目录的名称。