Win7中使用Eclipse连接虚拟机中的Ubuntu中的Hadoop2.(3)

经过前面的学习,基本上可以小试牛刀编写一些小程序玩一玩了,在此之前做几项准备工作

明确我要用Hadoop干什么

大体学习一下mapreduce

Ubuntu重启后,再启动hadoop会报连接异常的问题

答:

数据提炼、探索数据、挖掘数据

map=切碎,reduce=合并

重启后会清空tmp文件夹,默认namenode会存在这里,需要在core-site.xml文件中增加(别忘了创建文件夹,没权限的话,需要用root创建并把权限改成777):

<property>
    <name>hadoop.tmp.dir</name>
    <value>/usr/local/hadoop/tmp</value>
</property>

大数据,我的第一反应是现有关系型数据库中的数据怎么跟hadoop结合使用,网上搜了一些资料,使用的是DBInputFormat,那就简单编写一个从数据库读取数据,然后经过处理后,生成文件的小例子吧

数据库弄的简单一点吧,id是数值整型、test是字符串型,需求很简单,统计TEST字段出现的数量

Win7中使用Eclipse连接虚拟机中的Ubuntu中的Hadoop2.4

数据读取类:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class DBRecoder implements Writable, DBWritable{
 String test;
 int id;
 @Override
 public void write(DataOutput out) throws IOException {
  out.writeUTF(test);
  out.writeInt(id);
 }
 @Override
 public void readFields(DataInput in) throws IOException {
  test = in.readUTF();
  id = in.readInt();
 }
 @Override
 public void readFields(ResultSet arg0) throws SQLException {
  test = arg0.getString("test");
  id = arg0.getInt("id");
 }
 @Override
 public void write(PreparedStatement arg0) throws SQLException {
  arg0.setString(1, test);
  arg0.setInt(2, id);
 }
}

mapreduce操作类

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class DataCountTest {
 public static class TokenizerMapper extends Mapper<LongWritable, DBRecoder, Text, IntWritable> {
  public void map(LongWritable key, DBRecoder value, Context context) throws IOException, InterruptedException {
   context.write(new Text(value.test), new IntWritable(1));
  }
 }

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
    Context context) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : values) {
    sum += val.get();
   }
   result.set(sum);
   context.write(key, result);
  }
 }

public static void main(String[] args) throws Exception {
  args = new String[1];
  args[0] = "hdfs://192.168.203.137:9000/user/chenph/output1111221";

Configuration conf = new Configuration();
 
        DBConfiguration.configureDB(conf, "Oracle.jdbc.driver.OracleDriver", 
                "jdbc:oracle:thin:@192.168.101.179:1521:orcl", "chenph", "chenph"); 
 
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

Job job = new Job(conf, "DB count");
 
  job.setJarByClass(DataCountTest.class);
  job.setMapperClass(TokenizerMapper.class);
  job.setReducerClass(IntSumReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  job.setMapOutputKeyClass(Text.class); 
  job.setMapOutputValueClass(IntWritable.class); 
        String[] fields1 = { "id", "test"}; 
        DBInputFormat.setInput(job, DBRecoder.class, "t1", null, "id",  fields1); 

FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));
 
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

开发过程中遇到的问题:

Job被标记为已作废,那应该用什么我还没有查到

乱码问题,hadoop默认是utf8格式的,如果读取的是gbk的需要进行处理

这类例子网上挺少的,有也是老版的,新版的资料没有,我完全是拼凑出来的,很多地方还不甚了解,需要进一步学习官方资料

搜索资料时,有资料说不建议采用这种方式处理实际的大数据问题,原因就是并发过高,会瞬间秒杀掉数据库,一般都会采用导成文本文件的形式

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/521b71caad7be3ac372a8b96ed95f8eb.html