海量数据处理算法

海量数据类处理问题,是面试中非常高频的一类问题,本文归纳总结了面试中常用的海量数据处理算法。主要有以下内容:

  • 最高频问题
  • 布隆过滤器
  • 外排序算法
  • 概率类的大数据问题

海量数据处理问题的考点

首先来看一道简单的例子:求数组A和数据B的交集
三种不同的解法:
方法一:暴力求解,对于数据B中的每一个元素,在A中查找是否存在,如果存在,添加到结果集中,注意去重(排序或hash),时间复杂度 O(m*n)
方法二:Hash,将A中每个元素添加到hash中,遍历B中的元素,如果命中,添加到结果集中,注意去重(hash),时间复杂度O(m + n)
方法三:二分,对A进行排序,遍历B,在A中二分查找元素是否存在,如果存在,添加到结果集中,注意去重(排序),时间复杂度O(mlog(m) + nlog(n))

海量数据处理的问题:求两个超大文件中 URLs 的交集,内存不足以放下所有 URLs。
首先我们来看一下常见的考点:
算法方面:

  1. 外排序算法(External Sorting)
  2. Map Reduce
  3. 非精确算法
  4. 概率算法
  5. 哈希算法与哈希函数(Hash Function)

数据结构方面:

  1. 哈希表(Hash Table)
  2. 堆(Heap)
  3. 布隆过滤器(BloomFliter)
  4. 位图(Bitmap)
  5. Trie树(Trie)

最高频K项问题

问题:找到一个大文件或者数据流中出现频率最高的 K 项
这个问题的难点在于,如果条件不同,解决的方法是完全不一样的:

  1. 是否需要精确的Top K结果?即,是否允许小概率出错。
  2. 数据是离线的还是在线的?即是一个大文件的形式计算一次得到一个结果,还是数据流的形式返回结果。

整数数组前k大

问题:在一个整数数组中,找最大的 K 个整数

  1. 离线算法:QuickSelect,时间复杂度O(N)
  2. 在线算法:Heap,小顶堆,时间复杂度O(Nlogk)

最高频 K 项的离线算法

伪代码:

1
2
3
4
5
6
7
8
# 1. 用一个Hash表统计单词出现的次数
# 2. 循环每一个出现的项,用最大K项的方法,获得最大的K项
hash = {}
for e in elements:
hash[e] = hash.get(e, 0) + 1
for element, frequency in hash:
用最小堆统计<element, frequency>的前K项

问:时空复杂度:
答:第一步的时间复杂度O(N),空间复杂度O(N),第二步时间复杂度O(NlogK),空间复杂度O(K),总的时间复杂度O(NlogK),空间复杂度O(N)

Follow up 1:还有办法提速吗?
如果是上T的数据,扫描一次会非常的耗时,算法时间复杂度O(NlogK)看起来是理论上的下限,但有没有办法再提速?
问题描述:这里假设你有若干个小文件(如果是一个很大的文件,可以先进行 split,分割为若干个小文件),他们加起来有 1T 那么大。每个文件中用空格隔开若干单词,你需要统计出现次数最多的 K 个单词。

Map Reduce解决最高频K项问题:

  1. 通过Map步骤,将每个文件中的单词一个一个取出,组成这样的key-value二元组,作为Map的输出;
  2. 通过Reduce步骤,每个Reducer将收到的key的value加起来,这样就得到了每个key出现的次数,每个Reducer维护一个小顶堆,如果大于小顶堆的堆顶,就删除堆顶元素,将元素插入堆。通过Reducer处理完成之后,输出得到的Top K;
  3. 可能有多个Reducer,因此我们会得到多个Top K,之后扫描这些Top K,得到最终的Top K。

Follow up 2:内存不够怎么办?
问题描述:上面这个算法中,我们花费了 O(N) 的空间耗费,也就是需要把所有的,比如说单词,都放到内存里。
在现实场景中,这很可能是不行的,因为 N 可能非常的大。即便我们使用上了 Follow up 1 中的 Map Reduce 的办法,那么比如说就算你用了很多台机器,但是分到每台机器上的时候,仍然无法全部加载到内存中,也是有可能的。
假设现在只有一台机器,内存为 1G,你有一个 1T 大小的文件,需要统计里面最高频的 K 个单词。


最高频K项的在线算法

问题描述:数据流中不断流过一些单词,提供一个接口,返回当前出现过的单词中,频次最高的Top K个单词。
即:给定的K,比如100,实现两个接口:

1
2
3
4
5
add(word)
# 添加一个单词
topk()
# 返回集合中的Top K的高频词

标准在线算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class TopKAnalyzer:
# 构造函数,初始化一个top k分析器,指定k的大小
def __init__(self, k):
self.k = k
# 初始化一个哈希堆(HashHeap)
self.hashheap = HashHeap()
# 初始化一个哈希表,用于单词计数
self.hash = Hash()
def add(word):
# 单词计数 +1
self.hash[word] += 1
if word in self.hashheap:
# 调整word在hashheap中的位置
self.hashheap.adjust(word, self.hash[word])
reutrn
# 如果没满,就接着往hashheap里面放
if self.hashheap.size() < self.k
self.hashheap.push(word, self.hash[word])
# 和hash heap里出现频次最低的单词相比
# 如果当前单词更大,就踢掉hashheap里频次最低的单词
if self.hash[word] > self.hashheap.top().value:
self.hashheap.pop()
self.hashheap.push(word, self.hash[word])
def topk():
# 从hashheap中导出Top K的单词
return self.hashheap.toList()

复杂度分析:
这算法的时间复杂度要从两个维度讨论:

  1. add的时间复杂度是O(logK),因此最坏情况下,就是pop掉一个单词,push进去一个新单词。由于hashheap的大小最多是k,那么复杂度为O(logK)
  2. topk的时间复杂度是O(KlogK)

Follow up:内存不够怎么办?
在上述标准的在线算法中,哈希表的耗费是最多的,如果内存无法存的下哈希表,该如何解决?
有哪些算法可以用精度换空间?
有很多科研学者都研究过这个问题,一些比较成熟的算法有:

  1. Lossy Counting
  2. Sticky Sample
  3. Space Saving
  4. Efficent Count
  5. Hash Count

这里介绍一个比较容易掌握的,和标准在线算法相比之需要很少改动的一个算法:Hash Count

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class TopKAnalyzer:
def __init__(self, k):
self.k = k
self.hashheap = HashHeap()
self.hashcount = 开一个整数数组,内存能让你开多大就开多大
def add(self, word):
index = hashfunc(word) % self.hashcount.size
self.hashcount[index] += 1
wordCount = self.hashcount[index]
# 其他部分和标准在线算法一样
def topk(self):
# 和标准在线算法一样

上述的算法对精度的影响有多大呢?事实上,根据“长尾效应”,在实际数据的统计中,由于Top K的K相对整个数据流集合中的不同数据项个数N的关系是K远远小于N,而TopK的这些数据项的计数远远大于其他的数据项。因此,Top K的hashcode % hashcount.size扎堆的可能性是非常非常小的,因此这个算法的精确度也就不会太差。


布隆过滤器(Bloom Filter)

标准布隆过滤器

标准布隆过滤器的作用相当于一个 HashSet,即提供了这样一个数据结构,他支持如下操作:

  1. 在集合中加入一个元素
  2. 判断一个元素是否在集合中(允许 False Positive)

实现步骤

  1. 初始化:开一个足够大的 boolean 数组,初始值都是 false。
  2. 插入一个元素:通过 k 个哈希函数,计算出元素的 k 个哈希值,对 boolean 数组的长度取模之后,标记对应的 k 个位置上的值为 true。
  3. 查询一个元素:通过同样的 k 哈希函数,在 boolean 数组中取出对应的 k 个位置上的值。如果都是 true,则表示该元素可能存在,如果有一个 false,则表示一定不存在。
    伪代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    class StandardBloomFilter:
    def __init__(self, capacity, hash_functions):
    # capacity is the initial size of the SBF
    # it should be as big as possible to contains all
    # of the keys
    self.capacity = capacity
    self.bitset = [False] * capacity
    # k hash functions
    self.hash_functions = hash_functions
    def add(self, key):
    for func in self.hash_functions:
    position = func(key) % self.capacity
    self.bitset[position] = True
    def contains(self, key):
    for func in self.hash_functions:
    position = func(key) % self.capacity
    if self.bitset[position] is False:
    return False
    return True

Q & A
Q: 如果空间不够了怎么办呢?一开始开的 boolean 数组不够的话,如果全部都被赋为 true 了,contains 不就每次都返回 true 了么?
A: 实际运用中,我们通常需要进行预估,也就是估算一下大概需要用到多少的空间,开多大比较合适。另外一个解决办法,是采用 Extended Bloom Filter。具体的解决方案是,当一个 BloomFilter 满了的时候,开一个新的,capacity 更大的(两倍) BloomFilter。原来的 Bloom Filter 依旧保留,这样插入的时候,总是插入到新的 BloomFilter 里,而查询的时候,所有的 BloomFilter 都要查一遍。

Q: 如何定义一个 BloomFilter 是不是满了?
A: 如果哈希函数用4个的话,boolean 数组的大小和实际能够存储的元素个数之间的比例,在 40: 1 比较合适。这是一个经验值。


计数布隆过滤器

基于标准的 BloomFilter 稍加改动,把存储所用的 boolean 数组改为 int 数组,就成为了可以计数的 BloomFilter -> Counting Bloom Filter(简写为CBF)。这种数据结构类似 Java 中的 HashMap,但只能用作计数。提供如下的几种操作:

  1. O(1)时间内,在集合中加入一个元素
  2. O(1)时间内,统计某个元素在该集合中出现的次数 - 但是可能会比实际出现次数要大一些
    实现步骤
  3. 初始化:开一个足够大的 int 数组,初始值都是 0。
  4. 插入一个元素:通过 k 个哈希函数,计算出元素的 k 个哈希值,对 int 数组的长度取模之后,将对应的 k 个位置上的值都加一。
  5. 查询一个元素的出现次数:通过同样的 k 哈希函数,在 int 数组中取出对应的 k 个位置上的值。并取其中的最小值来作为该元素的出现次数预估。
    伪代码:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    class CountingBloomFilter:
    def __init__(self, capacity, hash_functions):
    # capacity is the initial size of the SBF
    # it should be as big as possible to contains all
    # of the keys
    self.capacity = capacity
    self.bitset = [0] * capacity
    # k hash functions
    self.hash_functions = hash_functions
    def add(self, key):
    for func in self.hash_functions:
    position = func(key) % self.capacity
    self.bitset[position] += 1
    def contains(self, key):
    count = sys.maxint
    for func in self.hash_functions:
    position = func(key) % self.capacity
    count = min(count, self.bitset[position])
    return count

Q & A
Q: 为什么要取最小值?
A: 比如我们使用两个哈希函数,key1 算出来的两个下标是 0, 1, key2 算出来的 两个下标是 1, 2。这里 counts[1] 会等于 2,他被 2个 key 都影响到了。所以取最小值,能够让这个计数尽可能的毕竟真实计数。

Q: 为什么说是预估的出现次数?而不是精确的出现次数?
A: 承接上面问题的解答,如果还有一个 key3 算出来的下标是 0 和 2。那么 counts[0~2] 都会是 2,无论对 key1~3 的任何一个 key 取计数,都得到的是 2,要比实际的出现次数大。

Q: CBF算出来的计数,有可能比实际出现次数小么?
A: 不可能


外排序算法

外排序算法分为两个基本步骤:

  1. 将大文件切分为若干个个小文件,并分别使用内存排好序
  2. 使用K路归并算法将若干个排好序的小文件合并到一个大文件中
    第一步:文件拆分
    根据内存的大小,尽可能多的分批次的将数据 Load 到内存中,并使用系统自带的内存排序函数(或者自己写个快速排序算法),将其排好序,并输出到一个个小文件中。比如一个文件有1T,内存有1G,那么我们就这个大文件中的内容按照 1G 的大小,分批次的导入内存,排序之后输出得到 1024 个 1G 的小文件。
    第二步:K路归并算法
    我们将 K 个文件中的第一个元素加入到堆里,假设数据是从小到大排序的话,那么这个堆是一个最小堆(Min Heap)。每次从堆中选出最小的元素,输出到目标结果文件中,然后如果这个元素来自第 x 个文件,则从第 x 个文件中继续读入一个新的数进来放到堆里,并重复上述操作,直到所有元素都被输出到目标结果文件中。
    Follow up: 一个个从文件中读入数据,一个个输出到目标文件中操作很慢,如何优化?
    如果我们每个文件只读入1个元素并放入堆里的话,总共只用到了 1024 个元素,这很小,没有充分的利用好内存。另外,单个读入和单个输出的方式也不是磁盘的高效使用方式。因此我们可以为输入和输出都分别加入一个缓冲(Buffer)。假如一个元素有10个字节大小的话,1024 个元素一共 10K,1G的内存可以支持约 100K 组这样的数据,那么我们就为每个文件设置一个 100K 大小的 Buffer,每次需要从某个文件中读数据,都将这个 Buffer 装满。当然 Buffer 中的数据都用完的时候,再批量的从文件中读入。输出同理,设置一个 Buffer 来避免单个输出带来的效率缓慢。

问题描述
给定A、B两个文件,各存放50亿个URLs,每个 URL 各占 64 字节,内存限制是 4G,让你找出A、B文件共同的 URLs?
问题分析
假设文件没有重复。
方法1:文件拆分 Sharding(也可以叫 Partitioning)
50亿,每个 URLs 64 字节,也就是每个文件 320G 的大小。很显然我们不能直接全部 Load 到内存中去处理。这种内存不够的问题,通常我们的解决方法都可以是使用 hash function 来将大文件拆分为若干个小文件。比如按照hashfunc(url) % 200进行拆分的话,可以拆分成为,200 个小文件 —— 也就是如果 hashfunc(url) % 200 = 1 就把这个 url 放到 1 号文件里。每个小文件理想状况下,大小约是 1.6 G,完全可以 Load 到内存里。
这种方法的好处在于,因为我们的目标是要去重,那么那些A和B中重复的 URLs,会被hashfunc(url) % 200映射到同一个文件中。这样在这个小文件中,来自 A 和 B 的 URls 在理想状况下一共 3.2G,可以全部导入内存进入重复判断筛选出有重复的 URLs。
Q: 刚才一直在说理想情况下,那么不理想情况下是什么样的?该怎么处理?
A: 不理想的情况下,如果 hashfunc(url) % 200 的结果比较集中,那么有可能会造成不同的 URLs 在同一个文件中扎堆的情况。这种情况下,有一些文件的大小可能会超过 4G。对于这种情况,处理的办法是进行二次拆分,把这些仍然比较大的小文件,用一个新的 hashfunc 进行拆分:hashfunc’(url) % X。这里再拆成多少个文件,可以根据文件的实际大小来定。如果二次拆分之后还是存在很大的文件,就进行三次拆分。直到每个小文件都小于 4G。

方法2:BloomFilter
我们可以使用一个 4G 的 Bloom Filter,它大概包含 320 亿 个 bit。把 A 文件的 50亿 个 URLs 丢入 BF 中,然后查询 B 文件的 每个 URL 是否在 BF 里。这种方法的缺点在于,320 亿个 bit 的 BF 里存 50 亿个 URLs 实在是太满了(要考虑到BF可能会用4个哈希函数),错误率会很高。因此仍然还需需要方法1中的文件拆分来分批处理。

方法3:外排序算法
将A,B文件分别拆分为80个小文件,每个小文件4G。每个文件在拆分的时候,每4G的数据在内存中做快速排序并将有序的URLs输出到小文件中。
用多路归并算法,将这160个小文件进行归并,在归并的过程中,即可知道哪些是重复的 URLs。只需将重复的 URLs 记录下来即可。

Follow up: A,B各自有重复的 URLs
当 A, B 各自有重复的 URLs 的时候,比如最坏情况下,A里的50亿个URLs 全部一样。B里也是。这样采用方法1这种比较容易想到的 Sharding 方法,是不奏效的,因为所有 URLs 的 hashcode 都一样,就算换不同的 hashfunc 也一样。这种情况下,需要先对两个文件进行单独的去重,方法是每 4G 的数据,放到内存中用简单的哈希表进行去重。这样,在最坏情况下,总共 320G 的数据里,一个 URLs 最多重复 8次,则不会出现太严重的扎堆情况了。算法上唯一需要稍微改动的地方是,由于 A 存在多个重复的 URLs,所以当和 B 的 URLs 被sharding 到同一个文件里的时候,需要标记一下这个 URLs 来自哪个文件,这样才能知道是否在A和B中同时出现过。


概率类的大数据问题

面试题:等概率挑出文件中的一行
Amazon: 一个文件中有很多行,不能全部放到内存中,如何等概率的随机挑出其中的一行?
问题解答
先将第一行设为候选的被选中的那一行,然后一行一行的扫描文件。假如现在是第 K 行,那么第 K 行被选中踢掉现在的候选行成为新的候选行的概率为 1/K。用一个随机函数看一下是否命中这个概率即可。命中了,就替换掉现在的候选行然后继续,没有命中就继续看下一行。


问题描述
给你一个 Google 搜索日志记录,存有上亿挑搜索记录(Query)。这些搜索记录包含不同的语言。随机挑选出其中的 100 万条中文搜索记录。假设判断一条 Query 是不是中文的工具已经写好了。
问题解答
假设你一共要挑选 N 个 Queries,设置一个 N 的 Buffer,用于存放你选中的 Queries。对于每一条飞驰而过的
Query,按照如下步骤执行你的算法:
1.如果非中文,直接跳过
2.如果 Buffer 不满,将这条 Query 直接加入 Buffer 中
3.如果 Buffer 满了,假设当前一共出了过 M 条中文 Queries,用一个随机函数,以 N / M 的概率来决定这条 Query 是否能被选中留下。
3.1 如果没有选中,则跳过该 Query,继续处理下一条 Query
3.2 如果选中了,则用一个随机函数,以 1 / N 的概率从 Buffer 中随机挑选一个 Query 来丢掉,让当前的 Query 放进去。
证明
我们用 5 条 Queries 里挑 3 条来作为例子证明每条 Query 被挑中的概率都是 3/5。

  • 依次处理每条 Query,前 3 条 Queries 直接进入 Buffer => [1,2,3],此时前 3 条 Queries 被选中的概率 100%
  • 第 4 条 Query 处理时,有 3/4 的概率被留下,那么第 4 条 Query 被选中的概率此时就是 3/4。
  • 第 4 条 Query 处理时,如果留下之后,会从 Buffer 中以 1/3 的概率踢走一条 Query。那么这些在 Buffer 中留下的概率是`1/4 + 2/3 * 3/4 = 3/4)。其中 1/4 是第 4 条 Query 没有被选中的概率。
  • 第 5 条 Query 处理时,有 3/5 的概率被留下,那么第 5 条 Query 被选中的概率此时就是 3/5。
  • 第 5 条 Query 处理时,如果留下之后,会从 Buffer 中以 1/3 的概率踢走一条 Query。前4条Query能够顺利进入Buffer并被留下的概率是:3/4 (2/5 + 2/3 3/5) = 3/5。其中 2/5 是第 5 条 Query 没有被选中的概率。3/4 是前3条 Queries 在处理完第4条 Query 之后,进入 Buffer的概率,2/3 * 3/5 第5条Query被选中之后但是没有踢走自己的概率。