view utils/SegUtil.py @ 13:cc8ceb270e79

add some gmm ipynbs
author mitian
date Fri, 05 Jun 2015 18:02:05 +0100
parents c23658e8ae38
children 6dae41887406
line wrap: on
line source
"""
Useful functions that are quite common for music segmentation
"""
'''
Modified and more funcs added.
Mi Tian, April 2015.
'''

__author__ = "Oriol Nieto"
__copyright__ = "Copyright 2014, Music and Audio Research Lab (MARL)"
__license__ = "GPL"
__version__ = "1.0"
__email__ = "oriol@nyu.edu"

import copy
import numpy as np
import os, sys
import scipy
from scipy.spatial import distance
from scipy.ndimage import filters, zoom
from scipy import signal
import pylab as plt
from scipy.spatial.distance import squareform, pdist
from scipy.ndimage.filters import *
from sklearn.metrics.pairwise import pairwise_distances


def lognormalize_chroma(C):
	"""Log-normalizes chroma such that each vector is between -80 to 0."""
	C += np.abs(C.min()) + 0.1
	C = C/C.max(axis=0)
	C = 80 * np.log10(C)  # Normalize from -80 to 0
	return C


def normalize_matrix(X):
	"""Nomalizes a matrix such that it's maximum value is 1 and minimum is 0."""
	X += np.abs(X.min())
	X /= X.max()
	return X


def ensure_dir(directory):
	"""Makes sure that the given directory exists."""
	if not os.path.exists(directory):
		os.makedirs(directory)


def median_filter(X, M=8):
	"""Median filter along the first axis of the feature matrix X."""
	for i in xrange(X.shape[1]):
		X[:, i] = filters.median_filter(X[:, i], size=M)
	return X


def compute_gaussian_krnl(M):
	"""Creates a gaussian kernel following Foote's paper."""
	g = signal.gaussian(M, M / 3., sym=True)
	G = np.dot(g.reshape(-1, 1), g.reshape(1, -1))
	G[M / 2:, :M / 2] = -G[M / 2:, :M / 2]
	G[:M / 2, M / 2:] = -G[:M / 2, M / 2:]
	return G


def compute_ssm(X, metric="seuclidean"):
	"""Computes the self-similarity matrix of X."""
	D = distance.pdist(X, metric=metric)
	D = distance.squareform(D)
	D /= D.max()
	return 1 - D


def compute_nc(X, G):
	"""Computes the novelty curve from the self-similarity matrix X and
		the gaussian kernel G."""
	N = X.shape[0]
	M = G.shape[0]
	nc = np.zeros(N)

	for i in xrange(M / 2, N - M / 2 + 1):
		nc[i] = np.sum(X[i - M / 2:i + M / 2, i - M / 2:i + M / 2] * G)

	# Normalize
	nc += nc.min()
	nc /= nc.max()
	return nc


def resample_mx(X, incolpos, outcolpos):
	"""
	Method from Librosa
	Y = resample_mx(X, incolpos, outcolpos)
	X is taken as a set of columns, each starting at 'time'
	colpos, and continuing until the start of the next column.
	Y is a similar matrix, with time boundaries defined by
	outcolpos.	Each column of Y is a duration-weighted average of
	the overlapping columns of X.
	2010-04-14 Dan Ellis dpwe@ee.columbia.edu  based on samplemx/beatavg
	-> python: TBM, 2011-11-05, TESTED
	"""
	noutcols = len(outcolpos)
	Y = np.zeros((X.shape[0], noutcols))
	# assign 'end times' to final columns
	if outcolpos.max() > incolpos.max():
		incolpos = np.concatenate([incolpos,[outcolpos.max()]])
		X = np.concatenate([X, X[:,-1].reshape(X.shape[0],1)], axis=1)
	outcolpos = np.concatenate([outcolpos, [outcolpos[-1]]])
	# durations (default weights) of input columns)
	incoldurs = np.concatenate([np.diff(incolpos), [1]])

	for c in range(noutcols):
		firstincol = np.where(incolpos <= outcolpos[c])[0][-1]
		firstincolnext = np.where(incolpos < outcolpos[c+1])[0][-1]
		lastincol = max(firstincol,firstincolnext)
		# default weights
		wts = copy.deepcopy(incoldurs[firstincol:lastincol+1])
		# now fix up by partial overlap at ends
		if len(wts) > 1:
			wts[0] = wts[0] - (outcolpos[c] - incolpos[firstincol])
			wts[-1] = wts[-1] - (incolpos[lastincol+1] - outcolpos[c+1])
		wts = wts * 1. /sum(wts)
		Y[:,c] = np.dot(X[:,firstincol:lastincol+1], wts)
	# done
	return Y


def chroma_to_tonnetz(C):
	"""Transforms chromagram to Tonnetz (Harte, Sandler, 2006)."""
	N = C.shape[0]
	T = np.zeros((N, 6))

	r1 = 1		# Fifths
	r2 = 1		# Minor
	r3 = 0.5	# Major

	# Generate Transformation matrix
	phi = np.zeros((6, 12))
	for i in range(6):
		for j in range(12):
			if i % 2 == 0:
				fun = np.sin
			else:
				fun = np.cos

			if i < 2:
				phi[i, j] = r1 * fun(j * 7 * np.pi / 6.)
			elif i >= 2 and i < 4:
				phi[i, j] = r2 * fun(j * 3 * np.pi / 2.)
			else:
				phi[i, j] = r3 * fun(j * 2 * np.pi / 3.)

	# Do the transform to tonnetz
	for i in range(N):
		for d in range(6):
			denom = float(C[i, :].sum())
			if denom == 0:
				T[i, d] = 0
			else:
				T[i, d] = 1 / denom * (phi[d, :] * C[i, :]).sum()

	return T


def most_frequent(x):
	"""Returns the most frequent value in x."""
	return np.argmax(np.bincount(x))


def pick_peaks(nc, L=16, plot=False):
	"""Obtain peaks from a novelty curve using an adaptive threshold."""
	offset = nc.mean() / 3
	th = filters.median_filter(nc, size=L) + offset
	peaks = []
	for i in xrange(1, nc.shape[0] - 1):
		# is it a peak?
		if nc[i - 1] < nc[i] and nc[i] > nc[i + 1]:
			# is it above the threshold?
			if nc[i] > th[i]:
				peaks.append(i)
	if plot:
		plt.plot(nc)
		plt.plot(th)
		for peak in peaks:
			plt.axvline(peak, color="m")
		plt.show()
	return peaks


def recurrence_matrix(data, k=None, width=1, metric='sqeuclidean', sym=False):
	'''
	Note: Copied from librosa

	Compute the binary recurrence matrix from a time-series.

	``rec[i,j] == True`` <=> (``data[:,i]``, ``data[:,j]``) are
	k-nearest-neighbors and ``|i-j| >= width``

	:usage:
		>>> mfcc	= librosa.feature.mfcc(y=y, sr=sr)
		>>> R		= librosa.segment.recurrence_matrix(mfcc)

		>>> # Or fix the number of nearest neighbors to 5
		>>> R		= librosa.segment.recurrence_matrix(mfcc, k=5)

		>>> # Suppress neighbors within +- 7 samples
		>>> R		= librosa.segment.recurrence_matrix(mfcc, width=7)

		>>> # Use cosine similarity instead of Euclidean distance
		>>> R		= librosa.segment.recurrence_matrix(mfcc, metric='cosine')

		>>> # Require mutual nearest neighbors
		>>> R		= librosa.segment.recurrence_matrix(mfcc, sym=True)

	:parameters:
	  - data : np.ndarray
		  feature matrix (d-by-t)

	  - k : int > 0 or None
		  the number of nearest-neighbors for each sample

		  Default: ``k = 2 * ceil(sqrt(t - 2 * width + 1))``,
		  or ``k = 2`` if ``t <= 2 * width + 1``

	  - width : int > 0
		  only link neighbors ``(data[:, i], data[:, j])``
		  if ``|i-j| >= width``

	  - metric : str
		  Distance metric to use for nearest-neighbor calculation.

		  See ``scipy.spatial.distance.cdist()`` for details.

	  - sym : bool
		  set ``sym=True`` to only link mutual nearest-neighbors

	:returns:
	  - rec : np.ndarray, shape=(t,t), dtype=bool
		  Binary recurrence matrix
	'''

	t = data.shape[1]

	if k is None:
		if t > 2 * width + 1:
			k = 2 * np.ceil(np.sqrt(t - 2 * width + 1))
		else:
			k = 2

	k = int(k)

	def _band_infinite():
		'''Suppress the diagonal+- of a distance matrix'''

		band = np.empty((t, t))
		band.fill(np.inf)
		band[np.triu_indices_from(band, width)] = 0
		band[np.tril_indices_from(band, -width)] = 0

		return band

	# Build the distance matrix
	D = scipy.spatial.distance.cdist(data.T, data.T, metric=metric)

	# Max out the diagonal band
	D = D + _band_infinite()

	# build the recurrence plot
	rec = np.zeros((t, t), dtype=bool)

	# get the k nearest neighbors for each point
	for i in range(t):
		for j in np.argsort(D[i])[:k]:
			rec[i, j] = True

	# symmetrize
	if sym:
		rec = rec * rec.T

	return rec

def finiteMax(X):
	'''Return the smallest finite value in the array'''
	if not (np.isnan(X).any() or np.isposinf(X).any()):
		return np.max(X)
	data = np.sort(np.ndarray.flatten(X))
	pos = np.where(data == np.inf)[0]
	fMax = 0
	return fMax

def finiteMin(feature):
	'''Return the smallest finite value in the array'''
	if not (np.isnan(X).any() or np.isinf(X).any()):
		return np.min(X)
	fMin = 0
	return fMin
	
def getMean(feature, winlen, stepsize):
	means = []
	steps = int((feature.shape[0] - winlen + stepsize) / stepsize)
	for i in xrange(steps):
		means.append(np.mean(feature[i*stepsize:(i*stepsize+winlen), :], axis=0))
	return np.array(means)


def getStd(feature, winlen, stepsize):
	std = []
	steps = int((feature.shape[0] - winlen + stepsize) / stepsize)
	for i in xrange(steps):
		std.append(np.std(feature[i*stepsize:(i*stepsize+winlen), :], axis=0))
	return np.array(std)


def getDelta(feature):
	delta_feature = np.vstack((np.zeros((1, feature.shape[1])), np.diff(feature, axis=0)))
	return delta_feature


def getSSM(feature_array, metric='cosine', norm='simple', reduce=False):
	'''Compute SSM given input feature array. 
	args: norm: ['simple', 'remove_noise']
	'''
	dm = pairwise_distances(feature_array, metric=metric)
	dm = np.nan_to_num(dm)
	if norm == 'simple':
		ssm = 1 - (dm - np.min(dm)) / (np.max(dm) - np.min(dm))
	if reduce:
		ssm = reduceSSM(ssm)
	return ssm


def reduceSSM(ssm, maxfilter_size = 2, remove_size=50):
	reduced_ssm = ssm
	reduced_ssm[reduced_ssm<0.75] = 0
	# # reduced_ssm = maximum_filter(reduced_ssm,size=maxfilter_size)
	# # reduced_ssm = morphology.remove_small_objects(reduced_ssm.astype(bool), min_size=remove_size)
	local_otsu = otsu(reduced_ssm, disk(5))
	local_otsu = (local_otsu.astype(float) - np.min(local_otsu)) / (np.max(local_otsu) - np.min(local_otsu))
	reduced_ssm = reduced_ssm - 0.6*local_otsu
	return reduced_ssm	


def upSample(feature_array, step):
	'''Resample downsized tempogram features, tempoWindo should be in accordance with input features'''
	# print feature_array.shape
	sampleRate = 44100
	stepSize = 1024.0
	# step = np.ceil(sampleRate/stepSize/5.0)
	feature_array = zoom(feature_array, (step,1))
	# print 'resampled', feature_array.shape	
	return feature_array


def normaliseFeature(feature_array):
	'''Normalise features column wisely. Ensure numerical stability by adding a small constant.'''
	feature_array[np.isnan(feature_array)] = 0.0
	feature_array[np.isinf(feature_array)] = 0.0	
	feature_array = (feature_array - np.min(feature_array, axis=-1)[:,np.newaxis]) / (np.max(feature_array, axis=-1) - np.min(feature_array, axis=-1) + 0.005)[:,np.newaxis]
	feature_array[np.isnan(feature_array)] = 0.0
	feature_array[np.isinf(feature_array)] = 0.0

	return feature_array		


def getRolloff(data, tpower, filterbank, thresh=0.9):
	nFrames = data.shape[0]
	nFilters = len(filterbank)
	rolloff = np.zeros(nFrames)
	for i in xrange(nFrames):
		rolloffE = thresh * tpower[i]
		temp = 0.0
		tempE = 0.0
		for band in xrange(nFilters):
			temp += data[i][band]
			if temp > rolloffE: break
		rolloff[i] = filterbank[nFilters-band-1]
	
	return rolloff


def verifyPeaks(peak_canditates, dev_list):
	'''Verify peaks from the 1st round detection by applying adaptive thresholding to the deviation list.'''
	
	final_peaks = copy(peak_canditates)
	dev_list = np.array([np.mean(x) for x in dev_list]) # get average of devs of different features
	med_dev = median_filter(dev_list, size=5)
	# print dev_list, np.min(dev_list), np.median(dev_list), np.mean(dev_list), np.std(dev_list)
	dev = dev_list - np.percentile(dev_list, 50)
	# print dev
	for i, x in enumerate(dev):
		if x < 0:
			final_peaks.remove(peak_canditates[i])
	return final_peaks


def envelopeFollower(xc, AT, RT, prevG, scaler=1):
	'''Follows the amplitude envelope of input signal xc.'''
	
	g = np.zeros_like(xc)
	length = len(xc)
	
	for i in xrange(length):
		xSquared = xc[i] ** 2
		# if input is less than the previous output use attack, otherwise use the release
		if xSquared < prevG:
			coeff = AT
		else:
			coeff = RT
		g[i] = (xSquared - prevG)*coeff + prevG
		g[i] *= scaler
		prevG = g[i]
		
	return g

	
def getEnvPeaks(sig, sig_env, size=1):
	'''Finds peaks in the signal envelope.
	args: sig (1d array): orignal input signal
		  sig_env (list): position of the signal envelope. 
		  size: ranges to locate local maxima in the envelope as peaks. 
	'''
	envelope = sig[sig_env]
	peaks = []
	if len(envelope) > 1 and envelope[0] > envelope[1]:
		peaks.append(sig_env[0])
	for i in xrange(size, len(envelope)-size-1):
		if envelope[i] > np.max(envelope[i-size:i]) and envelope[i] > np.max(envelope[i+1:i+size+1]):
			peaks.append(sig_env[i])
	return peaks


def deltaFeature(self, feature_array, step=1, axis=-1):
	'''Return delta of a feature array'''
	delta = np.zeros_like(feature_array)
	delta[:, step:] = np.diff(feature_array, axis=axis)
	return delta
	

def plotCurve(self, yp, yr, yf, x, labels):
	'''Plot performance curve.
	x axis: distance threshold for feature selection; y axis: f measure'''

	f = plt.figure()
	ax = f.add_axes([0.1, 0.1, 0.7, 0.7])
	l1, l2, l3 = ax.plot(x, yp, 'rs-', x, yr, 'go-', x, yf, 'k^-')
	f.legend((l1, l2, l3), ('Precision', 'Recall', 'F-measure'), 'upper left')
	for i, label in enumerate(labels):
		ax.annotate(label, (x[i], yf[i]))
	plt.show()
	plt.savefig('performance.pdf', format='pdf')

	return None