高效循环缓冲区?

我想在python中创build一个高效的循环缓冲区 (目标是获取缓冲区中整数值的平均值)。

这是使用列表来收集值的有效方法吗?

def add_to_buffer( self, num ): self.mylist.pop( 0 ) self.mylist.append( num ) 

什么会更有效率(以及为什么)?

我会用一个maxlen arg来使用collections.deque

 >>> import collections >>> d = collections.deque(maxlen=10) >>> d deque([], maxlen=10) >>> for i in xrange(20): ... d.append(i) ... >>> d deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10) 

在文档中有一个与你想要的类似的配方 。 我认为这是最有效率的说法完全取决于这个事实,即它是由一个令人难以置信的熟练的工作人员在C语言中实现的,这个工作人员习惯于开发出顶尖的代码。

从列表的顶部popup导致整个列表被复制,因此是低效的

您应该使用固定大小的列表/数组以及在添加/删除项目时通过缓冲区移动的索引

可以使用deque类,但对于问题的重要性(平均),这是我的解决scheme:

 >>> from collections import deque >>> class CircularBuffer(deque): ... def __init__(self, size=0): ... super(CircularBuffer, self).__init__(maxlen=size) ... @property ... def average(self): # TODO: Make type check for integer or floats ... return sum(self)/len(self) ... >>> >>> cb = CircularBuffer(size=10) >>> for i in range(20): ... cb.append(i) ... print "@%s, Average: %s" % (cb, cb.average) ... @deque([0], maxlen=10), Average: 0 @deque([0, 1], maxlen=10), Average: 0 @deque([0, 1, 2], maxlen=10), Average: 1 @deque([0, 1, 2, 3], maxlen=10), Average: 1 @deque([0, 1, 2, 3, 4], maxlen=10), Average: 2 @deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2 @deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3 @deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3 @deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4 @deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4 @deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5 @deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6 @deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7 @deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8 @deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9 @deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10 @deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11 @deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12 @deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13 @deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14 

Python的deque很慢。 您也可以使用numpy.roll而不是如何旋转形状(n,)或(n,1)的numpy数组中的数字?

在这个基准中,deque是448ms。 Numpy.roll是29ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

根据MoonCactus的回答 ,这里是一个circularlist类。 与他的版本不同的是,这里c[0]将总是给出最老的元素, c[-1]是最新添加的元素, c[-2]是倒数第二…这对于应用程序来说更自然。

 c = circularlist(4) c.append(1); print c, c[0], c[-1] #[1] (1 items) first 1, last 1 c.append(2); print c, c[0], c[-1] #[1, 2] (2 items) first 1, last 2 c.append(3); print c, c[0], c[-1] #[1, 2, 3] (3 items) first 1, last 3 c.append(8); print c, c[0], c[-1] #[1, 2, 3, 8] (4 items) first 1, last 8 c.append(10); print c, c[0], c[-1] #[10, 2, 3, 8] (4 items) first 2, last 10 c.append(11); print c, c[0], c[-1] #[10, 11, 3, 8] (4 items) first 3, last 11 

类:

 class circularlist(object): def __init__(self, size): """Initialization""" self.index = 0 self.size = size self._data = [] def append(self, value): """Append an element""" if len(self._data) == self.size: self._data[self.index] = value else: self._data.append(value) self.index = (self.index + 1) % self.size def __getitem__(self, key): """Get element by index, relative to the current index""" if len(self._data) == self.size: return(self._data[(key + self.index) % self.size]) else: return(self._data[key]) def __repr__(self): """Return string representation""" return self._data.__repr__() + ' (' + str(len(self._data))+' items)' 

你也可以看到这个很老的Python配方 。

这是我自己的NumPy数组版本:

 #!/usr/bin/env python import numpy as np class RingBuffer(object): def __init__(self, size_max, default_value=0.0, dtype=float): """initialization""" self.size_max = size_max self._data = np.empty(size_max, dtype=dtype) self._data.fill(default_value) self.size = 0 def append(self, value): """append an element""" self._data = np.roll(self._data, 1) self._data[0] = value self.size += 1 if self.size == self.size_max: self.__class__ = RingBufferFull def get_all(self): """return a list of elements from the oldest to the newest""" return(self._data) def get_partial(self): return(self.get_all()[0:self.size]) def __getitem__(self, key): """get element""" return(self._data[key]) def __repr__(self): """return string representation""" s = self._data.__repr__() s = s + '\t' + str(self.size) s = s + '\t' + self.get_all()[::-1].__repr__() s = s + '\t' + self.get_partial()[::-1].__repr__() return(s) class RingBufferFull(RingBuffer): def append(self, value): """append an element when buffer is full""" self._data = np.roll(self._data, 1) self._data[0] = value 

在进行串行编程之前,我遇到了这个问题。 就在一年前的时候,我也找不到任何有效的实现,所以我最终写了一个C扩展名 ,它也可以在 MIT许可证下的pypi上使用。 它是超级基本的,只处理8位有符号字符的缓冲区,但是长度是灵活的,所以如果你需要字符以外的其他东西,你可以使用Struct或者其他东西。 我现在看到谷歌search现在有几个选项,所以你也可以看看这些。

这个不需要任何库。 它增长一个列表,然后通过索引循环。

足迹是非常小的(没有图书馆),它至less跑出两倍的速度。 这确实是计算移动平均值的好方法,但请注意,这些项目并不按上述年龄进行sorting。

 class CircularBuffer(object): def __init__(self, size): """initialization""" self.index= 0 self.size= size self._data = [] def record(self, value): """append an element""" if len(self._data) == self.size: self._data[self.index]= value else: self._data.append(value) self.index= (self.index + 1) % self.size def __getitem__(self, key): """get element by index like a regular array""" return(self._data[key]) def __repr__(self): """return string representation""" return self._data.__repr__() + ' (' + str(len(self._data))+' items)' def get_all(self): """return a list of all the elements""" return(self._data) 

要获得平均值,例如:

 q= CircularBuffer(1000000); for i in range(40000): q.record(i); print "capacity=", q.size print "stored=", len(q.get_all()) print "average=", sum(q.get_all()) / len(q.get_all()) 

结果是:

 capacity= 1000000 stored= 40000 average= 19999 real 0m0.024s user 0m0.020s sys 0m0.000s 

这大约相当于出队时间的三分之一。

原来的问题是:“ 高效 ”的循环缓冲区。 根据这个效率要求,从aaronasterling的答案似乎是明确无误的。 使用Python中编写的专用类,并将时间处理与collections.deque进行比较,显示了使用deque的x5.2倍加速! 这里是testing这个非常简单的代码:

 class cb: def __init__(self, size): self.b = [0]*size self.i = 0 self.sz = size def append(self, v): self.b[self.i] = v self.i = (self.i + 1) % self.sz b = cb(1000) for i in range(10000): b.append(i) # called 200 times, this lasts 1.097 second on my laptop from collections import deque b = deque( [], 1000 ) for i in range(10000): b.append(i) # called 200 times, this lasts 0.211 second on my laptop 

要将一个双端转换为列表,只需使用:

 my_list = [v for v in my_deque] 

随后您将获得O(1)随机访问deque项目。 当然,这只有在设置了一次后才需要进行多次随机访问才有价值。

你回答不对。 循环缓冲区主要有两个原则( https://en.wikipedia.org/wiki/Circular_buffer

  1. 缓冲区的长度设置;
  2. 先进先出;
  3. 添加或删除项目时,其他项目不应移动其位置

你的代码如下:

 def add_to_buffer( self, num ): self.mylist.pop( 0 ) self.mylist.append( num ) 

让我们考虑一下列表已满的情况,通过使用你的代码:

 self.mylist = [1, 2, 3, 4, 5] 

现在我们追加6,这个清单变成了

 self.mylist = [2, 3, 4, 5, 6] 

预计列表中的项目1已经改变了它们的位置

你的代码是一个队列,而不是一个圆形的缓冲区。

巴斯的答案,我认为是最有效的。

顺便说一下,一个圆形缓冲区可以提高操作的性能来添加一个项目。