目前hive的版本支持multi-distinct的特性,这个在用起来比较方便,但是在此特性下面无法开启防数据倾斜的开关(set hive.groupby.skewindata=true),防止数据倾斜的参数只在单distinct情况下会通过一个job来防止数据的倾斜。multi-distinct使用起来方便的同时也可能会带来性能的不优化,如日志中常常统计pv,Uv,独立ip数,独立session数,这些都要去重统计,如下面统计各个浏览器占比的SQL,这个sql可能需要运行20到30分钟(这个和集群和日志数据量相关),browser_core只有10个数值,其reduce压力很大,优化后会有50%-70%的提升
1. 原SQL
Select
browser_core,
count(1) as pv,
count(distinct uniq_id) as uv,
count(distinct client_ip) as ip_cnt,
count(distinct session_id) as session_cnt,
count(distinct apay_aid) as apay_aid_cnt,
count(distinct apay_uid) as apay_uid_cnt
From dw_log
wheredt=20120101
andpage_type='page'
andagent is not null
andagent <> '-'
group bybrowser_core
2. 改进SQL如下:
步骤(1):首先进行初步去重汇总
Create table tmp_browser_core_ds_1 as
Select
Browser_core,
uniq_id,
client_ip,
session_id,
apay_aid,
apay_uid,
count(1) as pv
from dw_log
wheredt=20120101
andpage_type='page'
andagent is not null
andagent <> '-'
group bybrowser_core,uniq_id,client_ip,session_id,apay_aid,apay_uid;
步骤(2):最关键的一步,相当于用空间来换时间。借用union all的把数据根据distinct的字段扩充起来,假如有8个distinct,相当于数据扩充8倍,用rownumber=1来达到间接去重的目的,如果这里不计算整体pv的话,可以直接进行Group by效果一样。这里的unionall只走一个job,不会因为job多拖后腿(Hadoop不怕数据量大【一定范围内】,就怕job多和数据倾斜)。
setmapred.reduce.tasks=300;
Create table tmp_browser_core_ds_2 as
select
type,
browser_core,
type_value,
pv,
rownumber(type,type_value,browser_core) as rn
from (
select
type,
browser_core,
type_value,
pv
from (
select
'client_ip'as type,browser_core,client_ip as type_value,pv
from tmp_browser_core_ds_1
union all
select
'uniq_id'as type,browser_core,uniq_id as type_value,pv
from tmp_browser_core_ds_1
union all
select
'session_id'as type,browser_core,session_id as type_value,pv
from tmp_st_log_browser_core_ds_1
union all
select
'apay_aid'as type,browser_core,apay_aid as type_value,pv
from tmp_browser_core_ds_1
union all
select
'apay_uid'as type,browser_core,apay_uid as type_value,pv
from tmp_browser_core_ds_1
) t
distribute by type,type_value,browser_core
sort by type,type_value,browser_core
) t1;
步骤(3): 得到最终结果,没有一个distinct,全部走的是普通sum,可以在mapper端提前聚合,会很快
select
browser_core,
sum(case when type='uniq_id' then pv else cast(0 as bigint) end) as pv,
sum(case when type='client_ip' and rn=1 then 1else 0 end) ip_cnt,
sum(case when type='uniq_id' and rn=1 then 1 else 0 end) as uv,
sum(case when type='session_id' and rn=1 then 1 else 0 end) as session_cnt,
sum(case when type='apay_aid' and rn=1 then 1 else 0 end) as apay_aid_cnt,
sum(case when type='apay_uid' and rn=1 then 1 else 0 end) as apay_uid_cnt
fromtmp_st_log_browser_core_ds_2
group bybrowser_core
改进SQL虽然整体job数为3个,较原sql多2个job,但整体运行时间不超过10分钟。基本思路,通过多job来进行multi-distinct,也可以看到rownumber的妙用,充分发挥hadoop的partition和sort的优势。如果某个sql的multi-distinct本身很快,就不要这么麻烦。