SparkSQL Join优化笔记

Spark SQL 数据Join优化笔记

问题描述

从开始定位SparkOutOfMemoryError这一异常之前,就有两个问题困扰着我。一个就是有 Spill 机制的存在,为什么还会 OOM 呢?这个问题在之前的文字中已经解释了根本原因。还有一个问题是数据量不大的情况为什么会进行 sort merge join 呢?这样我在小数据量的情况下,shuffle 操作带来了很多不必要的损耗。

代码分析

在SparkStrategy中的JoinSelection类中找到将关联 logical 转换成具体 Sort merge join的代码

  case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if RowOrdering.isOrderable(leftKeys) =>
        joins.SortMergeJoinExec(
          leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil

有疑问的代码并不在这里,而是下面这部分代码

      case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
        if canBroadcastBySizes(joinType, left, right) =>
        val buildSide = broadcastSideBySizes(joinType, left, right)
        Seq(joins.BroadcastHashJoinExec(
          leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))

其中最为关键的是 canBroadcastBySizes(joinType, left, right)这个调用返回的是 false!首先我数据量并不大,设置可以说是非常小,这里竟然返回是 false!

接着往这个函数内部跟踪,发现我们提供的 RDD 的统计数据的计算竟然是 Int 最大值,换句话说我的数据量非常大。为什么 spark 采用 sort merge join 的原因找到了,但是为什么我们提供的 RDD 的统计值这么大呢?

这里我们要从 Dataset 的构造开始找起,由于我们的数据是在内存中的,而且有一个可用RDD 封装,于是我们调用的构造函数是

Dataset<Row> data = sparkSession.createDataFrame(rdd, schema);

因为传入 dataset 中的数据和操作,最后都会被转换为Logical Plan,包括 SQL 语句(当然最终构造会成为QueryExecution传入 Dataset 构造函数)。那么我的跟踪目标就是锁定在了 RDD 转换的 Logical Plan 上面。于是发现了一个 Logical RDD 对于统计数据计算的实现。

override def computeStats(): Statistics = Statistics(
    sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
  )

也就是说 RDD 获得的 Logical Plan 的统计值估算的是一个定值。那么当我用这个 Logical Plan 做 join 操作,并转换成为一个 Spark Plan 时,必然会被转换成为sort merge join 了。 在查看 dataset 构造方法并确认了相关函数后,我们采用了下面的构造方法。

Dataset<Row> data = sparkSession.createDataFrame(sparkRowList, schema);

函数名是一样的,但是第一个参数sparkRowList,是原始数据,不再是 RDD。这个对于的 logical plan 是 LocalRelation,其computeStats方法是依据数据量来计算的,join 也会顺利的采用了BroadcastHashJoin。

总结

Spark 内部逻辑复杂,一个参数类型的不同就会带来截然不同的性能。具我同事所说,前期也遇到过类似问题,同一个方法,参数数据本质一样,只是改变参数类型,效果完全不同。因此可见深入了解 Spark 内部实现的必要性。

最近的文章

SparkSQL Adaptive Execution详解

SparkSQL Adaptive Execution详解原有问题在使用 Spark SQL 时,会通过 spark.sql.shuffle.partitions 指定 Shuffle 时 Partition 个数。通过数据量的大小,估计一个合理的Partition 个数。Partition大小的最佳实际是保持一块100M 左右(Adaptive Execution 功能中块大小默认64M),所以一般会用表的数据量均量和单块大小相除,得到一个Partition 的数量值,在这个基础上微调。...…

继续阅读
更早的文章

Spark SparkOutOfMemoryError Unable to acquire异常

异常信息如下Caused by: org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 28 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157) at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryCo...…

继续阅读