Spark BroadcastHashJoin Timeout异常简单分析笔记

异常信息如下

java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
	at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)

这个问题在很久之前也出现过,但是当时并没有把超时当回事,因为这个出现的时候发生了OOM的异常,于是简单认为这里的超时是因为GC导致的。直到一个客户频繁发生超时异常,并且无OOM发生,于是感觉这个问题不是这么简单的。

问题分析

处理这样的问题必须深入代码了解具体做什么。因为这里BroadcastHashJoin的中异步计算抛出的异常,而其中的计算是对于小表进行broadcast时候超时的。 在relationFuture的实现中,发现了下面的代码

      val beforeCollect = System.nanoTime()
          // Use executeCollect/executeCollectIterator to avoid conversion to Scala types
          val (numRows, input) = child.executeCollectIterator()
          if (numRows >= 512000000) {
            throw new SparkException(
              s"Cannot broadcast the table with more than 512 millions rows: $numRows rows")
          }
          

代码中的child.executeCollectIterator()的非常扎眼,原来这里还有一次需要提交任务的计算。那如果此时spark系统繁忙的话,此任务迟迟得不到计算资源,因此非常容易超时。

解决方案

超时的操作是发生在join执行计划的构造过程中,也就是实际的join计算还没有发生呢。同时BroadcastHashJoin对于小表本身就是有大小限制的,因此可以理解为什么这里会设置一个超时限制。但是对于系统繁忙的时候这里存在一定超时的可能性。 因此最简单处理方式就是直接增加超时的时间,让这里的任务有足够时间去获得计算资源并完成。

最近的文章

Map中的Hash问题

今天是祖国70大庆,没有出游而是选择了工作。因为这个里面涉及到一个接下来工作的核心,用Int替代String作为Map的key。我必须要充分的论证。关于Hash的问题一直萦绕在我心头。在数据库计算,所有关乎Map的计算都会是一个耗时的计算,比如分组比如HashJoin等。因此可以肯定得说想加快计算速度,首先得搞定Map。所有Map中理论速度最快必然是Hash Map,get操作时间为产量时间。所以Hash Map必然是首选。Hash Map分组查询这里不深究其实现,只关注于其使用。Equa...…

继续阅读
更早的文章

编码 笔记

端午节假期宅在家中闲来无事,翻出了《编码》这本书。里面偏软件的内容我略过没看,但是偏向硬件的真是解答了我的很多疑惑。逻辑门高中应该是数学课上学过简单的逻辑运算,那个时候就曾疑惑过,逻辑门凭什么可以在输出端为0的情况下输出端为1?是怎样的黑魔法可以在断开的情况下还能让灯泡亮呢?这个疑惑就如同发现永动机,明知不对还不得不相信就是这样。当时老师给的答案一如既往的简洁,规定就是这样。这个答案是正确的,逻辑上的确如此,可是我更像知道现实中是如何做到这看似不可能的事情,而这本书告诉了我真实的实现方式。...…

继续阅读