Spark SparkOutOfMemoryError Unable to acquire异常

异常信息如下

Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of memory, got 0
	at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
	at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:119)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:383)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:407)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:135)

这是一个内存溢出的问题,但不是严格意义上 JVM 无法分配的内存不足,而是 Spark 计算得出的内存不足。 目前在一个 shuffle 场景会遇到这个异常。具体场景如下,数据经过shuffle 后的partition很多,然后后续调用了一个剧烈的Coalesce操作,比如Coalesce(1)。

解决这个问题需要对UnsafeExternalSorter和Spark内存管理有一定了解。我们知道UnsafeExternalSorter对象会在内存不足的时候主动 spill 数据到磁盘来释放内存,但是在这里仿佛失效了。实际上 spill 操作并没有失效,而是由于 spark 避免其它调用者对数据操作,而保留了一个内存页。

public long spill() throws IOException {
      synchronized (this) {
       ...
          for (MemoryBlock page : allocatedPages) {
            if (!loaded || page.pageNumber !=
                    ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
              released += page.size();
              freePage(page);
            } else {
              lastPage = page;
            }
          }
          allocatedPages.clear();
        }
        ...
      }
    }

可以看到在 spill 操作中,如果当前数据已经加载同时当前页号和 iterator 的数据来源页号是一样,那么就将这个页设为最后页,这个页并不会释放。 在UnsafeExternalSorter对象构造函数中注册了一个 clean 方法

// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
taskContext.addTaskCompletionListener(context -> {
  cleanupResources();
});

这意味着如果不主动调用 cleanupResource 方法,那么sort对象中的最后保留的 page 内存页将不会被释放。

没有Coalesce操作的时候,每个 partition 是一个task来计算,此时内存并不会有很大问题,假设 JVM 中32个 Cores 在计算, Page 默认大小为128M的情况下,内存损耗顶峰也不到4G,这个数量并不算多。但是Coalesce(1)彻底不同了,会让 N 块 partitions 放到一个 Task 中来计算。那导致内存顶峰大小为 N*128M,而 shuffle 中的 N 值默认的情况为200,实际可能更大,那么就很容易内存溢出了。

问题解决

首先从计算逻辑上分析,Coalesce(1)这个操作是否为必要操作?如果不是,那么尽快减少使用,Coalesce不会把数据块做 shuffle 操作,但是会让数据块在一个 Task 执行,同时后续的计算成为单块的任务了。 这里使用Coalesce(1)是因为数据量不大,但是 shuffle 后数据量太多,导致 task 计算次数过多,所有使用 Coalesce 将数据拉成1块,但是没想到还有这么一茬。目前我们去掉了Coalesce,并从 join 逻辑上面改变 join 方式,彻底解决了这个问题。

最后,我们可以通过下面的配置方法来解决,但是还是存在性能损耗,并且不能根治

  1. 减小 Page 默认大小,参数为 spark.buffer.pageSize,比如设为32M,这样减少内存峰值。
  2. 减少 partition 数量,比如 shuffle 的数量,或者从数据源入手减少数量等等。
最近的文章

SparkSQL Join优化笔记

Spark SQL 数据Join优化笔记问题描述从开始定位SparkOutOfMemoryError这一异常之前,就有两个问题困扰着我。一个就是有 Spill 机制的存在,为什么还会 OOM 呢?这个问题在之前的文字中已经解释了根本原因。还有一个问题是数据量不大的情况为什么会进行 sort merge join 呢?这样我在小数据量的情况下,shuffle 操作带来了很多不必要的损耗。代码分析在SparkStrategy中的JoinSelection类中找到将关联 logical 转换成...…

继续阅读
更早的文章

Alluxio异常排查二

Alluxio异常排查二接着上篇,保险客户的问题。问题分析负责这个客户的研发,突然有一天找我说,客户卡死的问题需要看一下。好在客户可以 dump 线程状态出来的。从客户那边 dump 了5个线程快照,每个间隔2秒。可是从 dump 的线程状态上,并没有发现有卡死的线程存在,每个线程做的事情在下一个 dump 文件都会变化。无从下手,于是我再次尝试还原问题,可是这次没有成功。尝试多次都以失败告终。但是,有一点可以确定的,肯定还是线程锁定问题,因为从 dump 中看到快照中多数线程在等待锁。因...…

继续阅读