mi@0: import numpy as np mi@0: from numpy import * mi@0: import sys mi@0: from scipy.signal import medfilt, filtfilt, butter mi@0: mi@0: from OnsetPlotProc import onset_plot, plot_on mi@0: mi@0: class PeakPicker(object): mi@0: '''Separate Peak Picker implementation that can be used in all plugins identically.''' mi@0: mi@0: def __init__(self): mi@0: '''Initialise the PeakPicker, but for now, we just define a class for the parameters.''' mi@0: class Params(object): mi@0: '''Just create a small efficient object for storing the parameters.''' mi@0: __slots__ = ['alpha','delta','QuadThresh_a','QuadThresh_b','QuadThresh_c','aCoeffs','bCoeffs',\ mitian@18: 'preWin','postWin','LP_on','Medfilt_on','Polyfit_on','isMedianPositive','rawSensitivity', 'gt'] mi@0: self.params = Params() mitian@18: mi@0: mi@0: def process(self, onset_df, calcu_env=False, prune = False): mi@0: '''Smooth and peak pick the detection function.''' mi@0: mi@0: smoothed_df = self.smooth_df(onset_df) mi@0: # min_old = min(smoothed_df) mi@0: # range_old = max(smoothed_df) - min_old mi@0: # smoothed_df = [((n - min_old) / range_old ) for n in smoothed_df] mi@0: mi@0: onsets = self.quadEval(smoothed_df, prune) mi@0: return smoothed_df, onsets mi@0: mi@0: def envFollower(self, sig): mi@0: '''Peak position of the signal envelope.''' mi@0: env = [] mi@0: if not len(sig): return env mi@0: i = 1 mi@0: while i < len(sig): mi@0: pos = 1 mi@0: while (i+pos) < len(sig): mi@0: if sig[i+pos] < sig[i+pos-1]: break mi@0: pos += 1 mi@0: if sig[i+pos-1] > sig[i+pos-2]: env.append(i+pos-1) mi@0: i += pos mi@0: mi@0: env = list(sort(env)) mi@0: mi@0: if not len(env): return env mi@0: mi@0: if env[-1] == len(sig): mi@0: env = env[:-1] mi@0: return env mi@0: mi@0: def quadEval(self, smoothed_df, prune=False): mi@0: '''Assess onset locations using the paramaters of a quadratic fucntion (or simple thresholding).''' mi@0: onsets = [] mi@0: x_array = np.array(xrange(-2,3),dtype=np.float32) mi@0: max_index = [] mi@0: mi@0: # if prune: mi@0: # smoothed_df = [abs(i) for i in smoothed_df] mi@0: # find maxima in the smoothed function, NOTE: could do this later with scipy.signal.argrelmax (using ver > .0.11) mi@0: for i in xrange(2,len(smoothed_df)-2) : mi@0: if smoothed_df[i] > smoothed_df[i-1] and smoothed_df[i] > smoothed_df[i+1] and smoothed_df[i] > 0 : mi@0: max_index.append(i) mi@0: # in case the last local maxima with an incomplete peak shape is missed mi@0: last = len(smoothed_df)-1 mi@0: if smoothed_df[last] >= max(smoothed_df[i] for i in xrange(last-2, last-1)): mi@0: max_index.append(last) mi@0: mi@0: # if len(max_index) == 0 : mi@0: # return onsets mi@0: if plot_on : onset_plot.add_markers(max_index,symbol="r1",subplot=4) mi@0: mi@0: # if the polynomial fitting is not on, just return the detected peaks above a threshold mi@0: # calculated using 100-rawSensitivity value considering the smallest and largest peaks mi@0: if not self.params.Polyfit_on : mi@0: if not max_index : mi@0: return onsets mi@0: max_index = np.array(max_index) mi@0: smoothed_df = np.array(smoothed_df) mi@0: smoothed_df_peaks = smoothed_df[max_index] mi@0: min_df, max_df = smoothed_df_peaks.min(), smoothed_df_peaks.max() mi@0: range_df = max_df-min_df mi@0: sensitivity = (100-self.params.rawSensitivity) / 100.0 mi@0: threshold = min_df + sensitivity * range_df mi@0: return max_index[smoothed_df[max_index]>=threshold] mi@0: mi@0: # NOTE: GF: The order of the polynomial coefficients is reversed in the C++ implementation! mi@0: # But it is numerically equivalent and accurate (checked by printing the results from the C++ code). mi@0: for j in xrange(len(max_index)) : mi@0: if max_index[j] + 2 > len(smoothed_df) : mi@0: onsets.append(max_index[j]) mi@0: else : mi@0: y_array = list() mi@0: for k in xrange(-2,3) : mi@0: selMax = smoothed_df[max_index[j] + k] mi@0: y_array.append(selMax) mi@0: coeffs = polyfit(x_array,np.array(y_array),2) mi@0: # print coeffs mi@0: mi@0: if coeffs[0] < -self.params.QuadThresh_a or coeffs[2] > self.params.QuadThresh_c : mi@0: onsets.append(max_index[j]) mi@0: # print max_index[j] mi@0: mi@0: # If the arg prune is on, remove onset candidates that have spurious peaks on its both side neighbourhood (1.5-2s) mi@0: if prune : mi@0: remove = [] mi@0: step = 50 mi@0: onsets.sort() mi@0: for idx in xrange(1, len(onsets) - 1): mi@0: if onsets[idx+1] - onsets[idx] < step and onsets[idx] - onsets[idx-1] < step : mi@0: remove.append(onsets[idx]) mi@0: onsets = [i for i in onsets if not i in remove] mi@0: print 'remove', remove, onsets mi@0: return onsets mi@0: mi@0: mi@0: def smooth_df(self, onset_df): mi@0: '''Smooth the detection function by 1) removing DC and normalising, 2) zero-phase low-pass filtering, mi@0: and 3) adaptive thresholding using a moving median filter with separately configurable pre/post window sizes. mi@0: ''' mi@0: mitian@18: if plot_on : onset_plot.add_plot(onset_df, title="Raw novelty curve") mi@0: mi@0: out_df = self.removeDCNormalize(onset_df) mi@0: mi@0: if self.params.LP_on : mi@0: # Now we get the exact same filtered function produced by the QM-Vamp plugin: mi@0: out_df = filtfilt(self.params.bCoeffs, self.params.aCoeffs, out_df) mitian@18: onset_plot.add_plot(out_df, title = "Novelty curve after low-pass filtering") mi@0: mi@0: if self.params.Medfilt_on : mi@0: out_df = self.movingMedianFilter(out_df) mi@0: mi@0: return out_df mi@0: mi@0: mi@0: mi@0: def movingMedianFilter(self, onset_df): mi@0: '''Adaptive thresholding implementation using a moving median filter with configurable pre/post windows. ''' mi@0: # TODO: Simplify and vectorise this where appropriate, may replace While loops with For if faster, mi@0: # perhaps use C extension module, Theano or similar... mi@0: length = len(onset_df) mi@0: isMedianPositive = self.params.isMedianPositive mi@0: preWin = int(self.params.preWin) mi@0: postWin = int(self.params.postWin) mi@0: index = 0 mi@0: delta = self.params.delta / 10.0 mi@0: y = np.zeros(postWin+preWin+1, dtype=np.float64) mi@0: scratch = np.zeros(length, dtype=np.float64) mi@0: output = np.zeros(length, dtype=np.float64) mi@0: mi@0: for i in xrange(preWin) : mi@0: if index >= length : break mi@0: k = i + postWin + 1; mi@0: for j in xrange(k) : mi@0: if j < length: y[j] = onset_df[j] mi@0: scratch[index] = np.median(y[:k]) mi@0: index += 1 mi@0: mi@0: i = 0 mi@0: while True : mi@0: if i+preWin+postWin >= length : break mi@0: if index >= length : break mi@0: l = 0 mi@0: j = i mi@0: while True: mi@0: if j >= ( i+preWin+postWin+1) : break mi@0: y[l] = onset_df[j] mi@0: l += 1 mi@0: j += 1 mi@0: i += 1 mi@0: scratch[index] = np.median(y[:preWin+postWin+1]) mi@0: index += 1 mi@0: mi@0: i = max(length-postWin, 1) mi@0: while True : mi@0: if i >= length : break mi@0: if index >= length : break mi@0: k = max(i-preWin, 1) mi@0: l = 0 mi@0: j = k mi@0: while True : mi@0: if j >= length : break mi@0: y[l] = onset_df[j] mi@0: j += 1 mi@0: l += 1 mi@0: scratch[index] = np.median(y[:l]) mi@0: index += 1 mi@0: i += 1 mi@0: mi@0: # onset_plot.add_plot(scratch,title = "median filter output", subplot = 1) mitian@18: onset_plot.add_plot(onset_df-scratch-delta,title = "Novelty curve after adaptive thresholding") mi@0: mi@0: for i in xrange(length) : mi@0: value = onset_df[i] - scratch[i] - delta mi@0: output[i] = value mi@0: if isMedianPositive and value < 0.0 : mi@0: output[i] = 0.0 mi@0: mitian@18: if plot_on : onset_plot.add_plot(onset_df-scratch-delta,title = "Smoothed novelty curve and detected boundaries") mi@0: mi@0: return output.tolist() mi@0: mi@0: def removeDCNormalize(self,onset_df): mi@0: '''Remove constant offset (DC) and regularise the scale of the detection function.''' mi@0: DFmin,DFmax = self.getFrameMinMax(onset_df) mi@0: DFAlphaNorm = self.getAlphaNorm(onset_df,self.params.alpha) mi@0: for i,v in enumerate(onset_df) : mi@0: onset_df[i] = (v - DFmin) / DFAlphaNorm mi@0: return onset_df mi@0: mi@0: def getAlphaNorm(self, onset_df, alpha): mi@0: '''Calculate the alpha norm of the detecion function''' mi@0: # TODO: Simplify or vectorise this. mi@0: # a = 0.0 mi@0: # for i in onset_df : mi@0: # a += pow(fabs(i), alpha) mi@0: a = sum( np.power(fabs(onset_df), alpha) ) mi@0: a /= len(onset_df) mi@0: a = pow(a, (1.0 / alpha)) mi@0: return a mi@0: mi@0: def getFrameMinMax(self, onset_df): mi@0: '''Just return the min/max of the detecion function''' mi@0: return min(onset_df),max(onset_df)