从整数stream查找运行中位数

可能重复:
C中的滚动中值algorithm

鉴于整数是从数据stream中读取的。 查找以有效方式读取的元素的中位数。

解决方法我读过:我们可以使用左侧的最大堆来表示小于有效中值的元素,右侧的最小堆表示大于有效中值的元素。

在处理一个传入元素之后,堆中元素的数量最多相差1个元素。 当两个堆包含相同数量的元素时,我们将堆的根数据的平均值作为有效中值。 当堆不平衡时,我们从包含更多元素的堆的根部中select有效中位数。

但是,我们将如何构build一个最大堆和最小堆,即我们如何知道这里的有效中位数? 我认为我们会在max-heap中插入1个元素,然后在min-heap中插入下一个元素,以此类推所有元素。 纠正我如果我在这里错了。

从stream式数据中find运行中位数有许多不同的解决scheme,我将在答案的最后简要地讨论它们。

问题是关于具体解决scheme的细节(最大堆/分堆解决scheme),以及基于堆的解决scheme的工作原理如下:

对于前两个元素,在左边的maxHeap上加一个较小的值,在右边的minHeap上加大一个。 然后逐一处理stream数据,

Step 1: Add next item to one of the heaps if next item is smaller than maxHeap root add it to maxHeap, else add it to minHeap Step 2: Balance the heaps (after this step heaps will be either balanced or one of them will contain 1 more item) if number of elements in one of the heaps is greater than the other by more than 1, remove the root element from the one containing more elements and add to the other one 

然后在任何时候你可以计算这样的中位数:

  If the heaps contain equal amount of elements; median = (root of maxHeap + root of minHeap)/2 Else median = root of the heap with more elements 

现在我将在答案开始时谈论一般问题。 从数据stream中查找运行中值是一个棘手的问题,在一般情况下,find一个有效的内存约束的精确解决scheme可能是不可能的。 另一方面,如果数据具有某些特点我们可以利用,我们可以开发高效的专业解决scheme。 例如,如果我们知道数据是一个整数types,那么我们可以使用计数sorting ,这可以给你一个恒定的记忆常量时间algorithm。 基于堆的解决scheme是更通用的解决scheme,因为它也可以用于其他数据types(双打)。 最后,如果确切的中位数不是必需的,并且近似值足够,那么可以尝试估计数据的概率密度函数,并使用它来估计中值。

如果你不能一次把所有的东西放在内存中,这个问题变得更加困难。 堆解决scheme要求您将所有元素一次保存在内存中。 在这个问题的大多数现实应用中,这是不可能的。

相反,当你看到数字时,logging你看到每个整数的次数。 假设4个字节的整数,即2 ^ 32个桶,或者最多2 ^ 33个整数(每个int的键和数),即2 ^ 35字节或32GB。 它可能会比这less得多,因为你不需要存储键或计数为0的条目(即像Python中的defaultdict)。 这需要花费不变的时间来插入每个新的整数。

然后在任何时候,要find中位数,只需使用计数来确定哪个整数是中间元素。 这需要不变的时间(虽然是一个大的常数,但不变)。

如果input的方差在统计上是分布的(例如,正态,对数正态等),那么油藏采样是从任意长的数字stream中估计百分位/中位数的合理方法。

 int n = 0; // Running count of elements observed so far #define SIZE 10000 int reservoir[SIZE]; while(streamHasData()) { int x = readNumberFromStream(); if (n < SIZE) { reservoir[n++] = x; } else { int p = random(++n); // Choose a random number 0 >= p < n if (p < SIZE) { reservoir[p] = x; } } } 

那么“水库”是一个运行,统一(公平),所有input的样本 – 不论大小。 find中位数(或任何百分位数)是sorting水库和投票有趣点的一个直接的问题。

由于水库的大小是固定的,因此可以认为这种sorting是有效的(1) – 而且这种方法在恒定的时间和内存消耗下运行。

计算我发现的stream的百分点的最有效的方法是P 2algorithm: Raj Jain,Imrich Chlamtac:用于dynamic计算量子和直方图而不存储观测的P 2algorithm。 COMMUN。 ACM 28(10):1076-1085(1985)

该algorithm直接实现,工作得非常好。 然而,这是一个估计,所以记住这一点。 摘要:

提出了一种启发式algorithm,用于中位数和其他分位数的dynamic计算。 估计值是在观察值生成时dynamic生成的。 观测值不存储; 因此,无论观察次数如何,该algorithm都具有非常小且固定的存储需求。 这使得它适用于可用于工业控制器和logging器的分位数芯片。 该algorithm进一步扩展到直方图绘图。 分析algorithm的准确性。

这个问题有一个确切的解决scheme,只需要将最近看到的n个元素保存在内存中。 速度快,规模好。

一个可索引的跳过列表支持O(ln n)插入,删除和任意元素的索引search,同时保持sorting顺序。 当与一个跟踪第n个最旧的入口的FIFO队列结合使用时,解决scheme很简单:

 class RunningMedian: 'Fast running median with O(lg n) updates where n is the window size' def __init__(self, n, iterable): self.it = iter(iterable) self.queue = deque(islice(self.it, n)) self.skiplist = IndexableSkiplist(n) for elem in self.queue: self.skiplist.insert(elem) def __iter__(self): queue = self.queue skiplist = self.skiplist midpoint = len(queue) // 2 yield skiplist[midpoint] for newelem in self.it: oldelem = queue.popleft() skiplist.remove(oldelem) queue.append(newelem) skiplist.insert(newelem) yield skiplist[midpoint] 

这里是链接到完整的工作代码(一个易于理解的类版本和优化生成器版本的可索引跳过列表代码内联):

一个直观的方式来思考这个问题,如果你有一个完全平衡的二叉树,那么根将是中位数元素,因为那里会有相同数量的更小和更大的元素。 现在,如果树不满,这将不会是相当的情况,因为会有最后一级失踪的元素。

所以我们可以做的是有中位数和两个平衡二叉树,一个是小于中位数的元素,另一个是大于中位数的元素。 两棵树必须保持相同的大小。

当我们从数据stream中得到一个新的整数时,我们将它与中位数进行比较。 如果它大于中位数,我们将它添加到右边的树。 如果两个树的大小相差超过1,我们删除右边树的最小元素,使它成为新的中间值,并把旧的中间值放在左边的树中。 同样对于较小的。

有效率是一个取决于上下文的词。 这个问题的解决scheme取决于相对于插入量执行的查询的数量。 假设你正在插入N个数字和K次,最后你对中位数感兴趣。 基于堆的algorithm的复杂性将是O(N log N + K)。

考虑下面的select。 在数组中排列数字,并为每个查询运行线性selectalgorithm(使用快速sorting数据库)。 现在你有一个运行时间O(KN)的algorithm。

现在如果K足够小(不频繁的查询),后面的algorithm实际上更有效,反之亦然。

你不能只用一堆呢? 更新:不。 见评论。

不变式:在读取2*ninput之后,最小堆保持它们中最大的n

循环:读取2个input。 将它们添加到堆中,并删除堆的最小值。 这重build了不variables。

所以当读取2ninput时,堆的最小值是第n个最大值。 平均位置周围的两个元素的平均值需要有一点额外的复杂性,并在奇数个input之后处理查询。