在Python中滚动或滑动窗口迭代器

我需要一个在序列/迭代器/生成器上迭代的滚动窗口(又称滑动窗口)。 默认的Python迭代可以被认为是一个特殊情况,窗口长度是1.我目前使用下面的代码。 有没有人有这样做更Pythonic,不详细,或更有效的方法?

def rolling_window(seq, window_size): it = iter(seq) win = [it.next() for cnt in xrange(window_size)] # First window yield win for e in it: # Subsequent windows win[:-1] = win[1:] win[-1] = e yield win if __name__=="__main__": for w in rolling_window(xrange(6), 3): print w """Example output: [0, 1, 2] [1, 2, 3] [2, 3, 4] [3, 4, 5] """ 

在Python文档的旧版本中有一个itertools示例 :

 from itertools import islice def window(seq, n=2): "Returns a sliding window (of width n) over data from the iterable" " s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... " it = iter(seq) result = tuple(islice(it, n)) if len(result) == n: yield result for elem in it: result = result[1:] + (elem,) yield result 

来自文档的一个更简洁一点,使用itertools更大的效果,我想象。

这似乎是量身定做的一个collections.deque因为你基本上有一个FIFO(添加到一端,从另一个删除)。 但是,即使你使用了一个list你也不应该切片两次。 相反,你应该从列表中pop(0)append()新的项目。

这里是一个基于deque的优化实现,

 from collections import deque def window(seq, n=2): it = iter(seq) win = deque((next(it, None) for _ in xrange(n)), maxlen=n) yield win append = win.append for e in it: append(e) yield win 

在我的testing中,它大部分时间都能轻松击败所有其他东西,尽pipepillmuncher的tee版本在大型迭代和小窗口中击败了它。 在更大的窗户上, deque以原始速度再次拉开。

在列表中访问单个项目可能比列表或元组更快或更慢。 (如果你使用的是否定索引,附近的项目会更快,或者项目接近尾声。)我在循环体中加上了一个sum(w) 。 这会发挥德克的力量(从一个项目迭代到下一个项目是快速的,所以这个循环跑了比下一个最快的方法,pillmuncher快了20%)。 当我改变它来单独查找和添加项目在十个窗口中,表转身, tee方法快了20%。 加上最后五个项的负指数,我能够恢复一些速度,但是tee速度还是比较快。 总的来说,我估计任何一个对于大多数用途来说都足够快,如果你需要更多的性能,configuration文件,并select最好的。

我喜欢tee()

 from itertools import tee, izip def window(iterable, size): iters = tee(iterable, size) for i in xrange(1, size): for each in iters[i:]: next(each, None) return izip(*iters) for each in window(xrange(6), 3): print list(each) 

得到:

 [0, 1, 2] [1, 2, 3] [2, 3, 4] [3, 4, 5] 

这是一个泛化,增加了对fillvalue参数的支持:

 from collections import deque from itertools import islice def sliding_window(iterable, size=2, step=1, fillvalue=None): if size < 0 or step < 1: raise ValueError it = iter(iterable) q = deque(islice(it, size), maxlen=size) if not q: return # empty iterable or size == 0 q.extend(fillvalue for _ in range(size - len(q))) # pad to size while True: yield iter(q) # iter() to avoid accidental outside modifications try: q.append(next(it)) except StopIteration: # Python 3.5 pep 479 support return q.extend(next(it, fillvalue) for _ in range(step - 1)) 

如果需要,每次迭代在一个时间滚动step位置产生块size项目,每个块填充填充值。 size=4, step=3, fillvalue='*'

  [abcd]efghijklmnopqrstuvwxy z abc[defg]hijklmnopqrstuvwxyz abcdef[ghij]klmnopqrstuvwxyz abcdefghi[jklm]nopqrstuvwxyz abcdefghijkl[mnop]qrstuvwxyz abcdefghijklmno[pqrs]tuvwxyz abcdefghijklmnopqr[stuv]wxyz abcdefghijklmnopqrstu[vwxy]z abcdefghijklmnopqrstu vwx[yz * *] 

有关step参数用例的示例,请参阅使用python高效处理大型.txt文件 。

只是一个快速的贡献。

由于当前的python文档在itertool示例(即http://docs.python.org/library/itertools.html底部)中没有“窗口”,下面是一个基于grouper代码的代码段是给出的例子之一:;

 import itertools as it def window(iterable, size): shiftedStarts = [it.islice(iterable, s, None) for s in xrange(size)] return it.izip(*shiftedStarts) 

基本上,我们创build了一系列切片迭代器,每个迭代器都有一个起点向前一点。 然后,我们一起拉链。 注意,这个函数返回一个生成器(它不是直接生成器本身)。

就像上面的附加元素和推进迭代器版本一样,性能(即最好)随列表大小和窗口大小而变化。 我喜欢这个,因为它是一个双线(可能是一个class轮,但我更喜欢命名的概念)。

事实certificate,上面的代码是错误的 。 如果传递给iterable的参数是一个序列,但是它不是一个迭代器,那么它就可以工作。 如果它是一个迭代器,在islice调用中共享同一个迭代器(但不是tee'd),这会使事情变得糟糕。

这是一些固定的代码:

 import itertools as it def window(iterable, size): itrs = it.tee(iterable, size) shiftedStarts = [it.islice(anItr, s, None) for s, anItr in enumerate(itrs)] return it.izip(*shiftedStarts) 

此外,还有一个版本的书籍。 而不是复制一个迭代器,然后多次推进副本,这个版本在我们向前移动起始位置时,使每个迭代器的成对副本。 因此,迭代器t提供了“完整的”迭代器,其起始点在t处,也是创build迭代器t + 1的基础:

 import itertools as it def window4(iterable, size): complete_itr, incomplete_itr = it.tee(iterable, 2) iters = [complete_itr] for i in xrange(1, size): incomplete_itr.next() complete_itr, incomplete_itr = it.tee(incomplete_itr, 2) iters.append(complete_itr) return it.izip(*iters) 

为了展示如何结合itertools食谱 ,我将使用consume食谱的pairwise食谱尽可能直接地扩展到window食谱中:

 def consume(iterator, n): "Advance the iterator n-steps ahead. If n is none, consume entirely." # Use functions that consume iterators at C speed. if n is None: # feed the entire iterator into a zero-length deque collections.deque(iterator, maxlen=0) else: # advance to the empty slice starting at position n next(islice(iterator, n, n), None) def window(iterable, n=2): "s -> (s0, ...,s(n-1)), (s1, ...,sn), (s2, ..., s(n+1)), ..." iters = tee(iterable, n) # Could use enumerate(islice(iters, 1, None), 1) to avoid consume(it, 0), but that's # slower for larger window sizes, while saving only small fixed "noop" cost for i, it in enumerate(iters): consume(it, i) return zip(*iters) 

window配方与pairwise相同,只是用第n - 1迭代器中的单个元素“消耗”,而逐渐增加n - 1迭代器的消耗。 使用consume而不是在islice中包装每个迭代器的islice要稍微快一些(对于足够大的迭代器),因为在consume阶段只支付islice包装开销,而不是在提取每个窗口值的过程中(所以它的界限是n iterable的项目数量)。

在性能方面,与其他一些解决scheme相比,这是相当不错的(比我testing的任何其他解决scheme都更好)。 在Python 3.5.0,Linux x86-64上testing,使用ipython %timeit magic。

kindall的deque解决scheme ,通过使用islice而不是home-rolled生成器expression式来testing性能/正确性,并testing结果长度,以便在迭代器比窗口短时不会产生结果,并且传递maxlen deque地位,而不是关键字(使小input令人惊讶的区别):

 >>> %timeit -r5 deque(windowkindall(range(10), 3), 0) 100000 loops, best of 5: 1.87 μs per loop >>> %timeit -r5 deque(windowkindall(range(1000), 3), 0) 10000 loops, best of 5: 72.6 μs per loop >>> %timeit -r5 deque(windowkindall(range(1000), 30), 0) 1000 loops, best of 5: 71.6 μs per loop 

与以前改进的类似的解决scheme相同,但是每个yield win变成yield tuple(win)所以存储结果来自生成器,而不是所有存储的结果真的是最近结果的视图(在这种情况下所有其他合理的解决scheme都是安全的) ,并将tuple=tuple添加到函数定义中,将tuple使用从LEGBB LEGBL

 >>> %timeit -r5 deque(windowkindalltupled(range(10), 3), 0) 100000 loops, best of 5: 3.05 μs per loop >>> %timeit -r5 deque(windowkindalltupled(range(1000), 3), 0) 10000 loops, best of 5: 207 μs per loop >>> %timeit -r5 deque(windowkindalltupled(range(1000), 30), 0) 1000 loops, best of 5: 348 μs per loop 

上面显示的基于consume的解决scheme:

 >>> %timeit -r5 deque(windowconsume(range(10), 3), 0) 100000 loops, best of 5: 3.92 μs per loop >>> %timeit -r5 deque(windowconsume(range(1000), 3), 0) 10000 loops, best of 5: 42.8 μs per loop >>> %timeit -r5 deque(windowconsume(range(1000), 30), 0) 1000 loops, best of 5: 232 μs per loop 

consume相同,但内联consume情况以避免函数调用,并且n is Nonetesting以减less运行时间,尤其是对于那些设置开销是工作的有意义的部分的小input:

 >>> %timeit -r5 deque(windowinlineconsume(range(10), 3), 0) 100000 loops, best of 5: 3.57 μs per loop >>> %timeit -r5 deque(windowinlineconsume(range(1000), 3), 0) 10000 loops, best of 5: 40.9 μs per loop >>> %timeit -r5 deque(windowinlineconsume(range(1000), 30), 0) 1000 loops, best of 5: 211 μs per loop 

(注意: pairwise使用默认参数为2的tee变体来创build嵌套的tee对象,所以任何给定的迭代器只会被提前一次,而不会独立地消耗越来越多的时间,类似于MrDrFenner的答案类似于非内联consume并且比所有testing中的内联consume要慢,所以为了简洁,我省略了这些结果)。

正如你所看到的, 如果你不关心调用者需要存储结果的可能性,那么我的优化版本的kindall的解决scheme大部分时间都会赢,除了“大的迭代,小窗口大小的情况” (内联consume胜); 随着可迭代大小的增加,它会迅速降级,而随着窗口大小的增加,所有这些都不会退化(对于可迭代大小的增加,其他解决scheme的降解速度会更慢,但窗口大小的增加也会降低)。 它甚至可以通过包装在map(tuple, ...)来适应“需要元组”的情况,该map(tuple, ...)运行的速度比将函数放在一个稍微慢一些的地图上,但是它是微不足道的(需要1-5%的时间)当您可以容忍重复返回相同的值时,可以让您保持更快的运行灵活性。

如果您需要安全性来防止存储的退货,则内联consume赢得除最小input大小之外的所有input (非内联consume稍微慢一点但缩放类似)。 由于安装成本较低,基于双层架构的解决scheme仅赢得最小的投入,而且增益较小; 随着迭代时间的延长,它会严重恶化。

为了logging,我使用的kindall解决scheme的修改版本是:

 def windowkindalltupled(iterable, n=2, tuple=tuple): it = iter(iterable) win = deque(islice(it, n), n) if len(win) < n: return append = win.append yield tuple(win) for e in it: append(e) yield tuple(win) 

在函数定义行中删除tuple的caching,并在每个yield使用tuple以获得更快但安全性较低的版本。

我使用下面的代码作为一个简单的滑动窗口,使用生成器来大幅提高可读性。 根据我的经验,它的速度到目前为止足以用于生物信息学序列分析。

我在这里包括它,因为我没有看到这个方法使用。 再一次,我没有声称它的比较performance。

 def slidingWindow(sequence,winSize,step=1): """Returns a generator that will iterate through the defined chunks of input sequence. Input sequence must be sliceable.""" # Verify the inputs if not ((type(winSize) == type(0)) and (type(step) == type(0))): raise Exception("**ERROR** type(winSize) and type(step) must be int.") if step > winSize: raise Exception("**ERROR** step must not be larger than winSize.") if winSize > len(sequence): raise Exception("**ERROR** winSize must not be larger than sequence length.") # Pre-compute number of chunks to emit numOfChunks = ((len(sequence)-winSize)/step)+1 # Do the work for i in range(0,numOfChunks*step,step): yield sequence[i:i+winSize] 

稍微修改一下deque窗口的版本,使其成为一个真正的滚动窗口。 所以它开始只填充一个元素,然后增长到它的最大窗口大小,然后收缩,因为它的左边缘接近尾声:

 from collections import deque def window(seq, n=2): it = iter(seq) win = deque((next(it, None) for _ in xrange(1)), maxlen=n) yield win append = win.append for e in it: append(e) yield win for _ in xrange(len(win)-1): win.popleft() yield win for wnd in window(range(5), n=3): print(list(wnd)) 

这给了

 [0] [0, 1] [0, 1, 2] [1, 2, 3] [2, 3, 4] [3, 4] [4] 

多个迭代器!

 def window(seq, size, step=1): # initialize iterators iters = [iter(seq) for i in range(size)] # stagger iterators (without yielding) [next(iters[i]) for j in range(size) for i in range(-1, -j-1, -1)] while(True): yield [next(i) for i in iters] # next line does nothing for step = 1 (skips iterations for step > 1) [next(i) for i in iters for j in range(step-1)] 

next(it)在序列完成时引发StopIteration ,除了我之外的一些很酷的原因,yield语句除了它和函数返回外,忽略了不构成完整窗口的剩余值。

无论如何,这是最简单的解决scheme,但其唯一的要求是seq实现__iter____getitem__ ,并不依赖itertoolscollections除了@ dansalmo的解决scheme:)

 def rolling_window(list, degree): for i in range(len(list)-degree+1): yield [list[i+o] for o in range(degree)] 

这是一个滚动平均function

为什么不

 def pairwise(iterable): "s -> (s0,s1), (s1,s2), (s2, s3), ..." a, b = tee(iterable) next(b, None) return zip(a, b) 

它在Python 文档中有logging 。 您可以轻松地将其扩展到更宽的窗口。

 def GetShiftingWindows(thelist, size): return [ thelist[x:x+size] for x in range( len(thelist) - size + 1 ) ] >> a = [1, 2, 3, 4, 5] >> GetShiftingWindows(a, 3) [ [1, 2, 3], [2, 3, 4], [3, 4, 5] ] 
 >>> n, m = 6, 3 >>> k = n - m+1 >>> print ('{}\n'*(k)).format(*[range(i, i+m) for i in xrange(k)]) [0, 1, 2] [1, 2, 3] [2, 3, 4] [3, 4, 5] 

如何使用以下内容:

 mylist = [1, 2, 3, 4, 5, 6, 7] def sliding_window(l, window_size=2): if window_size > len(l): raise ValueError("Window size must be smaller or equal to the number of elements in the list.") t = [] for i in xrange(0, window_size): t.append(l[i:]) return zip(*t) print sliding_window(mylist, 3) 

输出:

 [(1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6), (5, 6, 7)] 

这是一个古老的问题,但对于那些仍然感兴趣的是在这个页面中使用生成器的一个窗口滑块很好的实现(阿德里安·罗斯布鲁克)。

这是OpenCV的实现,但是您可以轻松地将其用于任何其他目的。 对于渴望的人,我会粘贴这里的代码,但要更好地理解它,我build议访问原始页面。

 def sliding_window(image, stepSize, windowSize): # slide a window across the image for y in xrange(0, image.shape[0], stepSize): for x in xrange(0, image.shape[1], stepSize): # yield the current window yield (x, y, image[y:y + windowSize[1], x:x + windowSize[0]]) 

提示:您可以在迭代生成器时检查窗口的.shape ,以放弃那些不符合要求的窗口

干杯

toolz包有一个sliding_window函数。

有一个图书馆,正是你所需要的:

 import more_itertools list(more_itertools.windowed([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],n=3, step=3)) Out: [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12), (13, 14, 15)]