mi@0: """ mi@0: Useful functions that are quite common for music segmentation mi@0: """ mi@0: ''' mi@0: Modified and more funcs added. mi@0: Mi Tian, April 2015. mi@0: ''' mi@0: mi@0: __author__ = "Oriol Nieto" mi@0: __copyright__ = "Copyright 2014, Music and Audio Research Lab (MARL)" mi@0: __license__ = "GPL" mi@0: __version__ = "1.0" mi@0: __email__ = "oriol@nyu.edu" mi@0: mi@0: import copy mi@0: import numpy as np mitian@1: import os, sys mi@0: import scipy mi@0: from scipy.spatial import distance mi@0: from scipy.ndimage import filters, zoom mi@0: from scipy import signal mitian@14: from scipy.signal import correlate2d, convolve2d, filtfilt, resample, butter mitian@17: # import pylab as plt mi@0: from scipy.spatial.distance import squareform, pdist mitian@14: from scipy.ndimage.filters import maximum_filter, minimum_filter, percentile_filter, uniform_filter mitian@17: from scipy.ndimage.filters import median_filter as med_filter # median_filter is a user defined function in this script mitian@4: from sklearn.metrics.pairwise import pairwise_distances mi@0: mitian@17: from GmmMetrics import GmmDistance mi@0: mi@0: def lognormalize_chroma(C): mi@0: """Log-normalizes chroma such that each vector is between -80 to 0.""" mi@0: C += np.abs(C.min()) + 0.1 mi@0: C = C/C.max(axis=0) mi@0: C = 80 * np.log10(C) # Normalize from -80 to 0 mi@0: return C mi@0: mi@0: mi@0: def normalize_matrix(X): mi@0: """Nomalizes a matrix such that it's maximum value is 1 and minimum is 0.""" mi@0: X += np.abs(X.min()) mi@0: X /= X.max() mi@0: return X mi@0: mi@0: mi@0: def ensure_dir(directory): mi@0: """Makes sure that the given directory exists.""" mi@0: if not os.path.exists(directory): mi@0: os.makedirs(directory) mi@0: mitian@17: mi@0: def median_filter(X, M=8): mi@0: """Median filter along the first axis of the feature matrix X.""" mi@0: for i in xrange(X.shape[1]): mi@0: X[:, i] = filters.median_filter(X[:, i], size=M) mi@0: return X mi@0: mi@0: mi@0: def compute_gaussian_krnl(M): mi@0: """Creates a gaussian kernel following Foote's paper.""" mi@0: g = signal.gaussian(M, M / 3., sym=True) mi@0: G = np.dot(g.reshape(-1, 1), g.reshape(1, -1)) mi@0: G[M / 2:, :M / 2] = -G[M / 2:, :M / 2] mi@0: G[:M / 2, M / 2:] = -G[:M / 2, M / 2:] mi@0: return G mi@0: mi@0: mi@0: def compute_ssm(X, metric="seuclidean"): mi@0: """Computes the self-similarity matrix of X.""" mi@0: D = distance.pdist(X, metric=metric) mi@0: D = distance.squareform(D) mi@0: D /= D.max() mi@0: return 1 - D mi@0: mi@0: mi@0: def compute_nc(X, G): mi@0: """Computes the novelty curve from the self-similarity matrix X and mi@0: the gaussian kernel G.""" mi@0: N = X.shape[0] mi@0: M = G.shape[0] mi@0: nc = np.zeros(N) mi@0: mi@0: for i in xrange(M / 2, N - M / 2 + 1): mi@0: nc[i] = np.sum(X[i - M / 2:i + M / 2, i - M / 2:i + M / 2] * G) mi@0: mi@0: # Normalize mi@0: nc += nc.min() mi@0: nc /= nc.max() mi@0: return nc mi@0: mi@0: mi@0: def resample_mx(X, incolpos, outcolpos): mi@0: """ mi@0: Method from Librosa mi@0: Y = resample_mx(X, incolpos, outcolpos) mi@0: X is taken as a set of columns, each starting at 'time' mi@0: colpos, and continuing until the start of the next column. mi@0: Y is a similar matrix, with time boundaries defined by mi@0: outcolpos. Each column of Y is a duration-weighted average of mi@0: the overlapping columns of X. mi@0: 2010-04-14 Dan Ellis dpwe@ee.columbia.edu based on samplemx/beatavg mi@0: -> python: TBM, 2011-11-05, TESTED mi@0: """ mi@0: noutcols = len(outcolpos) mi@0: Y = np.zeros((X.shape[0], noutcols)) mi@0: # assign 'end times' to final columns mi@0: if outcolpos.max() > incolpos.max(): mi@0: incolpos = np.concatenate([incolpos,[outcolpos.max()]]) mi@0: X = np.concatenate([X, X[:,-1].reshape(X.shape[0],1)], axis=1) mi@0: outcolpos = np.concatenate([outcolpos, [outcolpos[-1]]]) mi@0: # durations (default weights) of input columns) mi@0: incoldurs = np.concatenate([np.diff(incolpos), [1]]) mi@0: mi@0: for c in range(noutcols): mi@0: firstincol = np.where(incolpos <= outcolpos[c])[0][-1] mi@0: firstincolnext = np.where(incolpos < outcolpos[c+1])[0][-1] mi@0: lastincol = max(firstincol,firstincolnext) mi@0: # default weights mi@0: wts = copy.deepcopy(incoldurs[firstincol:lastincol+1]) mi@0: # now fix up by partial overlap at ends mi@0: if len(wts) > 1: mi@0: wts[0] = wts[0] - (outcolpos[c] - incolpos[firstincol]) mi@0: wts[-1] = wts[-1] - (incolpos[lastincol+1] - outcolpos[c+1]) mi@0: wts = wts * 1. /sum(wts) mi@0: Y[:,c] = np.dot(X[:,firstincol:lastincol+1], wts) mi@0: # done mi@0: return Y mi@0: mi@0: mi@0: def chroma_to_tonnetz(C): mi@0: """Transforms chromagram to Tonnetz (Harte, Sandler, 2006).""" mi@0: N = C.shape[0] mi@0: T = np.zeros((N, 6)) mi@0: mi@0: r1 = 1 # Fifths mi@0: r2 = 1 # Minor mi@0: r3 = 0.5 # Major mi@0: mi@0: # Generate Transformation matrix mi@0: phi = np.zeros((6, 12)) mi@0: for i in range(6): mi@0: for j in range(12): mi@0: if i % 2 == 0: mi@0: fun = np.sin mi@0: else: mi@0: fun = np.cos mi@0: mi@0: if i < 2: mi@0: phi[i, j] = r1 * fun(j * 7 * np.pi / 6.) mi@0: elif i >= 2 and i < 4: mi@0: phi[i, j] = r2 * fun(j * 3 * np.pi / 2.) mi@0: else: mi@0: phi[i, j] = r3 * fun(j * 2 * np.pi / 3.) mi@0: mi@0: # Do the transform to tonnetz mi@0: for i in range(N): mi@0: for d in range(6): mi@0: denom = float(C[i, :].sum()) mi@0: if denom == 0: mi@0: T[i, d] = 0 mi@0: else: mi@0: T[i, d] = 1 / denom * (phi[d, :] * C[i, :]).sum() mi@0: mi@0: return T mi@0: mi@0: mi@0: def most_frequent(x): mi@0: """Returns the most frequent value in x.""" mi@0: return np.argmax(np.bincount(x)) mi@0: mi@0: mi@0: def pick_peaks(nc, L=16, plot=False): mi@0: """Obtain peaks from a novelty curve using an adaptive threshold.""" mi@0: offset = nc.mean() / 3 mi@0: th = filters.median_filter(nc, size=L) + offset mi@0: peaks = [] mi@0: for i in xrange(1, nc.shape[0] - 1): mi@0: # is it a peak? mi@0: if nc[i - 1] < nc[i] and nc[i] > nc[i + 1]: mi@0: # is it above the threshold? mi@0: if nc[i] > th[i]: mi@0: peaks.append(i) mi@0: if plot: mi@0: plt.plot(nc) mi@0: plt.plot(th) mi@0: for peak in peaks: mi@0: plt.axvline(peak, color="m") mi@0: plt.show() mi@0: return peaks mi@0: mi@0: mi@0: def recurrence_matrix(data, k=None, width=1, metric='sqeuclidean', sym=False): mi@0: ''' mi@0: Note: Copied from librosa mi@0: mi@0: Compute the binary recurrence matrix from a time-series. mi@0: mi@0: ``rec[i,j] == True`` <=> (``data[:,i]``, ``data[:,j]``) are mi@0: k-nearest-neighbors and ``|i-j| >= width`` mi@0: mi@0: :usage: mi@0: >>> mfcc = librosa.feature.mfcc(y=y, sr=sr) mi@0: >>> R = librosa.segment.recurrence_matrix(mfcc) mi@0: mi@0: >>> # Or fix the number of nearest neighbors to 5 mi@0: >>> R = librosa.segment.recurrence_matrix(mfcc, k=5) mi@0: mi@0: >>> # Suppress neighbors within +- 7 samples mi@0: >>> R = librosa.segment.recurrence_matrix(mfcc, width=7) mi@0: mi@0: >>> # Use cosine similarity instead of Euclidean distance mi@0: >>> R = librosa.segment.recurrence_matrix(mfcc, metric='cosine') mi@0: mi@0: >>> # Require mutual nearest neighbors mi@0: >>> R = librosa.segment.recurrence_matrix(mfcc, sym=True) mi@0: mi@0: :parameters: mi@0: - data : np.ndarray mi@0: feature matrix (d-by-t) mi@0: mi@0: - k : int > 0 or None mi@0: the number of nearest-neighbors for each sample mi@0: mi@0: Default: ``k = 2 * ceil(sqrt(t - 2 * width + 1))``, mi@0: or ``k = 2`` if ``t <= 2 * width + 1`` mi@0: mi@0: - width : int > 0 mi@0: only link neighbors ``(data[:, i], data[:, j])`` mi@0: if ``|i-j| >= width`` mi@0: mi@0: - metric : str mi@0: Distance metric to use for nearest-neighbor calculation. mi@0: mi@0: See ``scipy.spatial.distance.cdist()`` for details. mi@0: mi@0: - sym : bool mi@0: set ``sym=True`` to only link mutual nearest-neighbors mi@0: mi@0: :returns: mi@0: - rec : np.ndarray, shape=(t,t), dtype=bool mi@0: Binary recurrence matrix mi@0: ''' mi@0: mi@0: t = data.shape[1] mi@0: mi@0: if k is None: mi@0: if t > 2 * width + 1: mi@0: k = 2 * np.ceil(np.sqrt(t - 2 * width + 1)) mi@0: else: mi@0: k = 2 mi@0: mi@0: k = int(k) mi@0: mi@0: def _band_infinite(): mi@0: '''Suppress the diagonal+- of a distance matrix''' mi@0: mi@0: band = np.empty((t, t)) mi@0: band.fill(np.inf) mi@0: band[np.triu_indices_from(band, width)] = 0 mi@0: band[np.tril_indices_from(band, -width)] = 0 mi@0: mi@0: return band mi@0: mi@0: # Build the distance matrix mi@0: D = scipy.spatial.distance.cdist(data.T, data.T, metric=metric) mi@0: mi@0: # Max out the diagonal band mi@0: D = D + _band_infinite() mi@0: mi@0: # build the recurrence plot mi@0: rec = np.zeros((t, t), dtype=bool) mi@0: mi@0: # get the k nearest neighbors for each point mi@0: for i in range(t): mi@0: for j in np.argsort(D[i])[:k]: mi@0: rec[i, j] = True mi@0: mi@0: # symmetrize mi@0: if sym: mi@0: rec = rec * rec.T mi@0: mi@0: return rec mi@0: mitian@14: def lp(signal, fc=0.34, axis=-1): mitian@14: '''Low pass filter function mitian@14: signal: Raw signal to be smoothed. mitian@14: fc: Cutoff frequency of the butterworth filter. Normalized from 0 to 1, where 1 is the Nyquist frequency. mitian@14: axis: The axis of x to which the filter is applied. Default is -1.''' mitian@14: bCoeffs, aCoeffs = butter(2, fc) mitian@14: lp_smoothed_signal = filtfilt(bCoeffs, aCoeffs, signal, axis) mitian@14: return lp_smoothed_signal mitian@14: mitian@17: mitian@14: def hp(signal, fc=0.34, axis=-1): mitian@14: '''Low pass filter function mitian@14: signal: Raw signal to be smoothed. mitian@14: fc: Cutoff frequency of the butterworth filter. mitian@14: axis: The axis of x to which the filter is applied. Default is -1.''' mitian@14: bCoeffs, aCoeffs = butter(2, fc, 'highpass') mitian@14: hp_smoothed_signal = filtfilt(bCoeffs, aCoeffs, signal, axis) mitian@14: return hp_smoothed_signal mitian@17: mitian@17: mi@0: def getMean(feature, winlen, stepsize): mi@0: means = [] mi@0: steps = int((feature.shape[0] - winlen + stepsize) / stepsize) mi@0: for i in xrange(steps): mi@0: means.append(np.mean(feature[i*stepsize:(i*stepsize+winlen), :], axis=0)) mi@0: return np.array(means) mi@0: mi@0: mi@0: def getStd(feature, winlen, stepsize): mi@0: std = [] mi@0: steps = int((feature.shape[0] - winlen + stepsize) / stepsize) mi@0: for i in xrange(steps): mi@0: std.append(np.std(feature[i*stepsize:(i*stepsize+winlen), :], axis=0)) mi@0: return np.array(std) mi@0: mi@0: mi@0: def getDelta(feature): mi@0: delta_feature = np.vstack((np.zeros((1, feature.shape[1])), np.diff(feature, axis=0))) mi@0: return delta_feature mi@0: mi@0: mitian@14: def getSSM(feature_array, metric='cosine', norm='exp', reduce=False): mi@0: '''Compute SSM given input feature array. mi@0: args: norm: ['simple', 'remove_noise'] mi@0: ''' mi@0: dm = pairwise_distances(feature_array, metric=metric) mi@0: dm = np.nan_to_num(dm) mi@0: if norm == 'simple': mi@0: ssm = 1 - (dm - np.min(dm)) / (np.max(dm) - np.min(dm)) mitian@14: if norm == 'exp': # Use with cosine metric only mitian@17: ssm = 1 - np.exp(dm - 1) mi@0: if reduce: mi@0: ssm = reduceSSM(ssm) mi@0: return ssm mi@0: mitian@17: mitian@14: def enhanceSSM(ssm, fc=0.34, med_size=(5,5), max_size=(5,5), min_size=(5,5), filter_type='min', axis=-1): mitian@14: '''A series of filtering for SSM enhancement mitian@14: fc: cutoff frequency for LP filtering. mitian@14: med_size: Median filter window size. mitian@14: int or tuple. If using an integer for a 2d input, axis must be specified. mitian@17: filter_type: Select either to use maximum filter or minimum filter according to the distance metric with which the SSM was computed. mitian@14: float ['min', 'max', None] mitian@14: max_size: Maximum filter window size. mitian@14: int or tuple. If using an integer for a 2d input, axis must be specified. mitian@14: Use this when homogeneity in the SSM is expressed by LARGE value. mitian@14: min_size: Mininum filter window size. mitian@14: int or tuple. If using an integer for a 2d input, axis must be specified. mitian@14: Use this when homogeneity in the SSM is expressed by SMALL value. mitian@14: (eg. When cosine metric and exp normalization and used for distance computation.)''' mi@0: mitian@16: ssm_lp = lp(ssm, fc=fc) mitian@14: mitian@14: # Use scipy.ndimage.filters.median_filter instead mitian@14: ssm_med = med_filter(ssm_lp, size=med_size) mitian@14: mitian@14: if filter_type == 'min': mitian@14: enhanced_ssm = minimum_filter(ssm_med, size=min_size) mitian@14: elif filter_type == 'max': mitian@14: enhanced_ssm = maximum_filter(ssm_med, size=max_size) mitian@14: else: mitian@14: enhanced_ssm = ssm_med mitian@14: return enhanced_ssm mitian@17: mitian@17: mi@0: def reduceSSM(ssm, maxfilter_size = 2, remove_size=50): mitian@14: '''Adaptive thresholding using OTSU method mitian@17: Required package: skimage (0.10+) -- NOT installed on ignis, do NOT call this function!''' mitian@14: mitian@14: from skimage.morphology import disk mitian@14: # from skimage.filters import threshold_otsu, rank #skimage 0.12 mitian@14: from skimage.filter.rank import otsu #skimage 0.10 mitian@14: from skimage.filter import threshold_otsu mitian@17: mitian@14: reduced_ssm = copy(ssm) mi@0: reduced_ssm[reduced_ssm<0.75] = 0 mi@0: # # reduced_ssm = maximum_filter(reduced_ssm,size=maxfilter_size) mi@0: # # reduced_ssm = morphology.remove_small_objects(reduced_ssm.astype(bool), min_size=remove_size) mi@0: local_otsu = otsu(reduced_ssm, disk(5)) mi@0: local_otsu = (local_otsu.astype(float) - np.min(local_otsu)) / (np.max(local_otsu) - np.min(local_otsu)) mi@0: reduced_ssm = reduced_ssm - 0.6*local_otsu mi@0: return reduced_ssm mi@0: mi@0: mi@0: def upSample(feature_array, step): mi@0: '''Resample downsized tempogram features, tempoWindo should be in accordance with input features''' mi@0: # print feature_array.shape mi@0: sampleRate = 44100 mi@0: stepSize = 1024.0 mi@0: # step = np.ceil(sampleRate/stepSize/5.0) mi@0: feature_array = zoom(feature_array, (step,1)) mi@0: # print 'resampled', feature_array.shape mi@0: return feature_array mi@0: mi@0: mi@0: def normaliseFeature(feature_array): mitian@13: '''Normalise features column wisely. Ensure numerical stability by adding a small constant.''' mi@0: feature_array[np.isnan(feature_array)] = 0.0 mi@0: feature_array[np.isinf(feature_array)] = 0.0 mitian@13: feature_array = (feature_array - np.min(feature_array, axis=-1)[:,np.newaxis]) / (np.max(feature_array, axis=-1) - np.min(feature_array, axis=-1) + 0.005)[:,np.newaxis] mi@0: feature_array[np.isnan(feature_array)] = 0.0 mi@0: feature_array[np.isinf(feature_array)] = 0.0 mi@0: mi@0: return feature_array mi@0: mitian@17: def normaliseArray(X): mitian@17: '''Normalise 1d array.''' mitian@17: if np.max(X) == np.min(X): mitian@17: return None mitian@17: return (X - np.min(X)) / (np.max(X) - np.min(X)) mitian@17: mitian@17: def pairwiseSKL(self, gmm_list): mitian@17: '''Compute pairwise symmetrised KL divergence of a list of GMMs.''' mitian@17: n_GMMs = len(gmm_list) mitian@17: distance_matrix = np.zeros((n_GMMs, n_GMMs)) mitian@17: for i in xrange(n_GMMs): mitian@17: for j in xrange(i, n_GMMs): mitian@17: distance_matrix[i][j] = gmm_list[i].skl_distance_full(gmm_list[j]) mitian@17: distance_matrix[j][i] = distance_matrix[i][j] mitian@17: mitian@17: np.fill_diagonal(distance_matrix, 0.0) mitian@17: distance_matrix[np.isinf(distance_matrix)] = np.finfo(np.float64).max mitian@17: mitian@17: return distance_matrix mi@0: mitian@12: def getRolloff(data, tpower, filterbank, thresh=0.9): mitian@12: nFrames = data.shape[0] mitian@12: nFilters = len(filterbank) mitian@12: rolloff = np.zeros(nFrames) mitian@12: for i in xrange(nFrames): mitian@12: rolloffE = thresh * tpower[i] mitian@12: temp = 0.0 mitian@12: tempE = 0.0 mitian@12: for band in xrange(nFilters): mitian@12: temp += data[i][band] mitian@12: if temp > rolloffE: break mitian@12: rolloff[i] = filterbank[nFilters-band-1] mitian@12: mitian@12: return rolloff mitian@12: mitian@12: mi@0: def verifyPeaks(peak_canditates, dev_list): mi@0: '''Verify peaks from the 1st round detection by applying adaptive thresholding to the deviation list.''' mi@0: mi@0: final_peaks = copy(peak_canditates) mi@0: dev_list = np.array([np.mean(x) for x in dev_list]) # get average of devs of different features mi@0: med_dev = median_filter(dev_list, size=5) mi@0: # print dev_list, np.min(dev_list), np.median(dev_list), np.mean(dev_list), np.std(dev_list) mi@0: dev = dev_list - np.percentile(dev_list, 50) mi@0: # print dev mi@0: for i, x in enumerate(dev): mi@0: if x < 0: mi@0: final_peaks.remove(peak_canditates[i]) mi@0: return final_peaks mi@0: mi@0: mi@0: def envelopeFollower(xc, AT, RT, prevG, scaler=1): mi@0: '''Follows the amplitude envelope of input signal xc.''' mi@0: mi@0: g = np.zeros_like(xc) mi@0: length = len(xc) mi@0: mi@0: for i in xrange(length): mi@0: xSquared = xc[i] ** 2 mi@0: # if input is less than the previous output use attack, otherwise use the release mi@0: if xSquared < prevG: mi@0: coeff = AT mi@0: else: mi@0: coeff = RT mi@0: g[i] = (xSquared - prevG)*coeff + prevG mi@0: g[i] *= scaler mi@0: prevG = g[i] mi@0: mi@0: return g mi@0: mi@0: mi@0: def getEnvPeaks(sig, sig_env, size=1): mi@0: '''Finds peaks in the signal envelope. mi@0: args: sig (1d array): orignal input signal mi@0: sig_env (list): position of the signal envelope. mi@0: size: ranges to locate local maxima in the envelope as peaks. mi@0: ''' mi@0: envelope = sig[sig_env] mi@0: peaks = [] mi@0: if len(envelope) > 1 and envelope[0] > envelope[1]: mi@0: peaks.append(sig_env[0]) mi@0: for i in xrange(size, len(envelope)-size-1): mi@0: if envelope[i] > np.max(envelope[i-size:i]) and envelope[i] > np.max(envelope[i+1:i+size+1]): mi@0: peaks.append(sig_env[i]) mitian@13: return peaks mitian@13: mitian@13: mitian@13: def deltaFeature(self, feature_array, step=1, axis=-1): mitian@13: '''Return delta of a feature array''' mitian@13: delta = np.zeros_like(feature_array) mitian@13: delta[:, step:] = np.diff(feature_array, axis=axis) mitian@13: return delta mitian@13: mitian@13: mitian@13: def plotCurve(self, yp, yr, yf, x, labels): mitian@13: '''Plot performance curve. mitian@13: x axis: distance threshold for feature selection; y axis: f measure''' mitian@13: mitian@13: f = plt.figure() mitian@13: ax = f.add_axes([0.1, 0.1, 0.7, 0.7]) mitian@13: l1, l2, l3 = ax.plot(x, yp, 'rs-', x, yr, 'go-', x, yf, 'k^-') mitian@13: f.legend((l1, l2, l3), ('Precision', 'Recall', 'F-measure'), 'upper left') mitian@13: for i, label in enumerate(labels): mitian@13: ax.annotate(label, (x[i], yf[i])) mitian@13: plt.show() mitian@13: plt.savefig('performance.pdf', format='pdf') mitian@13: mitian@13: return None mitian@13: