我们经常会将一个频率扫描波输入到滤波器之中,然后根据滤波器输出的峰值变化计算滤波器的频率响应。例如下面的程序:
import numpy as np
import pylab as pl
from scipy import signal
rate = 44100.0
t = np.arange(0, 10, 1/rate)
in_data = signal.chirp(t, 20, 10, 20000, method="logarithmic") ❶
freq = np.logspace(np.log10(20), np.log10(20000), len(t), base=10) ❷
b, a = signal.butter(4, 0.1) ❸
out_data = signal.lfilter(b, a, in_data) ❹
pl.semilogx(freq, out_data)
❶使用signal.chrip()创建一个从20Hz到20kHz、10秒、频率呈对数变化频率扫描波。❷计算频率扫描波对应的频率。❸设计一个4阶的低通滤波器,截止频率为0.1(正规化频率)。❹调用signal.lfilter()计算滤波器的输出。
程序的输出如下图所示:
out_data数组是一个振幅和频率都不断变化的频率扫描波,只需要计算每个峰值的大小和时间就可以得到滤波器的频率响应。
maximum_filter1d
可以使用移动峰值的函数计算局域峰值。移动峰值的计算方法如下:
- 用一个长度为N的窗口和取样值数组进行计算。
- 计算窗口所对应的取样值中的最大值,并用此最大值取代窗口中心的取样值。
- 窗口不断向右移动,并重复计算步骤2。
这种看似和信号处理相关的函数在SciPy中却放到了ndimage图像处理模块中:scipy.ndimage.maximum_filter1d()。找到每个取样值窗口所对应的峰值之后,将峰值与原始取样数据进行比较,若峰值与原始数据相同,则说明原始数据为峰值。
下面的程序演示这一算法:
from scipy.ndimage import maximum_filter1d
t = np.linspace(-10, 10, 200)
x = np.sin(5*t)*(50-t**2)
mx = maximum_filter1d(x, 11)
pos = np.where(x==mx)[0]
pl.plot(x)
pl.plot(mx, "r")
pl.plot(pos, x[pos], "x")
输出为:
图中蓝色曲线为原取样数据,红色曲线是窗口长度为11时的局域峰值曲线。可以看到蓝色曲线和红色曲线重合的位置就为峰值出现的位置。图中采用绿色”x”表示。
通过峰值计算频率响应
下面我们用上述的方法计算频率扫描波的峰值,并和滤波器的理论频率响应进行比较:
from scipy.ndimage import maximum_filter1d
window = 201
max_data = maximum_filter1d(out_data, window)
peak_index = np.where(max_data == out_data)[0]
pl.semilogx(freq[peak_index], 20*np.log10(out_data[peak_index]), lw=2)
#使用freqz计算滤波器的频率响应
w, p = signal.freqz(b, a)
freq2 = w/np.pi*rate*0.5
pl.semilogx(freq2, 20*np.log10(np.abs(p)), "r")
pl.xlim(20, 20000)
pl.ylim(-120, 20)
pl.savefig("lpf_response.png", dpi=100)
程序的输出如下图所示,其中蓝色曲线为频率扫描波计算的频率响应,红色曲线为理论频率响应:
提高运算速度
下面让我们考虑以下maximum_filter1d()是如何实现的。一个最简单的方法就是做双重循环,外层循环N次,内层循环K次,其中N为数据点数,K为窗口的大小。每次内层循环都找出窗口中的最大值。这个算法的复杂度为::math: 。
实际上有更快捷的算法,这个算法在一个名为bottleneck的扩展库中。下面让我们先比较它和maximum_filter1d()的运算速度,然后再介绍其算法。
bottleneck的文档
from scipy.ndimage import maximum_filter1d
import bottleneck as bn
import numpy as np
data = np.array([1, 4, 3, 8, 2, 5, 0, 2, 3, 5, 8, 2, 4, 10, 12, 2])
print maximum_filter1d(data, 5)
print bn.move_max(data, 5)
程序的输出为:
[ 4 8 8 8 8 8 5 5 8 8 8 10 12 12 12 12]
[ nan nan nan nan 8. 8. 8. 8. 5. 5. 8. 8. 8. 10. 12. 12.]
bottleneck中的move_max()和maximum_filter1d()对窗口的处理稍微有些不同,在move_max()中当前处理的元素处于窗口的最右端,而maximum_filter1d()中的当前元素处于窗口的正中间。
为了让move_max()的结果和maximum_filter1d()的一致,我们需要将move_max()的结果左移N//2+1个元素,这个操作可以通过numpy.roll()实现:
>>> print np.roll(bn.move_max(data, 5), -5//2+1)
[ nan nan 8. 8. 8. 8. 5. 5. 8. 8. 8. 10. 12. 12. nan nan]
这样除了前后有N-1个NAN,其余的元素都合maximum_filter1d()相同。
下面我们比较二者的运算速度:
from scipy.ndimage import maximum_filter1d
import bottleneck as bn
import numpy as np
data = np.random.randn(10000)
%timeit maximum_filter1d(data, 200)
%timeit bn.move_max(data, 200)
其中timeit是IPython的命令,它用来评价程序的运算时间。程序的输出为:
100 loops, best of 3: 7.01 ms per loop
1000 loops, best of 3: 465 us per loop
move_max()比maximum_filter1d()快15倍,显然它们所采用的算法应该是不同量级的。
更高效的算法
bottleneck的move_max()算法可以用下面的程序表示:
from collections import deque
def move_max(data, k):
dq = deque([0])
result = []
for i in xrange(len(data)):
x = data[i]
idx = dq[0]
if i == idx + k: # 如果当前峰值超出窗口范围
dq.popleft() # 则将其移出队列,队列中的下一个值将成为峰值
if x >= data[idx]: # 发现更大的峰值
dq.clear() # 将队列清空
dq.append(i) # 只保存最大峰值
else:
while dq and data[dq[-1]] <= x: # 当前值没有峰值大
dq.pop() # 则按照从大到小的顺序将其插入队列,队列中其后的元素可删除
dq.append(i)
result.append(data[dq[0]])
result[:k-1] = [None]*(k-1)
return result
其基本思想就是使用一个队列保存所有候选的峰值。队列中的峰值按照从大到小排列,根据当前元素的值按照如下规则更新队列:
- 如果队列中的第一个元素(最大值)超出窗口作用范围,则将其从队列中移除。
- 如果当前元素大于队列的第一个元素,则清空队列,并让当前元素成为队列中的唯一元素。
- 否则将当前值插入队列,并且将队列中小于它的元素全部删除。
>>> move_max([1, 4, 3, 8, 2, 5, 0, 2, 3, 5, 8, 2, 4, 10, 12, 2], 5)
[None, None, None, None, 8, 8, 8, 8, 5, 5, 8, 8, 8, 10, 12, 12]