annotate utils/PeakPickerUtil.py @ 19:890cfe424f4a tip

added annotations
author mitian
date Fri, 11 Dec 2015 09:47:40 +0000
parents b4bf37f94e92
children
rev   line source
mi@0 1 import numpy as np
mi@0 2 from numpy import *
mi@0 3 import sys
mi@0 4 from scipy.signal import medfilt, filtfilt, butter
mi@0 5
mi@0 6 from OnsetPlotProc import onset_plot, plot_on
mi@0 7
mi@0 8 class PeakPicker(object):
mi@0 9 '''Separate Peak Picker implementation that can be used in all plugins identically.'''
mi@0 10
mi@0 11 def __init__(self):
mi@0 12 '''Initialise the PeakPicker, but for now, we just define a class for the parameters.'''
mi@0 13 class Params(object):
mi@0 14 '''Just create a small efficient object for storing the parameters.'''
mi@0 15 __slots__ = ['alpha','delta','QuadThresh_a','QuadThresh_b','QuadThresh_c','aCoeffs','bCoeffs',\
mitian@18 16 'preWin','postWin','LP_on','Medfilt_on','Polyfit_on','isMedianPositive','rawSensitivity', 'gt']
mi@0 17 self.params = Params()
mitian@18 18
mi@0 19
mi@0 20 def process(self, onset_df, calcu_env=False, prune = False):
mi@0 21 '''Smooth and peak pick the detection function.'''
mi@0 22
mi@0 23 smoothed_df = self.smooth_df(onset_df)
mi@0 24 # min_old = min(smoothed_df)
mi@0 25 # range_old = max(smoothed_df) - min_old
mi@0 26 # smoothed_df = [((n - min_old) / range_old ) for n in smoothed_df]
mi@0 27
mi@0 28 onsets = self.quadEval(smoothed_df, prune)
mi@0 29 return smoothed_df, onsets
mi@0 30
mi@0 31 def envFollower(self, sig):
mi@0 32 '''Peak position of the signal envelope.'''
mi@0 33 env = []
mi@0 34 if not len(sig): return env
mi@0 35 i = 1
mi@0 36 while i < len(sig):
mi@0 37 pos = 1
mi@0 38 while (i+pos) < len(sig):
mi@0 39 if sig[i+pos] < sig[i+pos-1]: break
mi@0 40 pos += 1
mi@0 41 if sig[i+pos-1] > sig[i+pos-2]: env.append(i+pos-1)
mi@0 42 i += pos
mi@0 43
mi@0 44 env = list(sort(env))
mi@0 45
mi@0 46 if not len(env): return env
mi@0 47
mi@0 48 if env[-1] == len(sig):
mi@0 49 env = env[:-1]
mi@0 50 return env
mi@0 51
mi@0 52 def quadEval(self, smoothed_df, prune=False):
mi@0 53 '''Assess onset locations using the paramaters of a quadratic fucntion (or simple thresholding).'''
mi@0 54 onsets = []
mi@0 55 x_array = np.array(xrange(-2,3),dtype=np.float32)
mi@0 56 max_index = []
mi@0 57
mi@0 58 # if prune:
mi@0 59 # smoothed_df = [abs(i) for i in smoothed_df]
mi@0 60 # find maxima in the smoothed function, NOTE: could do this later with scipy.signal.argrelmax (using ver > .0.11)
mi@0 61 for i in xrange(2,len(smoothed_df)-2) :
mi@0 62 if smoothed_df[i] > smoothed_df[i-1] and smoothed_df[i] > smoothed_df[i+1] and smoothed_df[i] > 0 :
mi@0 63 max_index.append(i)
mi@0 64 # in case the last local maxima with an incomplete peak shape is missed
mi@0 65 last = len(smoothed_df)-1
mi@0 66 if smoothed_df[last] >= max(smoothed_df[i] for i in xrange(last-2, last-1)):
mi@0 67 max_index.append(last)
mi@0 68
mi@0 69 # if len(max_index) == 0 :
mi@0 70 # return onsets
mi@0 71 if plot_on : onset_plot.add_markers(max_index,symbol="r1",subplot=4)
mi@0 72
mi@0 73 # if the polynomial fitting is not on, just return the detected peaks above a threshold
mi@0 74 # calculated using 100-rawSensitivity value considering the smallest and largest peaks
mi@0 75 if not self.params.Polyfit_on :
mi@0 76 if not max_index :
mi@0 77 return onsets
mi@0 78 max_index = np.array(max_index)
mi@0 79 smoothed_df = np.array(smoothed_df)
mi@0 80 smoothed_df_peaks = smoothed_df[max_index]
mi@0 81 min_df, max_df = smoothed_df_peaks.min(), smoothed_df_peaks.max()
mi@0 82 range_df = max_df-min_df
mi@0 83 sensitivity = (100-self.params.rawSensitivity) / 100.0
mi@0 84 threshold = min_df + sensitivity * range_df
mi@0 85 return max_index[smoothed_df[max_index]>=threshold]
mi@0 86
mi@0 87 # NOTE: GF: The order of the polynomial coefficients is reversed in the C++ implementation!
mi@0 88 # But it is numerically equivalent and accurate (checked by printing the results from the C++ code).
mi@0 89 for j in xrange(len(max_index)) :
mi@0 90 if max_index[j] + 2 > len(smoothed_df) :
mi@0 91 onsets.append(max_index[j])
mi@0 92 else :
mi@0 93 y_array = list()
mi@0 94 for k in xrange(-2,3) :
mi@0 95 selMax = smoothed_df[max_index[j] + k]
mi@0 96 y_array.append(selMax)
mi@0 97 coeffs = polyfit(x_array,np.array(y_array),2)
mi@0 98 # print coeffs
mi@0 99
mi@0 100 if coeffs[0] < -self.params.QuadThresh_a or coeffs[2] > self.params.QuadThresh_c :
mi@0 101 onsets.append(max_index[j])
mi@0 102 # print max_index[j]
mi@0 103
mi@0 104 # If the arg prune is on, remove onset candidates that have spurious peaks on its both side neighbourhood (1.5-2s)
mi@0 105 if prune :
mi@0 106 remove = []
mi@0 107 step = 50
mi@0 108 onsets.sort()
mi@0 109 for idx in xrange(1, len(onsets) - 1):
mi@0 110 if onsets[idx+1] - onsets[idx] < step and onsets[idx] - onsets[idx-1] < step :
mi@0 111 remove.append(onsets[idx])
mi@0 112 onsets = [i for i in onsets if not i in remove]
mi@0 113 print 'remove', remove, onsets
mi@0 114 return onsets
mi@0 115
mi@0 116
mi@0 117 def smooth_df(self, onset_df):
mi@0 118 '''Smooth the detection function by 1) removing DC and normalising, 2) zero-phase low-pass filtering,
mi@0 119 and 3) adaptive thresholding using a moving median filter with separately configurable pre/post window sizes.
mi@0 120 '''
mi@0 121
mitian@18 122 if plot_on : onset_plot.add_plot(onset_df, title="Raw novelty curve")
mi@0 123
mi@0 124 out_df = self.removeDCNormalize(onset_df)
mi@0 125
mi@0 126 if self.params.LP_on :
mi@0 127 # Now we get the exact same filtered function produced by the QM-Vamp plugin:
mi@0 128 out_df = filtfilt(self.params.bCoeffs, self.params.aCoeffs, out_df)
mitian@18 129 onset_plot.add_plot(out_df, title = "Novelty curve after low-pass filtering")
mi@0 130
mi@0 131 if self.params.Medfilt_on :
mi@0 132 out_df = self.movingMedianFilter(out_df)
mi@0 133
mi@0 134 return out_df
mi@0 135
mi@0 136
mi@0 137
mi@0 138 def movingMedianFilter(self, onset_df):
mi@0 139 '''Adaptive thresholding implementation using a moving median filter with configurable pre/post windows. '''
mi@0 140 # TODO: Simplify and vectorise this where appropriate, may replace While loops with For if faster,
mi@0 141 # perhaps use C extension module, Theano or similar...
mi@0 142 length = len(onset_df)
mi@0 143 isMedianPositive = self.params.isMedianPositive
mi@0 144 preWin = int(self.params.preWin)
mi@0 145 postWin = int(self.params.postWin)
mi@0 146 index = 0
mi@0 147 delta = self.params.delta / 10.0
mi@0 148 y = np.zeros(postWin+preWin+1, dtype=np.float64)
mi@0 149 scratch = np.zeros(length, dtype=np.float64)
mi@0 150 output = np.zeros(length, dtype=np.float64)
mi@0 151
mi@0 152 for i in xrange(preWin) :
mi@0 153 if index >= length : break
mi@0 154 k = i + postWin + 1;
mi@0 155 for j in xrange(k) :
mi@0 156 if j < length: y[j] = onset_df[j]
mi@0 157 scratch[index] = np.median(y[:k])
mi@0 158 index += 1
mi@0 159
mi@0 160 i = 0
mi@0 161 while True :
mi@0 162 if i+preWin+postWin >= length : break
mi@0 163 if index >= length : break
mi@0 164 l = 0
mi@0 165 j = i
mi@0 166 while True:
mi@0 167 if j >= ( i+preWin+postWin+1) : break
mi@0 168 y[l] = onset_df[j]
mi@0 169 l += 1
mi@0 170 j += 1
mi@0 171 i += 1
mi@0 172 scratch[index] = np.median(y[:preWin+postWin+1])
mi@0 173 index += 1
mi@0 174
mi@0 175 i = max(length-postWin, 1)
mi@0 176 while True :
mi@0 177 if i >= length : break
mi@0 178 if index >= length : break
mi@0 179 k = max(i-preWin, 1)
mi@0 180 l = 0
mi@0 181 j = k
mi@0 182 while True :
mi@0 183 if j >= length : break
mi@0 184 y[l] = onset_df[j]
mi@0 185 j += 1
mi@0 186 l += 1
mi@0 187 scratch[index] = np.median(y[:l])
mi@0 188 index += 1
mi@0 189 i += 1
mi@0 190
mi@0 191 # onset_plot.add_plot(scratch,title = "median filter output", subplot = 1)
mitian@18 192 onset_plot.add_plot(onset_df-scratch-delta,title = "Novelty curve after adaptive thresholding")
mi@0 193
mi@0 194 for i in xrange(length) :
mi@0 195 value = onset_df[i] - scratch[i] - delta
mi@0 196 output[i] = value
mi@0 197 if isMedianPositive and value < 0.0 :
mi@0 198 output[i] = 0.0
mi@0 199
mitian@18 200 if plot_on : onset_plot.add_plot(onset_df-scratch-delta,title = "Smoothed novelty curve and detected boundaries")
mi@0 201
mi@0 202 return output.tolist()
mi@0 203
mi@0 204 def removeDCNormalize(self,onset_df):
mi@0 205 '''Remove constant offset (DC) and regularise the scale of the detection function.'''
mi@0 206 DFmin,DFmax = self.getFrameMinMax(onset_df)
mi@0 207 DFAlphaNorm = self.getAlphaNorm(onset_df,self.params.alpha)
mi@0 208 for i,v in enumerate(onset_df) :
mi@0 209 onset_df[i] = (v - DFmin) / DFAlphaNorm
mi@0 210 return onset_df
mi@0 211
mi@0 212 def getAlphaNorm(self, onset_df, alpha):
mi@0 213 '''Calculate the alpha norm of the detecion function'''
mi@0 214 # TODO: Simplify or vectorise this.
mi@0 215 # a = 0.0
mi@0 216 # for i in onset_df :
mi@0 217 # a += pow(fabs(i), alpha)
mi@0 218 a = sum( np.power(fabs(onset_df), alpha) )
mi@0 219 a /= len(onset_df)
mi@0 220 a = pow(a, (1.0 / alpha))
mi@0 221 return a
mi@0 222
mi@0 223 def getFrameMinMax(self, onset_df):
mi@0 224 '''Just return the min/max of the detecion function'''
mi@0 225 return min(onset_df),max(onset_df)