Mercurial > hg > segmentation
diff utils/SegUtil.py @ 0:26838b1f560f
initial commit of a segmenter project
author | mi tian |
---|---|
date | Thu, 02 Apr 2015 18:09:27 +0100 |
parents | |
children | c11ea9e0357f |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/utils/SegUtil.py Thu Apr 02 18:09:27 2015 +0100 @@ -0,0 +1,395 @@ +""" +Useful functions that are quite common for music segmentation +""" +''' +Modified and more funcs added. +Mi Tian, April 2015. +''' + +__author__ = "Oriol Nieto" +__copyright__ = "Copyright 2014, Music and Audio Research Lab (MARL)" +__license__ = "GPL" +__version__ = "1.0" +__email__ = "oriol@nyu.edu" + +import copy +import numpy as np +import os +import scipy +from scipy.spatial import distance +from scipy.ndimage import filters, zoom +from scipy import signal +import pylab as plt +from scipy.spatial.distance import squareform, pdist + + +def lognormalize_chroma(C): + """Log-normalizes chroma such that each vector is between -80 to 0.""" + C += np.abs(C.min()) + 0.1 + C = C/C.max(axis=0) + C = 80 * np.log10(C) # Normalize from -80 to 0 + return C + + +def normalize_matrix(X): + """Nomalizes a matrix such that it's maximum value is 1 and minimum is 0.""" + X += np.abs(X.min()) + X /= X.max() + return X + + +def ensure_dir(directory): + """Makes sure that the given directory exists.""" + if not os.path.exists(directory): + os.makedirs(directory) + + +def median_filter(X, M=8): + """Median filter along the first axis of the feature matrix X.""" + for i in xrange(X.shape[1]): + X[:, i] = filters.median_filter(X[:, i], size=M) + return X + + +def compute_gaussian_krnl(M): + """Creates a gaussian kernel following Foote's paper.""" + g = signal.gaussian(M, M / 3., sym=True) + G = np.dot(g.reshape(-1, 1), g.reshape(1, -1)) + G[M / 2:, :M / 2] = -G[M / 2:, :M / 2] + G[:M / 2, M / 2:] = -G[:M / 2, M / 2:] + return G + + +def compute_ssm(X, metric="seuclidean"): + """Computes the self-similarity matrix of X.""" + D = distance.pdist(X, metric=metric) + D = distance.squareform(D) + D /= D.max() + return 1 - D + + +def compute_nc(X, G): + """Computes the novelty curve from the self-similarity matrix X and + the gaussian kernel G.""" + N = X.shape[0] + M = G.shape[0] + nc = np.zeros(N) + + for i in xrange(M / 2, N - M / 2 + 1): + nc[i] = np.sum(X[i - M / 2:i + M / 2, i - M / 2:i + M / 2] * G) + + # Normalize + nc += nc.min() + nc /= nc.max() + return nc + + +def resample_mx(X, incolpos, outcolpos): + """ + Method from Librosa + Y = resample_mx(X, incolpos, outcolpos) + X is taken as a set of columns, each starting at 'time' + colpos, and continuing until the start of the next column. + Y is a similar matrix, with time boundaries defined by + outcolpos. Each column of Y is a duration-weighted average of + the overlapping columns of X. + 2010-04-14 Dan Ellis dpwe@ee.columbia.edu based on samplemx/beatavg + -> python: TBM, 2011-11-05, TESTED + """ + noutcols = len(outcolpos) + Y = np.zeros((X.shape[0], noutcols)) + # assign 'end times' to final columns + if outcolpos.max() > incolpos.max(): + incolpos = np.concatenate([incolpos,[outcolpos.max()]]) + X = np.concatenate([X, X[:,-1].reshape(X.shape[0],1)], axis=1) + outcolpos = np.concatenate([outcolpos, [outcolpos[-1]]]) + # durations (default weights) of input columns) + incoldurs = np.concatenate([np.diff(incolpos), [1]]) + + for c in range(noutcols): + firstincol = np.where(incolpos <= outcolpos[c])[0][-1] + firstincolnext = np.where(incolpos < outcolpos[c+1])[0][-1] + lastincol = max(firstincol,firstincolnext) + # default weights + wts = copy.deepcopy(incoldurs[firstincol:lastincol+1]) + # now fix up by partial overlap at ends + if len(wts) > 1: + wts[0] = wts[0] - (outcolpos[c] - incolpos[firstincol]) + wts[-1] = wts[-1] - (incolpos[lastincol+1] - outcolpos[c+1]) + wts = wts * 1. /sum(wts) + Y[:,c] = np.dot(X[:,firstincol:lastincol+1], wts) + # done + return Y + + +def chroma_to_tonnetz(C): + """Transforms chromagram to Tonnetz (Harte, Sandler, 2006).""" + N = C.shape[0] + T = np.zeros((N, 6)) + + r1 = 1 # Fifths + r2 = 1 # Minor + r3 = 0.5 # Major + + # Generate Transformation matrix + phi = np.zeros((6, 12)) + for i in range(6): + for j in range(12): + if i % 2 == 0: + fun = np.sin + else: + fun = np.cos + + if i < 2: + phi[i, j] = r1 * fun(j * 7 * np.pi / 6.) + elif i >= 2 and i < 4: + phi[i, j] = r2 * fun(j * 3 * np.pi / 2.) + else: + phi[i, j] = r3 * fun(j * 2 * np.pi / 3.) + + # Do the transform to tonnetz + for i in range(N): + for d in range(6): + denom = float(C[i, :].sum()) + if denom == 0: + T[i, d] = 0 + else: + T[i, d] = 1 / denom * (phi[d, :] * C[i, :]).sum() + + return T + + +def most_frequent(x): + """Returns the most frequent value in x.""" + return np.argmax(np.bincount(x)) + + +def pick_peaks(nc, L=16, plot=False): + """Obtain peaks from a novelty curve using an adaptive threshold.""" + offset = nc.mean() / 3 + th = filters.median_filter(nc, size=L) + offset + peaks = [] + for i in xrange(1, nc.shape[0] - 1): + # is it a peak? + if nc[i - 1] < nc[i] and nc[i] > nc[i + 1]: + # is it above the threshold? + if nc[i] > th[i]: + peaks.append(i) + if plot: + plt.plot(nc) + plt.plot(th) + for peak in peaks: + plt.axvline(peak, color="m") + plt.show() + return peaks + + +def recurrence_matrix(data, k=None, width=1, metric='sqeuclidean', sym=False): + ''' + Note: Copied from librosa + + Compute the binary recurrence matrix from a time-series. + + ``rec[i,j] == True`` <=> (``data[:,i]``, ``data[:,j]``) are + k-nearest-neighbors and ``|i-j| >= width`` + + :usage: + >>> mfcc = librosa.feature.mfcc(y=y, sr=sr) + >>> R = librosa.segment.recurrence_matrix(mfcc) + + >>> # Or fix the number of nearest neighbors to 5 + >>> R = librosa.segment.recurrence_matrix(mfcc, k=5) + + >>> # Suppress neighbors within +- 7 samples + >>> R = librosa.segment.recurrence_matrix(mfcc, width=7) + + >>> # Use cosine similarity instead of Euclidean distance + >>> R = librosa.segment.recurrence_matrix(mfcc, metric='cosine') + + >>> # Require mutual nearest neighbors + >>> R = librosa.segment.recurrence_matrix(mfcc, sym=True) + + :parameters: + - data : np.ndarray + feature matrix (d-by-t) + + - k : int > 0 or None + the number of nearest-neighbors for each sample + + Default: ``k = 2 * ceil(sqrt(t - 2 * width + 1))``, + or ``k = 2`` if ``t <= 2 * width + 1`` + + - width : int > 0 + only link neighbors ``(data[:, i], data[:, j])`` + if ``|i-j| >= width`` + + - metric : str + Distance metric to use for nearest-neighbor calculation. + + See ``scipy.spatial.distance.cdist()`` for details. + + - sym : bool + set ``sym=True`` to only link mutual nearest-neighbors + + :returns: + - rec : np.ndarray, shape=(t,t), dtype=bool + Binary recurrence matrix + ''' + + t = data.shape[1] + + if k is None: + if t > 2 * width + 1: + k = 2 * np.ceil(np.sqrt(t - 2 * width + 1)) + else: + k = 2 + + k = int(k) + + def _band_infinite(): + '''Suppress the diagonal+- of a distance matrix''' + + band = np.empty((t, t)) + band.fill(np.inf) + band[np.triu_indices_from(band, width)] = 0 + band[np.tril_indices_from(band, -width)] = 0 + + return band + + # Build the distance matrix + D = scipy.spatial.distance.cdist(data.T, data.T, metric=metric) + + # Max out the diagonal band + D = D + _band_infinite() + + # build the recurrence plot + rec = np.zeros((t, t), dtype=bool) + + # get the k nearest neighbors for each point + for i in range(t): + for j in np.argsort(D[i])[:k]: + rec[i, j] = True + + # symmetrize + if sym: + rec = rec * rec.T + + return rec + + +def getMean(feature, winlen, stepsize): + means = [] + steps = int((feature.shape[0] - winlen + stepsize) / stepsize) + for i in xrange(steps): + means.append(np.mean(feature[i*stepsize:(i*stepsize+winlen), :], axis=0)) + return np.array(means) + + +def getStd(feature, winlen, stepsize): + std = [] + steps = int((feature.shape[0] - winlen + stepsize) / stepsize) + for i in xrange(steps): + std.append(np.std(feature[i*stepsize:(i*stepsize+winlen), :], axis=0)) + return np.array(std) + + +def getDelta(feature): + delta_feature = np.vstack((np.zeros((1, feature.shape[1])), np.diff(feature, axis=0))) + return delta_feature + + +def getSSM(feature_array, metric='cosine', norm='simple', reduce=False): + '''Compute SSM given input feature array. + args: norm: ['simple', 'remove_noise'] + ''' + dm = pairwise_distances(feature_array, metric=metric) + dm = np.nan_to_num(dm) + if norm == 'simple': + ssm = 1 - (dm - np.min(dm)) / (np.max(dm) - np.min(dm)) + if reduce: + ssm = reduceSSM(ssm) + return ssm + + +def reduceSSM(ssm, maxfilter_size = 2, remove_size=50): + reduced_ssm = ssm + reduced_ssm[reduced_ssm<0.75] = 0 + # # reduced_ssm = maximum_filter(reduced_ssm,size=maxfilter_size) + # # reduced_ssm = morphology.remove_small_objects(reduced_ssm.astype(bool), min_size=remove_size) + local_otsu = otsu(reduced_ssm, disk(5)) + local_otsu = (local_otsu.astype(float) - np.min(local_otsu)) / (np.max(local_otsu) - np.min(local_otsu)) + reduced_ssm = reduced_ssm - 0.6*local_otsu + return reduced_ssm + + +def upSample(feature_array, step): + '''Resample downsized tempogram features, tempoWindo should be in accordance with input features''' + # print feature_array.shape + sampleRate = 44100 + stepSize = 1024.0 + # step = np.ceil(sampleRate/stepSize/5.0) + feature_array = zoom(feature_array, (step,1)) + # print 'resampled', feature_array.shape + return feature_array + + +def normaliseFeature(feature_array): + '''Normalise features column wisely.''' + feature_array[np.isnan(feature_array)] = 0.0 + feature_array[np.isinf(feature_array)] = 0.0 + feature_array = (feature_array - np.min(feature_array, axis=-1)[:,np.newaxis]) / (np.max(feature_array, axis=-1) - np.min(feature_array, axis=-1))[:,np.newaxis] + feature_array[np.isnan(feature_array)] = 0.0 + feature_array[np.isinf(feature_array)] = 0.0 + + return feature_array + + +def verifyPeaks(peak_canditates, dev_list): + '''Verify peaks from the 1st round detection by applying adaptive thresholding to the deviation list.''' + + final_peaks = copy(peak_canditates) + dev_list = np.array([np.mean(x) for x in dev_list]) # get average of devs of different features + med_dev = median_filter(dev_list, size=5) + # print dev_list, np.min(dev_list), np.median(dev_list), np.mean(dev_list), np.std(dev_list) + dev = dev_list - np.percentile(dev_list, 50) + # print dev + for i, x in enumerate(dev): + if x < 0: + final_peaks.remove(peak_canditates[i]) + return final_peaks + + +def envelopeFollower(xc, AT, RT, prevG, scaler=1): + '''Follows the amplitude envelope of input signal xc.''' + + g = np.zeros_like(xc) + length = len(xc) + + for i in xrange(length): + xSquared = xc[i] ** 2 + # if input is less than the previous output use attack, otherwise use the release + if xSquared < prevG: + coeff = AT + else: + coeff = RT + g[i] = (xSquared - prevG)*coeff + prevG + g[i] *= scaler + prevG = g[i] + + return g + + +def getEnvPeaks(sig, sig_env, size=1): + '''Finds peaks in the signal envelope. + args: sig (1d array): orignal input signal + sig_env (list): position of the signal envelope. + size: ranges to locate local maxima in the envelope as peaks. + ''' + envelope = sig[sig_env] + peaks = [] + if len(envelope) > 1 and envelope[0] > envelope[1]: + peaks.append(sig_env[0]) + for i in xrange(size, len(envelope)-size-1): + if envelope[i] > np.max(envelope[i-size:i]) and envelope[i] > np.max(envelope[i+1:i+size+1]): + peaks.append(sig_env[i]) + return peaks \ No newline at end of file