在Python中滚动或滑动窗口迭代器
我需要一个在序列/迭代器/生成器上迭代的滚动窗口(又称滑动窗口)。 默认的Python迭代可以被认为是一个特殊情况,窗口长度是1.我目前使用下面的代码。 有没有人有这样做更Pythonic,不详细,或更有效的方法?
def rolling_window(seq, window_size): it = iter(seq) win = [it.next() for cnt in xrange(window_size)] # First window yield win for e in it: # Subsequent windows win[:-1] = win[1:] win[-1] = e yield win if __name__=="__main__": for w in rolling_window(xrange(6), 3): print w """Example output: [0, 1, 2] [1, 2, 3] [2, 3, 4] [3, 4, 5] """
在Python文档的旧版本中有一个itertools
示例 :
from itertools import islice def window(seq, n=2): "Returns a sliding window (of width n) over data from the iterable" " s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... " it = iter(seq) result = tuple(islice(it, n)) if len(result) == n: yield result for elem in it: result = result[1:] + (elem,) yield result
来自文档的一个更简洁一点,使用itertools
更大的效果,我想象。
这似乎是量身定做的一个collections.deque
因为你基本上有一个FIFO(添加到一端,从另一个删除)。 但是,即使你使用了一个list
你也不应该切片两次。 相反,你应该从列表中pop(0)
并append()
新的项目。
这里是一个基于deque的优化实现,
from collections import deque def window(seq, n=2): it = iter(seq) win = deque((next(it, None) for _ in xrange(n)), maxlen=n) yield win append = win.append for e in it: append(e) yield win
在我的testing中,它大部分时间都能轻松击败所有其他东西,尽pipepillmuncher的tee
版本在大型迭代和小窗口中击败了它。 在更大的窗户上, deque
以原始速度再次拉开。
在列表中访问单个项目可能比列表或元组更快或更慢。 (如果你使用的是否定索引,附近的项目会更快,或者项目接近尾声。)我在循环体中加上了一个sum(w)
。 这会发挥德克的力量(从一个项目迭代到下一个项目是快速的,所以这个循环跑了比下一个最快的方法,pillmuncher快了20%)。 当我改变它来单独查找和添加项目在十个窗口中,表转身, tee
方法快了20%。 加上最后五个项的负指数,我能够恢复一些速度,但是tee
速度还是比较快。 总的来说,我估计任何一个对于大多数用途来说都足够快,如果你需要更多的性能,configuration文件,并select最好的。
我喜欢tee()
:
from itertools import tee, izip def window(iterable, size): iters = tee(iterable, size) for i in xrange(1, size): for each in iters[i:]: next(each, None) return izip(*iters) for each in window(xrange(6), 3): print list(each)
得到:
[0, 1, 2] [1, 2, 3] [2, 3, 4] [3, 4, 5]
这是一个泛化,增加了对fillvalue
参数的支持:
from collections import deque from itertools import islice def sliding_window(iterable, size=2, step=1, fillvalue=None): if size < 0 or step < 1: raise ValueError it = iter(iterable) q = deque(islice(it, size), maxlen=size) if not q: return # empty iterable or size == 0 q.extend(fillvalue for _ in range(size - len(q))) # pad to size while True: yield iter(q) # iter() to avoid accidental outside modifications try: q.append(next(it)) except StopIteration: # Python 3.5 pep 479 support return q.extend(next(it, fillvalue) for _ in range(step - 1))
如果需要,每次迭代在一个时间滚动step
位置产生块size
项目,每个块填充填充值。 size=4, step=3, fillvalue='*'
:
[abcd]efghijklmnopqrstuvwxy z abc[defg]hijklmnopqrstuvwxyz abcdef[ghij]klmnopqrstuvwxyz abcdefghi[jklm]nopqrstuvwxyz abcdefghijkl[mnop]qrstuvwxyz abcdefghijklmno[pqrs]tuvwxyz abcdefghijklmnopqr[stuv]wxyz abcdefghijklmnopqrstu[vwxy]z abcdefghijklmnopqrstu vwx[yz * *]
有关step
参数用例的示例,请参阅使用python高效处理大型.txt文件 。
只是一个快速的贡献。
由于当前的python文档在itertool示例(即http://docs.python.org/library/itertools.html底部)中没有“窗口”,下面是一个基于grouper代码的代码段是给出的例子之一:;
import itertools as it def window(iterable, size): shiftedStarts = [it.islice(iterable, s, None) for s in xrange(size)] return it.izip(*shiftedStarts)
基本上,我们创build了一系列切片迭代器,每个迭代器都有一个起点向前一点。 然后,我们一起拉链。 注意,这个函数返回一个生成器(它不是直接生成器本身)。
就像上面的附加元素和推进迭代器版本一样,性能(即最好)随列表大小和窗口大小而变化。 我喜欢这个,因为它是一个双线(可能是一个class轮,但我更喜欢命名的概念)。
事实certificate,上面的代码是错误的 。 如果传递给iterable的参数是一个序列,但是它不是一个迭代器,那么它就可以工作。 如果它是一个迭代器,在islice调用中共享同一个迭代器(但不是tee'd),这会使事情变得糟糕。
这是一些固定的代码:
import itertools as it def window(iterable, size): itrs = it.tee(iterable, size) shiftedStarts = [it.islice(anItr, s, None) for s, anItr in enumerate(itrs)] return it.izip(*shiftedStarts)
此外,还有一个版本的书籍。 而不是复制一个迭代器,然后多次推进副本,这个版本在我们向前移动起始位置时,使每个迭代器的成对副本。 因此,迭代器t提供了“完整的”迭代器,其起始点在t处,也是创build迭代器t + 1的基础:
import itertools as it def window4(iterable, size): complete_itr, incomplete_itr = it.tee(iterable, 2) iters = [complete_itr] for i in xrange(1, size): incomplete_itr.next() complete_itr, incomplete_itr = it.tee(incomplete_itr, 2) iters.append(complete_itr) return it.izip(*iters)
为了展示如何结合itertools
食谱 ,我将使用consume
食谱的pairwise
食谱尽可能直接地扩展到window
食谱中:
def consume(iterator, n): "Advance the iterator n-steps ahead. If n is none, consume entirely." # Use functions that consume iterators at C speed. if n is None: # feed the entire iterator into a zero-length deque collections.deque(iterator, maxlen=0) else: # advance to the empty slice starting at position n next(islice(iterator, n, n), None) def window(iterable, n=2): "s -> (s0, ...,s(n-1)), (s1, ...,sn), (s2, ..., s(n+1)), ..." iters = tee(iterable, n) # Could use enumerate(islice(iters, 1, None), 1) to avoid consume(it, 0), but that's # slower for larger window sizes, while saving only small fixed "noop" cost for i, it in enumerate(iters): consume(it, i) return zip(*iters)
window
配方与pairwise
相同,只是用第n - 1
迭代器中的单个元素“消耗”,而逐渐增加n - 1
迭代器的消耗。 使用consume
而不是在islice
中包装每个迭代器的islice
要稍微快一些(对于足够大的迭代器),因为在consume
阶段只支付islice
包装开销,而不是在提取每个窗口值的过程中(所以它的界限是n
iterable
的项目数量)。
在性能方面,与其他一些解决scheme相比,这是相当不错的(比我testing的任何其他解决scheme都更好)。 在Python 3.5.0,Linux x86-64上testing,使用ipython
%timeit
magic。
kindall的deque
解决scheme ,通过使用islice
而不是home-rolled生成器expression式来testing性能/正确性,并testing结果长度,以便在迭代器比窗口短时不会产生结果,并且传递maxlen
deque
地位,而不是关键字(使小input令人惊讶的区别):
>>> %timeit -r5 deque(windowkindall(range(10), 3), 0) 100000 loops, best of 5: 1.87 μs per loop >>> %timeit -r5 deque(windowkindall(range(1000), 3), 0) 10000 loops, best of 5: 72.6 μs per loop >>> %timeit -r5 deque(windowkindall(range(1000), 30), 0) 1000 loops, best of 5: 71.6 μs per loop
与以前改进的类似的解决scheme相同,但是每个yield win
变成yield tuple(win)
所以存储结果来自生成器,而不是所有存储的结果真的是最近结果的视图(在这种情况下所有其他合理的解决scheme都是安全的) ,并将tuple=tuple
添加到函数定义中,将tuple
使用从LEGB
的B
LEGB
到L
:
>>> %timeit -r5 deque(windowkindalltupled(range(10), 3), 0) 100000 loops, best of 5: 3.05 μs per loop >>> %timeit -r5 deque(windowkindalltupled(range(1000), 3), 0) 10000 loops, best of 5: 207 μs per loop >>> %timeit -r5 deque(windowkindalltupled(range(1000), 30), 0) 1000 loops, best of 5: 348 μs per loop
上面显示的基于consume
的解决scheme:
>>> %timeit -r5 deque(windowconsume(range(10), 3), 0) 100000 loops, best of 5: 3.92 μs per loop >>> %timeit -r5 deque(windowconsume(range(1000), 3), 0) 10000 loops, best of 5: 42.8 μs per loop >>> %timeit -r5 deque(windowconsume(range(1000), 30), 0) 1000 loops, best of 5: 232 μs per loop
与consume
相同,但内联consume
情况以避免函数调用,并且n is None
testing以减less运行时间,尤其是对于那些设置开销是工作的有意义的部分的小input:
>>> %timeit -r5 deque(windowinlineconsume(range(10), 3), 0) 100000 loops, best of 5: 3.57 μs per loop >>> %timeit -r5 deque(windowinlineconsume(range(1000), 3), 0) 10000 loops, best of 5: 40.9 μs per loop >>> %timeit -r5 deque(windowinlineconsume(range(1000), 30), 0) 1000 loops, best of 5: 211 μs per loop
(注意: pairwise
使用默认参数为2的tee
变体来创build嵌套的tee
对象,所以任何给定的迭代器只会被提前一次,而不会独立地消耗越来越多的时间,类似于MrDrFenner的答案类似于非内联consume
并且比所有testing中的内联consume
要慢,所以为了简洁,我省略了这些结果)。
正如你所看到的, 如果你不关心调用者需要存储结果的可能性,那么我的优化版本的kindall的解决scheme大部分时间都会赢,除了“大的迭代,小窗口大小的情况” (内联consume
胜); 随着可迭代大小的增加,它会迅速降级,而随着窗口大小的增加,所有这些都不会退化(对于可迭代大小的增加,其他解决scheme的降解速度会更慢,但窗口大小的增加也会降低)。 它甚至可以通过包装在map(tuple, ...)
来适应“需要元组”的情况,该map(tuple, ...)
运行的速度比将函数放在一个稍微慢一些的地图上,但是它是微不足道的(需要1-5%的时间)当您可以容忍重复返回相同的值时,可以让您保持更快的运行灵活性。
如果您需要安全性来防止存储的退货,则内联consume
赢得除最小input大小之外的所有input (非内联consume
稍微慢一点但缩放类似)。 由于安装成本较低,基于双层架构的解决scheme仅赢得最小的投入,而且增益较小; 随着迭代时间的延长,它会严重恶化。
为了logging,我使用的kindall解决scheme的修改版本是:
def windowkindalltupled(iterable, n=2, tuple=tuple): it = iter(iterable) win = deque(islice(it, n), n) if len(win) < n: return append = win.append yield tuple(win) for e in it: append(e) yield tuple(win)
在函数定义行中删除tuple
的caching,并在每个yield
使用tuple
以获得更快但安全性较低的版本。
我使用下面的代码作为一个简单的滑动窗口,使用生成器来大幅提高可读性。 根据我的经验,它的速度到目前为止足以用于生物信息学序列分析。
我在这里包括它,因为我没有看到这个方法使用。 再一次,我没有声称它的比较performance。
def slidingWindow(sequence,winSize,step=1): """Returns a generator that will iterate through the defined chunks of input sequence. Input sequence must be sliceable.""" # Verify the inputs if not ((type(winSize) == type(0)) and (type(step) == type(0))): raise Exception("**ERROR** type(winSize) and type(step) must be int.") if step > winSize: raise Exception("**ERROR** step must not be larger than winSize.") if winSize > len(sequence): raise Exception("**ERROR** winSize must not be larger than sequence length.") # Pre-compute number of chunks to emit numOfChunks = ((len(sequence)-winSize)/step)+1 # Do the work for i in range(0,numOfChunks*step,step): yield sequence[i:i+winSize]
稍微修改一下deque窗口的版本,使其成为一个真正的滚动窗口。 所以它开始只填充一个元素,然后增长到它的最大窗口大小,然后收缩,因为它的左边缘接近尾声:
from collections import deque def window(seq, n=2): it = iter(seq) win = deque((next(it, None) for _ in xrange(1)), maxlen=n) yield win append = win.append for e in it: append(e) yield win for _ in xrange(len(win)-1): win.popleft() yield win for wnd in window(range(5), n=3): print(list(wnd))
这给了
[0] [0, 1] [0, 1, 2] [1, 2, 3] [2, 3, 4] [3, 4] [4]
多个迭代器!
def window(seq, size, step=1): # initialize iterators iters = [iter(seq) for i in range(size)] # stagger iterators (without yielding) [next(iters[i]) for j in range(size) for i in range(-1, -j-1, -1)] while(True): yield [next(i) for i in iters] # next line does nothing for step = 1 (skips iterations for step > 1) [next(i) for i in iters for j in range(step-1)]
next(it)
在序列完成时引发StopIteration
,除了我之外的一些很酷的原因,yield语句除了它和函数返回外,忽略了不构成完整窗口的剩余值。
无论如何,这是最简单的解决scheme,但其唯一的要求是seq
实现__iter__
或__getitem__
,并不依赖itertools
或collections
除了@ dansalmo的解决scheme:)
def rolling_window(list, degree): for i in range(len(list)-degree+1): yield [list[i+o] for o in range(degree)]
这是一个滚动平均function
为什么不
def pairwise(iterable): "s -> (s0,s1), (s1,s2), (s2, s3), ..." a, b = tee(iterable) next(b, None) return zip(a, b)
它在Python 文档中有logging 。 您可以轻松地将其扩展到更宽的窗口。
def GetShiftingWindows(thelist, size): return [ thelist[x:x+size] for x in range( len(thelist) - size + 1 ) ] >> a = [1, 2, 3, 4, 5] >> GetShiftingWindows(a, 3) [ [1, 2, 3], [2, 3, 4], [3, 4, 5] ]
>>> n, m = 6, 3 >>> k = n - m+1 >>> print ('{}\n'*(k)).format(*[range(i, i+m) for i in xrange(k)]) [0, 1, 2] [1, 2, 3] [2, 3, 4] [3, 4, 5]
如何使用以下内容:
mylist = [1, 2, 3, 4, 5, 6, 7] def sliding_window(l, window_size=2): if window_size > len(l): raise ValueError("Window size must be smaller or equal to the number of elements in the list.") t = [] for i in xrange(0, window_size): t.append(l[i:]) return zip(*t) print sliding_window(mylist, 3)
输出:
[(1, 2, 3), (2, 3, 4), (3, 4, 5), (4, 5, 6), (5, 6, 7)]
这是一个古老的问题,但对于那些仍然感兴趣的是在这个页面中使用生成器的一个窗口滑块很好的实现(阿德里安·罗斯布鲁克)。
这是OpenCV的实现,但是您可以轻松地将其用于任何其他目的。 对于渴望的人,我会粘贴这里的代码,但要更好地理解它,我build议访问原始页面。
def sliding_window(image, stepSize, windowSize): # slide a window across the image for y in xrange(0, image.shape[0], stepSize): for x in xrange(0, image.shape[1], stepSize): # yield the current window yield (x, y, image[y:y + windowSize[1], x:x + windowSize[0]])
提示:您可以在迭代生成器时检查窗口的.shape
,以放弃那些不符合要求的窗口
干杯
toolz包有一个sliding_window函数。
有一个图书馆,正是你所需要的:
import more_itertools list(more_itertools.windowed([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15],n=3, step=3)) Out: [(1, 2, 3), (4, 5, 6), (7, 8, 9), (10, 11, 12), (13, 14, 15)]