Hadoop mapreduce核心功能描述(2)

Reducer的输出是没有排序的。

需要多少个Reduce?
Reduce的数目建议是0.95或1.75乘以 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。

用0.95,所有reduce可以在maps一完成时就立刻启动,开始传输map的输出结果。用1.75,速度快的节点可以在完成第一轮reduce任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。

增加reduce的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。

上述比例因子比整体数目稍小一些是为了给框架中的推测性任务(speculative-tasks) 或失败的任务预留一些reduce的资源。

无Reducer
如果没有归约要进行,那么设置reduce任务的数目为零是合法的。

这种情况下,map任务的输出会直接被写入由 setOutputPath(Path)指定的输出路径。框架在把它们写入FileSystem之前没有对它们进行排序。

Partitioner
Partitioner用于划分键值空间(key space)。

Partitioner负责控制map输出结果key的分割。Key(或者一个key子集)被用于产生分区,通常使用的是Hash函数。分区的数目与一个作业的reduce任务的数目是一样的。因此,它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。

HashPartitioner是默认的 Partitioner。

Reporter
Reporter是用于Map/Reduce应用程序报告进度,设定应用级别的状态消息, 更新Counters(计数器)的机制。

Mapper和Reducer的实现可以利用Reporter 来报告进度,或者仅是表明自己运行正常。在那种应用程序需要花很长时间处理个别键值对的场景中,这种机制是很关键的,因为框架可能会以为这个任务超时了,从而将它强行杀死。另一个避免这种情况发生的方式是,将配置参数mapred.task.timeout设置为一个足够高的值(或者干脆设置为零,则没有超时限制了)。

应用程序可以用Reporter来更新Counter(计数器)。

OutputCollector
OutputCollector是一个Map/Reduce框架提供的用于收集 Mapper或Reducer输出数据的通用机制 (包括中间输出结果和作业的输出结果)。

Hadoop Map/Reduce框架附带了一个包含许多实用型的mapper、reducer和partitioner 的类库。

作业配置
JobConf代表一个Map/Reduce作业的配置。

JobConf是用户向Hadoop框架描述一个Map/Reduce作业如何执行的主要接口。框架会按照JobConf描述的信息忠实地去尝试完成这个作业,然而:

一些参数可能会被管理者标记为 final,这意味它们不能被更改。
一些作业的参数可以被直截了当地进行设置(例如: setNumReduceTasks(int)),而另一些参数则与框架或者作业的其他参数之间微妙地相互影响,并且设置起来比较复杂(例如: setNumMapTasks(int))。
通常,JobConf会指明Mapper、Combiner(如果有的话)、 Partitioner、Reducer、InputFormat和 OutputFormat的具体实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件应该写在哪儿 (setOutputPath(Path))。

JobConf可选择地对作业设置一些高级选项,例如:设置Comparator; 放到DistributedCache上的文件;中间结果或者作业输出结果是否需要压缩以及怎么压缩; 利用用户提供的脚本(setMapDebugScript(String)/setReduceDebugScript(String)) 进行调试;作业是否允许预防性(speculative)任务的执行 (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) ;每个任务最大的尝试次数 (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) ;一个作业能容忍的任务失败的百分比 (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) ;等等。

当然,用户能使用 set(String, String)/get(String, String) 来设置或者取得应用程序需要的任意参数。然而,DistributedCache的使用是面向大规模只读数据的。

任务的执行和环境
TaskTracker是在一个单独的jvm上以子进程的形式执行 Mapper/Reducer任务(Task)的。

子任务会继承父TaskTracker的环境。用户可以通过JobConf中的 mapred.child.java.opts配置参数来设定子jvm上的附加选项,例如: 通过-Djava.library.path=<> 将一个非标准路径设为运行时的链接用以搜索共享库,等等。如果mapred.child.java.opts包含一个符号@taskid@, 它会被替换成map/reduce的taskid的值。

下面是一个包含多个参数和替换的例子,其中包括:记录jvm GC日志; JVM JMX代理程序以无密码的方式启动,这样它就能连接到jconsole上,从而可以查看子进程的内存和线程,得到线程的dump;还把子jvm的最大堆尺寸设置为512MB, 并为子jvm的java.library.path添加了一个附加路径。

<property>
  <name>mapred.child.java.opts</name>
  <value>
     -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
     -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

用户或管理员也可以使用mapred.child.ulimit设定运行的子任务的最大虚拟内存。mapred.child.ulimit的值以(KB)为单位,并且必须大于或等于-Xmx参数传给JavaVM的值,否则VM会无法启动。

注意:mapred.child.java.opts只用于设置task tracker启动的子任务。为守护进程设置内存选项请查看 cluster_setup.html

${mapred.local.dir}/taskTracker/是task tracker的本地目录, 用于创建本地缓存和job。它可以指定多个目录(跨越多个磁盘),文件会半随机的保存到本地路径下的某个目录。当job启动时,task tracker根据配置文档创建本地job目录,目录结构如以下所示:

${mapred.local.dir}/taskTracker/archive/ :分布式缓存。这个目录保存本地的分布式缓存。因此本地分布式缓存是在所有task和job间共享的。
${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job目录。
${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job指定的共享目录。各个任务可以使用这个空间做为暂存空间,用于它们之间共享文件。这个目录通过job.local.dir 参数暴露给用户。这个路径可以通过API JobConf.getJobLocalDir()来访问。它也可以被做为系统属性获得。因此,用户(比如运行streaming)可以调用System.getProperty("job.local.dir")获得该目录。
${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 存放jar包的路径,用于存放作业的jar文件和展开的jar。job.jar是应用程序的jar文件,它会被自动分发到各台机器,在task启动前会被自动展开。使用api JobConf.getJar() 函数可以得到job.jar的位置。使用JobConf.getJar().getParent()可以访问存放展开的jar包的目录。
${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 一个job.xml文件,本地的通用的作业配置文件。
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 每个任务有一个目录task-id,它里面有如下的目录结构:
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 一个job.xml文件,本地化的任务作业配置文件。任务本地化是指为该task设定特定的属性值。这些值会在下面具体说明。
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output 一个存放中间过程的输出文件的目录。它保存了由framwork产生的临时map reduce数据,比如map的输出文件等。
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的当前工作目录。
${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的临时目录。(用户可以设定属性mapred.child.tmp 来为map和reduce task设定临时目录。缺省值是./tmp。如果这个值不是绝对路径, 它会把task的工作路径加到该路径前面作为task的临时文件路径。如果这个值是绝对路径则直接使用这个值。 如果指定的目录不存在,会自动创建该目录。之后,按照选项 -Djava.io.tmpdir='临时文件的绝对路径'执行java子任务。 pipes和streaming的临时文件路径是通过环境变量TMPDIR='the absolute path of the tmp dir'设定的)。 如果mapred.child.tmp有./tmp值,这个目录会被创建。
下面的属性是为每个task执行时使用的本地参数,它们保存在本地化的任务作业配置文件里:

名称 类型 描述
mapred.job.id String job id
mapred.jar String job目录下job.jar的位置
job.local.dir String job指定的共享存储空间
mapred.tip.id String task id
mapred.task.id String task尝试id
mapred.task.is.map boolean 是否是map task
mapred.task.partition int task在job中的id
map.input.file String map读取的文件名
map.input.start long map输入的数据块的起始位置偏移
map.input.length long map输入的数据块的字节数
mapred.work.output.dir String task临时输出目录

task的标准输出和错误输出流会被读到TaskTracker中,并且记录到 ${HADOOP_LOG_DIR}/userlogs

DistributedCache 可用于map或reduce task中分发jar包和本地库。子jvm总是把 当前工作目录 加到 java.library.path 和 LD_LIBRARY_PATH。 因此,可以通过 System.loadLibrary或 System.load装载缓存的库。有关使用分布式缓存加载共享库的细节请参考 native_libraries.html

作业的提交与监控
JobClient是用户提交的作业与JobTracker交互的主要接口。

JobClient 提供提交作业,追踪进程,访问子任务的日志记录,获得Map/Reduce集群状态信息等功能。

作业提交过程包括:

检查作业输入输出样式细节
为作业计算InputSplit值。
如果需要的话,为作业的DistributedCache建立必须的统计信息。
拷贝作业的jar包和配置文件到FileSystem上的Map/Reduce系统目录下。
提交作业到JobTracker并且监控它的状态。
作业的历史文件记录到指定目录的"_logs/history/"子目录下。这个指定目录由hadoop.job.history.user.location设定,默认是作业输出的目录。因此默认情况下,文件会存放在mapred.output.dir/_logs/history目录下。用户可以设置hadoop.job.history.user.location为none来停止日志记录。

用户使用下面的命令可以看到在指定目录下的历史日志记录的摘要。
$ bin/hadoop job -history output-dir
这个命令会打印出作业的细节,以及失败的和被杀死的任务细节。
要查看有关作业的更多细节例如成功的任务、每个任务尝试的次数(task attempt)等,可以使用下面的命令
$ bin/hadoop job -history all output-dir


用户可以使用 OutputLogFilter 从输出目录列表中筛选日志文件。

一般情况,用户利用JobConf创建应用程序并配置作业属性, 然后用 JobClient 提交作业并监视它的进程。

作业的控制
有时候,用一个单独的Map/Reduce作业并不能完成一个复杂的任务,用户也许要链接多个Map/Reduce作业才行。这是容易实现的,因为作业通常输出到分布式文件系统上的,所以可以把这个作业的输出作为下一个作业的输入实现串联。

然而,这也意味着,确保每一作业完成(成功或失败)的责任就直接落在了客户身上。在这种情况下,可以用的控制作业的选项有:

runJob(JobConf):提交作业,仅当作业完成时返回。
submitJob(JobConf):只提交作业,之后需要你轮询它返回的 RunningJob句柄的状态,并根据情况调度。
JobConf.setJobEndNotificationURI(String):设置一个作业完成通知,可避免轮询。
作业的输入
InputFormat 为Map/Reduce作业描述输入的细节规范。

Map/Reduce框架根据作业的InputFormat来:

检查作业输入的有效性。
把输入文件切分成多个逻辑InputSplit实例, 并把每一实例分别分发给一个 Mapper。
提供RecordReader的实现,这个RecordReader从逻辑InputSplit中获得输入记录, 这些记录将由Mapper处理。
基于文件的InputFormat实现(通常是 FileInputFormat的子类) 默认行为是按照输入文件的字节大小,把输入数据切分成逻辑分块(logical InputSplit )。 其中输入文件所在的FileSystem的数据块尺寸是分块大小的上限。下限可以设置mapred.min.split.size 的值。

考虑到边界情况,对于很多应用程序来说,很明显按照文件大小进行逻辑分割是不能满足需求的。 在这种情况下,应用程序需要实现一个RecordReader来处理记录的边界并为每个任务提供一个逻辑分块的面向记录的视图。

TextInputFormat 是默认的InputFormat。

如果一个作业的Inputformat是TextInputFormat, 并且框架检测到输入文件的后缀是.gz和.lzo,就会使用对应的CompressionCodec自动解压缩这些文件。 但是需要注意,上述带后缀的压缩文件不会被切分,并且整个压缩文件会分给一个mapper来处理。

InputSplit
InputSplit 是一个单独的Mapper要处理的数据块。

一般的InputSplit 是字节样式输入,然后由RecordReader处理并转化成记录样式。

FileSplit 是默认的InputSplit。 它把 map.input.file 设定为输入文件的路径,输入文件是逻辑分块文件。

RecordReader
RecordReader 从InputSlit读入<key, value>对。

一般的,RecordReader 把由InputSplit 提供的字节样式的输入文件,转化成由Mapper处理的记录样式的文件。 因此RecordReader负责处理记录的边界情况和把数据表示成keys/values对形式。

作业的输出
OutputFormat 描述Map/Reduce作业的输出样式。

Map/Reduce框架根据作业的OutputFormat来:

检验作业的输出,例如检查输出路径是否已经存在。
提供一个RecordWriter的实现,用来输出作业结果。 输出文件保存在FileSystem上。
TextOutputFormat是默认的 OutputFormat。

任务的Side-Effect File
在一些应用程序中,子任务需要产生一些side-file,这些文件与作业实际输出结果的文件不同。

在这种情况下,同一个Mapper或者Reducer的两个实例(比如预防性任务)同时打开或者写 FileSystem上的同一文件就会产生冲突。因此应用程序在写文件的时候需要为每次任务尝试(不仅仅是每次任务,每个任务可以尝试执行很多次)选取一个独一无二的文件名(使用attemptid,例如task_200709221812_0001_m_000000_0)。

为了避免冲突,Map/Reduce框架为每次尝试执行任务都建立和维护一个特殊的 ${mapred.output.dir}/_temporary/_${taskid}子目录,这个目录位于本次尝试执行任务输出结果所在的FileSystem上,可以通过 ${mapred.work.output.dir}来访问这个子目录。 对于成功完成的任务尝试,只有${mapred.output.dir}/_temporary/_${taskid}下的文件会移动到${mapred.output.dir}。当然,框架会丢弃那些失败的任务尝试的子目录。这种处理过程对于应用程序来说是完全透明的。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/d5555a73b236a588081e9d48c05f2d28.html