view utils/SegUtil.py @ 19:890cfe424f4a tip

added annotations
author mitian
date Fri, 11 Dec 2015 09:47:40 +0000
parents c01fcb752221
children
line wrap: on
line source
"""
Useful functions that are quite common for music segmentation
"""
'''
Modified and more funcs added.
Mi Tian, April 2015.
'''

__author__ = "Oriol Nieto"
__copyright__ = "Copyright 2014, Music and Audio Research Lab (MARL)"
__license__ = "GPL"
__version__ = "1.0"
__email__ = "oriol@nyu.edu"

import copy
import numpy as np
import os, sys
import scipy
from scipy.spatial import distance
from scipy.ndimage import filters, zoom
from scipy import signal
from scipy.signal import correlate2d, convolve2d, filtfilt, resample, butter
# import pylab as plt
from scipy.spatial.distance import squareform, pdist
from scipy.ndimage.filters import maximum_filter, minimum_filter, percentile_filter, uniform_filter
from scipy.ndimage.filters import median_filter as med_filter # median_filter is a user defined function in this script
from sklearn.metrics.pairwise import pairwise_distances

from GmmMetrics import GmmDistance

def lognormalize_chroma(C):
	"""Log-normalizes chroma such that each vector is between -80 to 0."""
	C += np.abs(C.min()) + 0.1
	C = C/C.max(axis=0)
	C = 80 * np.log10(C)  # Normalize from -80 to 0
	return C


def normalize_matrix(X):
	"""Nomalizes a matrix such that it's maximum value is 1 and minimum is 0."""
	X += np.abs(X.min())
	X /= X.max()
	return X


def ensure_dir(directory):
	"""Makes sure that the given directory exists."""
	if not os.path.exists(directory):
		os.makedirs(directory)


def median_filter(X, M=8):
	"""Median filter along the first axis of the feature matrix X."""
	for i in xrange(X.shape[1]):
		X[:, i] = filters.median_filter(X[:, i], size=M)
	return X


def compute_gaussian_krnl(M):
	"""Creates a gaussian kernel following Foote's paper."""
	g = signal.gaussian(M, M / 3., sym=True)
	G = np.dot(g.reshape(-1, 1), g.reshape(1, -1))
	G[M / 2:, :M / 2] = -G[M / 2:, :M / 2]
	G[:M / 2, M / 2:] = -G[:M / 2, M / 2:]
	return G


def compute_ssm(X, metric="seuclidean"):
	"""Computes the self-similarity matrix of X."""
	D = distance.pdist(X, metric=metric)
	D = distance.squareform(D)
	D /= D.max()
	return 1 - D


def compute_nc(X, G):
	"""Computes the novelty curve from the self-similarity matrix X and
		the gaussian kernel G."""
	N = X.shape[0]
	M = G.shape[0]
	nc = np.zeros(N)

	for i in xrange(M / 2, N - M / 2 + 1):
		nc[i] = np.sum(X[i - M / 2:i + M / 2, i - M / 2:i + M / 2] * G)

	# Normalize
	nc += nc.min()
	nc /= nc.max()
	return nc


def resample_mx(X, incolpos, outcolpos):
	"""
	Method from Librosa
	Y = resample_mx(X, incolpos, outcolpos)
	X is taken as a set of columns, each starting at 'time'
	colpos, and continuing until the start of the next column.
	Y is a similar matrix, with time boundaries defined by
	outcolpos.	Each column of Y is a duration-weighted average of
	the overlapping columns of X.
	2010-04-14 Dan Ellis dpwe@ee.columbia.edu  based on samplemx/beatavg
	-> python: TBM, 2011-11-05, TESTED
	"""
	noutcols = len(outcolpos)
	Y = np.zeros((X.shape[0], noutcols))
	# assign 'end times' to final columns
	if outcolpos.max() > incolpos.max():
		incolpos = np.concatenate([incolpos,[outcolpos.max()]])
		X = np.concatenate([X, X[:,-1].reshape(X.shape[0],1)], axis=1)
	outcolpos = np.concatenate([outcolpos, [outcolpos[-1]]])
	# durations (default weights) of input columns)
	incoldurs = np.concatenate([np.diff(incolpos), [1]])

	for c in range(noutcols):
		firstincol = np.where(incolpos <= outcolpos[c])[0][-1]
		firstincolnext = np.where(incolpos < outcolpos[c+1])[0][-1]
		lastincol = max(firstincol,firstincolnext)
		# default weights
		wts = copy.deepcopy(incoldurs[firstincol:lastincol+1])
		# now fix up by partial overlap at ends
		if len(wts) > 1:
			wts[0] = wts[0] - (outcolpos[c] - incolpos[firstincol])
			wts[-1] = wts[-1] - (incolpos[lastincol+1] - outcolpos[c+1])
		wts = wts * 1. /sum(wts)
		Y[:,c] = np.dot(X[:,firstincol:lastincol+1], wts)
	# done
	return Y


def chroma_to_tonnetz(C):
	"""Transforms chromagram to Tonnetz (Harte, Sandler, 2006)."""
	N = C.shape[0]
	T = np.zeros((N, 6))

	r1 = 1		# Fifths
	r2 = 1		# Minor
	r3 = 0.5	# Major

	# Generate Transformation matrix
	phi = np.zeros((6, 12))
	for i in range(6):
		for j in range(12):
			if i % 2 == 0:
				fun = np.sin
			else:
				fun = np.cos

			if i < 2:
				phi[i, j] = r1 * fun(j * 7 * np.pi / 6.)
			elif i >= 2 and i < 4:
				phi[i, j] = r2 * fun(j * 3 * np.pi / 2.)
			else:
				phi[i, j] = r3 * fun(j * 2 * np.pi / 3.)

	# Do the transform to tonnetz
	for i in range(N):
		for d in range(6):
			denom = float(C[i, :].sum())
			if denom == 0:
				T[i, d] = 0
			else:
				T[i, d] = 1 / denom * (phi[d, :] * C[i, :]).sum()

	return T


def most_frequent(x):
	"""Returns the most frequent value in x."""
	return np.argmax(np.bincount(x))


def pick_peaks(nc, L=16, plot=False):
	"""Obtain peaks from a novelty curve using an adaptive threshold."""
	offset = nc.mean() / 3
	th = filters.median_filter(nc, size=L) + offset
	peaks = []
	for i in xrange(1, nc.shape[0] - 1):
		# is it a peak?
		if nc[i - 1] < nc[i] and nc[i] > nc[i + 1]:
			# is it above the threshold?
			if nc[i] > th[i]:
				peaks.append(i)
	if plot:
		plt.plot(nc)
		plt.plot(th)
		for peak in peaks:
			plt.axvline(peak, color="m")
		plt.show()
	return peaks


def recurrence_matrix(data, k=None, width=1, metric='sqeuclidean', sym=False):
	'''
	Note: Copied from librosa

	Compute the binary recurrence matrix from a time-series.

	``rec[i,j] == True`` <=> (``data[:,i]``, ``data[:,j]``) are
	k-nearest-neighbors and ``|i-j| >= width``

	:usage:
		>>> mfcc	= librosa.feature.mfcc(y=y, sr=sr)
		>>> R		= librosa.segment.recurrence_matrix(mfcc)

		>>> # Or fix the number of nearest neighbors to 5
		>>> R		= librosa.segment.recurrence_matrix(mfcc, k=5)

		>>> # Suppress neighbors within +- 7 samples
		>>> R		= librosa.segment.recurrence_matrix(mfcc, width=7)

		>>> # Use cosine similarity instead of Euclidean distance
		>>> R		= librosa.segment.recurrence_matrix(mfcc, metric='cosine')

		>>> # Require mutual nearest neighbors
		>>> R		= librosa.segment.recurrence_matrix(mfcc, sym=True)

	:parameters:
	  - data : np.ndarray
		  feature matrix (d-by-t)

	  - k : int > 0 or None
		  the number of nearest-neighbors for each sample

		  Default: ``k = 2 * ceil(sqrt(t - 2 * width + 1))``,
		  or ``k = 2`` if ``t <= 2 * width + 1``

	  - width : int > 0
		  only link neighbors ``(data[:, i], data[:, j])``
		  if ``|i-j| >= width``

	  - metric : str
		  Distance metric to use for nearest-neighbor calculation.

		  See ``scipy.spatial.distance.cdist()`` for details.

	  - sym : bool
		  set ``sym=True`` to only link mutual nearest-neighbors

	:returns:
	  - rec : np.ndarray, shape=(t,t), dtype=bool
		  Binary recurrence matrix
	'''

	t = data.shape[1]

	if k is None:
		if t > 2 * width + 1:
			k = 2 * np.ceil(np.sqrt(t - 2 * width + 1))
		else:
			k = 2

	k = int(k)

	def _band_infinite():
		'''Suppress the diagonal+- of a distance matrix'''

		band = np.empty((t, t))
		band.fill(np.inf)
		band[np.triu_indices_from(band, width)] = 0
		band[np.tril_indices_from(band, -width)] = 0

		return band

	# Build the distance matrix
	D = scipy.spatial.distance.cdist(data.T, data.T, metric=metric)

	# Max out the diagonal band
	D = D + _band_infinite()

	# build the recurrence plot
	rec = np.zeros((t, t), dtype=bool)

	# get the k nearest neighbors for each point
	for i in range(t):
		for j in np.argsort(D[i])[:k]:
			rec[i, j] = True

	# symmetrize
	if sym:
		rec = rec * rec.T

	return rec

def lp(signal, fc=0.34, axis=-1):
	'''Low pass filter function
	signal: Raw signal to be smoothed.
	fc: Cutoff frequency of the butterworth filter. Normalized from 0 to 1, where 1 is the Nyquist frequency.
	axis: The axis of x to which the filter is applied. Default is -1.'''
	bCoeffs, aCoeffs = butter(2, fc)
	lp_smoothed_signal = filtfilt(bCoeffs, aCoeffs, signal, axis)
	return lp_smoothed_signal


def hp(signal, fc=0.34, axis=-1):
	'''Low pass filter function
	signal: Raw signal to be smoothed.
	fc: Cutoff frequency of the butterworth filter.
	axis: The axis of x to which the filter is applied. Default is -1.'''
	bCoeffs, aCoeffs = butter(2, fc, 'highpass')
	hp_smoothed_signal = filtfilt(bCoeffs, aCoeffs, signal, axis)
	return hp_smoothed_signal


def getMean(feature, winlen, stepsize):
	means = []
	steps = int((feature.shape[0] - winlen + stepsize) / stepsize)
	for i in xrange(steps):
		means.append(np.mean(feature[i*stepsize:(i*stepsize+winlen), :], axis=0))
	return np.array(means)


def getStd(feature, winlen, stepsize):
	std = []
	steps = int((feature.shape[0] - winlen + stepsize) / stepsize)
	for i in xrange(steps):
		std.append(np.std(feature[i*stepsize:(i*stepsize+winlen), :], axis=0))
	return np.array(std)


def getDelta(feature):
	delta_feature = np.vstack((np.zeros((1, feature.shape[1])), np.diff(feature, axis=0)))
	return delta_feature


def getSSM(feature_array, metric='cosine', norm='exp', reduce=False):
	'''Compute SSM given input feature array. 
	args: norm: ['simple', 'remove_noise']
	'''
	dm = pairwise_distances(feature_array, metric=metric)
	dm = np.nan_to_num(dm)
	if norm == 'simple':
		ssm = 1 - (dm - np.min(dm)) / (np.max(dm) - np.min(dm))
	if norm == 'exp': # Use with cosine metric only
		ssm = 1 - np.exp(dm - 1)
	if reduce:
		ssm = reduceSSM(ssm)
	return ssm


def enhanceSSM(ssm, fc=0.34, med_size=(5,5), max_size=(5,5), min_size=(5,5), filter_type='min', axis=-1):
	'''A series of filtering for SSM enhancement
	fc: cutoff frequency for LP filtering.
	med_size: Median filter window size.
			  int or tuple. If using an integer for a 2d input, axis must be specified.
	filter_type: Select either to use maximum filter or minimum filter according to the distance metric with which the SSM was computed.
			float ['min', 'max', None]
	max_size: Maximum filter window size.
			  int or tuple. If using an integer for a 2d input, axis must be specified.
			  Use this when homogeneity in the SSM is expressed by LARGE value.
	min_size: Mininum filter window size.
			  int or tuple. If using an integer for a 2d input, axis must be specified.
			  Use this when homogeneity in the SSM is expressed by SMALL value. 
			  (eg. When cosine metric and exp normalization and used for distance computation.)'''

	ssm_lp = lp(ssm, fc=fc)
	
	# Use scipy.ndimage.filters.median_filter instead
	ssm_med = med_filter(ssm_lp, size=med_size)
	
	if filter_type == 'min':
		enhanced_ssm = minimum_filter(ssm_med, size=min_size)
	elif filter_type == 'max':
		enhanced_ssm = maximum_filter(ssm_med, size=max_size)	
	else:
		enhanced_ssm = ssm_med
	return enhanced_ssm


def reduceSSM(ssm, maxfilter_size = 2, remove_size=50):
	'''Adaptive thresholding using OTSU method
	Required package: skimage (0.10+) -- NOT installed on ignis, do NOT call this function!'''
	
	from skimage.morphology import disk
	# from skimage.filters import threshold_otsu, rank #skimage 0.12
	from skimage.filter.rank import otsu #skimage 0.10
	from skimage.filter import threshold_otsu

	reduced_ssm = copy(ssm)
	reduced_ssm[reduced_ssm<0.75] = 0
	# # reduced_ssm = maximum_filter(reduced_ssm,size=maxfilter_size)
	# # reduced_ssm = morphology.remove_small_objects(reduced_ssm.astype(bool), min_size=remove_size)
	local_otsu = otsu(reduced_ssm, disk(5))
	local_otsu = (local_otsu.astype(float) - np.min(local_otsu)) / (np.max(local_otsu) - np.min(local_otsu))
	reduced_ssm = reduced_ssm - 0.6*local_otsu
	return reduced_ssm	


def upSample(feature_array, step):
	'''Resample downsized tempogram features, tempoWindo should be in accordance with input features'''
	# print feature_array.shape
	sampleRate = 44100
	stepSize = 1024.0
	# step = np.ceil(sampleRate/stepSize/5.0)
	feature_array = zoom(feature_array, (step,1))
	# print 'resampled', feature_array.shape	
	return feature_array


def normaliseFeature(feature_array):
	'''Normalise features column wisely. Ensure numerical stability by adding a small constant.'''
	feature_array[np.isnan(feature_array)] = 0.0
	feature_array[np.isinf(feature_array)] = 0.0	
	feature_array = (feature_array - np.min(feature_array, axis=-1)[:,np.newaxis]) / (np.max(feature_array, axis=-1) - np.min(feature_array, axis=-1) + 0.005)[:,np.newaxis]
	feature_array[np.isnan(feature_array)] = 0.0
	feature_array[np.isinf(feature_array)] = 0.0

	return feature_array		

def normaliseArray(X):
	'''Normalise 1d array.'''
	if np.max(X) == np.min(X):
		return None
	return (X - np.min(X)) / (np.max(X) - np.min(X))
	
def pairwiseSKL(self, gmm_list):
	'''Compute pairwise symmetrised KL divergence of a list of GMMs.'''
	n_GMMs = len(gmm_list)
	distance_matrix = np.zeros((n_GMMs, n_GMMs))
	for i in xrange(n_GMMs):
		for j in xrange(i, n_GMMs):
			distance_matrix[i][j] = gmm_list[i].skl_distance_full(gmm_list[j])
			distance_matrix[j][i] = distance_matrix[i][j]

	np.fill_diagonal(distance_matrix, 0.0)
	distance_matrix[np.isinf(distance_matrix)] = np.finfo(np.float64).max
	
	return distance_matrix

def getRolloff(data, tpower, filterbank, thresh=0.9):
	nFrames = data.shape[0]
	nFilters = len(filterbank)
	rolloff = np.zeros(nFrames)
	for i in xrange(nFrames):
		rolloffE = thresh * tpower[i]
		temp = 0.0
		tempE = 0.0
		for band in xrange(nFilters):
			temp += data[i][band]
			if temp > rolloffE: break
		rolloff[i] = filterbank[nFilters-band-1]
	
	return rolloff


def verifyPeaks(peak_canditates, dev_list):
	'''Verify peaks from the 1st round detection by applying adaptive thresholding to the deviation list.'''
	
	final_peaks = copy(peak_canditates)
	dev_list = np.array([np.mean(x) for x in dev_list]) # get average of devs of different features
	med_dev = median_filter(dev_list, size=5)
	# print dev_list, np.min(dev_list), np.median(dev_list), np.mean(dev_list), np.std(dev_list)
	dev = dev_list - np.percentile(dev_list, 50)
	# print dev
	for i, x in enumerate(dev):
		if x < 0:
			final_peaks.remove(peak_canditates[i])
	return final_peaks


def envelopeFollower(xc, AT, RT, prevG, scaler=1):
	'''Follows the amplitude envelope of input signal xc.'''
	
	g = np.zeros_like(xc)
	length = len(xc)
	
	for i in xrange(length):
		xSquared = xc[i] ** 2
		# if input is less than the previous output use attack, otherwise use the release
		if xSquared < prevG:
			coeff = AT
		else:
			coeff = RT
		g[i] = (xSquared - prevG)*coeff + prevG
		g[i] *= scaler
		prevG = g[i]
		
	return g

	
def getEnvPeaks(sig, sig_env, size=1):
	'''Finds peaks in the signal envelope.
	args: sig (1d array): orignal input signal
		  sig_env (list): position of the signal envelope. 
		  size: ranges to locate local maxima in the envelope as peaks. 
	'''
	envelope = sig[sig_env]
	peaks = []
	if len(envelope) > 1 and envelope[0] > envelope[1]:
		peaks.append(sig_env[0])
	for i in xrange(size, len(envelope)-size-1):
		if envelope[i] > np.max(envelope[i-size:i]) and envelope[i] > np.max(envelope[i+1:i+size+1]):
			peaks.append(sig_env[i])
	return peaks


def deltaFeature(self, feature_array, step=1, axis=-1):
	'''Return delta of a feature array'''
	delta = np.zeros_like(feature_array)
	delta[:, step:] = np.diff(feature_array, axis=axis)
	return delta
	

def plotCurve(self, yp, yr, yf, x, labels):
	'''Plot performance curve.
	x axis: distance threshold for feature selection; y axis: f measure'''

	f = plt.figure()
	ax = f.add_axes([0.1, 0.1, 0.7, 0.7])
	l1, l2, l3 = ax.plot(x, yp, 'rs-', x, yr, 'go-', x, yf, 'k^-')
	f.legend((l1, l2, l3), ('Precision', 'Recall', 'F-measure'), 'upper left')
	for i, label in enumerate(labels):
		ax.annotate(label, (x[i], yf[i]))
	plt.show()
	plt.savefig('performance.pdf', format='pdf')

	return None