计算局域峰值

11064阅读 0评论2012-09-23 HyryStudio
分类:Python/Ruby

我们经常会将一个频率扫描波输入到滤波器之中,然后根据滤波器输出的峰值变化计算滤波器的频率响应。例如下面的程序:

import numpy as np
import pylab as pl
from scipy import signal

rate = 44100.0
t = np.arange(0, 10, 1/rate)
in_data = signal.chirp(t, 20, 10, 20000, method="logarithmic")
freq = np.logspace(np.log10(20), np.log10(20000), len(t), base=10)
b, a = signal.butter(4, 0.1)
out_data = signal.lfilter(b, a, in_data)
pl.semilogx(freq, out_data)

使用signal.chrip()创建一个从20Hz到20kHz、10秒、频率呈对数变化频率扫描波。计算频率扫描波对应的频率。设计一个4阶的低通滤波器,截止频率为0.1(正规化频率)。调用signal.lfilter()计算滤波器的输出。

程序的输出如下图所示:

out_data数组是一个振幅和频率都不断变化的频率扫描波,只需要计算每个峰值的大小和时间就可以得到滤波器的频率响应。

maximum_filter1d

可以使用移动峰值的函数计算局域峰值。移动峰值的计算方法如下:

  1. 用一个长度为N的窗口和取样值数组进行计算。
  2. 计算窗口所对应的取样值中的最大值,并用此最大值取代窗口中心的取样值。
  3. 窗口不断向右移动,并重复计算步骤2。

这种看似和信号处理相关的函数在SciPy中却放到了ndimage图像处理模块中:scipy.ndimage.maximum_filter1d()。找到每个取样值窗口所对应的峰值之后,将峰值与原始取样数据进行比较,若峰值与原始数据相同,则说明原始数据为峰值。

下面的程序演示这一算法:

from scipy.ndimage import maximum_filter1d
t = np.linspace(-10, 10, 200)
x = np.sin(5*t)*(50-t**2)
mx = maximum_filter1d(x, 11)
pos = np.where(x==mx)[0]
pl.plot(x)
pl.plot(mx, "r")
pl.plot(pos, x[pos], "x")

输出为:

图中蓝色曲线为原取样数据,红色曲线是窗口长度为11时的局域峰值曲线。可以看到蓝色曲线和红色曲线重合的位置就为峰值出现的位置。图中采用绿色”x”表示。

通过峰值计算频率响应

下面我们用上述的方法计算频率扫描波的峰值,并和滤波器的理论频率响应进行比较:

from scipy.ndimage import maximum_filter1d
window = 201

max_data = maximum_filter1d(out_data, window)
peak_index = np.where(max_data == out_data)[0]
pl.semilogx(freq[peak_index], 20*np.log10(out_data[peak_index]), lw=2)

#使用freqz计算滤波器的频率响应
w, p = signal.freqz(b, a)
freq2 = w/np.pi*rate*0.5
pl.semilogx(freq2, 20*np.log10(np.abs(p)), "r")
pl.xlim(20, 20000)
pl.ylim(-120, 20)
pl.savefig("lpf_response.png", dpi=100)

程序的输出如下图所示,其中蓝色曲线为频率扫描波计算的频率响应,红色曲线为理论频率响应:

提高运算速度

下面让我们考虑以下maximum_filter1d()是如何实现的。一个最简单的方法就是做双重循环,外层循环N次,内层循环K次,其中N为数据点数,K为窗口的大小。每次内层循环都找出窗口中的最大值。这个算法的复杂度为::math:O( N \cdot K) 。

实际上有更快捷的算法,这个算法在一个名为bottleneck的扩展库中。下面让我们先比较它和maximum_filter1d()的运算速度,然后再介绍其算法。

bottleneck的文档

from scipy.ndimage import maximum_filter1d
import bottleneck as bn
import numpy as np
data = np.array([1, 4, 3, 8, 2, 5, 0, 2, 3, 5, 8, 2, 4, 10, 12, 2])
print maximum_filter1d(data, 5)
print bn.move_max(data, 5)

程序的输出为:

[ 4  8  8  8  8  8  5  5  8  8  8 10 12 12 12 12]
[ nan nan nan nan 8. 8. 8. 8. 5. 5. 8. 8. 8. 10. 12. 12.]

bottleneck中的move_max()和maximum_filter1d()对窗口的处理稍微有些不同,在move_max()中当前处理的元素处于窗口的最右端,而maximum_filter1d()中的当前元素处于窗口的正中间。

为了让move_max()的结果和maximum_filter1d()的一致,我们需要将move_max()的结果左移N//2+1个元素,这个操作可以通过numpy.roll()实现:

>>> print np.roll(bn.move_max(data, 5), -5//2+1)
[ nan nan 8. 8. 8. 8. 5. 5. 8. 8. 8. 10. 12. 12. nan nan]

这样除了前后有N-1个NAN,其余的元素都合maximum_filter1d()相同。

下面我们比较二者的运算速度:

from scipy.ndimage import maximum_filter1d
import bottleneck as bn
import numpy as np
data = np.random.randn(10000)
%timeit maximum_filter1d(data, 200)
%timeit bn.move_max(data, 200)

其中timeit是IPython的命令,它用来评价程序的运算时间。程序的输出为:

100 loops, best of 3: 7.01 ms per loop
1000 loops, best of 3: 465 us per loop

move_max()比maximum_filter1d()快15倍,显然它们所采用的算法应该是不同量级的。

更高效的算法

bottleneck的move_max()算法可以用下面的程序表示:

from collections import deque

def move_max(data, k):
    dq = deque([0])
    result = []
    for i in xrange(len(data)):
        x = data[i]
        idx = dq[0]
        if i == idx + k: # 如果当前峰值超出窗口范围
            dq.popleft() # 则将其移出队列,队列中的下一个值将成为峰值
        if x >= data[idx]: # 发现更大的峰值
            dq.clear()     # 将队列清空
            dq.append(i) # 只保存最大峰值
        else:
            while dq and data[dq[-1]] <= x: # 当前值没有峰值大
                dq.pop() # 则按照从大到小的顺序将其插入队列,队列中其后的元素可删除
            dq.append(i)
        result.append(data[dq[0]])
    result[:k-1] = [None]*(k-1)
    return result

其基本思想就是使用一个队列保存所有候选的峰值。队列中的峰值按照从大到小排列,根据当前元素的值按照如下规则更新队列:

>>> move_max([1, 4, 3, 8, 2, 5, 0, 2, 3, 5, 8, 2, 4, 10, 12, 2], 5)
[None, None, None, None, 8, 8, 8, 8, 5, 5, 8, 8, 8, 10, 12, 12]
上一篇:增加Spyder的编辑范围
下一篇:继承对属性访问速度的影响