Hadoop的环境搭建和编写一个简单的hadoop job

0hadoop的简要介绍

google之所以能够成功,一个重要的技术就是map-reduce。map-reduce是google为大规模的、分布式数据进行处理的一种编程模式。

而本文介绍的hadoop是apache的开源map-reduce实现。本文不过多的介绍map-reduce,主要精力放在hadoop的配置和编写

一个简单的haoop程序上

hadoop服务器的安装:
hadoop是一个分布式的处理框架,本文先介绍的是一个简单的伪分布式hadoop(安装在一个linux机器上)

配置环境是Ubuntu
创建一个新文件/etc/sources.list.d/cloudera.list
把下边的内容复制到新文件:
 
deb intrepid-cdh3 contrib
deb-src intrepid-cdh3 contrib
 
然后打开teminal输入下边的命令:
$ curl -s | \
sudo apt-key add - sudo apt-get update

然后,安装采用伪分布式配置的 Hadoop(所有 Hadoop 守护进程在同一个主机上运行):

$ sudo apt-get install hadoop-0.20-conf-pseudo
 

确保系统已经安装了sshd(如果没有,请先安装)。
 设置不需要密码的ssh:
    
$ sudo su -
# ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
# cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys

启动hadoop:
首先对namenode进行格式化:
# hadoop-0.20 namenode -format

Hadoop 提供一些简化启动的辅助工具。这些工具分为启动(比如 start-dfs)和停止(比如 stop-dfs)两类。下面的简单脚本说明如何启动 Hadoop 节点:

# /usr/lib/hadoop-0.20/bin/start-dfs.sh
# /usr/lib/hadoop-0.20/bin/start-mapred.sh
#
 
输入命令jps可以查看守护进程是否正在运行;

编写一个hadoop程序:
作为联系,我们从网上下载一个cvs格式的数据文件:

cvs是以逗号进行列分割的数据文件。
使用opencvs可以很方便的处理cvs格式的数据。
opencvs可以从sourceforge上下载。
opencvs可以把一个string以逗号进行分割成一个string数组
只扩展 Hadoop 的 Mapper 类。然后我可以使用泛型来为传出键和值指定显式类。类型子句也指定了传入键和值,这对于读取文件分别是字节数和文本行数。

EarthQuakesPerDateMapper 类扩展了 Hadoop 的 Mapper 对象。它显式地将其输出键指定为一个 Text 对象,将其值指定为一个 IntWritable,这是一个 Hadoop 特定类,实质上是一个整数。还要注意,class 子句的前两个类型是 LongWritable 和 Text,分别是字节数和文本行数。

由于类定义中的类型子句,我将传入 map 方法的参数类型设置为在 context.write 子句内带有该方法的输出。如果我想指定其他内容,将会出现一个编译器问题,或 Hadoop 将输出一个错误消息,描述类型不匹配的消息。

一个mapper的实现:

public class EarthQuakesPerDateMapper extends
  Mapper<LongWritable, Text, Text, IntWritable> {
 @Override
 protected void map(LongWritable key, Text value, Context context)
   throws IOException, InterruptedException {

if (key.get() > 0) {
   try {
    CSVParser parser = new CSVParser();
    String[] lines = parser.parseLine(value.toString());
    lines = new CSVParser().parseLine(lines[0]);
    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMddHHmm");
    Date dt = formatter.parse(lines[0]);
    formatter.applyPattern("dd-MM-yyyy");

String dtstr = formatter.format(dt);
    context.write(new Text(dtstr), new IntWritable(1));
   } catch (java.text.ParseException e) {
    // TODO Auto-generated catch block
    //e.printStackTrace();
   }
  }
 }
}
reduce 实现如下 所示。与 Hadoop 的 Mapper 一样,Reducer 被参数化了:前两个参数是传入的键类型(Text)和值类型(IntWritable),后两个参数是输出类型:键和值,这在本例中是相同的。


public class EarthQuakesPerDateReducer extends
  Reducer<Text, IntWritable, Text, IntWritable> {
 @Override
 protected void reduce(Text key, Iterable<IntWritable> values,
   Context context) throws IOException, InterruptedException {
  int count = 0;
  for (IntWritable value : values) {
   count++;
  }
  context.write(key, new IntWritable(count));
 }
}

写好mapper和reducer之后,就可以定义一个hadoop job了。

public class EarthQuakesPerDayJob {
 public static void main(String[] args) throws Throwable {
  Job job = new Job();
  job.setJarByClass(EarthQuakesPerDayJob.class);
  FileInputFormat.addInputPath(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://127.0.0.1/wyygfs.html