Spark 窗口函数内存溢出

[流水]Spark 窗口函数内存溢出

场景

系统的 ETL 中有步操作,是给原表增加一列排序内容。利用spark rank 函数。实际计算过程发现,数据量大的情况,稳定会出现内存溢出。

内存溢出代码

根据上面溢出位置可以发现一下代码

内存溢出代码 其中Utils.copyStream(input, out) 的注释也明确说明这里有内存溢出的风险。下面我们分析一下这里为什么会内存溢出。

问题分析

这里不去深究窗口函数的算法细节(详见:窗口函数计算)。这里我们将重点放在对于内存溢出的内部原因和如何快速定位问题。

分析溢出处逻辑

从上图中的代码中,可以看出这块属于一个环境检查,检查数据是否完整。而在检查的过程中,放入内存的数据量过大,从而导致内存溢出。到这里我们可以得到一个有用信息,获取 shuffle 后的数据过大,导致内存溢出

顺流而上

得知 shuffle 数据过大从而导致内存溢出后。我们就开始寻找导致 shuffle 的文件数过大的原因。走到这里,如果想继续下去,您必须要对 shuffle 的算法逻辑有充分的认识,才能够得出一些合理的猜想。shuffle操作的目的,在于将数据按照一定都逻辑重新排列,这里的排列逻辑多是将同一类数据聚集到某一块中。后续的计算会通过 shuffle fetch 对象,去每台机器上获取自身所属于的块数据。我们出错的地方是在取块数据的地方,而错误原因是由于给的块数据过大。那么很容易联想到是不是 shuffle 的时候,算出来的块太大了? 通过简单的调试,发现 shuffleMapTask 任务中的 partition数量为1,这意味着子节点N 块数据会被shuffle成为1块数据,至少在 shuffle fetch阶段会占据的内存大小为,子节点 N 块数据中最大块的大小。 这里可以断定问题的原因是由于 shuffle 数量太小导致的。

乘胜追击

顺着代码继续调试,就会发现 shuffle 数量为1的原因,调试过程不多赘述。下图中的就是导致后续 shuffle 数量为1的地方。而问题的关键就是这个 AllTruples这个意思是全部的行数据。也就是说这个 etl 操作落到 spark 计算逻辑上是全部的表数据,而全部的表数据带来的问题就是 shuffle 的输出数量为1。 内存溢出代码

补充

这里又有个小地方需要知晓。就是 spark sql 在生成 spark plan 的时候,需要进行一个数据块的保证工作,说白了就是上一步计算完的数据块数据量,满不满足我自身计算的要求(详解 spark#Distribution )。如果上一步计算结果不满足我的数据要求,就进行一步 shuffle 计算。 内存溢出代码 而上图中就是进行判断的地方。这里就是当窗口函数没有 partition by 的情况下,对于块的要求数据是1,因此这里会增加一个 shuffle 操作,确保进行窗口函数计算前,数据块为1.

解决

  • 换个算法实现 rank功能
  • 不做数据验证

总结

对于 Spark 来说,数据优先放在内存中计算。那么内存的控制至关重要,一不小心就会导致内存溢出。Spark 对于内存的控制异常优秀,能够很好的避免内存溢出的情况。 这里问题并不是窗口函数的问题,而是shuffle输出的 partition 过小,后续计算取 shuffle 后结果,做验证的数据又全部加载内存,导致内存溢出。 从解决顺序上来看,对于整体计算结构的熟悉程度至关重要。没有整理的了解很难有一个明确的方向,更不要说进行调试并确认问题呢。

最近的文章

Spark 整体感知

为了能够深入使用 Spark ,那么必须对 Spark 有更为深入的理解。整体的结构的把握会显得非常重要。整体结构在脑海中成型后,别如同脑海中有了一个 Spark 的地图,后续深入了解的每个模块都能在整体上找到对应的位置,开发对应功能也会更加了然于胸。网上的结构图非常丰富,但是我觉得都太过抽象,缺少一些细节。我这里尝试做的就是帮助 Spark 新手读者能构建一个 Spark 的直观感受,同时也是我自己知识的一个整理。小Demo下面是一段简单的 demo 程序。首先从数据源中取数,这里是从文...…

继续阅读
更早的文章

Spark的 Shuffle 计算

Spark的 Shuffle 计算概述由于分布式计算是数据分块,多机器同时计算的架构。但是当一个计算依赖的数据不是某一块数据,而是全局数据的时候。这类计算变难以分块计算,例如 join 操作。为了充分发挥集群机器的算力,能够并行计算。我们就会在这些依赖全局数据的计算前,增加一个 shuffle 操作。目的是按照某种逻辑将同一类数据汇集到一块中,这样变将原有无法并行计算的结构,转为可以并行计算的逻辑了。Spark 中同样如此。所有需要进行 shuffle 操作的算子,都会加入到一个叫Shuf...…

继续阅读