移动平均线或平均线
是否有一个pyipy函数或numpy函数或模块的python,计算给定一个特定的窗口一维数组的运行平均值?
/ M
对于一个简单快速的解决scheme来说,在一个循环中完成整个任务,而不依赖于下面的代码。
mylist = [1, 2, 3, 4, 5, 6, 7] N = 3 cumsum, moving_aves = [0], [] for i, x in enumerate(mylist, 1): cumsum.append(cumsum[i-1] + x) if i>=N: moving_ave = (cumsum[i] - cumsum[iN])/N #can do stuff with moving_ave here moving_aves.append(moving_ave)
UPD: Alleo和jasaarim已经提出了更有效的解决scheme。
你可以使用np.convolve
:
np.convolve(x, np.ones((N,))/N, mode='valid')
说明
运行均值是卷积运算的一个例子。 对于运行平均值,沿input滑动窗口并计算窗口内容的平均值。 对于离散的一维信号,卷积是同样的事情,除了计算一个任意的线性组合的意思之外,即将每个元素乘以一个相应的系数并合计结果。 这些系数,一个用于窗口中的每个位置,有时被称为卷积核 。 现在,N值的算术平均值是(x_1 + x_2 + ... + x_N) / N
,所以相应的内核是(1/N, 1/N, ..., 1/N)
,这正是我们通过使用np.ones((N,))/N
。
边缘
np.convolve
的mode
参数指定如何处理边缘。 我在这里select了valid
模式,因为我认为这就是大多数人所期望的运行方式的工作原理,但是您可能还有其他的优先级。 这是一个图解说明模式之间的区别:
import numpy as np import matplotlib.pyplot as plt modes = ['full', 'same', 'valid'] for m in modes: plt.plot(np.convolve(np.ones((200,)), np.ones((50,))/50, mode=m)); plt.axis([-10, 251, -.1, 1.1]); plt.legend(modes, loc='lower center'); plt.show()
高效的解决scheme
卷积比简单的方法好得多,但是(我猜)它使用FFT,因此很慢。 但是,特别是对于计算运行均值,以下方法正常工作
def running_mean(x, N): cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) return (cumsum[N:] - cumsum[:-N]) / float(N)
要检查的代码
In[3]: x = numpy.random.random(100000) In[4]: N = 1000 In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode='valid') 10 loops, best of 3: 41.4 ms per loop In[6]: %timeit result2 = running_mean(x, N) 1000 loops, best of 3: 1.04 ms per loop
注意numpy.allclose(result1, result2)
是True
,两个方法是等价的。 N越大,时间差别越大。
你可以用下面的方法计算运行平均值
import numpy as np def runningMean(x, N): y = np.zeros((len(x),)) for ctr in range(len(x)): y[ctr] = np.sum(x[ctr:(ctr+N)]) return y/N
但速度很慢。
幸运的是,numpy包含了一个可以用来加快速度的函数。 运行均值相当于用N
长的向量卷积x
,所有成员等于1/N
convolve的numpy实现包括启动瞬态,所以你必须删除第一个N-1点:
def runningMeanFast(x, N): return np.convolve(x, np.ones((N,))/N)[(N-1):]
在我的机器上,快速版本的速度要快20-30倍,具体取决于inputvector的长度和平均窗口的大小。
请注意,convolve确实包含了一个'same'
模式,它似乎应该解决起始瞬态问题,但是它将它在开始和结束之间分开。
pandas比NumPy或SciPy更适合这个。 其functionrolling_mean方便地完成工作。 当input是一个数组时,它也返回一个NumPy数组。
使用任何自定义的纯Python实现很难击败rolling_mean
。 以下是两个build议的解决scheme的示例性能:
In [1]: import numpy as np In [2]: import pandas as pd In [3]: def running_mean(x, N): ...: cumsum = np.cumsum(np.insert(x, 0, 0)) ...: return (cumsum[N:] - cumsum[:-N]) / N ...: In [4]: x = np.random.random(100000) In [5]: N = 1000 In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode='valid') 10 loops, best of 3: 172 ms per loop In [7]: %timeit running_mean(x, N) 100 loops, best of 3: 6.72 ms per loop In [8]: %timeit pd.rolling_mean(x, N)[N-1:] 100 loops, best of 3: 4.74 ms per loop In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N)) Out[9]: True
对于如何处理边缘值也有不错的select。
有关即用型解决scheme,请参阅http://www.scipy.org/Cookbook/SignalSmooth 。 它提供flat
窗口types的运行平均值。 请注意,这比简单的自制卷积方法稍微复杂一些,因为它试图通过反映数据的开始和结束来处理这些问题(这可能或可能不适用于您的情况。 ..)。
首先,您可以尝试:
a = np.random.random(100) plt.plot(a) b = smooth(a, window='flat') plt.plot(b)
或计算python的模块
在Tradewave.net的testing中TA-lib总是赢:
import talib as ta import numpy as np import pandas as pd import scipy from scipy import signal import time as t PAIR = info.primary_pair PERIOD = 30 def initialize(): storage.reset() storage.elapsed = storage.get('elapsed', [0,0,0,0,0,0]) def cumsum_sma(array, period): ret = np.cumsum(array, dtype=float) ret[period:] = ret[period:] - ret[:-period] return ret[period - 1:] / period def pandas_sma(array, period): return pd.rolling_mean(array, period) def api_sma(array, period): # this method is native to Tradewave and does NOT return an array return (data[PAIR].ma(PERIOD)) def talib_sma(array, period): return ta.MA(array, period) def convolve_sma(array, period): return np.convolve(array, np.ones((period,))/period, mode='valid') def fftconvolve_sma(array, period): return scipy.signal.fftconvolve( array, np.ones((period,))/period, mode='valid') def tick(): close = data[PAIR].warmup_period('close') t1 = t.time() sma_api = api_sma(close, PERIOD) t2 = t.time() sma_cumsum = cumsum_sma(close, PERIOD) t3 = t.time() sma_pandas = pandas_sma(close, PERIOD) t4 = t.time() sma_talib = talib_sma(close, PERIOD) t5 = t.time() sma_convolve = convolve_sma(close, PERIOD) t6 = t.time() sma_fftconvolve = fftconvolve_sma(close, PERIOD) t7 = t.time() storage.elapsed[-1] = storage.elapsed[-1] + t2-t1 storage.elapsed[-2] = storage.elapsed[-2] + t3-t2 storage.elapsed[-3] = storage.elapsed[-3] + t4-t3 storage.elapsed[-4] = storage.elapsed[-4] + t5-t4 storage.elapsed[-5] = storage.elapsed[-5] + t6-t5 storage.elapsed[-6] = storage.elapsed[-6] + t7-t6 plot('sma_api', sma_api) plot('sma_cumsum', sma_cumsum[-5]) plot('sma_pandas', sma_pandas[-10]) plot('sma_talib', sma_talib[-15]) plot('sma_convolve', sma_convolve[-20]) plot('sma_fftconvolve', sma_fftconvolve[-25]) def stop(): log('ticks....: %s' % info.max_ticks) log('api......: %.5f' % storage.elapsed[-1]) log('cumsum...: %.5f' % storage.elapsed[-2]) log('pandas...: %.5f' % storage.elapsed[-3]) log('talib....: %.5f' % storage.elapsed[-4]) log('convolve.: %.5f' % storage.elapsed[-5]) log('fft......: %.5f' % storage.elapsed[-6])
结果:
[2015-01-31 23:00:00] ticks....: 744 [2015-01-31 23:00:00] api......: 0.16445 [2015-01-31 23:00:00] cumsum...: 0.03189 [2015-01-31 23:00:00] pandas...: 0.03677 [2015-01-31 23:00:00] talib....: 0.00700 # <<< Winner! [2015-01-31 23:00:00] convolve.: 0.04871 [2015-01-31 23:00:00] fft......: 0.22306
我知道这是一个老问题,但是这是一个不使用任何额外的数据结构或库的解决scheme。 它与input列表的元素数量成线性关系,我想不出任何其他方式来提高效率(实际上,如果有人知道更好的方法来分配结果,请让我知道)。
注意:使用numpy数组而不是列表会更快,但是我想消除所有的依赖关系。 也可以通过multithreading执行来提高性能
该函数假定input列表是一维的,所以要小心。
### Running mean/Moving average def running_mean(l, N): sum = 0 result = list( 0 for x in l) for i in range( 0, N ): sum = sum + l[i] result[i] = sum / (i+1) for i in range( N, len(l) ): sum = sum - l[iN] + l[i] result[i] = sum / N return result
我还没有检查这是多快,但你可以尝试:
from collections import deque cache = deque() # keep track of seen values n = 10 # window size A = xrange(100) # some dummy iterable cum_sum = 0 # initialize cumulative sum for t, val in enumerate(A, 1): cache.append(val) cum_sum += val if t < n: avg = cum_sum / float(t) else: # if window is saturated, cum_sum -= cache.popleft() # subtract oldest value avg = cum_sum / float(n)
晚会有点晚了,但是我做了一个我自己的小function,它不会用零来包裹,然后用零来find平均值。 作为进一步的处理,它也在线性间隔点重新采样信号。 自定义代码来获得其他function。
该方法是用归一化的高斯内核进行简单的matrix乘法。
def running_mean(y_in, x_in, N_out=101, sigma=1): ''' Returns running mean as a Bell-curve weighted average at evenly spaced points. Does NOT wrap signal around, or pad with zeros. Arguments: y_in -- y values, the values to be smoothed and re-sampled x_in -- x values for array Keyword arguments: N_out -- NoOf elements in resampled array. sigma -- 'Width' of Bell-curve in units of param x . ''' N_in = size(y_in) # Gaussian kernel x_out = np.linspace(np.min(x_in), np.max(x_in), N_out) x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out) gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2)) # Normalize kernel, such that the sum is one along axis 1 normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in)) gauss_kernel_normalized = gauss_kernel / normalization # Perform running average as a linear operation y_out = gauss_kernel_normalized @ y_in return y_out, x_out
正弦信号的简单用法,增加了正态分布噪声:
这个问题现在比上个月NeXuS写的还要老 ,但我喜欢他的代码如何处理边缘情况。 然而,由于这是一个“简单的移动平均数”,其结果落后于他们所应用的数据。 我认为,通过对基于convolution()
的方法应用类似的方法,可以以比NumPy的模式valid
, same
和full
更令人满意的方式处理边缘情况。
我的贡献使用中央运行平均值来调整其结果与其数据。 当要使用的全尺寸窗口的可用点太less时,将从数组边缘的连续较小的窗口计算运行平均值。 [其实,从更大的窗口,但这是一个实施细节。]
import numpy as np def running_mean(l, N): # Also works for the(strictly invalid) cases when N is even. if (N//2)*2 == N: N = N - 1 front = np.zeros(N//2) back = np.zeros(N//2) for i in range(1, (N//2)*2, 2): front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid') for i in range(1, (N//2)*2, 2): back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid') return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])
由于它使用了convolve()
,所以速度相对较慢,而且可能会被一个真正的Pythonista进行很多修改,但是我相信这个想法是站得住脚的。
另一种方法是在不使用numpy的情况下寻找移动平均值的pandas
import itertools sample = [2, 6, 10, 8, 11, 10] list(itertools.starmap(lambda a,b: b/a, enumerate(itertools.accumulate(sample), 1)))
将打印[2.0,4.0,6.0,6.5,7.4,7.833333333333333]
如果保持input的大小很重要(而不是限制输出到卷积的'valid'
区域),那么可以使用scipy.ndimage.filters.uniform_filter1d :
import numpy as np from scipy.ndimage.filters import uniform_filter1d N = 1000 x = np.random.random(100000) y = uniform_filter1d(x, size=N) y.shape == x.shape >>> True
uniform_filter1d
允许多种方式来处理'reflect'
是默认的边界,但在我的情况下,我宁愿'nearest'
。
它也相当快(比np.convolve
快近50倍):
%timeit y1 = np.convolve(x, np.ones((N,))/N, mode='same') 100 loops, best of 3: 9.28 ms per loop %timeit y2 = uniform_filter1d(x, size=N) 10000 loops, best of 3: 191 µs per loop
如果您select自己推出,而不是使用现有的库,请注意浮点错误并尽量减less其影响:
class SumAccumulator: def __init__(self): self.values = [0] self.count = 0 def add( self, val ): self.values.append( val ) self.count = self.count + 1 i = self.count while i & 0x01: i = i >> 1 v0 = self.values.pop() v1 = self.values.pop() self.values.append( v0 + v1 ) def get_total(self): return sum( reversed(self.values) ) def get_size( self ): return self.count
如果所有的值大致相同,那么这将有助于保持精度,通常总是增加大致相似的值。