Hive数据倾斜

本文遵循BY-SA版权协议,转载请附上原文出处链接。


本文作者: 黑伴白

本文链接: http://heibanbai.com.cn/posts/ea7c0c47/

数据倾斜问题剖析

数据倾斜是分布式系统不可避免的问题,任何分布式系统都有几率发生数据倾斜,但在平时有时感知并不是太明显,主要是和数据量有关系,如果数据量比较小,就算数据全部在一台服务器上,执行起来也是毫无压力的,所以感知不到,但如果数据量达到一个量级,一台服务器应对不了这么多的数据时,如果存在数据倾斜,那么就很难算出结果。

在map和reduce两个阶段中,最容易出现数据倾斜的是reduce阶段,因为map到reduce会经过shuffle阶段,在shuffle中默认会按照key进行hash,如果相同的key过多,那么hash的结果就是大量相同的key进入到同一个reduce中,导致数据倾斜。

但在map阶段也是有可能发生数据倾斜的。一个任务中,数据文件在进入map阶段之前会进行切分,默认是128M一个数据块,但是如果当对文件使用gzip压缩等不支持文件分割操作的压缩方式时,MR任务读取压缩后的文件时,是对它切分不了的,该压缩文件只会被一个任务读取,如果有一个超大的不可分割的压缩文件被一个map读取,就是发生map阶段的数据倾斜。

所以从本质上来说,发送数据倾斜的原因有两种:

  1. 任务中需要处理大量相同的key的数据
  2. 任务读取不可分割的大文件

数据倾斜解决方案

MapReduce和Spark中的数据倾斜解决方案原理都是类似的,以下讨论Hive使用MapReduce引擎引发的数据倾斜,Spark数据倾斜也可以此参照。

空值引发的数据倾斜

实际业务中有些大量的null值或者一些无意义的数据参与到计算作业中,表中有大量的null值,如果表之间进行join操作,就会有shuffle产生,这样所有的null值都会被分配到一个reduce中,必然产生数据倾斜。

问:如果A、B两表join操作,假如A中需要join的字段为null,但B中join的字段不为null,这两个字段根本就join不上,为什么会放到一个reduce中?

答:明确一个概念,数据放到一个reduce中不是因为字段能不能join上,而是因为shuffle阶段的hash操作,只要key的hash结果是一样的,他们就会被拉到同一个reduce中。

解决方案:

  • 不让null值参与join操作,即不让null值有shuffle阶段

    1
    2
    3
    4
    5
    6
    7
    8
    9
    SELECT *
    FROM log a
    JOIN users b
    ON a.user_id IS NOT NULL
    AND a.user_id = b.user_id
    UNION ALL
    SELECT *
    FROM log a
    WHERE a.user_id IS NULL;
  • 因为null值参与shuffle时的hash结果是一样的,那么我们可以给null值随机赋值,这样他们的hash结果就不一样了,就会进到不同的reduce中

    1
    2
    3
    4
    5
    6
    SELECT *
    FROM log a
    LEFT JOIN users b ON CASE
    WHEN a.user_id IS NULL THEN concat('hive_', rand())
    ELSE a.user_id
    END = b.user_id;

不同数据类型引发的数据倾斜

对于两个表join,表a中需要join的字段key为int,表b中key字段既有string类型也有int类型。当按照key进行两个表的join操作时,默认的Hash操作会按int型的id来进行分配,这样所有的string类型都被分配成同一个id,结果就是所有的string类型的字段进入到一个reduce中,引发数据倾斜。

解决方案:

如果key字段既有string类型也有int类型,默认的hash就都会按int类型来分配,那么我们直接把int类型转为string就好了,这样key字段都为string,hash时就按照string类型分配了:

1
2
3
SELECT *
FROM users a
LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string);

不可拆分大文件引发的数据倾斜

当集群的数量增长到一定规模,有些数据需要归档或者转储,这时候往往会对数据进行压缩;当对文件进行gzip压缩等不支持文件分割操作的压缩方式,在日后有作业涉及读取压缩后的文件时,该压缩文件只会被一个任务所读取。如果该压缩文件很大,则处理该文件的Map需要花费的时间会远多于读取普通文件的Map时间,该Map任务会成为作业运行的瓶颈。这种情况也就是Map读取文件的数据倾斜。

解决方案:

这种数据倾斜问题没有什么好的解决方案,只能将使用gzip压缩等不支持文件分割的文件转为bzip和zip等支持文件分割的压缩方式。

所以,我们在对文件进行压缩时,为避免因不可拆分大文件而引发数据读取的倾斜,在数据压缩的时候可以采用bzip2和zip等支持文件分割的压缩算法。

数据膨胀引发的数据倾斜

在多为聚合计算时,如果分组聚合的字段过多,如下:

1
2
3
select a, b, c, count(1)
from log
group by a, b, c with rollup;

with rollup是用来在分组统计数据的基础上再进行统计汇总,即用来得到group by的汇总信息

如果上面的log表的数据量很大,并且Map端的聚合不能很好的起到数据压缩的情况下,会导致Map端产生的数据急速膨胀,正在情况容易导致作业内存溢出的异常。如果log表含有数据倾斜key,会加剧shuffle过程的数据倾斜。

解决方案:

可以拆分上面的sql,将with rollup拆分成如下几个sql:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
SELECT a, b, c, COUNT(1)
FROM log
GROUP BY a, b, c;

SELECT a, b, NULL, COUNT(1)
FROM log
GROUP BY a, b;

SELECT a, NULL, NULL, COUNT(1)
FROM log
GROUP BY a;

SELECT NULL, NULL, NULL, COUNT(1)
FROM log;

但是,上面这种方式不太好,因为现在是对3个字段进行分组聚合,那如果是5个或者10个字段呢,那么需要拆解的SQL语句会更多。

在Hive中可以通过参数 hive.new.job.grouping.set.cardinality 配置的方式自动控制作业的拆解,该参数默认值是30。表示针对grouping sets/rollups/cubes这类多维聚合的操作,如果最后拆解的键组合大于该值,会启用新的任务去处理大于该值之外的组合。如果在处理数据时,某个分组聚合的列有较大的倾斜,可以适当调小该值。


蚂蚁🐜再小也是肉🥩!


Hive数据倾斜
http://heibanbai.com.cn/posts/ea7c0c47/
作者
黑伴白
发布于
2022年5月31日
许可协议

“您的支持,我的动力!觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭”

微信二维码

微信支付

支付宝二维码

支付宝支付