Hadoop 中利用 MapReduce 读写 MySQL 数据(2)

@Override
  public void readFields(ResultSet result) throws SQLException {
   this.id = result.getInt(1);
   this.name = result.getString(2);
  }

@Override
  public void write(DataOutput out) throws IOException {
   out.writeInt(this.id);
   Text.writeString(out, this.name);
  }
 }

// 记住此处是静态内部类,要不然你自己实现无参构造器,或者等着抛异常:
 // Caused by: java.lang.NoSuchMethodException: DBInputMapper.<init>()
 //
 // 网上脑残式的转帖,没见到一个写对的。。。
 public static class DBInputMapper extends MapReduceBase implements
   Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
  public void map(LongWritable key, StudentinfoRecord value,
    OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException {
   collector.collect(new LongWritable(value.id), new Text(value.toString()));
  }
 }

public static class MyReducer extends MapReduceBase implements
   Reducer<LongWritable, Text, StudentinfoRecord, Text> {
  @Override
  public void reduce(LongWritable key, Iterator<Text> values,
    OutputCollector<StudentinfoRecord, Text> output, Reporter reporter) throws IOException {
   String[] splits = values.next().toString().split(" ");
   StudentinfoRecord r = new StudentinfoRecord();
   r.id = Integer.parseInt(splits[0]);
   r.name = splits[1];
   output.collect(r, new Text(r.name));
  }
 }

public static void main(String[] args) throws IOException {
  JobConf conf = new JobConf(Mysql2Mr.class);
  DistributedCache.addFileToClassPath(new Path("/tmp/mysql-connector-java-5.0.8-bin.jar"), conf);

conf.setMapOutputKeyClass(LongWritable.class);
  conf.setMapOutputValueClass(Text.class);
  conf.setOutputKeyClass(LongWritable.class);
  conf.setOutputValueClass(Text.class);

conf.setOutputFormat(DBOutputFormat.class);
  conf.setInputFormat(DBInputFormat.class);
  // // mysql to hdfs
  // conf.setReducerClass(IdentityReducer.class);
  // Path outPath = new Path("/tmp/1");
  // FileSystem.get(conf).delete(outPath, true);
  // FileOutputFormat.setOutputPath(conf, outPath);

DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.101:3306/test",
    "root", "root");
  String[] fields = { "id", "name" };
  // 从 t 表读数据
  DBInputFormat.setInput(conf, StudentinfoRecord.class, "t", null, "id", fields);
  // mapreduce 将数据输出到 t2 表
  DBOutputFormat.setOutput(conf, "t2", "id", "name");
  // conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
  conf.setMapperClass(DBInputMapper.class);
  conf.setReducerClass(MyReducer.class);

JobClient.runJob(conf);
 }
}

接下来请看第二页:

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/8e57f0f9a972926ebb02b843fce5b2d6.html