mi@0
|
1 import numpy as np
|
mi@0
|
2 from numpy import *
|
mi@0
|
3 import sys
|
mi@0
|
4 from scipy.signal import medfilt, filtfilt, butter
|
mi@0
|
5
|
mi@0
|
6 from OnsetPlotProc import onset_plot, plot_on
|
mi@0
|
7
|
mi@0
|
8 class PeakPicker(object):
|
mi@0
|
9 '''Separate Peak Picker implementation that can be used in all plugins identically.'''
|
mi@0
|
10
|
mi@0
|
11 def __init__(self):
|
mi@0
|
12 '''Initialise the PeakPicker, but for now, we just define a class for the parameters.'''
|
mi@0
|
13 class Params(object):
|
mi@0
|
14 '''Just create a small efficient object for storing the parameters.'''
|
mi@0
|
15 __slots__ = ['alpha','delta','QuadThresh_a','QuadThresh_b','QuadThresh_c','aCoeffs','bCoeffs',\
|
mitian@18
|
16 'preWin','postWin','LP_on','Medfilt_on','Polyfit_on','isMedianPositive','rawSensitivity', 'gt']
|
mi@0
|
17 self.params = Params()
|
mitian@18
|
18
|
mi@0
|
19
|
mi@0
|
20 def process(self, onset_df, calcu_env=False, prune = False):
|
mi@0
|
21 '''Smooth and peak pick the detection function.'''
|
mi@0
|
22
|
mi@0
|
23 smoothed_df = self.smooth_df(onset_df)
|
mi@0
|
24 # min_old = min(smoothed_df)
|
mi@0
|
25 # range_old = max(smoothed_df) - min_old
|
mi@0
|
26 # smoothed_df = [((n - min_old) / range_old ) for n in smoothed_df]
|
mi@0
|
27
|
mi@0
|
28 onsets = self.quadEval(smoothed_df, prune)
|
mi@0
|
29 return smoothed_df, onsets
|
mi@0
|
30
|
mi@0
|
31 def envFollower(self, sig):
|
mi@0
|
32 '''Peak position of the signal envelope.'''
|
mi@0
|
33 env = []
|
mi@0
|
34 if not len(sig): return env
|
mi@0
|
35 i = 1
|
mi@0
|
36 while i < len(sig):
|
mi@0
|
37 pos = 1
|
mi@0
|
38 while (i+pos) < len(sig):
|
mi@0
|
39 if sig[i+pos] < sig[i+pos-1]: break
|
mi@0
|
40 pos += 1
|
mi@0
|
41 if sig[i+pos-1] > sig[i+pos-2]: env.append(i+pos-1)
|
mi@0
|
42 i += pos
|
mi@0
|
43
|
mi@0
|
44 env = list(sort(env))
|
mi@0
|
45
|
mi@0
|
46 if not len(env): return env
|
mi@0
|
47
|
mi@0
|
48 if env[-1] == len(sig):
|
mi@0
|
49 env = env[:-1]
|
mi@0
|
50 return env
|
mi@0
|
51
|
mi@0
|
52 def quadEval(self, smoothed_df, prune=False):
|
mi@0
|
53 '''Assess onset locations using the paramaters of a quadratic fucntion (or simple thresholding).'''
|
mi@0
|
54 onsets = []
|
mi@0
|
55 x_array = np.array(xrange(-2,3),dtype=np.float32)
|
mi@0
|
56 max_index = []
|
mi@0
|
57
|
mi@0
|
58 # if prune:
|
mi@0
|
59 # smoothed_df = [abs(i) for i in smoothed_df]
|
mi@0
|
60 # find maxima in the smoothed function, NOTE: could do this later with scipy.signal.argrelmax (using ver > .0.11)
|
mi@0
|
61 for i in xrange(2,len(smoothed_df)-2) :
|
mi@0
|
62 if smoothed_df[i] > smoothed_df[i-1] and smoothed_df[i] > smoothed_df[i+1] and smoothed_df[i] > 0 :
|
mi@0
|
63 max_index.append(i)
|
mi@0
|
64 # in case the last local maxima with an incomplete peak shape is missed
|
mi@0
|
65 last = len(smoothed_df)-1
|
mi@0
|
66 if smoothed_df[last] >= max(smoothed_df[i] for i in xrange(last-2, last-1)):
|
mi@0
|
67 max_index.append(last)
|
mi@0
|
68
|
mi@0
|
69 # if len(max_index) == 0 :
|
mi@0
|
70 # return onsets
|
mi@0
|
71 if plot_on : onset_plot.add_markers(max_index,symbol="r1",subplot=4)
|
mi@0
|
72
|
mi@0
|
73 # if the polynomial fitting is not on, just return the detected peaks above a threshold
|
mi@0
|
74 # calculated using 100-rawSensitivity value considering the smallest and largest peaks
|
mi@0
|
75 if not self.params.Polyfit_on :
|
mi@0
|
76 if not max_index :
|
mi@0
|
77 return onsets
|
mi@0
|
78 max_index = np.array(max_index)
|
mi@0
|
79 smoothed_df = np.array(smoothed_df)
|
mi@0
|
80 smoothed_df_peaks = smoothed_df[max_index]
|
mi@0
|
81 min_df, max_df = smoothed_df_peaks.min(), smoothed_df_peaks.max()
|
mi@0
|
82 range_df = max_df-min_df
|
mi@0
|
83 sensitivity = (100-self.params.rawSensitivity) / 100.0
|
mi@0
|
84 threshold = min_df + sensitivity * range_df
|
mi@0
|
85 return max_index[smoothed_df[max_index]>=threshold]
|
mi@0
|
86
|
mi@0
|
87 # NOTE: GF: The order of the polynomial coefficients is reversed in the C++ implementation!
|
mi@0
|
88 # But it is numerically equivalent and accurate (checked by printing the results from the C++ code).
|
mi@0
|
89 for j in xrange(len(max_index)) :
|
mi@0
|
90 if max_index[j] + 2 > len(smoothed_df) :
|
mi@0
|
91 onsets.append(max_index[j])
|
mi@0
|
92 else :
|
mi@0
|
93 y_array = list()
|
mi@0
|
94 for k in xrange(-2,3) :
|
mi@0
|
95 selMax = smoothed_df[max_index[j] + k]
|
mi@0
|
96 y_array.append(selMax)
|
mi@0
|
97 coeffs = polyfit(x_array,np.array(y_array),2)
|
mi@0
|
98 # print coeffs
|
mi@0
|
99
|
mi@0
|
100 if coeffs[0] < -self.params.QuadThresh_a or coeffs[2] > self.params.QuadThresh_c :
|
mi@0
|
101 onsets.append(max_index[j])
|
mi@0
|
102 # print max_index[j]
|
mi@0
|
103
|
mi@0
|
104 # If the arg prune is on, remove onset candidates that have spurious peaks on its both side neighbourhood (1.5-2s)
|
mi@0
|
105 if prune :
|
mi@0
|
106 remove = []
|
mi@0
|
107 step = 50
|
mi@0
|
108 onsets.sort()
|
mi@0
|
109 for idx in xrange(1, len(onsets) - 1):
|
mi@0
|
110 if onsets[idx+1] - onsets[idx] < step and onsets[idx] - onsets[idx-1] < step :
|
mi@0
|
111 remove.append(onsets[idx])
|
mi@0
|
112 onsets = [i for i in onsets if not i in remove]
|
mi@0
|
113 print 'remove', remove, onsets
|
mi@0
|
114 return onsets
|
mi@0
|
115
|
mi@0
|
116
|
mi@0
|
117 def smooth_df(self, onset_df):
|
mi@0
|
118 '''Smooth the detection function by 1) removing DC and normalising, 2) zero-phase low-pass filtering,
|
mi@0
|
119 and 3) adaptive thresholding using a moving median filter with separately configurable pre/post window sizes.
|
mi@0
|
120 '''
|
mi@0
|
121
|
mitian@18
|
122 if plot_on : onset_plot.add_plot(onset_df, title="Raw novelty curve")
|
mi@0
|
123
|
mi@0
|
124 out_df = self.removeDCNormalize(onset_df)
|
mi@0
|
125
|
mi@0
|
126 if self.params.LP_on :
|
mi@0
|
127 # Now we get the exact same filtered function produced by the QM-Vamp plugin:
|
mi@0
|
128 out_df = filtfilt(self.params.bCoeffs, self.params.aCoeffs, out_df)
|
mitian@18
|
129 onset_plot.add_plot(out_df, title = "Novelty curve after low-pass filtering")
|
mi@0
|
130
|
mi@0
|
131 if self.params.Medfilt_on :
|
mi@0
|
132 out_df = self.movingMedianFilter(out_df)
|
mi@0
|
133
|
mi@0
|
134 return out_df
|
mi@0
|
135
|
mi@0
|
136
|
mi@0
|
137
|
mi@0
|
138 def movingMedianFilter(self, onset_df):
|
mi@0
|
139 '''Adaptive thresholding implementation using a moving median filter with configurable pre/post windows. '''
|
mi@0
|
140 # TODO: Simplify and vectorise this where appropriate, may replace While loops with For if faster,
|
mi@0
|
141 # perhaps use C extension module, Theano or similar...
|
mi@0
|
142 length = len(onset_df)
|
mi@0
|
143 isMedianPositive = self.params.isMedianPositive
|
mi@0
|
144 preWin = int(self.params.preWin)
|
mi@0
|
145 postWin = int(self.params.postWin)
|
mi@0
|
146 index = 0
|
mi@0
|
147 delta = self.params.delta / 10.0
|
mi@0
|
148 y = np.zeros(postWin+preWin+1, dtype=np.float64)
|
mi@0
|
149 scratch = np.zeros(length, dtype=np.float64)
|
mi@0
|
150 output = np.zeros(length, dtype=np.float64)
|
mi@0
|
151
|
mi@0
|
152 for i in xrange(preWin) :
|
mi@0
|
153 if index >= length : break
|
mi@0
|
154 k = i + postWin + 1;
|
mi@0
|
155 for j in xrange(k) :
|
mi@0
|
156 if j < length: y[j] = onset_df[j]
|
mi@0
|
157 scratch[index] = np.median(y[:k])
|
mi@0
|
158 index += 1
|
mi@0
|
159
|
mi@0
|
160 i = 0
|
mi@0
|
161 while True :
|
mi@0
|
162 if i+preWin+postWin >= length : break
|
mi@0
|
163 if index >= length : break
|
mi@0
|
164 l = 0
|
mi@0
|
165 j = i
|
mi@0
|
166 while True:
|
mi@0
|
167 if j >= ( i+preWin+postWin+1) : break
|
mi@0
|
168 y[l] = onset_df[j]
|
mi@0
|
169 l += 1
|
mi@0
|
170 j += 1
|
mi@0
|
171 i += 1
|
mi@0
|
172 scratch[index] = np.median(y[:preWin+postWin+1])
|
mi@0
|
173 index += 1
|
mi@0
|
174
|
mi@0
|
175 i = max(length-postWin, 1)
|
mi@0
|
176 while True :
|
mi@0
|
177 if i >= length : break
|
mi@0
|
178 if index >= length : break
|
mi@0
|
179 k = max(i-preWin, 1)
|
mi@0
|
180 l = 0
|
mi@0
|
181 j = k
|
mi@0
|
182 while True :
|
mi@0
|
183 if j >= length : break
|
mi@0
|
184 y[l] = onset_df[j]
|
mi@0
|
185 j += 1
|
mi@0
|
186 l += 1
|
mi@0
|
187 scratch[index] = np.median(y[:l])
|
mi@0
|
188 index += 1
|
mi@0
|
189 i += 1
|
mi@0
|
190
|
mi@0
|
191 # onset_plot.add_plot(scratch,title = "median filter output", subplot = 1)
|
mitian@18
|
192 onset_plot.add_plot(onset_df-scratch-delta,title = "Novelty curve after adaptive thresholding")
|
mi@0
|
193
|
mi@0
|
194 for i in xrange(length) :
|
mi@0
|
195 value = onset_df[i] - scratch[i] - delta
|
mi@0
|
196 output[i] = value
|
mi@0
|
197 if isMedianPositive and value < 0.0 :
|
mi@0
|
198 output[i] = 0.0
|
mi@0
|
199
|
mitian@18
|
200 if plot_on : onset_plot.add_plot(onset_df-scratch-delta,title = "Smoothed novelty curve and detected boundaries")
|
mi@0
|
201
|
mi@0
|
202 return output.tolist()
|
mi@0
|
203
|
mi@0
|
204 def removeDCNormalize(self,onset_df):
|
mi@0
|
205 '''Remove constant offset (DC) and regularise the scale of the detection function.'''
|
mi@0
|
206 DFmin,DFmax = self.getFrameMinMax(onset_df)
|
mi@0
|
207 DFAlphaNorm = self.getAlphaNorm(onset_df,self.params.alpha)
|
mi@0
|
208 for i,v in enumerate(onset_df) :
|
mi@0
|
209 onset_df[i] = (v - DFmin) / DFAlphaNorm
|
mi@0
|
210 return onset_df
|
mi@0
|
211
|
mi@0
|
212 def getAlphaNorm(self, onset_df, alpha):
|
mi@0
|
213 '''Calculate the alpha norm of the detecion function'''
|
mi@0
|
214 # TODO: Simplify or vectorise this.
|
mi@0
|
215 # a = 0.0
|
mi@0
|
216 # for i in onset_df :
|
mi@0
|
217 # a += pow(fabs(i), alpha)
|
mi@0
|
218 a = sum( np.power(fabs(onset_df), alpha) )
|
mi@0
|
219 a /= len(onset_df)
|
mi@0
|
220 a = pow(a, (1.0 / alpha))
|
mi@0
|
221 return a
|
mi@0
|
222
|
mi@0
|
223 def getFrameMinMax(self, onset_df):
|
mi@0
|
224 '''Just return the min/max of the detecion function'''
|
mi@0
|
225 return min(onset_df),max(onset_df)
|