用户提交给Hadoop client的command,指定了输入路径,输出路径,如下所示:
cmd="${HADOOP_HOME}/bin/hadoop bistreaming \
-input ${LINK_PATH}/part-* \
-input ${PATCH_PATH}/* \
-output ${UNI_PATH} \
-inputformat org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat \
-outputformat org.apache.hadoop.mapred.SequenceFileAsBinaryOutputFormat \
-partitioner com.baidu.sos.mapred.lib.IntHashPartitioner \
-mapper 'sh dlb_merge1/dlb_merge1.sh' \
-reducer 'sh dlb_merge1/dlb_merge1.sh' \
-mapdebug 'sh dlb_merge1/dlb_debug.sh' \
-reducedebug 'sh dlb_merge1/dlb_debug.sh' \
-file ${HADOOP_COMMON_CONF} \
-cacheArchive '${ARCHIVE_PATH}/dlb_merge1.tar.gz#dlb_merge1' \
-jobconf mapred.compress.map.output=true \
-jobconf mapred.map.output.compression.codec=org.apache.hadoop.io.compress.LzoCodec \
-jobconf mapred.output.compress=true \
-jobconf mapred.output.compression.type=BLOCK \
-jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.LzoCodec \
-jobconf mapred.min.split.size=$dlb_saver_merge1_split_size \
-jobconf dfs.block.size=$dlb_saver_merge1_block_size \
-jobconf mapred.map.max.attempts=$dlb_saver_merge1_map_max_attempts \
-jobconf mapred.reduce.max.attempts=$dlb_saver_merge1_reduce_max_attempts \
-jobconf mapred.job.name=dlb_merge1.${start_time} \
-jobconf mapred.reduce.tasks=${dlb_saver_merge1_reduce_num}"
hadoop client会解析你的command,从而知道:输入从哪找,输出存到哪里,map程序,reduce程序,等等。
client会将这些信息提交namenode。namenode里存放了目录,它知道输入的数据,实际存在哪些机器(datanode)上,另外2个备份存在哪里。同时,namenode还会将要执行的任务告诉jobtracker,然后由jobtracker控制这些datanode。
工作过程:
map阶段
namenode首先将输入的源进行切分,按照任务配置的处理能力和集群整体处理能力(槽位)的不同(以低者计数),数据源会被切分成不同的份数,每一份会被同时发往3个机器的tasktracker,3台机器同时跑同一个任务,总控的jobtracker会按时轮询该任务的运行情况,当这三台机器有一台数据明显偏慢时,在某个轮询周期内,该机器的任务会被kill,同时会有个百分比,在有一台机器该任务完成到一定百分比的时候,跑的慢的其它两台机器的相同任务也会被kill,产出会被保存在各个机器的临时目录内。
reduce阶段
然后jobtracker会按照一定规则重组所有表示成功完成任务的tasktracker中的数据,称为reduce阶段,重组后的数据作为hadoop的该任务的输出。
reduce的过程中有一个最近原则,就是某台机器上map的输入结果,如果空间允许,会在本机进行reduce,如果空间不足,会将数据传到最近的机器上进行reduce。
当然,不是所有的job都要有reduce阶段。