view utils/SegProperties.py @ 0:26838b1f560f

initial commit of a segmenter project
author mi tian
date Thu, 02 Apr 2015 18:09:27 +0100
parents
children c11ea9e0357f
line wrap: on
line source
#!/usr/bin/env python
# encoding: utf-8
"""
SegProperties.py

Created by mi tian on 2015-04-02.
Copyright (c) 2015 __MyCompanyName__. All rights reserved.
"""

import sys
import os

class FeatureGMM(object):
	'''Represent segment candidates using single GMMs and compute pairwise distances.'''
	def getGaussianParams(self, length, featureRate, timeWindow):

		win_len = round(timeWindow * featureRate)
		win_len = win_len + (win_len % 2) - 1

		# a 50% overlap between windows
		stepsize = ceil(win_len * 0.5)
		num_win = int(floor( (length) / stepsize))
		gaussian_rate = featureRate / stepsize

		return stepsize, num_win, win_len, gaussian_rate

	def GaussianDistance(self, feature, featureRate, timeWindow):

		stepsize, num_win, win_len, gr = self.getGaussianParams(feature.shape[0], featureRate, timeWindow)
		print 'stepsize, num_win, feature', stepsize, num_win, feature.shape, featureRate, timeWindow
		gaussian_list = []
		gaussian_timestamps = []
		tsi = 0

		# f = open('/Users/mitian/Documents/experiments/features.txt','w')
		# print 'divergence computing..'
		for num in xrange(num_win): 
			# print num, num * stepsize , (num * stepsize) + win_len
			gf=GaussianFeature(feature[int(num * stepsize) : int((num * stepsize) + win_len), :],2)
			# f.write("\n%s" %str(gf))
			gaussian_list.append(gf)
			tsi = int(floor( num * stepsize + 1)) 
			gaussian_timestamps.append(self.timestamp[tsi])

		# f.close()

		# print 'gaussian_list', len(gaussian_list), len(gaussian_timestamps)
		dm = np.zeros((len(gaussian_list), len(gaussian_list)))

		for v1, v2 in combinations(gaussian_list, 2): 
			i, j = gaussian_list.index(v1), gaussian_list.index(v2)
			dm[i, j] = v1.distance(v2)
			dm[j, i] = v2.distance(v1)
			# print 'dm[i,j]',dm[i,j]
		# sio.savemat("/Users/mitian/Documents/experiments/dm-from-segmenter.mat",{"dm":dm})
		return dm, gaussian_timestamps

	def getGMMs(self, feature, segment_boundaries):
		'''Return GMMs for located segments'''
		gmm_list = []
		gmm_list.append(GmmDistance(feature[: segment_boundaries[0], :], components = 1))
		for i in xrange(1, len(segment_boundaries)):
			gmm_list.append(GmmDistance(feature[segment_boundaries[i-1] : segment_boundaries[i], :], components = 1))
		return gmm_list


class FusedPeakSelection(object):
	'''Peak selection from fusion of individual results.'''  
	def getFusedPeaks(self, combined_thresh, individual_thresh, individual_tol, combined_tol, w1=None, w2=None, w3=None, w4=None):
		'''Return a list a peak position and the corresponding confidence.'''
		confidence_array = np.zeros_like(w1)
		conf1 = np.zeros_like(w1)
		len_arr = len(w1)
		
		# keep peaks retrieved by single feature if its confidence is above individual_thresh
		w1_keep = np.where(w1>=individual_thresh)[0]
		w2_keep = np.where(w2>=individual_thresh)[0]
		w3_keep = np.where(w3>=individual_thresh)[0]
		w4_keep = np.where(w4>=individual_thresh)[0]
		confidence_array[w1_keep] += w1[w1_keep]
		confidence_array[w2_keep] += w2[w2_keep]
		confidence_array[w3_keep] += w3[w3_keep]
		confidence_array[w4_keep] += w4[w4_keep]
		
		confidence_array[confidence_array>1] = 1
		
		# deal with peaks picked individual features with high confidence first
		i=0
		while i < len_arr:
			if confidence_array[i] > 0:
				temp = [confidence_array[i]]
				pos = [i]
				i += 1
		
				# start searching neighborhood for local maximum
				while (i+individual_tol < len_arr and np.max(confidence_array[i:i+individual_tol]) > 0):
					temp += [confidence_array[i+delta] for delta in xrange(individual_tol) if confidence_array[i+delta]>0]
					pos += [i+delta for delta in xrange(individual_tol) if confidence_array[i+delta]>0]
					i += individual_tol
					
				if len(temp) == 1:
					conf1[pos[0]] = temp[0]
				else:
					# p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp)))
					# conf1[p] = 1
					p = int(np.mean(pos))
					conf1[p] = np.mean(temp)
			else:	
				i += 1
		conf1[conf1>1] = 1
		
		# Process peaks with low confidence but located by multiple features in the same neighborhood
		# conf2 = copy(conf1)
		conf2 = np.zeros_like(conf1)
		weight1, weight2, weight3, weight4 = copy(w1), copy(w2), copy(w3), copy(w4)
		weight1[weight1>individual_thresh] = 0.0
		weight2[weight2>individual_thresh] = 0.0
		weight3[weight3>individual_thresh] = 0.0
		weight4[weight4>individual_thresh] = 0.0
		combined = weight1 + weight2 + weight3 + weight4
		combined = (combined - np.min(combined)) / (np.max(combined) - np.min(combined))
		if combined[0]>0.3: combined[0] = 0.8
		
		i = 0	
		while i < len_arr:
			if combined[i] > 0:
				temp = [combined[i]]
				pos = [i]
				i += 1
				
				# start searching neighborhood for local maximum
				while (i+combined_tol < len_arr and np.max(combined[i:i+combined_tol]) > 0):
					temp += [combined[i+delta] for delta in xrange(combined_tol) if combined[i+delta]>0]
					pos += [i+delta for delta in xrange(combined_tol) if combined[i+delta]>0]
					i += combined_tol
					
				if len(temp) == 1:
					conf2[pos[0]] += temp[0]
				else:
					p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp)))
					conf2[p] += np.sum(np.multiply(pos,temp)) / p 
			else:	
				i += 1
		
		conf2[conf2<combined_thresh] = 0
		conf2[conf2>1] = 1
		
		combined_conf = conf1 + conf2	
		combined_conf[combined_conf>1] = 1
		conf = np.zeros_like(combined_conf)
		# Combine selections from the obove two steps	
		i=0
		while i < len_arr:
			if combined_conf[i] > 0.3:
				temp = [combined_conf[i]]
				pos = [i]
				i += 1
		
				# start searching neighborhood for local maximum
				while (i+individual_tol < len_arr and np.max(combined_conf[i:i+individual_tol]) > 0.5):
					temp += [combined_conf[i+delta] for delta in xrange(individual_tol) if combined_conf[i+delta]>0.5]
					pos += [i+delta for delta in xrange(individual_tol) if combined_conf[i+delta]>0.5]
					i += individual_tol
					
				if len(temp) == 1:
					conf[pos[0]] = combined_conf[pos[0]]
				elif (np.max(temp)== 1 and np.sort(temp)[-2] < combined_thresh):
					p = pos[np.argmax(temp)]
					conf[p] = np.max(temp)
				else:
					p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp)))
					conf[p] = np.mean(np.multiply(pos,temp)) / p 
			else:	
				i += 1

		peaks = list(np.where(conf>combined_thresh)[0])
		return peaks, conf1, conf2, conf
		
	def getPeakWeights(self, sdf, peak_list):
		'''Compute peak confidence.
		Return: array with confidence values at peak positions and zeros otherwise'''
		mask = np.zeros_like(sdf)
		mask[peak_list] = 1.0
		return sdf * mask
		
	def selectPeak(self, peak_candidates, featureset, winlen=5):
		dist_list = []
		feature_types = len(featureset)
		gt_dist, hm_dist, tb_dist, tp_dist = [], [], [], []
		
		for idx, x in enumerate(peak_candidates):
			prev_features = tuple([featureset[i][x-winlen:x, :] for i in xrange(feature_types)])
			post_features = tuple([featureset[i][x:x+winlen, :] for i in xrange(feature_types)])
			gt_dist.append(np.sum(pairwise_distances(prev_features[0], post_features[0])))
			hm_dist.append(np.sum(pairwise_distances(prev_features[1], post_features[1])))
			tb_dist.append(np.sum(pairwise_distances(prev_features[2], post_features[2])))
			tp_dist.append(np.sum(pairwise_distances(prev_features[3], post_features[3])))
		
		return peak_candidates[np.argmax(gt_dist)], peak_candidates[np.argmax(hm_dist)], peak_candidates[np.argmax(tb_dist)], peak_candidates[np.argmax(tp_dist)]
		
	def getPeakFeatures(self, peak_candidates, featureset, winlen):
		'''
		args: winlen: length of feature window before and after an investigated peak 
			 featureset: A list of audio features for measuring the dissimilarity.
		
		return: peak_features
				A list of tuples of features for windows before and after each peak.
		'''
		prev_features = []
		post_features = []
		feature_types = len(featureset)

		# print peak_candidates[-1], winlen, featureset[0].shape
		# if peak_candidates[-1] + winlen > featureset[0].shape[0]:
		#	peak_candidates = peak_candidates[:-1]
		# for x in peak_candidates:
		#	prev_features.append(tuple([featureset[i][x-winlen:x, :] for i in xrange(feature_types)]))
		#	post_features.append(tuple([featureset[i][x:x+winlen, :] for i in xrange(feature_types)]))
		prev_features.append(tuple([featureset[i][:peak_candidates[0], :] for i in xrange(feature_types)]))
		post_features.append(tuple([featureset[i][peak_candidates[0]:peak_candidates[1], :] for i in xrange(feature_types)]))
		for idx in xrange(1, len(peak_candidates)-1):
			prev_features.append(tuple([featureset[i][peak_candidates[idx-1]:peak_candidates[idx], :] for i in xrange(feature_types)]))
			post_features.append(tuple([featureset[i][peak_candidates[idx]:peak_candidates[idx+1], :] for i in xrange(feature_types)]))
		prev_features.append(tuple([featureset[i][peak_candidates[-2]:peak_candidates[-1], :] for i in xrange(feature_types)]))
		post_features.append(tuple([featureset[i][peak_candidates[-1]:, :] for i in xrange(feature_types)]))
		return prev_features, post_features
	
	def segStats(self, feature_array, boundary_list):
		'''Return some basic stats of features associated with two boundaries.'''
		feature_stats = []
		for i in xrange(1, len(boundary_list)):
			feature_stats.append(np.std(feature_array[boundary_list[i-1]:boundary_list[i]], axis=0))
		return feature_stats
			
	def segmentDev(self, prev_features, post_features, metric='kl'):
		'''Deviations are measured for each given feature type. 
		peak_candidates: peaks from the 1st round detection
		peak_features: Features for measuring the dissimilarity for parts before and after each peak.
					dtype: tuple. 
		'''
		dev_list = []
		n_peaks = len(prev_features)
		n_features = len(prev_features[0])
		# print 'n_peaks, n_features', n_peaks, n_features
		if metric == 'kl':
			for x in xrange(n_peaks):
				f1, f2 = prev_features[x], post_features[x]
				dev_list.append(tuple([GmmDistance(f1[i], components=1).skl_distance_full(GmmDistance(f2[i], components=1)) for i in xrange(n_features)]))
		elif metric == 'euclidean':
			for x in xrange(n_peaks):
				f1, f2 = prev_features[x], post_features[x]
				dev_list.append(tuple([pairwise_distances(f1[i], f2[i]) for i in xrange(n_features)]))
		return dev_list
	
def main():
	pass


if __name__ == '__main__':
	main()