mapreduce过程详解(基于hadoop2.x架构) (3)

mapreduce过程详解(基于hadoop2.x架构)

1.切片

在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize))

minSize的默认值是1,而maxSize的默认值是long类型的最大值,即可得切片的默认大小是blockSize(128M)

maxSize参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值

minSize参数调的比blockSize大,则可以让切片变得比blocksize还大

hadoop为每个分片构建一个map任务,可以并行处理多个分片上的数据,整个数据的处理过程将得到很好的负载均衡,因为一台性能较强的计算机能处理更多的数据分片.

分片也不能切得太小,否则多个map和reduce间数据的传输时间,管理分片,构建多个map任务的时间将决定整个作业的执行时间.(大部分时间都不在计算上)

如果文件大小小于128M,则该文件不会被切片,不管文件多小都会是一个单独的切片,交给一个maptask处理.如果有大量的小文件,将导致产生大量的maptask,大大降低集群性能.

大量小文件的优化策略:

(1) 在数据处理的前端就将小文件整合成大文件,再上传到hdfs上,即避免了hdfs不适合存储小文件的缺点,又避免了后期使用mapreduce处理大量小文件的问题。(最提倡的做法)

(2)小文件已经存在hdfs上了,可以使用另一种inputformat来做切片(CombineFileInputFormat),它的切片逻辑和FileInputFormat(默认)不同,它可以将多个小文件在逻辑上规划到一个切片上,交给一个maptask处理。

2.环形缓存区

经过map函数的逻辑处理后的数据输出之后,会通过OutPutCollector收集器将数据收集到环形缓存区保存。

环形缓存区的大小默认为100M,当保存的数据达到80%时,就将缓存区的数据溢出到磁盘上保存。

3.溢出

环形缓存区的数据达到其容量的80%时就会溢出到磁盘上进行保存,在此过程中,程序会对数据进行分区(默认HashPartition)和排序(默认根据key进行快排)

缓存区不断溢出的数据形成多个小文件

4.合并

溢出的多个小文件各个区合并在一起(0区和0区合并成一个0区),形成大文件

通过归并排序保证区内的数据有序

5.shuffle

从过程2到过程7之间,即map任务和reduce任务之间的数据流称为shuffle(混洗),而过程5最能体现出混洗这一概念。一般情况下,一个reduce任务的输入数据来自与多个map任务,多个reduce任务的情况下就会出现如过程5所示的,每个reduce任务从map的输出数据中获取属于自己的那个分区的数据。

6.合并

运行reducetask的节点通过过程5,将来自多个map任务的属于自己的分区数据下载到本地磁盘工作目录。这多个分区文件通过归并排序合并成大文件,并根据key值分好组(key值相同的,value值会以迭代器的形式组在一起)。

7.reducetask

reducetask从本地工作目录获取已经分好组并且排好序的数据,将数据进行reduce函数中的逻辑处理。

8.输出

每个reducetask输出一个结果文件。

partition(分区)

数据从环形缓存区溢出到文件的过程中会根据用户自定义的partition函数进行分区,如果用户没有自定义该函数,程序会用默认的partitioner通过哈希函数来分区,hash partition 的好处是比较弹性,跟数据类型无关,实现简单,只需要设置reducetask的个数。分区的目的是将整个大数据块分成多个数据块,通过多个reducetask处理后,输出多个文件。通常在输出数据需要有所区分的情况下使用自定义分区,如在上述的流量统计的案例里,如果需要最后的输出数据再根据手机号码的省份分成几个文件来存储,则需要自定义partition函数,并在驱动程序里设置reduce任务数等于分区数(job.setNumReduceTasks(5);)和指明自己定义的partition(job.setPartitionerClass(ProvincePartitioner.class))。在需要获取统一的输出结果的情况下,不需要自定义partition也不用设置reducetask的数量(默认1个)。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppygp.html