Yarn详解
Yarn详解
Yarn概述
YARN(Yet Another Resource Negotiator)是Hadoop生态系统的一个关键组件,用于资源管理和作业调度。
Yarn是Hadoop 2.0新引入的资源管理系统,直接从MRv1演化而来的;核心思想是将MRv1中JobTracker的资源管理和任务调度两个功能分开,分别由ResourceManager和ApplicationMaster进程实现:
ResourceManager:负责整个集群的资源管理和调度
ApplicationMaster:负责应用程序相关的事务,比如任务调度、任务监控和容错等
YARN的引入,使得多个计算可运行在一个集群中,每个job对应一个ApplicationMaster。目前多个计算框架可以运行在YARN上,比如MapReduce、Spark、Storm等。
Yarn就相当于一个分布式的操作系统平台,而MapReduce、Spark等运算程序则相当于运行在操作系统之上的应用程序。
YARN 的整体调度来说,采用双层资源调度模型:
第一层:ResourceManager 中的资源调度器将资源分配给各个 ApplicationMaster,由 YARN 决定。
第二层:ApplicationMaster 再进一步将资源分配给它内部的各个任务 Task,由用户应用程序 ApplicationMaster 决定。
由此可见,YARN 是一个统一的资源调度系统,只要满足 YARN 的调度规范的分布式应用程序,都可以运行在 YARN 中,调度规范:定义一个ApplicatoinMaster,问 RM 申请资源,AM 自己来完成 Container 到 Task 的分配。 YARN 采用拉模型实现异步资源分配,资源调度器将资源分配给应用程序之后,暂存于缓冲区中,等待 ApplicationMaster 通过心跳来进行获取。
大概流程如下:
Yarn基本架构
YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等组件构成。
Yarn工作机制
(1)MR程序提交到客户端所在的节点。
(2)YarnRunner向ResourceManager申请一个Application。
(3)RM将该应用程序的资源路径返回给YarnRunner。
(4)该程序将运行所需资源提交到HDFS上。
(5)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个Task。
(7)其中一个NodeManager领取到Task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行MapTask资源。
(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask向MapTask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。
作业提交过程之Yarn
参考Yarn工作机制图示
- 作业提交
第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
第2步:Client向RM申请一个作业id。
第3步:RM给Client返回该job资源的提交路径和作业id。
第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。
第5步:Client提交完资源后,向RM申请运行MrAppMaster。
- 作业初始化
第6步:当RM收到Client的请求后,将该job添加到容量调度器中。
第7步:某一个空闲的NM领取到该Job。
第8步:该NM创建Container,并产生MRAppmaster。
第9步:下载Client提交的资源到本地。
- 任务分配
第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
- 任务运行
第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
第14步:ReduceTask向MapTask获取相应分区的数据。
第15步:程序运行完毕后,MR会向RM申请注销自己。
- 进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
- 作业完成
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
作业提交过程之MapReduce
- HDFS上传待处理的文本: 首先,待处理的文本数据被上传到HDFS,这样它可以分布式存储在Hadoop集群的不同节点上,以便后续的处理。
- Map阶段读取数据: MapReduce作业的第一阶段是Map阶段。在这个阶段,Map任务会从HDFS中读取数据,通常使用默认的TextInputFormat进行数据读取。数据会被切分成多个输入分片(input splits),每个分片由一个Map任务处理。
- Map任务处理数据: 每个Map任务会对其分配的输入分片进行处理,经过程序逻辑运算后,生成一系列<key, value>对。这些<key, value>对会被写入一个环形缓冲区。
- 环形缓冲区存储数据和索引: 环形缓冲区不仅存储数据本身,还包括用于索引和管理数据的信息,以及数据的键(key)和值(value)。
- 环形缓冲区数据写入和排序: 当环形缓冲区内存占用率达到一定阈值(通常为80%)时,会触发数据的写入和排序过程。数据会根据键(key)进行hash分区,确保相同键的数据被分配到同一Reduce任务。然后,在每个分区内,数据会根据键(key)进行一次快速排序,以确保分区内的数据有序。
- Combiner阶段: 在Map阶段后,可以应用Combiner函数进行一次简单的合并操作,以减少在Shuffle阶段传输到Reduce任务的数据量。Combiner函数通常用于在Map端执行一些局部合并操作,使具有相同键的数据在一起,以减少Shuffle和Sort的工作负载。
- Shuffle和Sort阶段: 在Reduce阶段之前,Shuffle和Sort阶段发生。在这个阶段,Reduce任务会从Map端拉取数据,这些数据会被写入Reduce任务的本地磁盘,并根据键进行排序和分组。这确保了Reduce任务可以高效地处理相同键的数据。
- Reduce任务处理和写回: 最后,在Reduce阶段,Reduce任务会从本地磁盘读取数据,对数据进行最终的合并和处理,然后将结果写回到HDFS中。处理好的数据会被写回到指定的HDFS输出路径。
Yarn资源调度器
Yarn 的资源调度器是可以配置的,默认实现有三种 FIFO
、CapacityScheduler
、FairScheduler
。
FIFO
FIFO 调度器(First In First Out):单队列,根据提交作业的先后顺序,先来先服务。在进行资源分配的时候,先给队列中最头上的应用进行分配资源,待最头上的应用需求满足后再给下一个分配,以此类推。
优点:调度算法简单,JobTracker(job提交任务后发送得地方)工作负担轻。
缺点:忽略了不同作业的需求差异。例如如果类似对海量数据进行统计分析的作业长期占据计算资源,那么在其后提交的交互型作业有可能迟迟得不到处理,从而影响到用户的体验。
Capacity Scheduler
容量调度器CapacityScheduler是Yahoo开发,Hadoop 3.x 中的默认调度策略。
多队列:每个队列内部先进先出, 同一时间队列中只有一个任务在执行, 队列的并行度为队列的个数。
1.多队列支持,每个队列采用FIFO
2.为了防止同一个用户的作业独占队列中的资源,该调度器会对同一个用户提交多的作业所占资源量进行限定
3.首先,计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值,选择一个该比值最小的队列
4.其次,根据作业的优先级和提交时间顺序,同时考虑用户资源量限制和内存限制对队列内任务排序
5.三个队列同时按照任务的先后顺序依次执行,比如,job11,job21和job31分别排在队列最前面,是最先运行,也是同时运行
该调度默认情况下不支持优先级,但是可以在配置文件中开启此选项,如果支持优先级,调度算法就是带有优先级的FIFO。
不支持优先级抢占,一旦一个作业开始执行,在执行完之前它的资源不会被高优先级作业所抢占。
对队列中同一用户提交的作业能够获得的资源百分比进行了限制以使同属于一用户的作业不能出现独占资源的情况。
Fair Scheduler
Fair Scheduler 是 Facebook 开发的多用户调度器。设计目标是为所有的应用分配「公平」的资源(对公平的定义可以通过参数来设置)。公平不仅可以在队列中的应用体现,也可以在多个队列之间工作。
在 Fair 调度器中,我们不需要预先占用一定的系统资源,Fair 调度器会为所有运行的 job 动态的调整系统资源。如下图所示,当第一个大 job 提交时,只有这一个 job 在运行,此时它获得了所有集群资源;当第二个小任务提交后,Fair 调度器会分配一半资源给这个小任务,让这两个任务公平的共享集群资源。
公平调度器有如下特点:
- 支持多队列多作业,每个队列可以单独配置;
- 同一队列的作业按照器优先级分享整个队列的资源,并发执行;
- 每个作业可以设置最小资源值,调度器会保证作业获得其以上资源;
- 设计目标是在时间尺度上,所有作业获得公平的资源。某一时刻一个作业应获资源和实际获取资源的差距叫“缺额”;
- 调度器会优先为缺额大的作业分配资源。
Yarn集群资源管理
集群总资源
要想知道YARN集群上一共有多少资源很容易,我们通过YARN的web ui就可以直接查看到。
通过查看Cluster Metrics,可以看到总共的内存为24GB、虚拟CPU核为24个。我们也可以看到每个NodeManager的资源。很明显,YARN集群中总共能使用的内存就是每个NodeManager的可用内存加载一起,VCORE也是一样。
NodeManger总资源
NodeManager的可用内存、可用CPU分别是8G、和8Core。这个资源和Linux系统是不一致的。我们通过free -g来查看下Linux操作系统的总计内存、和CPU核。
第一个节点(总计内存是10G,空闲的是8G)
1 |
|
第二个节点(总计内存是7G,空闲是不到6G)
1 |
|
第三个节点(和第二个节点一样)
1 |
|
这说明了,NodeManager的可用内存和操作系统总计内存是没有直接关系的!
那NodeManager的可用内存是如何确定的呢?
在yarn-default.xml中有一项配置为:yarn.nodemanager.resource.memory-mb,它的默认值为:-1(hadoop 3.1.4)。我们来看下Hadoop官方解释:
1 |
|
这个配置是表示NodeManager总共能够使用的物理内存,这也是可以给container使用的物理内存。如果配置为-1,且yarn.nodemanager.resource.detect-hardware-capabilities配置为true,那么它会根据操作的物理内存自动计算。而yarn.nodemanager.resource.detect-hardware-capabilities默认为false,所以,此处默认NodeManager就是8G。这就是解释了为什么每个NM的可用内存是8G。
还有一个重要的配置:yarn.nodemanager.vmem-pmem-ratio,它的默认配置是2.1
1 |
|
这个配置是针对NodeManager上的container,如果说某个Container的物理内存不足时,可以使用虚拟内存,能够使用的虚拟内存默认为物理内存的2.1倍。
针对虚拟CPU核数,也有一个配置yarn.nodemanager.resource.cpu-vcores配置,它的默认配置也为-1。看一下Hadoop官方的解释:
1 |
|
与内存类似,它也有一个默认值:就是8。
这就解释了为什么每个NodeManager的总计资源是8G和8个虚拟CPU核了。
Scheduler调度资源
通过YARN的webui,点击scheduler,我们可以看到的调度策略、最小和最大资源分配。
通过web ui,我们可以看到当前YARN的调度策略为容量调度。调度资源的单位是基于MB的内存、和Vcore(虚拟CPU核)。最小的一次资源分配是:1024M(1G)和1个VCORE。最大的一次分配是:4096M(4G)和4个VCORE。注意:内存资源和VCORE都是以Container承载的。
yarn-default.xml的配置:
配置项 | 默认 | 说明 |
---|---|---|
yarn.scheduler.minimum-allocation-mb | 1024 | 该配置表示每个容器的最小分配。因为RM是使用scheduler来进行资源调度的,如果请求的资源小于1G,也会设置为1G。这表示,如果我们请求一个256M的container,也会分配1G。 |
yarn.scheduler.maximum-allocation-mb | 8192 | 最大分配的内存,如果比这个内存高,就会抛出InvalidResourceRequestException异常。这里也就意味着,最大请求的内存不要超过8G。上述截图显示是4G,是因为我在yarn-site.xml中配置了最大分配4G。 |
yarn.scheduler.minimum-allocation-vcores | 1 | 同内存的最小分配 |
yarn.scheduler.maximum-allocation-vcores | 4 | 同内存的最大分配 |
Container总资源
在YARN中,资源都是通过Container来进行调度的,程序也是运行在Container中。Container能够使用的最大资源,是由scheduler决定的。如果按照Hadoop默认配置,一个container最多能够申请8G的内存、4个虚拟核。例如:我们请求一个Container,内存为3G、VCORE为2,是OK的。考虑一个问题:如果当前NM机器上剩余可用内存不到3G,怎么办?此时,就会使用虚拟内存。不过,虚拟内存,最多为内存的2.1倍,如果物理内存 + 虚拟内存仍然不足3G,将会给container分配资源失败。
根据上述分析,如果我们申请的container内存为1G、1个VCORE。那么NodeManager最多可以运行8个Container。如果我们申请的container内存为4G、4个vcore,那么NodeManager最多可以运行2个Container。
Container是一个JVM进程吗
这个问题估计有很多天天在使用Hadoop的人都不一定知道。当向RM请求资源后,会在NodeManager上创建Container。问题是:Container是不是有自己独立运行的JVM进程呢?还是说,NodeManager上可以运行多个Container?Container和JVM的关系是什么?
此处,明确一下,每一个Container就是一个独立的JVM实例。(此处,咱们不讨论Uber模式)。每一个任务都是在Container中独立运行,例如:MapTask、ReduceTask。当scheduler调度时,它会根据任务运行需要来申请Container,而每个任务其实就是一个独立的JVM。
为了验证此观点,我们来跑一个MapReduce程序。然后我们在一个NodeManager上使用JPS查看一下进程:(这是我处理过的,不然太长了,我们主要是看一下内存使用量就可以了)
1 |
|
我们看到了有MRAppMaster、YarnChild这样的一些Java进程。这就表示,每一个Container都是一个独立运行的JVM,它们彼此之间是独立的。
Spark on YARN资源管理
通常,生产环境中,我们是把Spark程序在YARN中执行。而Spark程序在YARN中运行有两种模式,一种是Cluster模式、一种是Client模式。这两种模式的关键区别就在于Spark的driver是运行在什么地方。如果运行模式是Cluster模式,Driver运行在Application Master里面的。如果是Client模式,Driver就运行在提交spark程序的地方。Spark Driver是需要不断与任务运行的Container交互的,所以运行Driver的client是必须在网络中可用的,直到应用程序结束。
如下图所示,留意一下Driver的位置
通过上面的分析,我们可以明确,如果是Client模式,Driver和ApplicationMaster运行在不同的地方。ApplicationMaster运行在Container中,而Driver运行在提交任务的client所在的机器上。
因为如果是Standalone集群,整个资源管理、任务执行是由Master和Worker来完成的。而当运行在YARN的时候,就没有这两个概念了。资源管理遵循YARN的资源调度方式。之前在Standalone集群种类,一个worker上可以运行多个executor,现在对应的就是一个NodeManager上可以运行多个container,executor的数量跟container是一致的。可以直接把executor理解为container。
我们再来看看spark-submit的一些参数配置。
1 |
|
配置选项中,有一个是公共配置,还有一些针对spark-submit运行在不同的集群,参数是不一样的。
公共的配置:
–driver-memory、–executor-memory,这是我们可以指定spark driver以及executor运行所需的配置。executor其实就是指定container的内存,而driver如果是cluster模式,就是application master的内置,否则就是client运行的那台机器上申请的内存。
如果运行在Cluster模式,可以指定driver所需的cpu core。
如果运行在Spark Standalone,–total-executor-cores表示一共要运行多少个executor。
如果运行在Standalone集群或者YARN集群,–executor-cores表示每个executor所需的cpu core。
如果运行在yum上,–num-executors表示要启动多少个executor,其实就是要启动多少个container。
总结
如果你认真看完了,很轻易地就能回答下面的问题:
- Container是以什么形式运行的?是单独的JVM进程吗?
是的,每一个Container就是一个单独的JVM进程。
- YARN的vcore和本机的CPU核数关系?
没关系。默认都是手动在yarn-default.xml中配置的,默认每个NodeManager是8个vcore,所有的NodeManager上的vcore加在一起就是整个YARN所有的vcore。
- 每个Container能够使用的物理内存和虚拟内存是多少?
scheduler分配给container多少内存就是最大能够使用的物理内存,但如果超出该物理内存,可以使用虚拟内存。虚拟内存默认是物理内存的2.1倍。
- 一个NodeManager可以分配多少个Container?
这个得看Container的内存大小和vcore数量。用NM上最大的可用Mem和Vcore相除就知道了。
- 一个Container可以分配的最小内存是多少?最大内存内存是多少?以及最小、最大的VCore是多少?
根据scheduler分配的最小/最大内存、最小/最大vcore来定。
- 当将Spark程序部署在YARN上, AM与Driver的关系是什么?
有两种模式,cluster模式,Driver就运行在AM上。如果是client模式,没关系。
- Spark on YARN,一个Container可以运行几个executor?executor设置的内存和container的关系是什么?
一个container对应一个executor。executor设置的内存就是AM申请的container内存,如果container最小分配单位是1G,而executor设置的内置是512M,按照container最小单位分配。
Yarn资源管理部分转自让你彻底搞明白YARN资源分配 - 知乎 (zhihu.com)
蚂蚁🐜再小也是肉🥩!
“您的支持,我的动力!觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭”
微信支付
支付宝支付