Mercurial > hg > segmentation
comparison utils/SegUtil.py @ 0:26838b1f560f
initial commit of a segmenter project
author | mi tian |
---|---|
date | Thu, 02 Apr 2015 18:09:27 +0100 |
parents | |
children | c11ea9e0357f |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26838b1f560f |
---|---|
1 """ | |
2 Useful functions that are quite common for music segmentation | |
3 """ | |
4 ''' | |
5 Modified and more funcs added. | |
6 Mi Tian, April 2015. | |
7 ''' | |
8 | |
9 __author__ = "Oriol Nieto" | |
10 __copyright__ = "Copyright 2014, Music and Audio Research Lab (MARL)" | |
11 __license__ = "GPL" | |
12 __version__ = "1.0" | |
13 __email__ = "oriol@nyu.edu" | |
14 | |
15 import copy | |
16 import numpy as np | |
17 import os | |
18 import scipy | |
19 from scipy.spatial import distance | |
20 from scipy.ndimage import filters, zoom | |
21 from scipy import signal | |
22 import pylab as plt | |
23 from scipy.spatial.distance import squareform, pdist | |
24 | |
25 | |
26 def lognormalize_chroma(C): | |
27 """Log-normalizes chroma such that each vector is between -80 to 0.""" | |
28 C += np.abs(C.min()) + 0.1 | |
29 C = C/C.max(axis=0) | |
30 C = 80 * np.log10(C) # Normalize from -80 to 0 | |
31 return C | |
32 | |
33 | |
34 def normalize_matrix(X): | |
35 """Nomalizes a matrix such that it's maximum value is 1 and minimum is 0.""" | |
36 X += np.abs(X.min()) | |
37 X /= X.max() | |
38 return X | |
39 | |
40 | |
41 def ensure_dir(directory): | |
42 """Makes sure that the given directory exists.""" | |
43 if not os.path.exists(directory): | |
44 os.makedirs(directory) | |
45 | |
46 | |
47 def median_filter(X, M=8): | |
48 """Median filter along the first axis of the feature matrix X.""" | |
49 for i in xrange(X.shape[1]): | |
50 X[:, i] = filters.median_filter(X[:, i], size=M) | |
51 return X | |
52 | |
53 | |
54 def compute_gaussian_krnl(M): | |
55 """Creates a gaussian kernel following Foote's paper.""" | |
56 g = signal.gaussian(M, M / 3., sym=True) | |
57 G = np.dot(g.reshape(-1, 1), g.reshape(1, -1)) | |
58 G[M / 2:, :M / 2] = -G[M / 2:, :M / 2] | |
59 G[:M / 2, M / 2:] = -G[:M / 2, M / 2:] | |
60 return G | |
61 | |
62 | |
63 def compute_ssm(X, metric="seuclidean"): | |
64 """Computes the self-similarity matrix of X.""" | |
65 D = distance.pdist(X, metric=metric) | |
66 D = distance.squareform(D) | |
67 D /= D.max() | |
68 return 1 - D | |
69 | |
70 | |
71 def compute_nc(X, G): | |
72 """Computes the novelty curve from the self-similarity matrix X and | |
73 the gaussian kernel G.""" | |
74 N = X.shape[0] | |
75 M = G.shape[0] | |
76 nc = np.zeros(N) | |
77 | |
78 for i in xrange(M / 2, N - M / 2 + 1): | |
79 nc[i] = np.sum(X[i - M / 2:i + M / 2, i - M / 2:i + M / 2] * G) | |
80 | |
81 # Normalize | |
82 nc += nc.min() | |
83 nc /= nc.max() | |
84 return nc | |
85 | |
86 | |
87 def resample_mx(X, incolpos, outcolpos): | |
88 """ | |
89 Method from Librosa | |
90 Y = resample_mx(X, incolpos, outcolpos) | |
91 X is taken as a set of columns, each starting at 'time' | |
92 colpos, and continuing until the start of the next column. | |
93 Y is a similar matrix, with time boundaries defined by | |
94 outcolpos. Each column of Y is a duration-weighted average of | |
95 the overlapping columns of X. | |
96 2010-04-14 Dan Ellis dpwe@ee.columbia.edu based on samplemx/beatavg | |
97 -> python: TBM, 2011-11-05, TESTED | |
98 """ | |
99 noutcols = len(outcolpos) | |
100 Y = np.zeros((X.shape[0], noutcols)) | |
101 # assign 'end times' to final columns | |
102 if outcolpos.max() > incolpos.max(): | |
103 incolpos = np.concatenate([incolpos,[outcolpos.max()]]) | |
104 X = np.concatenate([X, X[:,-1].reshape(X.shape[0],1)], axis=1) | |
105 outcolpos = np.concatenate([outcolpos, [outcolpos[-1]]]) | |
106 # durations (default weights) of input columns) | |
107 incoldurs = np.concatenate([np.diff(incolpos), [1]]) | |
108 | |
109 for c in range(noutcols): | |
110 firstincol = np.where(incolpos <= outcolpos[c])[0][-1] | |
111 firstincolnext = np.where(incolpos < outcolpos[c+1])[0][-1] | |
112 lastincol = max(firstincol,firstincolnext) | |
113 # default weights | |
114 wts = copy.deepcopy(incoldurs[firstincol:lastincol+1]) | |
115 # now fix up by partial overlap at ends | |
116 if len(wts) > 1: | |
117 wts[0] = wts[0] - (outcolpos[c] - incolpos[firstincol]) | |
118 wts[-1] = wts[-1] - (incolpos[lastincol+1] - outcolpos[c+1]) | |
119 wts = wts * 1. /sum(wts) | |
120 Y[:,c] = np.dot(X[:,firstincol:lastincol+1], wts) | |
121 # done | |
122 return Y | |
123 | |
124 | |
125 def chroma_to_tonnetz(C): | |
126 """Transforms chromagram to Tonnetz (Harte, Sandler, 2006).""" | |
127 N = C.shape[0] | |
128 T = np.zeros((N, 6)) | |
129 | |
130 r1 = 1 # Fifths | |
131 r2 = 1 # Minor | |
132 r3 = 0.5 # Major | |
133 | |
134 # Generate Transformation matrix | |
135 phi = np.zeros((6, 12)) | |
136 for i in range(6): | |
137 for j in range(12): | |
138 if i % 2 == 0: | |
139 fun = np.sin | |
140 else: | |
141 fun = np.cos | |
142 | |
143 if i < 2: | |
144 phi[i, j] = r1 * fun(j * 7 * np.pi / 6.) | |
145 elif i >= 2 and i < 4: | |
146 phi[i, j] = r2 * fun(j * 3 * np.pi / 2.) | |
147 else: | |
148 phi[i, j] = r3 * fun(j * 2 * np.pi / 3.) | |
149 | |
150 # Do the transform to tonnetz | |
151 for i in range(N): | |
152 for d in range(6): | |
153 denom = float(C[i, :].sum()) | |
154 if denom == 0: | |
155 T[i, d] = 0 | |
156 else: | |
157 T[i, d] = 1 / denom * (phi[d, :] * C[i, :]).sum() | |
158 | |
159 return T | |
160 | |
161 | |
162 def most_frequent(x): | |
163 """Returns the most frequent value in x.""" | |
164 return np.argmax(np.bincount(x)) | |
165 | |
166 | |
167 def pick_peaks(nc, L=16, plot=False): | |
168 """Obtain peaks from a novelty curve using an adaptive threshold.""" | |
169 offset = nc.mean() / 3 | |
170 th = filters.median_filter(nc, size=L) + offset | |
171 peaks = [] | |
172 for i in xrange(1, nc.shape[0] - 1): | |
173 # is it a peak? | |
174 if nc[i - 1] < nc[i] and nc[i] > nc[i + 1]: | |
175 # is it above the threshold? | |
176 if nc[i] > th[i]: | |
177 peaks.append(i) | |
178 if plot: | |
179 plt.plot(nc) | |
180 plt.plot(th) | |
181 for peak in peaks: | |
182 plt.axvline(peak, color="m") | |
183 plt.show() | |
184 return peaks | |
185 | |
186 | |
187 def recurrence_matrix(data, k=None, width=1, metric='sqeuclidean', sym=False): | |
188 ''' | |
189 Note: Copied from librosa | |
190 | |
191 Compute the binary recurrence matrix from a time-series. | |
192 | |
193 ``rec[i,j] == True`` <=> (``data[:,i]``, ``data[:,j]``) are | |
194 k-nearest-neighbors and ``|i-j| >= width`` | |
195 | |
196 :usage: | |
197 >>> mfcc = librosa.feature.mfcc(y=y, sr=sr) | |
198 >>> R = librosa.segment.recurrence_matrix(mfcc) | |
199 | |
200 >>> # Or fix the number of nearest neighbors to 5 | |
201 >>> R = librosa.segment.recurrence_matrix(mfcc, k=5) | |
202 | |
203 >>> # Suppress neighbors within +- 7 samples | |
204 >>> R = librosa.segment.recurrence_matrix(mfcc, width=7) | |
205 | |
206 >>> # Use cosine similarity instead of Euclidean distance | |
207 >>> R = librosa.segment.recurrence_matrix(mfcc, metric='cosine') | |
208 | |
209 >>> # Require mutual nearest neighbors | |
210 >>> R = librosa.segment.recurrence_matrix(mfcc, sym=True) | |
211 | |
212 :parameters: | |
213 - data : np.ndarray | |
214 feature matrix (d-by-t) | |
215 | |
216 - k : int > 0 or None | |
217 the number of nearest-neighbors for each sample | |
218 | |
219 Default: ``k = 2 * ceil(sqrt(t - 2 * width + 1))``, | |
220 or ``k = 2`` if ``t <= 2 * width + 1`` | |
221 | |
222 - width : int > 0 | |
223 only link neighbors ``(data[:, i], data[:, j])`` | |
224 if ``|i-j| >= width`` | |
225 | |
226 - metric : str | |
227 Distance metric to use for nearest-neighbor calculation. | |
228 | |
229 See ``scipy.spatial.distance.cdist()`` for details. | |
230 | |
231 - sym : bool | |
232 set ``sym=True`` to only link mutual nearest-neighbors | |
233 | |
234 :returns: | |
235 - rec : np.ndarray, shape=(t,t), dtype=bool | |
236 Binary recurrence matrix | |
237 ''' | |
238 | |
239 t = data.shape[1] | |
240 | |
241 if k is None: | |
242 if t > 2 * width + 1: | |
243 k = 2 * np.ceil(np.sqrt(t - 2 * width + 1)) | |
244 else: | |
245 k = 2 | |
246 | |
247 k = int(k) | |
248 | |
249 def _band_infinite(): | |
250 '''Suppress the diagonal+- of a distance matrix''' | |
251 | |
252 band = np.empty((t, t)) | |
253 band.fill(np.inf) | |
254 band[np.triu_indices_from(band, width)] = 0 | |
255 band[np.tril_indices_from(band, -width)] = 0 | |
256 | |
257 return band | |
258 | |
259 # Build the distance matrix | |
260 D = scipy.spatial.distance.cdist(data.T, data.T, metric=metric) | |
261 | |
262 # Max out the diagonal band | |
263 D = D + _band_infinite() | |
264 | |
265 # build the recurrence plot | |
266 rec = np.zeros((t, t), dtype=bool) | |
267 | |
268 # get the k nearest neighbors for each point | |
269 for i in range(t): | |
270 for j in np.argsort(D[i])[:k]: | |
271 rec[i, j] = True | |
272 | |
273 # symmetrize | |
274 if sym: | |
275 rec = rec * rec.T | |
276 | |
277 return rec | |
278 | |
279 | |
280 def getMean(feature, winlen, stepsize): | |
281 means = [] | |
282 steps = int((feature.shape[0] - winlen + stepsize) / stepsize) | |
283 for i in xrange(steps): | |
284 means.append(np.mean(feature[i*stepsize:(i*stepsize+winlen), :], axis=0)) | |
285 return np.array(means) | |
286 | |
287 | |
288 def getStd(feature, winlen, stepsize): | |
289 std = [] | |
290 steps = int((feature.shape[0] - winlen + stepsize) / stepsize) | |
291 for i in xrange(steps): | |
292 std.append(np.std(feature[i*stepsize:(i*stepsize+winlen), :], axis=0)) | |
293 return np.array(std) | |
294 | |
295 | |
296 def getDelta(feature): | |
297 delta_feature = np.vstack((np.zeros((1, feature.shape[1])), np.diff(feature, axis=0))) | |
298 return delta_feature | |
299 | |
300 | |
301 def getSSM(feature_array, metric='cosine', norm='simple', reduce=False): | |
302 '''Compute SSM given input feature array. | |
303 args: norm: ['simple', 'remove_noise'] | |
304 ''' | |
305 dm = pairwise_distances(feature_array, metric=metric) | |
306 dm = np.nan_to_num(dm) | |
307 if norm == 'simple': | |
308 ssm = 1 - (dm - np.min(dm)) / (np.max(dm) - np.min(dm)) | |
309 if reduce: | |
310 ssm = reduceSSM(ssm) | |
311 return ssm | |
312 | |
313 | |
314 def reduceSSM(ssm, maxfilter_size = 2, remove_size=50): | |
315 reduced_ssm = ssm | |
316 reduced_ssm[reduced_ssm<0.75] = 0 | |
317 # # reduced_ssm = maximum_filter(reduced_ssm,size=maxfilter_size) | |
318 # # reduced_ssm = morphology.remove_small_objects(reduced_ssm.astype(bool), min_size=remove_size) | |
319 local_otsu = otsu(reduced_ssm, disk(5)) | |
320 local_otsu = (local_otsu.astype(float) - np.min(local_otsu)) / (np.max(local_otsu) - np.min(local_otsu)) | |
321 reduced_ssm = reduced_ssm - 0.6*local_otsu | |
322 return reduced_ssm | |
323 | |
324 | |
325 def upSample(feature_array, step): | |
326 '''Resample downsized tempogram features, tempoWindo should be in accordance with input features''' | |
327 # print feature_array.shape | |
328 sampleRate = 44100 | |
329 stepSize = 1024.0 | |
330 # step = np.ceil(sampleRate/stepSize/5.0) | |
331 feature_array = zoom(feature_array, (step,1)) | |
332 # print 'resampled', feature_array.shape | |
333 return feature_array | |
334 | |
335 | |
336 def normaliseFeature(feature_array): | |
337 '''Normalise features column wisely.''' | |
338 feature_array[np.isnan(feature_array)] = 0.0 | |
339 feature_array[np.isinf(feature_array)] = 0.0 | |
340 feature_array = (feature_array - np.min(feature_array, axis=-1)[:,np.newaxis]) / (np.max(feature_array, axis=-1) - np.min(feature_array, axis=-1))[:,np.newaxis] | |
341 feature_array[np.isnan(feature_array)] = 0.0 | |
342 feature_array[np.isinf(feature_array)] = 0.0 | |
343 | |
344 return feature_array | |
345 | |
346 | |
347 def verifyPeaks(peak_canditates, dev_list): | |
348 '''Verify peaks from the 1st round detection by applying adaptive thresholding to the deviation list.''' | |
349 | |
350 final_peaks = copy(peak_canditates) | |
351 dev_list = np.array([np.mean(x) for x in dev_list]) # get average of devs of different features | |
352 med_dev = median_filter(dev_list, size=5) | |
353 # print dev_list, np.min(dev_list), np.median(dev_list), np.mean(dev_list), np.std(dev_list) | |
354 dev = dev_list - np.percentile(dev_list, 50) | |
355 # print dev | |
356 for i, x in enumerate(dev): | |
357 if x < 0: | |
358 final_peaks.remove(peak_canditates[i]) | |
359 return final_peaks | |
360 | |
361 | |
362 def envelopeFollower(xc, AT, RT, prevG, scaler=1): | |
363 '''Follows the amplitude envelope of input signal xc.''' | |
364 | |
365 g = np.zeros_like(xc) | |
366 length = len(xc) | |
367 | |
368 for i in xrange(length): | |
369 xSquared = xc[i] ** 2 | |
370 # if input is less than the previous output use attack, otherwise use the release | |
371 if xSquared < prevG: | |
372 coeff = AT | |
373 else: | |
374 coeff = RT | |
375 g[i] = (xSquared - prevG)*coeff + prevG | |
376 g[i] *= scaler | |
377 prevG = g[i] | |
378 | |
379 return g | |
380 | |
381 | |
382 def getEnvPeaks(sig, sig_env, size=1): | |
383 '''Finds peaks in the signal envelope. | |
384 args: sig (1d array): orignal input signal | |
385 sig_env (list): position of the signal envelope. | |
386 size: ranges to locate local maxima in the envelope as peaks. | |
387 ''' | |
388 envelope = sig[sig_env] | |
389 peaks = [] | |
390 if len(envelope) > 1 and envelope[0] > envelope[1]: | |
391 peaks.append(sig_env[0]) | |
392 for i in xrange(size, len(envelope)-size-1): | |
393 if envelope[i] > np.max(envelope[i-size:i]) and envelope[i] > np.max(envelope[i+1:i+size+1]): | |
394 peaks.append(sig_env[i]) | |
395 return peaks |