Hyperloglog与大数据统计
目录
这几天在做一些数据统计相关的工作,涉及的东西挺多的,在此顺便做一下笔记以备忘。
大数据统计
大数据应用场景中最常见的一个问题便是基数估算,而在进行基数估算时遇到的问题主要是内存需求以及后期数据合并处理等。
举个例子,假设我们的网站对每一次请求都做如下简单的日志记录:
date=20151104123059&province=广东&network=电信&uid=1087012…… date=20151104123101&province=北京&network=联通&uid=2319802…… date=20151104123108&province=浙江&network=移动&uid=1825023……
现在,如果我们需要统计一天内的独立用户数,我们可以用最简单的方法:
def count_user(): …… uniq_users = set() for u in uids: uniq_users.add(u) total = len(uniq_users) ……
然而,假如每天的访问量量级在千万或者亿级别,那么这样的方法显然会占用极大的内存,我们不得不寻找其他的基数计算方法。
Bitmap计数
参考:解读Cardinality Estimation算法(第一部分:基本概念)
bitmap的原理很简单,它的实质就是用一个bit数组来标记用户,比如对于{3, 6, 10}三个用户,我们可以使用0000001000100100(这里用了2byte的bitmap)来标记用户。
对于bitmap,我们可以很容易的进行and、or操作,而且效率也是非常高的。但是,对于bitmap的内存占用则是一个比较大的问题,它取决于基数的上限而非元素的个数。比如你的基数是1000W,那么你将需要分配1.2MB左右的内存空间给bitmap而不管你是否仅存了一个元素。
为此,对于大数据基数估算,一般采用概率统计算法,常见的算法有Linear Counting、LogLog Counting、HyperLogLog Counting等几种。
Liner Counting
参考:解读Cardinality Estimation算法(第二部分:Linear Counting)
Liner Counting的基本思路是:设有一哈希函数H,其哈希结果空间有m个值(最小值0,最大值m-1),并且哈希结果服从均匀分布。使用一个长度为m的bitmap,每个bit为一个桶,均初始化为0,设一个集合的基数为n,此集合所有元素通过H哈希到bitmap中,如果某一个元素被哈希到第k个比特并且第k个比特为0,则将其置为1。当集合所有元素哈希完成后,设bitmap中还有u个bit为0。则: n̂ = -mlog(u/m) 为n的一个估计,且为最大似然估计(MLE)。
LC的空间复杂度与简单bitmap方法是一样的,但是有个常数项级别的降低,不过实际应用仍然比较少单独使用(作为Adaptive Counting的一部分)。
当然,关于误差问题可以参考原作者的分析……
Loglog Counting
参考:解读Cardinality Estimation算法(第三部分:LogLog Counting)
LLC的空间复杂度仅有O(log2(log2(Nmax)))。
LLC基本思路:选取一个哈希函数H应用于所有元素,并且满足:哈希结果等长,哈希碰撞可忽略不计,哈希结果基本服从均匀分布;
对于哈希后的结果a,假设其长度为L比特位,由上面哈希函数要求哈希结果基本服从均匀分布,故对哈希结果的每一位也都应服从:P(x=1)=0.5,P(x=0)=0.5;
设ρ(a)为a的比特串中第一个“1”出现的位置,显然1≤ρ(a)≤L;如果我们遍历集合中所有元素的比特串,取ρmax为所有ρ(a)的最大值。
此时我们可以将2**ρmax作为基数的一个粗糙估计,即:n̂ =2**ρmax
至于为什么可以这样估算,参考原作者的分析……
当然,直接用上面的估计方式会由于偶然性而存在较大误差。因此,LLC采用了分桶平均的思想来消减误差。具体来说,就是将哈希空间平均分成m份,每份称之为一个桶(bucket)。对于每一个元素,其哈希值的前k比特作为桶编号,其中2**k=m,而后L-k个比特作为真正用于基数估计的比特串。桶编号相同的元素被分配到同一个桶,在进行基数估计时,首先计算每个桶内元素最大的第一个“1”的位置,设为M[i],然后对这m个值取平均后再进行估计,即:n̂ = 2**(∑M[i]/m)
Hyperloglog
参考:解读Cardinality Estimation算法(第四部分:HyperLogLog Counting及Adaptive Counting)
HLLC的基本思想是在LLC的基础上做改进,其中一个改进就是使用调和平均数替代几何平均数。
而目前在Redis 2.8.9中就已经实现了Hyperloglog,官方提供了三个操作大大简化了我们的工作:
$ pfadd key element [element ...] $ pfcount key [key ...] $ pfmerge destkey sourcekey [sourcekey ...]
对于单独一个key, 这三个操作的时间复杂度均为O(1); 对于N个key, 时间复杂度为O(N).
在最开始的问题中,应用hyperloglog我们便可以非常方便地对独立用户数进行统计。
交集问题
为了实现各种组合条件下的统计需求,比如我们需要统计“广东电信”的独立用户数,就需要对hyperloglog进行求交集运算。而redis的hyperloglog默认操作只有pfadd,pfcount,pfmerge(并集)。
通过以上对loglog counting和hyperloglog counting原理的分析以及redis的官方说明文档,redis中每个hyperloglog key占用了12K的内存用于标记基数。
测试中我们发现,pfadd命令并不会一次性分配12k内存,而是随着基数的增加而逐渐增加内存分配;而pfmerge操作则会将sourcekey合并后存储在12k大小的key中,这由hyperloglog合并操作的原理(两个hyperloglog合并时需要单独比较每个桶的值)可以很容易理解。
因此,在进行一次pfmerge操作保证每个key的长度均为12k后,我们尝试使用bitop进行and、or操作。
对于hyperloglog类型的key:hyll1,hyll2,对应set类型的key:set1,set2:
$ sunionstore set_union set1 set2 $ sinterstore set_inter set1 set2 $ pfmerge hyll_merge hyll1 hyll2 $ bitop and hyll_and hyll1 hyll2 $ bitop or hyll_or hyll1 hyll2 $ pfcount hyll_and $ pfcount hyll_or # 测试的结果, ~=(约等于) set_union ~= hyll_merge == hyll_or # hyll_and与set_inter误差很大 set_inter <> hyll_and
在测试数据中,set1,set2的数量在1W左右,set_inter在5000左右,而hyll_and则在4300,计算的误差在10%~20%.
德·摩根律
既然这种方法不可靠,我们便需要寻找将交集运算转换为并集运算的方法。
根据德·摩根律:
在这里,我们可以将交集转换为并集与补集的操作。我们继续把上面的例子简化,假设我们存储了6个hyperloglog key:
# redis-cli $ keys uv:total uv:total $ keys uv:province:* uv:province:广东 uv:province:北京 uv:province:浙江 $ keys uv:network:* uv:network:移动 uv:network:联通 uv:network:电信
那么: “广东n电信” = not(not广东 U not电信) = not((北京U浙江)U(联通U移动)) = TOTAL – (北京U浙江)U(联通U移动)
当然这里存在一个隐藏条件:那就是province内的每一个key都必须是独立,否则not广东 = 北京U浙江 就不会成立了;同理network。
PS:这里或许有人会说:广东n电信 = 广东 + 电信 – 广东U电信,即 A n B = A + B – A u B。是的,这属于容斥原理中最简单的一种情况,但假如有N个条件呢,比如“广东n电信n男性 n Android n 4G网络 n 手机系统版本号 n ……”,那么应用德·摩根律仍不失为一个不错的方法。
容斥原理
但是假设我们需要统计“12~18时n广东n电信”的独立用户呢?假如我们存储了24个hyerloglog key用来表示每个小时的独立用户数:
$ keys uv:hour:* uv:hour:00 uv:hour:01 uv:hour:02 uv:hour:03 uv:hour:04 …… uv:hour:21 uv:hour:22 uv:hour:23
这时候德·摩根律就无法使用了,因为 not(uv:hour:12~18) != (uv:hour:00~11)U(uv:hour:19~23)。此时,我们则必须采用容斥原理:
对于其代码实现,可以采用递归方式:
假如有集合 A, B, C, D, 那么根据容斥原理, 我们首先需要求出各种组合:
n=1: A, B, C, D
n=2: AnB, AnC, AnD, BnC, BnD, CnD
n=3: AnBnC, AnBnD, AnCnD, BnCnD
所以: AuBuCuD = A + B + C + D – (AnB + AnC + AnD + BnC + BnD + CnD) + (AnBnC + AnBnD + AnCnD + BnCnD) – AnBnCnD, 这样就可以算出 AnBnCnD 了
"""容斥原理, 递归求交集 其中, ckeys=[A, B, C, D], 当然, 为了减少重复计算量, 这里可以加入一个 lru_cache() 的修饰器 """ # @lru_cache def recurse_intersect(ckeys): if len(ckeys) == 1: result = r.execute_command('pfcount', ckeys[0]) elif len(ckeys) == 2: A = r.execute_command('pfcount', ckeys[0]) B = r.execute_command('pfcount', ckeys[1]) AuB = r.execute_command('pfcount', *ckeys) result = A + B - AuB else: result = 0 for i in range(len(ckeys))[1:]: # 这里对元素求组合 combines = recurse_combine(ckeys, i) total = 0 for combine in combines: total += recurse_intersect(combine) result += total if i % 2 == 1 else -total union = r.execute_command('pfcount', *ckeys) if (i+1) % 2 == 1: result = union - result else: result = result - union return result """递归求组合 其中, keys=list, n为组合元素个数, i为递归参数用 例如, keys=[A, B, C, D], n=1, result=[[A],[B],[C],[D]] n=2, result=[[A,B],[A,C],[A,D],[B,C], [B,D],[C,D]] n=3, result=[[A,B,C], [A,B,D],[A,C,D], [B,C,D]] """ def recurse_combine(keys, n, i=0): result = [] for val in keys[i:]: if len(keys) - keys.index(val) < n: break if n == 1: result.append([val]) else: children = recurse_combine(keys, n-1, keys.index(val)+1) for child in children: child.append(val) # 由于list是有序的, 这里进行排序可以减少后面重复计算 child.sort() result.append(child) return result
至此,我们的问题也就解决啦。
在后续的测试中我们发现,通过利用容斥原理来计算多个hyperloglog的交集,其计算耗时主要在 recurse_intersect 上。Hyperloglog操作的复杂度基本都是O(1),即使merge多个key耗时也只是在ms级别。
性能测试: 容斥原理性能分析
另外有一个值得注意的问题,那就是在用户基数比较小的情况下,hyperloglog通过容斥原理进行交集计算有可能产生比较大的误差。在我自己的几次测试中发现,一般情况下误差在1~2%,然而当基数比较少,而组合条件又比较复杂的时候,统计出来的用户数(量级1W)误差最大竟然有18%。关于这个问题,可以阅读下面MinHash的两篇文章,其中便提到了关于容斥原理的极端误差问题。
MinHash
当然,除了容斥原理,还有一些其他的方法统计hyperloglog交集,其中有一种是通过计算Jaccard系数的MinHash算法。有兴趣的话可以参考:
扩展阅读:
评论