Hadoop mapreduce核心功能描述(3)

在任务执行期间,应用程序在写文件时可以利用这个特性,比如 通过 FileOutputFormat.getWorkOutputPath()获得${mapred.work.output.dir}目录, 并在其下创建任意任务执行时所需的side-file,框架在任务尝试成功时会马上移动这些文件,因此不需要在程序内为每次任务尝试选取一个独一无二的名字。

注意:在每次任务尝试执行期间,${mapred.work.output.dir} 的值实际上是 ${mapred.output.dir}/_temporary/_{$taskid},这个值是Map/Reduce框架创建的。 所以使用这个特性的方法是,在 FileOutputFormat.getWorkOutputPath() 路径下创建side-file即可。

对于只使用map不使用reduce的作业,这个结论也成立。这种情况下,map的输出结果直接生成到HDFS上。

RecordWriter
RecordWriter 生成<key, value> 对到输出文件。

RecordWriter的实现把作业的输出结果写到 FileSystem。

其他有用的特性
Counters
Counters 是多个由Map/Reduce框架或者应用程序定义的全局计数器。 每一个Counter可以是任何一种 Enum类型。同一特定Enum类型的Counter可以汇集到一个组,其类型为Counters.Group。

应用程序可以定义任意(Enum类型)的Counters并且可以通过 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long) 更新。之后框架会汇总这些全局counters。

DistributedCache
DistributedCache 可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。

DistributedCache 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件 (包括文本,档案文件,jar文件等)。

应用程序在JobConf中通过url(hdfs://)指定需要被缓存的文件。 DistributedCache假定由hdfs://格式url指定的文件已经在 FileSystem上了。

Map-Redcue框架在作业所有任务执行之前会把必要的文件拷贝到slave节点上。 它运行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。

DistributedCache 根据缓存文档修改的时间戳进行追踪。 在作业执行期间,当前应用程序或者外部程序不能修改缓存文件。

distributedCache可以分发简单的只读数据或文本文件,也可以分发复杂类型的文件例如归档文件和jar文件。归档文件(zip,tar,tgz和tar.gz文件)在slave节点上会被解档(un-archived)。 这些文件可以设置执行权限。

用户可以通过设置mapred.cache.{files|archives}来分发文件。 如果要分发多个文件,可以使用逗号分隔文件所在路径。也可以利用API来设置该属性: DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf) 其中URI的形式是 hdfs://host:port/absolute-path#link-name 在Streaming程序中,可以通过命令行选项 -cacheFile/-cacheArchive 分发文件。

用户可以通过 DistributedCache.createSymlink(Configuration)方法让DistributedCache 在当前工作目录下创建到缓存文件的符号链接。 或者通过设置配置文件属性mapred.create.symlink为yes。 分布式缓存会截取URI的片段作为链接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 则在task当前工作目录会有名为lib.so的链接, 它会链接分布式缓存中的lib.so.1。

DistributedCache可在map/reduce任务中作为 一种基础软件分发机制使用。它可以被用于分发jar包和本地库(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)和 DistributedCache.addFileToClassPath(Path, Configuration) API能够被用于 缓存文件和jar包,并把它们加入子jvm的classpath。也可以通过设置配置文档里的属性 mapred.job.classpath.{files|archives}达到相同的效果。缓存文件可用于分发和装载本地库。

Tool
Tool 接口支持处理常用的Hadoop命令行选项。

Tool 是Map/Reduce工具或应用的标准。应用程序应只处理其定制参数, 要把标准命令行选项通过 ToolRunner.run(Tool, String[]) 委托给 GenericOptionsParser处理。

Hadoop命令行的常用选项有:
-conf <configuration file>
-D <property=value>
-fs <local|namenode:port>
-jt <local|jobtracker:port>

IsolationRunner
IsolationRunner 是帮助调试Map/Reduce程序的工具。

使用IsolationRunner的方法是,首先设置 keep.failed.tasks.files属性为true (同时参考keep.tasks.files.pattern)。

然后,登录到任务运行失败的节点上,进入 TaskTracker的本地路径运行 IsolationRunner:
$ cd <local path>/taskTracker/${taskid}/work
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml

IsolationRunner会把失败的任务放在单独的一个能够调试的jvm上运行,并且采用和之前完全一样的输入数据。

Profiling
Profiling是一个工具,它使用内置的java profiler工具进行分析获得(2-3个)map或reduce样例运行分析报告。

用户可以通过设置属性mapred.task.profile指定系统是否采集profiler信息。 利用api JobConf.setProfileEnabled(boolean)可以修改属性值。如果设为true, 则开启profiling功能。profiler信息保存在用户日志目录下。缺省情况,profiling功能是关闭的。

如果用户设定使用profiling功能,可以使用配置文档里的属性 mapred.task.profile.{maps|reduces} 设置要profile map/reduce task的范围。设置该属性值的api是 JobConf.setProfileTaskRange(boolean,String)。 范围的缺省值是0-2。

用户可以通过设定配置文档里的属性mapred.task.profile.params 来指定profiler配置参数。修改属性要使用api JobConf.setProfileParams(String)。当运行task时,如果字符串包含%s。 它会被替换成profileing的输出文件名。这些参数会在命令行里传递到子JVM中。缺省的profiling 参数是 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s。

调试
Map/Reduce框架能够运行用户提供的用于调试的脚本程序。 当map/reduce任务失败时,用户可以通过运行脚本在任务日志(例如任务的标准输出、标准错误、系统日志以及作业配置文件)上做后续处理工作。用户提供的调试脚本程序的标准输出和标准错误会输出为诊断文件。如果需要的话这些输出结果也可以打印在用户界面上。

在接下来的章节,我们讨论如何与作业一起提交调试脚本。为了提交调试脚本, 首先要把这个脚本分发出去,而且还要在配置文件里设置。

如何分发脚本文件:
用户要用 DistributedCache 机制来分发和链接脚本文件

如何提交脚本:
一个快速提交调试脚本的方法是分别为需要调试的map任务和reduce任务设置 "mapred.map.task.debug.script" 和 "mapred.reduce.task.debug.script" 属性的值。这些属性也可以通过 JobConf.setMapDebugScript(String) 和 JobConf.setReduceDebugScript(String) API来设置。对于streaming, 可以分别为需要调试的map任务和reduce任务使用命令行选项-mapdebug 和 -reducedegug来提交调试脚本。

脚本的参数是任务的标准输出、标准错误、系统日志以及作业配置文件。在运行map/reduce失败的节点上运行调试命令是:
$script $stdout $stderr $syslog $jobconf

Pipes 程序根据第五个参数获得c++程序名。 因此调试pipes程序的命令是
$script $stdout $stderr $syslog $jobconf $program

默认行为
对于pipes,默认的脚本会用gdb处理core dump, 打印 stack trace并且给出正在运行线程的信息。

JobControl
JobControl是一个工具,它封装了一组Map/Reduce作业以及他们之间的依赖关系。

数据压缩
Hadoop Map/Reduce框架为应用程序的写入文件操作提供压缩工具,这些工具可以为map输出的中间数据和作业最终输出数据(例如reduce的输出)提供支持。它还附带了一些 CompressionCodec的实现,比如实现了 zlib和lzo压缩算法。 Hadoop同样支持gzip文件格式。

考虑到性能问题(zlib)以及Java类库的缺失(lzo)等因素,Hadoop也为上述压缩解压算法提供本地库的实现。更多的细节请参考 这里。

中间输出
应用程序可以通过 JobConf.setCompressMapOutput(boolean)api控制map输出的中间结果,并且可以通过 JobConf.setMapOutputCompressorClass(Class)api指定 CompressionCodec。

作业输出
应用程序可以通过 FileOutputFormat.setCompressOutput(JobConf, boolean) api控制输出是否需要压缩并且可以使用 FileOutputFormat.setOutputCompressorClass(JobConf, Class)api指定CompressionCodec。

如果作业输出要保存成 SequenceFileOutputFormat格式,需要使用 SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,来设定 SequenceFile.CompressionType(i.e. RECORD / BLOCK - 默认是RECORD)。

更多Hadoop相关信息见Hadoop 专题页面 ?tid=13

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/d5555a73b236a588081e9d48c05f2d28.html