终其一生,我们只不过在寻找自己

0%

5003总结2 stream&并行计算-例子

Streaming是处理数据流的API。
下面直接通过例子,感受streaming的机制。


例1 从键盘输入文本,统计词频

例2.1 从文件输入,统计词频

例2.2 对词的感情值排序

例2.3 stage,统计所有词频

上面的例子2.1 仅仅是统计每一个输入部分的词频,意义不大
通过updateStateByKey()可以实现对所有数据的统计。

updateStateByKey()需要定义一个函数,函数的输入是上次的和这次的rdd,这里的streaming的精髓,需要好好理解。


算法应用1:蓄水池抽样Reservoir Sampling

算法过程

蓄水池采样算法(Reservoir Sampling)了。先说一下算法的过程:

  • 假设数据序列的规模为 𝑛,需要采样的数量的为 𝑘。
  • 首先构建一个可容纳 𝑘 个元素的数组,将序列的前 𝑘 个元素放入数组中。
  • 然后从第 𝑘+1 个元素开始,以 𝑘/𝑛 的概率来决定该元素是否被替换到数组中(数组中的元素被替换的概率是相同的)。 当遍历完所有元素之后,数组中剩下的元素即为所需采取的样本。

    这样就可以对任意长度的数据抽样,可以实现streaming. 不需要考虑文件长度。

简单证明:

完整证明需要用到FIsher-Yates shuffle。

算法应用2:O(n)时间,O(1)空间,寻找Majority的元素(n>N/2)

算法描述:
只用空间复杂度O(1),去寻找数组内有没有出现次数大于一半的元素。

算法过程:

遍历所有元素:记录新出现元素的次数,如果下一个与记录相同,就+1,不同就-1,遍历完成,会保留一个候选项
遍历所有元素:统计刚才候选项出现的次数n以及N,验证n>N/2则验证成功,如果n<=N/2,则无Majority

算法应用3:GM算法 Heavy hitters

Misra-Gries (MG) algorithm finds up to k items that occur more than 1/k fraction of the time in a stream.
找到频率超过1/k的元素.
算法:

初始化k个候选项,对之后的每个元素:

  • 如果元素已经被记录了,count++
  • 如果没有记录,如果候选项<k,那就记录他
  • 如果没有记录,如果候选项=k,那就对每个候选项count—

遍历完成,返回候选项

算法应用4:并行计算TOPn, 时间复杂度满足O(n/p*log k)

原题:

Given an RDD storing a list of n integers (unordered), design a divide-and-conquer algorithm to find the k largest integers in the RDD. All workers must run in parallel. For full marks, each worker should run in time O(n/p * log k) time, where p is the number of workers.
Hint: Use a priority queue at each worker for maximum efficiency.

当看到log K和priority queue就知道要使用堆排序了。
分成p个worker,每个worker有O(n/p)的值,每一个work维护一个最大K堆,时间复杂度是O(log k).

老师给的答案:

Partition the integers evenly so that each worker receives O(n/p) values. On each worker, maintain a min-heap of size k. Inserting an element to the heap costs O(log k) time.
For each value, insert it to the heap if it is larger than the current min of the heap. Pop the minimum item if there are more than k items in the heap. By making one pass over all the values in each worker, we get the k largest integers out of the values of this worker. This can be done in O(n/p log k) time.
Then we merge the results by sending the top k of each partition to the same worker, which is at most kp values. The worker finds the largest k integers among these kp values and report it.

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import heapq  # 引入最大堆的包

rdd = sc.parallelize(xrange(0,1000,2)) #生成rdd
k = 10

def topk(it): #对于每一个分区的n/p个值
h = []
for i in it: #遍历每一个值
# check if we should insert i
if (not h) or (i > h[0]): #如果h为空或者新来的大于h里面最大的
heapq.heappush(h,i) #把这个元素插入到h
if len(h) > k: #长度大于k
heapq.heappop(h) #弹出h中最小的
for i in xrange(len(h)): #遍历每一个值
yield heapq.heappop(h) #从小到大依次弹出返回到上一层

print(list(topk(rdd.mapPartitions(topk).collect()))) #对于分区使用,再整体使用


用实例总结了几个steaming和并行计算的基础知识点,路漫漫兮。

-------------    你的留言  是我更新的动力😊    -------------