MapReduce详解

本文遵循BY-SA版权协议,转载请附上原文出处链接。


本文作者: 黑伴白

本文链接: http://heibanbai.com.cn/posts/37198e3b/

MapReduce详解

MapReduce概述

MapReduce是Hadoop生态圈的一部分,也是最核心的一部分。

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架,其核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

为什么需要MapReduce?

  • 海量数据在单机上处理因为硬件资源限制,无法胜任。
  • 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度。
  • 引入MapReduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。

MapReduce工作机制

详细工作流程

img

img

上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下:

1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中

2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

3)多个溢出文件会被合并成大的溢出文件

4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序

5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据

6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)

7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的reduce()方法)

注意:

Shuffle中的缓冲区大小会影响到MapReduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。

缓冲区的大小可以通过参数调整,参数:io.sort.mb默认100M。

概要工作流程

MapReduce概要工作流程

在上图中, MapReduce 的工作流程大致可以分为5步,具体如下:

  1. 分片、格式化数据源

输入 Map 阶段的数据源,必须经过分片和格式化操作。

  • 分片操作:指的是将源文件划分为大小相等的小数据块( Hadoop 2.x 中默认 128MB ),也就是分片( split ), Hadoop 会为每一个分片构建一个 Map 任务,并由该任务运行自定义的 map() 函数,从而处理分片里的每一条记录;
  • 格式化操作:将划分好的分片( split )格式化为键值对<key,value>形式的数据,其中,key 代表偏移量,value 代表每一行内容。
  1. 执行 MapTask

每个 Map 任务都有一个内存缓冲区(缓冲区大小 100MB ),输入的分片( split )数据经过 Map 任务处理后的中间结果会写入内存缓冲区中。 如果写人的数据达到内存缓冲的阈值( 80MB ),会启动一个线程将内存中的溢出数据写入磁盘,同时不影响 Map 中间结果继续写入缓冲区。 在溢写过程中, MapReduce 框架会对 key 进行排序,如果中间结果比较大,会形成多个溢写文件,最后的缓冲区数据也会全部溢写入磁盘形成一个溢写文件,如果是多个溢写文件,则最后合并所有的溢写文件为一个文件。

  1. 执行 Shuffle 过程

MapReduce 工作过程中, Map 阶段处理的数据如何传递给 Reduce 阶段,这是 MapReduce 框架中关键的一个过程,这个过程叫作 Shuffle 。 Shuffle 会将 MapTask 输出的处理结果数据分发给 ReduceTask ,并在分发的过程中,对数据按 key 进行分区和排序。

  1. 执行 ReduceTask

输入 ReduceTask 的数据流是<key, {value list}>形式,用户可以自定义 reduce()方法进行逻辑处理,最终以<key, value>的形式输出。

  1. 写入文件

MapReduce 框架会自动把 ReduceTask 生成的<key, value>传入 OutputFormat 的 write 方法,实现文件的写入操作。

Shuffle机制

img

key,value从map()方法输出后,被outputcollector收集通过HashPartitioner类的getpartitioner()方法获取分区号,进入环形缓冲区。默认情况下,环形缓冲区为100MB。

当环形环形缓冲区存储达到80%,开始执行溢写过程,溢写过程中如果有其他数据进入,那么由剩余的20%反向写入。溢写过程会根据key,value

先根据分区进行排序,再根据key值进行排序,后生成一个溢写文件(再对应的分区下根据key值排序),maptask将溢写文件归并排序后落入本地磁盘,reduceTask阶段将多个mapTask下相同分区的数据copy到指定的reduceTask中进行归并排序、分组后一次读取一组数据给reduce()函数

MapTask工作机制

MapTask工作详细图示

MapTask详细图

MapTask工作概要图示

MapTask概要图

MapTask工作流程说明

  1. Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

  2. Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

  3. Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

  4. Spill阶段:即“溢写”,当环形缓冲区满后(默认达到缓冲区大小的 80 %),MapReduce会将数据写到本地磁盘上,生成一个临时文件。

    将数据写入本地磁盘前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
    写入磁盘之前,线程会根据 ReduceTask 的数量,将数据分区,一个 Reduce 任务对应一个分区的数据。
    这样做的目的是为了避免有些 Reduce 任务分配到大量数据,而有些 Reduce 任务分到很少的数据,甚至没有分到数据的尴尬局面。
    如果此时设置了 Combiner ,将排序后的结果进行 Combine 操作,这样做的目的是尽可能少地执行数据写入磁盘的操作。

  5. Combine 阶段:当所有数据处理完成以后, MapTask 会对所有临时文件进行一次合并,以确保最终只会生成一个数据文件

    合并的过程中会不断地进行排序和 Combine 操作,
    其目的有两个:一是尽量减少每次写人磁盘的数据量;二是尽量减少下一复制阶段网络传输的数据量。
    最后合并成了一个已分区且已排序的文件。

ReduceTask工作机制

ReduceTask工作详细图示

ReduceTask工作详细图示

ReduceTask工作概要图示

ReduceTask工作概要图示

ReduceTask工作流程说明

  1. Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

  2. Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

  3. Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。

    为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。

    由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

  4. Reduce阶段:对排序后的键值对调用 reduce() 方法,键相等的键值对调用一次 reduce()方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到 HDFS 中

  5. Write 阶段: reduce() 函数将计算结果写到 HDFS 上。

    合并的过程中会产生许多的中间文件(写入磁盘了),但 MapReduce 会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到 Reduce 函数。


蚂蚁再小也是肉🥩!


MapReduce详解
http://heibanbai.com.cn/posts/37198e3b/
作者
黑伴白
发布于
2022年9月11日
许可协议

“您的支持,我的动力!觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭”

微信二维码

微信支付

支付宝二维码

支付宝支付