深入理解 YarnFairSchedule 中的饥饿抢占 阳光穿透心脏的1/2处 2022-12-22 15:23 78阅读 0赞 ### 文章目录 ### * * 饥饿 * * 饥饿的两种类型 * * FairShare 饥饿 * MinShare 饥饿 * 抢占 * * 要抢占的Container * Code ## 饥饿 ## 由于 FairSchedule 具有弹性功能,因此对列中运行的应用程序可能使得其他应用程序(在同一队列或不同队列)处于饥饿状态; 在如下例子中,假设仅 tenant1 和 tenant2 队列处于active状态,分别使用了33.3%和66.6%的资源。随后tenant3也变为active状态,则 队列的 Instantaneous FairShare 将分别变为 25%、50%、25%。但此时,tenant3 队列中的应用程序必须等待 tenant1 或 tenant2 中的应用程序释放资源。在此之前,tenant3 将以未满足资源需求或饥饿的形式等待container。 ![4e61120a4855d72923df811a38bfacee.png][] 通过抢占,可以以可预测的方式调整这种不平衡,它允许从超过其FairShare的队列中回收资源,而不用等待队列释放资源。 ### 饥饿的两种类型 ### * FairShare 饥饿 * MinShare 饥饿 #### FairShare 饥饿 #### 当满足以下所有条件时,一个作业可能将遭受 FairShare 饥饿 1. 该作业有未满足的资源需求 2. 该作业资源使用量低于其 Instantaneous Fair Share 3. 该作业的资源使用量低于其 Instantaneous Fair Share 的 公平共享抢占阈值(FairShare Preemption Threshold) 持续时间超过公平共享超时(FairShare Preemption Timeout). 公平共享抢占阈值(FairShare Preemption Threshold) 默认0.5,公平共享超时(FairShare Preemption Timeout)默认情况下未设置;需要明确设置它,以使应用程序被视为饥饿。这两个参数可以在全局或队列级别设置。 > 例如:假设队列的 FairShare 抢占超时为5秒,FairShare 请战阈值为0.8,如果启用了抢占并且满足上述条件的1和2,当作业在5秒内未获得其Instantaneous FairShare 的80%,FairScheduler将认为该作业处于饥饿状态。 **注意:开启抢占并不能保证一个作业或队列能获得其完整的Instantaneous FairShare,只保证一个作业或队列能够获得足够的资源,不再别视为饥饿。此外,此保证还取决于Resource Manager找到要抢占的容器** 对于高优先级队列,可以通过将抢占的阈值设置为较高的值,抢占的超时时间设置较小的时间,并将其标记为不可抢占,来积极配置饥饿。 #### MinShare 饥饿 #### 当一个队列满足以下所以条件时,将产生 MinShare 饥饿 1. 队列中的一个或多个应用程序有未满足的资源需求 2. 队列的资源使用量低于其MinShare 3. 队列的资源使用量低于其 MinShare 的时间超过 MinShare 抢占超时时间 默认情况下未设置MinShare抢占超时时间,需要明确的设置它,以使队列被认为是饥饿的。该参数可以全局设置或在队列级别设置。 MinShare 只能指定在队列级别,不像FairShare,因为作业不会产生因无法满足MinShare而产生饥饿 当队列因饥饿而获取资源后,按需求对队列中的应用程序进行排序。因为即使把队列获得的所有资源都分配给队列中的作业,也可能导致某些作业还是处于饥饿的状态。 > 例如:假设配置的队列的最小内存,并且该内存比其最小内存低6G, ## 抢占 ## 抢占的目的: 抢占的弊端:会降低集群的执行效率,因为抢占终止的container会被重新执行 同时满足以下两个条件时,将启用抢占功能: 1. yarn.scheduler.fair.preemption = true。默认为false. 2. 整个集群的资源利用率超过指定的阀值,利用率=已使用资源/集群总资源(yarn.scheduler.fair.preemption.cluster-utilization-threshold),默认0.8f 集群利用率是抢占的先决条件,在利用率低的集群上抢占容器会引起不必要的容器搅动,从而影响性能。集群利用率由 内存或 vCore 利用率的较大者表示。 > 注意:如果有必要,可以将队列标记为不可抢占,例如,因为它是最高优先级队列,因此永远不能抢占队列中的资源。 \[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MQWd9l1q-1605867049112)(https://i.loli.net/2020/11/18/GWwZ132Jx9jBsAd.png)\] ### 要抢占的Container ### 通过抢占一个或多个节点上运行的container,可以满足应用程序的资源需求。当多个节点上的容器可以满足此资源请求时,会优先考虑AM最少的节点,此外,只有满足以下两个条件时,才能抢占容器: * 容器的应用程序被置为可抢占 * 杀死容器不会使作业资源低于FairShare。(换句话说,抢占后最终使用的资源少于其FairShare的作业不能被抢占) 在决定要抢占哪个容器时不使用MinShare 。MinShare仅用于配置饥饿。这是非常重要的,因为不管是否配置了FairShare或MinShare饥饿,都只能抢占超过FairShare的作业,如下表所示: ![Screen-Shot-2018-06-05-at-12.12.17-PM.png][] ## Code ## 每隔updateInterval调用 > updateInterval 可通过参数 `yarn.scheduler.fair.update-interval-ms` 指定,默认值为500毫秒 private class UpdateThread extends Thread { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { try { Thread.sleep(updateInterval); long start = getClock().getTime(); update(); preemptTasksIfNecessary(); long duration = getClock().getTime() - start; fsOpDurations.addUpdateThreadRunDuration(duration); } catch (InterruptedException ie) { LOG.warn("Update thread interrupted. Exiting."); return; } catch (Exception e) { LOG.error("Exception in fair scheduler UpdateThread", e); } } } } 在 preemptTasksIfNecessary 方法中有抢占相关的逻辑,首先会判断当前,抢占是否可用, protected synchronized void preemptTasksIfNecessary() { if (!shouldAttemptPreemption()) { return; } long curTime = getClock().getTime(); // 判断当前抢占时机是否已到 if (curTime - lastPreemptCheckTime < preemptionInterval) { return; } lastPreemptCheckTime = curTime; // 初始抢占的资源,默认值为none Resource resToPreempt = Resources.clone(Resources.none()); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { // 计算请战资源的总和 Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); } if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { // 抢占动作 preemptResources(resToPreempt); } } 在preemptTasksIfNecessary() 方法中,会先判断请战是否可用, private boolean shouldAttemptPreemption() { if (preemptionEnabled) { // yarn.scheduler.fair.preemption是否为true //集群利用率(内存、CPU二者利用率的最大值,fairSchedule是 single-resource,这里为什么又考虑到Vcore有点不理解)是否超过给定的阈值, //由 yarn.scheduler.fair.preemption.cluster-utilization-threshold 设置 return (preemptionUtilizationThreshold < Math.max( (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(), (float) rootMetrics.getAllocatedVirtualCores() / clusterResource.getVirtualCores())); } return false; } 当抢占开启并可以进行抢占,切抢占时机已到时,首先会计算要抢占的资源,通过resToPreempt()方法会计算每个Schedule要抢占的资源,然后添加到总的抢占资源里。 在resToPreempt() 方法中,会计算当前队列允许抢占其他队列的资源大小,然后返回该资源 * 如果这个队列使用的资源低于其 min Share 的时间超过minSharePreemptionTimeout,则应该抢占的资源量在它当前fair share和它的 min share之间的差额。 * 如果队列使用的资源低于它的fair share的时间超过了fairSharePreemptionTimeout,则它应该进行抢占的资源是满足其fair share的资源量, * 如果两者都发生,则抢占的值为以上二者的最大值 **minSharePreemptionTimeout**: 表示如果超过该指定时间,Scheduler还没有获得minShare的资源,则进行抢占 **fairSharePreemptionTimeout**: 表示如果超过该指定时间,Scheduler还没有获得fairShare的资源,则进行抢占 > 对代码中一个命令的解释 resToPreempt --> resourceToPreempt,resDueToFairShare,resDueToFairShare同理 protected Resource resToPreempt(FSLeafQueue sched, long curTime) { long minShareTimeout = sched.getMinSharePreemptionTimeout(); long fairShareTimeout = sched.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); // minSharePreemptionTimeout if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { // minShare,demand 之间的最小值作为target Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getMinShare(), sched.getDemand()); // target 与 ResourceUsage 之间的最大值即为resDueToMinShare (由于minShare 超时需要获取的资源) resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } // fairSharePreemptionTimeout if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, resDueToMinShare, resDueToFairShare); if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { String message = "Should preempt " + resToPreempt + " res for queue " + sched.getName() + ": resDueToMinShare = " + resDueToMinShare + ", resDueToFairShare = " + resDueToFairShare; LOG.info(message); } return resToPreempt; } 再次回到在preemptTasksIfNecessary() 方法中,拿到了要抢占的资源后 if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { preemptResources(resToPreempt); } 如果要抢占的资源不为none,则进行资源抢占资源 protected void preemptResources(Resource toPreempt) { long start = getClock().getTime(); if (Resources.equals(toPreempt, Resources.none())) { return; } // Scan down the list of containers we've already warned and kill them // if we need to. Remove any containers from the list that we don't need // or that are no longer running. Iterator<RMContainer> warnedIter = warnedContainers.iterator(); while (warnedIter.hasNext()) { RMContainer container = warnedIter.next(); if ((container.getState() == RMContainerState.RUNNING || container.getState() == RMContainerState.ALLOCATED) && Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, toPreempt, Resources.none())) { warnOrKillContainer(container); Resources.subtractFrom(toPreempt, container.getContainer().getResource()); } else { warnedIter.remove(); } } try { // Reset preemptedResource for each app for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { queue.resetPreemptedResources(); } while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, toPreempt, Resources.none())) { RMContainer container = getQueueManager().getRootQueue().preemptContainer(); if (container == null) { break; } else { warnOrKillContainer(container); warnedContainers.add(container); Resources.subtractFrom( toPreempt, container.getContainer().getResource()); } } } finally { // Clear preemptedResources for each app for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { queue.clearPreemptedResources(); } } long duration = getClock().getTime() - start; fsOpDurations.addPreemptCallDuration(duration); } [4e61120a4855d72923df811a38bfacee.png]: /images/20221120/00d0c81d65c2448cae547aead8ab0cbe.png [Screen-Shot-2018-06-05-at-12.12.17-PM.png]: /images/20221120/65e65b853edf4c9d99337f9ee13bcff0.png
相关 理解和解决Java中的并发饥饿问题 在Java中,并发饥饿问题主要是指多个线程同时访问资源,但由于某种原因(如调度算法不合理),导致某些线程总是得不到足够的运行时间,从而出现饥饿现象。 解决并发饥饿问题的方法主 ゝ一纸荒年。/ 2024年10月10日 19:18/ 0 赞/ 49 阅读
相关 理解并解决Java中并发控制的'饥饿问题' 在Java编程中,"饥饿问题"主要是指多线程环境下的一种资源竞争现象。 当一个线程因为某种原因长时间占用资源(比如CPU时间、锁等),而其他等待获取资源的线程却无法及时获得, 拼搏现实的明天。/ 2024年10月08日 07:57/ 0 赞/ 54 阅读
相关 理解和调试Java中的并发饥饿现象 在Java中,并发饥饿(Hungry Thread)是一种常见的问题,它出现在多个线程争抢同一个资源,而这个资源的容量有限,导致某些线程无法及时获取和使用,从而饿死。 要理解 悠悠/ 2024年10月08日 03:36/ 0 赞/ 61 阅读
相关 理解和解决Java中的并发饥饿问题 Java中的并发饥饿问题通常是指多个线程在同一资源上无限期等待,导致其他线程无法继续执行的情况。 解决饥饿问题的方法主要有以下几种: 1. **公平锁**:保证对资源的访问 亦凉/ 2024年10月07日 13:57/ 0 赞/ 58 阅读
相关 理解并解决Java并发中的CPU饥饿问题 CPU饥饿问题在Java并发编程中是一种常见的性能瓶颈现象。当一个线程(或者任务)长时间得不到CPU执行机会时,我们就说这个线程遇到了CPU饥饿。 解决CPU饥饿问题的方法主 绝地灬酷狼/ 2024年09月19日 04:48/ 0 赞/ 60 阅读
相关 理解和避免Java中的并发饥饿问题 Java中的并发饥饿问题是指多个线程在同一资源上持续等待,导致这些线程无法继续执行的问题。 理解和避免饥饿问题主要需要以下几个方面的考虑: 1. **公平调度**:Java 傷城~/ 2024年09月16日 01:15/ 0 赞/ 66 阅读
相关 理解和避免Java中的并发饥饿问题 Java中的并发饥饿问题主要发生在多线程环境下,多个线程对共享资源的争抢,如果处理不当,可能会导致某个线程一直等待资源,无法执行其他操作,这种情况就被称为饥饿。 理解和避免饥 绝地灬酷狼/ 2024年09月10日 16:00/ 0 赞/ 75 阅读
相关 深入理解 YarnFairSchedule 中的饥饿抢占 文章目录 饥饿 饥饿的两种类型 FairShare 饥饿 MinShare 饥饿 阳光穿透心脏的1/2处/ 2022年12月22日 15:23/ 0 赞/ 79 阅读
相关 非抢占式实时操作系统_操作系统中抢占式和非抢占式的区别 ![867cc7921a6f99a5c715b8855440065d.png][] 非抢占式实时操作系统 Here you will learn about differe 客官°小女子只卖身不卖艺/ 2022年12月07日 14:00/ 0 赞/ 220 阅读
还没有评论,来说两句吧...