Spark 的 Capacity 调度策略实现

Standalone 调度

在 standalone模式下面,也就是 spark 自身能够支持两周调度策略

  • FIFO 这个是默认策略,Job 谁先提交谁的优先级高
  • FAIR 这个策略可以划分资源 Pool,不同的 Pool 有不同的优先级,可以指定 Job 提交到相应的 Pool 中。

目前依靠 Spark 自身提供的调度策略,无法解决长任务对于短任务的阻塞问题。我们知道 CPU 在调度的时候,都会优先运行短的任务,因为这样等待的总时间是少的。同样的道理,我们希望在 Spark 上面也是如此,不希望一个长时间执行的任务,例如数据的 ETL 计算占据 Spark Cluster 的全部资源,导致一个简单的查询都迟迟得不到结果。 导致调度存在问题的有以下几点原因

  • Spark 的调度,包括 Yarn 和 Mesos 都是在一个 Task 结束后,有资源释放出来之后,调度才会起作用,不存在挂起任务的操作。
  • 一个 JVM 只能启动一个 Spark Context。也就是一个应用只能对应一个Spark 的 App。

因此如果长任务先于短任务提交,短任务必然会被阻塞住。

Capacity 调度

由于上述的原因,长任务必然会对短任务的阻塞。既然分配了资源后,没有办法主动收回,那么为什么不限制分配的资源呢,也就是 Capacity 调度策略。

  • 将整个分布式系统的计算资源进行切割,并分配到不同的资源池中。
  • 每个资源池是固定大小,相互不能借用。
  • 一个计算任务必须指定提交的资源池。对应资源池没有资源后,任务便会等待。

这里有个小小的问题,关于资源池大小固定,这一点比如导致资源的严重浪费。例如,夜晚完全没有任务短计算任务,那么短任务的资源池就空置浪费了。为什么不能借用短任务的资源池呢?主要原因还是必须优先短任务。一旦资源借用出去,就出现无法收回的情况了,只能苦苦等待任务主动还回资源。当然有优化的办法,在下面动态 Capacity 中详解。

在 Yarn 中直接支持 Capacity 调度策略。不过 Yarn 的 Capacity 策略更多是为了多租户的情况下资源分配的问题。目前 Spider 引擎只会在Spark Cluster创建单个 App,因此我们不需要面对多租户的资源问题。

简单实现

既然任务提交之后便没有办法控制任务的执行,那么只能在提交的时候做文章。在[Spark Scheduler in Standalone Mode 分析]一文中分析了一个任务从 RDD 到 Executor上面执行的过程。

Capacity 策略是在 FAIR基础上扩展而来的,在 FAIR 基础上增加一个对于 Cores 的控制,和 TaskCore的限制。例如,下面的配置中,会指定每个Pool的cores,当已经提交的任务占满当前的配置值的话,其它任务就会处于等待状态。而 TaskCores 是保证在资源池中的一个 TaskSet 最多能够获的资源。

<allocations>
    <pool name="Long">
        <minShare>2</minShare>
        <weight>1</weight>
        <schedulingMode>FIFO</schedulingMode>
        <cores>2</cores>
        <taskCores>2</taskCores>
    </pool>
    <pool name="Short">
        <minShare>3</minShare>
        <weight>1</weight>
        <schedulingMode>FIFO</schedulingMode>
        <cores>1</cores>
        <taskCores>1</taskCores>
</allocations>

动态 Capacity

正如上面说指出的。夜晚没有用户访问,把短任务的资源池借给长任务是一种合理并且高效的做法。但是问题在于,借出去的资源没法主动收回,甚至影响到短任务的执行。 针对这个问题,我们做了以下优化。

  • 依然保持资源池的容量是一个定值,并增加最低容量。
  • Spark driver 端提供一个功能接口,可以随时改变资源池容量设置,但是必须大于最低容量。
  • 增加资源池任务执行监控。

资源池任务监控

  • 只要资源池中有任务,就认为资源池处于工作状态,
  • 资源池中没有任务,就认为资源池是空闲的。
  • 资源池中有任务等待,那么认为资源池处于满载状态
  • 如果资源池保持了N时间的空闲,那么认为任务资源池可用。
  • 资源池保持可用状态越久,那么可提供的资源越多。
  • 一旦资源池变为工作状态,立刻将资源池是大小恢复。

这里依然没有办法让占用的资源立刻被回收,也没有办法做到充分的资源利用。只能在大量浪费系统资源与短任务快速计算中间进行一个折中的选择。

最近的文章

分布式锁的问题

在review导入表数据和结构处的重构时候,看到下面的逻辑,偶然发现了一个问题。 表A加写锁,加锁失败就等待。 写入数据,涉及大量的数据导入 释放锁和异常处理一个正常的锁,但是当时看到一个很老的注释,大概意思是尝试捕获OOM的异常,将表删除和锁释放掉。于是发现这里如果因为GC等待时间过程,被Server认为锁失效了那岂不是锁非常不安全了?而这是带有超时时间锁的通病。查找了一下资料发现这个问题很早就讨论过。首先简单解释一下通用的分布式锁模型。 Client A 向锁服务提供方(Ser...…

继续阅读
更早的文章

Spark Scheduler in Standalone Mode 分析

文中代码基于Spark 版本2.3.0目前Spark 在 Standalone Mode 下,所支持的 Task 调度策略有 FIFO 和 FAIR 两种策略。RDD 执行流程在介绍Capacity Scheduler之前,粗略说一下 RDD 如何执行的。 RDD 对象中包括 dependency 属性,表明当前 RDD 与父 RDD 的依赖关系。 DAGScheduler 利用dependency依赖关系将 RDD 按照宽依赖,也就是需要进行 shuffle的依赖作为边界,划分成为一...…

继续阅读