Hive数据倾斜
数据倾斜问题剖析
数据倾斜是分布式系统不可避免的问题,任何分布式系统都有几率发生数据倾斜,但在平时有时感知并不是太明显,主要是和数据量有关系,如果数据量比较小,就算数据全部在一台服务器上,执行起来也是毫无压力的,所以感知不到,但如果数据量达到一个量级,一台服务器应对不了这么多的数据时,如果存在数据倾斜,那么就很难算出结果。
在map和reduce两个阶段中,最容易出现数据倾斜的是reduce阶段,因为map到reduce会经过shuffle阶段,在shuffle中默认会按照key进行hash,如果相同的key过多,那么hash的结果就是大量相同的key进入到同一个reduce中,导致数据倾斜。
但在map阶段也是有可能发生数据倾斜的。一个任务中,数据文件在进入map阶段之前会进行切分,默认是128M一个数据块,但是如果当对文件使用gzip压缩等不支持文件分割操作的压缩方式时,MR任务读取压缩后的文件时,是对它切分不了的,该压缩文件只会被一个任务读取,如果有一个超大的不可分割的压缩文件被一个map读取,就是发生map阶段的数据倾斜。
所以从本质上来说,发送数据倾斜的原因有两种:
- 任务中需要处理大量相同的key的数据
- 任务读取不可分割的大文件
数据倾斜解决方案
MapReduce和Spark中的数据倾斜解决方案原理都是类似的,以下讨论Hive使用MapReduce引擎引发的数据倾斜,Spark数据倾斜也可以此参照。
空值引发的数据倾斜
实际业务中有些大量的null值或者一些无意义的数据参与到计算作业中,表中有大量的null值,如果表之间进行join操作,就会有shuffle产生,这样所有的null值都会被分配到一个reduce中,必然产生数据倾斜。
问:如果A、B两表join操作,假如A中需要join的字段为null,但B中join的字段不为null,这两个字段根本就join不上,为什么会放到一个reduce中?
答:明确一个概念,数据放到一个reduce中不是因为字段能不能join上,而是因为shuffle阶段的hash操作,只要key的hash结果是一样的,他们就会被拉到同一个reduce中。
解决方案:
不让null值参与join操作,即不让null值有shuffle阶段
1
2
3
4
5
6
7
8
9SELECT *
FROM log a
JOIN users b
ON a.user_id IS NOT NULL
AND a.user_id = b.user_id
UNION ALL
SELECT *
FROM log a
WHERE a.user_id IS NULL;因为null值参与shuffle时的hash结果是一样的,那么我们可以给null值随机赋值,这样他们的hash结果就不一样了,就会进到不同的reduce中
1
2
3
4
5
6SELECT *
FROM log a
LEFT JOIN users b ON CASE
WHEN a.user_id IS NULL THEN concat('hive_', rand())
ELSE a.user_id
END = b.user_id;
不同数据类型引发的数据倾斜
对于两个表join,表a中需要join的字段key为int,表b中key字段既有string类型也有int类型。当按照key进行两个表的join操作时,默认的Hash操作会按int型的id来进行分配,这样所有的string类型都被分配成同一个id,结果就是所有的string类型的字段进入到一个reduce中,引发数据倾斜。
解决方案:
如果key字段既有string类型也有int类型,默认的hash就都会按int类型来分配,那么我们直接把int类型转为string就好了,这样key字段都为string,hash时就按照string类型分配了:
1 |
|
不可拆分大文件引发的数据倾斜
当集群的数量增长到一定规模,有些数据需要归档或者转储,这时候往往会对数据进行压缩;当对文件进行gzip压缩等不支持文件分割操作的压缩方式,在日后有作业涉及读取压缩后的文件时,该压缩文件只会被一个任务所读取。如果该压缩文件很大,则处理该文件的Map需要花费的时间会远多于读取普通文件的Map时间,该Map任务会成为作业运行的瓶颈。这种情况也就是Map读取文件的数据倾斜。
解决方案:
这种数据倾斜问题没有什么好的解决方案,只能将使用gzip压缩等不支持文件分割的文件转为bzip和zip等支持文件分割的压缩方式。
所以,我们在对文件进行压缩时,为避免因不可拆分大文件而引发数据读取的倾斜,在数据压缩的时候可以采用bzip2和zip等支持文件分割的压缩算法。
数据膨胀引发的数据倾斜
在多为聚合计算时,如果分组聚合的字段过多,如下:
1 |
|
with rollup是用来在分组统计数据的基础上再进行统计汇总,即用来得到group by的汇总信息
如果上面的log表的数据量很大,并且Map端的聚合不能很好的起到数据压缩的情况下,会导致Map端产生的数据急速膨胀,正在情况容易导致作业内存溢出的异常。如果log表含有数据倾斜key,会加剧shuffle过程的数据倾斜。
解决方案:
可以拆分上面的sql,将with rollup
拆分成如下几个sql:
1 |
|
但是,上面这种方式不太好,因为现在是对3个字段进行分组聚合,那如果是5个或者10个字段呢,那么需要拆解的SQL语句会更多。
在Hive中可以通过参数 hive.new.job.grouping.set.cardinality
配置的方式自动控制作业的拆解,该参数默认值是30。表示针对grouping sets/rollups/cubes这类多维聚合的操作,如果最后拆解的键组合大于该值,会启用新的任务去处理大于该值之外的组合。如果在处理数据时,某个分组聚合的列有较大的倾斜,可以适当调小该值。
蚂蚁🐜再小也是肉🥩!
“您的支持,我的动力!觉得不错的话,给点打赏吧 ୧(๑•̀⌄•́๑)૭”
微信支付
支付宝支付