Hadoop之服务器基础环境搭建(8)

我们都知道一个地址拥有着多家公司,本案例将通过两种类型输入文件:address类(地址)和company类(公司)进行一对多的关联查询,得到地址名(例如:Beijing)与公司名(例如:Beijing JD、Beijing Red Star)的关联信息。

开发环境

硬件环境:CentOS 6.5 服务器4台(一台为Master节点,三台为Slave节点)
软件环境:Java 1.7.0_45、Hadoop-1.2.1

1、 Map过程

首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容并存入< key,value>例如<0,” 1:Beijing”>。Map过程首先按照输入文件的类型不同对输入信息进行不同的处理,例如,对于address类型输入文件将value值(”1:Beijing”)处理成<”1”,”address:Beijing”>,对于company类型输入文件将value值(”Beijing Red Star:1”)处理成<”1”,”company:Beijing Red Star”>,如图所示:

技术分享

Map端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static class MapClass extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { Text addressId = new Text(); Text info = new Text(); String[] line = value.toString().split(":");// 获取文件的每一行数据,并以":"分割 String path = ((FileSplit) context.getInputSplit()).getPath().toString(); if (line.length < 2) { return; } if (path.indexOf("company") >= 0) {//处理company文件的value信息: "Beijing Red Star:1" addressId.set(line[1]);//"1" info.set("company" + ":" + line[0]);//"company:Beijing Red Star" context.write(addressId,info);//<key,value> --<"1","company:Beijing Red Star"> } else if (path.indexOf("address") >= 0) {//处理adress文件的value信息:"1:Beijing" addressId.set(line[0]);//"1" info.set("address" + ":" + line[1]);//"address:Beijing" context.write(addressId,info);//<key,value> --<"1","address:Beijing"> } } } 2、 Reduce过程

Reduce过程首先对输入< key,values>即<”1”,[“company:Beijing Red Star”,”company:Beijing JD”,”address:Beijing”]>的values值进行遍历获取到单元信息value(例如”company:Beijing Red Star”),然后根据value中的标识符(company和address)将公司名和地址名分别存入到company集合和address集合,最后对company集合和address集合进行笛卡尔积运算得到company与address的关系,并进行输出,如图所示。

技术分享

Reduce端核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static class ReduceClass extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { List<String> companys = new ArrayList<String>(); List<String> addresses = new ArrayList<String>(); //["company:Beijing Red Star","company:Beijing JD","address:Beijing"] Iterator<Text> it = values.iterator(); while(it.hasNext()){ String value = it.next().toString();//"company:Beijing Red Star" String[] result = value.split(":"); if(result.length >= 2){ if(result[0].equals("company")){ companys.add(result[1]); }else if(result[0].equals("address")){ addresses.add(result[1]); } } } // 求笛卡尔积 if(0 != companys.size()&& 0 != addresses.size()){ for(int i=0;i<companys.size();i++){ for(int j=0;j<addresses.size();j++){ context.write(new Text(companys.get(i)), new Text(addresses.get(j)));//<key,value>--<"Beijing JD","Beijing"> } } } } } 3、 驱动实现

驱动核心代码实现如下,详细源码请参考:CompanyJoinAddress\src\com\zonesion\tablejoin\CompanyJoinAddress.java。

public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 3) { System.err.println("Usage: company Join address <companyTableDir> <addressTableDir> <out>"); System.exit(2); } Job job = new Job(conf, "company Join address"); //设置Job入口类 job.setJarByClass(CompanyJoinAddress.class); // 设置Map和Reduce处理类 job.setMapperClass(MapClass.class); job.setReducerClass(ReduceClass.class); // 设置输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 设置输入和输出目录 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//companyTableDir FileInputFormat.addInputPath(job, new Path(otherArgs[1]));//addressTableDir FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));//out System.exit(job.waitForCompletion(true) ? 0 : 1); } 4、部署运行 1)启动Hadoop集群 [hadoop@K-Master ~]$ start-dfs.sh [hadoop@K-Master ~]$ start-mapred.sh [hadoop@K-Master ~]$ jps 5283 SecondaryNameNode 5445 JobTracker 5578 Jps 5109 NameNode 2)部署源码 #设置工作环境 [hadoop@K-Master ~]$ mkdir -p /usr/hadoop/workspace/MapReduce #部署源码 将CompanyJoinAddress文件夹拷贝到/usr/hadoop/workspace/MapReduce/ 路径下;

… 你可以直接 下载 CompanyJoinAddress

3)编译文件 #切换工作目录 [hadoop@K-Master ~]$ cd /usr/hadoop/workspace/MapReduce/CompanyJoinAddress #编译文件 [hadoop@K-Master CompanyJoinAddress]$ javac -classpath /usr/hadoop/hadoop-core-1.2.1.jar:/usr/hadoop/lib/commons-cli-1.2.jar -d bin src/com/zonesion/tablejoin/CompanyJoinAddress.java [hadoop@K-Master CompanyJoinAddress]$ ls bin/com/zonesion/tablejoin/* -la -rw-rw-r-- 1 hadoop hadoop 1909 8月 1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress.class -rw-rw-r-- 1 hadoop hadoop 2199 8月 1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress$MapClass.class -rw-rw-r-- 1 hadoop hadoop 2242 8月 1 10:29 bin/com/zonesion/tablejoin/CompanyJoinAddress$ReduceClass.class 4)打包jar文件 [hadoop@K-Master CompanyJoinAddress]$ jar -cvf CompanyJoinAddress.jar -C bin/ . added manifest adding: com/(in = 0) (out= 0)(stored 0%) adding: com/zonesion/(in = 0) (out= 0)(stored 0%) adding: com/zonesion/tablejoin/(in = 0) (out= 0)(stored 0%) adding: com/zonesion/tablejoin/CompanyJoinAddress$MapClass.class(in = 2273) (out= 951)(deflated 58%) adding: com/zonesion/tablejoin/CompanyJoinAddress$ReduceClass.class(in = 2242) (out= 1029)(deflated 54%) adding: com/zonesion/tablejoin/CompanyJoinAddress.class(in = 1909) (out= 983)(deflated 48%) 5)上传输入文件 #创建company输入文件夹 [hadoop@K-Master CompanyJoinAddress]$ hadoop fs -mkdir CompanyJoinAddress/input/company/ #创建address输入文件夹 [hadoop@K-Master CompanyJoinAddress]$ hadoop fs -mkdir CompanyJoinAddress/input/address/ #上传文件到company输入文件夹 [hadoop@K-Master CompanyJoinAddress]$ hadoop fs -put input/company* CompanyJoinAddress/input/company/ #上传文件到address输入文件夹 [hadoop@K-Master CompanyJoinAddress]$ hadoop fs -put input/address* CompanyJoinAddress/input/address/ 6)运行Jar文件 [hadoop@K-Master CompanyJoinAddress]$ hadoop jar CompanyJoinAddress.jar com.zonesion.tablejoin.CompanyJoinAddress CompanyJoinAddress/input/company/ CompanyJoinAddress/input/address/ CompanyJoinAddress/output 14/08/01 10:50:05 INFO input.FileInputFormat: Total input paths to process : 4 14/08/01 10:50:05 INFO util.NativeCodeLoader: Loaded the native-hadoop library 14/08/01 10:50:05 WARN snappy.LoadSnappy: Snappy native library not loaded 14/08/01 10:50:05 INFO mapred.JobClient: Running job: job_201408010921_0008 14/08/01 10:50:06 INFO mapred.JobClient: map 0% reduce 0% 14/08/01 10:50:09 INFO mapred.JobClient: map 50% reduce 0% 14/08/01 10:50:10 INFO mapred.JobClient: map 100% reduce 0% 14/08/01 10:50:17 INFO mapred.JobClient: map 100% reduce 100% 14/08/01 10:50:17 INFO mapred.JobClient: Job complete: job_201408010921_0008 14/08/01 10:50:17 INFO mapred.JobClient: Counters: 29 ...... 7)查看输出结果 [hadoop@K-Master CompanyJoinAddress]$ hadoop fs -ls CompanyJoinAddress/output Found 3 items -rw-r--r-- 1 hadoop supergroup 0 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/_SUCCESS drwxr-xr-x - hadoop supergroup 0 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/_logs -rw-r--r-- 1 hadoop supergroup 241 2014-08-01 10:50 /user/hadoop/CompanyJoinAddress/output/part-r-00000 [hadoop@K-Master CompanyJoinAddress]$ hadoop fs -cat CompanyJoinAddress/output/part-r-00000 Beijing Red Star Beijing Beijing Rising Beijing Back of Beijing Beijing Beijing JD Beijing xiaomi Beijing Guangzhou Honda Guangzhou Guangzhou Development Bank Guangzhou Shenzhen Thunder Shenzhen Tencent Shenzhen aiplay hangzhou huawei wuhan

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/15b442496928210d0ed65a66fc213372.html