在数据分布在不同机器的场景下,join 计算一直是一个难点,因为 join 涉及到大量数据传输。从已有的 join 算法来看,如何减少数据传输是加快 join 的核心。sort merge join,hash join和broadcast join是目前常用的 join 方法。 这些算法适用于不同的场景,能很大程度上加快 join 的速度。但是对于列多的大表情况下,生成中间表并持久化就显得力不从心。
场景描述
需要对两张行和列都很多的表做 join ,并将 join 表持久化,生成一张中间表。下面用 A 表,B 表和 A-B 表来说明,其中A 表是10亿行200列的事实表,和 B 表是2亿行300列的维度表,A-B 为最终的left join 后的持久化表
解决方式
直接 join
通过 spark sql 使用默认的 sort merge join 算法进行两张大表的 join。这种方法实现起来非常简单,但是问题也很多。首先 A 表和 B 表的全部数据都进行shuffle,容易造成 executor 的 OOM和心跳超时 。此外如果进行多表 join ,结果表持久化到磁盘,会占用巨大的磁盘空间。
索引方法一
为了解决直接 join 带来的两个问题,一是 shuffle 数据量过大,二是占用大量磁盘空间。我们必须从数据本身入手,首先我们发现由于A-B 表是由于 A left join B 表得到的,那么 A-B 表中的 A 表部分的数据和 A 表自身是完全一样的。那么A-B join 表如果只存 B 表的数据,最终数据与 A 表原始数据进行行号对齐,那么可以省去存储 A-B 表中的 A 表数据,同时也减少了 A 表的shuffle操作 为了实现上述改进方法,首先需要给 A 增加一个索引或者叫标记列,这并不是真正意义上的索引,该索引列的用途是使A-B join 的顺序与 A 表原始数据一致,这样就可以通过行号让A-B 表中 B 表数据与 A 表原始数据对齐,从而二次利用A 表的原始数据。同时在计算过程中, A 表实际参与计算只有索引和关联列两列,shuffle 数据量也大大减少。 这么做也带来了另外一个问题。为了让 A-B的结果与 A 表数据一直,在索引列上又进行了一次 shuffle。最终 A-B 表持久化的数据中,只需存储关联后 B 表的数据,而不存 A 表。这是一个非常有效的方法,能够有效减少计算和持久化的数据量。该方法可以通过 spark sql 的 join 方法来实现,实现上稍微有些麻烦。
索引方法二
索引方法一并没有彻底解决问题,还是存在一个弊端。B 表的数据是全部参与 join 计算的,并且在关联后被持久化了。那么B 表很大的情况下,例如当前例子中的 B 表,2亿300列的数据,这依然是难以忽略的数据量。在做 join 计算和持久化的过程中,B 表的数据量会严重影响最后成功与否。 依据索引方法一增加索引列的方法,很容易想到给 B 表也增加索引替代原始数据。这里的索引是真正意义上的索引,最终需要通过索引来查找 B 表的实际数据,进而将最终的 A-B join 表落盘持久化。这里没有减少磁盘占用量,只是减少了 shuffle 数量,加快了join 的计算 实现该方法并不复杂,而且生成join 的速度有明显改善。可是,该方法并不实用。下面详细说一下该方法的问题 如果B 表的数据分布在不同机器的情况下,极端一点的情况,依据索引生成 A-B表的时候,可能会把 B 表全部数据加载到一台机器。将设我们在某一个计算节点需要进行 A-B的持久化,该节点有着 A-B 表中的第一块数据。我们首先取出第一行数据,顺利拿到其中 B 表的索引。有了该索引,我们进而可以获得 B表这一行的数据。可问题就在这里,如果这一行数据是被存在集群中其它机器上的话怎么办呢?我们会将该行数据拉取到计算节点来,而分布式数据多是以块为单位,所以我们需要把包含第一行数据的全部数据块拉到本机来。那如果A-B 表第二行的索引指向的数据又在集群中其它某台计算上,我们又需要拉取一块过来。以此类推,计算节点可能会加载全部 B 表数据,而且无法丢弃,因为可能下一行很快就用到该块数据。实际情况是一台机器很难容的下这么大的数据,而且这么多数据都要走网络,耗时可想而知。虽然场景有点极端,但是这个情况基本宣判该办法的死亡。这里说下为什么一定要把数据块加载到本机,直接远程RPC读取数据可以吗?理论上是可以的,但是实际情况是不可以的。因为这里的索引是一个位置索引。位置索引意味着数据流要支持很好 seek 操作,而实际情况就是通过 RPC 调用来指定位置读取速度极其缓慢。建立连接不说,远端的服务甚至需要频繁的在数据块中频繁切换,这些操作都异常耗时。因此将数据加载到本地,跳过底层存储直接进行系统级别的mmap是最佳的选择。
索引方法二改善
既然计算节点无法存的下这么多的数据,那么可以想办法减少数据。因此肯定会想到按需取数。的确按列生成是一个不错的办法。需要 A-B 中 B 某些字段,那么就只取这几个字段持久化。而且按列计算的话,一列计算完成,就可以丢掉该列数据的临时数据。由于是按需取数,那么必须提取知道分析用到哪些列才可以。因此这个改善需要产品功能的支持才行。
最终方法
很不巧我们的产品觉得,让用户来指定计算列很不友好。那么不指定生成列,转而在分析过程,第一次拿到分析列的时候,实时计算关联。产品有觉得这样速度太慢。总之就是必须在导入数据的时候,把关联表给生成完成。 大表 join 优化