自定义SparkSQL数据源

问题描述

需要对两张行和列都很多的表做 join ,并将 join 表持久化,生成一张中间表。下面用 A 表,B 表和 A-B 表来说明,其中A 表是10亿行200列的事实表,和 B 表是2亿行300列的维度表,A-B 为最终的left join 后持久化的中间表 首先我们来梳理一下问题

  • A,B表数据行多列多,而且A,B数据分布在不同的机器
  • A-B表数据太多,而且 A-B表的 A 部分数据与 A 表原始数据相同,可以省去。
  • shuffle 过程中数据量太多,内存压力大。而且为了不重复存储 A 数据,需要进行第二次 shuffle

实现

为了更好的说明,我们先简化场景。假设我们的事实表 (A 表)和维度表(B表) 为下图所示,其中 A 表已经去除了其它列,只留了关联列。数据分为两块,并且在两台不同的机器

首先,我先将 A 表的关联字段 boardcast 到全部 B 表所在的机器上。

接着,将A 表的关联列全部数据与 B 表的一个 partition 数据,进行join 操作。我们可以得到下图所示的数据

这个时候,我们已经拿到了最终 join 的结果,只是 B 表的数据还是分散的。下面我们准备将分散合并起来。如下图所示,将最终同在一个 parition 内的 B 表数据划分为一组,准备做合并操作

从上图可以发现,合并操作只需要简单的顺序读取即可。具体步骤如下

  1. 初始化每个 B 表的 partition 的 reader
  2. 将 merge 操作分配到数据存储节点上
  3. 读取第一个 partition 数据的行数据
  4. 如果是空,那么读取下一个 partition 的一行数据
  5. 如果不是空,那么持久化数据,并且终止当前行读取,跳到下一行读取,重复步骤3 按照上述算法步骤,即可得到最终A-B表的结果

总结

此方法是受Broadcast join 算法启发。其中维度和事实表很大是一个重要的前提,否则利用 spark 提供的join 同样可以很好的得出结果。

该方法有如下优点

  • 网络开销低。不需要 shuffle 计算,也不要拉取其它多余数据。
  • 在做 join的过程中,数据都在本地,支持 seek 操作。
  • merge 的过程中,数据流不需要seek 操作
最近的文章

Spark driver 调用 toLocalIterator 方法导致 OOM分析

Spark driver 调用 toLocalIterator 方法导致 OOM分析目前由于计算结构的问题,导致 spark 的计算结果需要返回 driver 进行落盘,导致数据量一大,很容易导致 OOM。下面分析一下问题的原因和处理办法。首先看一下toLocalIterator 的代码实现。 /** * Return an iterator that contains all of the elements in this RDD. * * The iterator will...…

继续阅读
更早的文章

SparkSQL做Join问题汇总

问题描述需要对两张行和列都很多的表做 join ,并将 join 表持久化,生成一张中间表。下面用 A 表,B 表和 A-B 表来说明,其中A 表是10亿行200列的事实表,和 B 表是2亿行300列的维度表,A-B 为最终的left join 后持久化的中间表首先我们来梳理一下问题 A,B表数据行多列多,而且A,B数据分布在不同的机器 A-B表数据太多,而且 A-B表的 A 部分数据与 A 表原始数据相同,可以省去。 shuffle 过程中数据量太多,内存压力大。而且为了不重复存储...…

继续阅读