Mercurial > hg > segmentation
comparison utils/SegProperties.py @ 0:26838b1f560f
initial commit of a segmenter project
author | mi tian |
---|---|
date | Thu, 02 Apr 2015 18:09:27 +0100 |
parents | |
children | c11ea9e0357f |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:26838b1f560f |
---|---|
1 #!/usr/bin/env python | |
2 # encoding: utf-8 | |
3 """ | |
4 SegProperties.py | |
5 | |
6 Created by mi tian on 2015-04-02. | |
7 Copyright (c) 2015 __MyCompanyName__. All rights reserved. | |
8 """ | |
9 | |
10 import sys | |
11 import os | |
12 | |
13 class FeatureGMM(object): | |
14 '''Represent segment candidates using single GMMs and compute pairwise distances.''' | |
15 def getGaussianParams(self, length, featureRate, timeWindow): | |
16 | |
17 win_len = round(timeWindow * featureRate) | |
18 win_len = win_len + (win_len % 2) - 1 | |
19 | |
20 # a 50% overlap between windows | |
21 stepsize = ceil(win_len * 0.5) | |
22 num_win = int(floor( (length) / stepsize)) | |
23 gaussian_rate = featureRate / stepsize | |
24 | |
25 return stepsize, num_win, win_len, gaussian_rate | |
26 | |
27 def GaussianDistance(self, feature, featureRate, timeWindow): | |
28 | |
29 stepsize, num_win, win_len, gr = self.getGaussianParams(feature.shape[0], featureRate, timeWindow) | |
30 print 'stepsize, num_win, feature', stepsize, num_win, feature.shape, featureRate, timeWindow | |
31 gaussian_list = [] | |
32 gaussian_timestamps = [] | |
33 tsi = 0 | |
34 | |
35 # f = open('/Users/mitian/Documents/experiments/features.txt','w') | |
36 # print 'divergence computing..' | |
37 for num in xrange(num_win): | |
38 # print num, num * stepsize , (num * stepsize) + win_len | |
39 gf=GaussianFeature(feature[int(num * stepsize) : int((num * stepsize) + win_len), :],2) | |
40 # f.write("\n%s" %str(gf)) | |
41 gaussian_list.append(gf) | |
42 tsi = int(floor( num * stepsize + 1)) | |
43 gaussian_timestamps.append(self.timestamp[tsi]) | |
44 | |
45 # f.close() | |
46 | |
47 # print 'gaussian_list', len(gaussian_list), len(gaussian_timestamps) | |
48 dm = np.zeros((len(gaussian_list), len(gaussian_list))) | |
49 | |
50 for v1, v2 in combinations(gaussian_list, 2): | |
51 i, j = gaussian_list.index(v1), gaussian_list.index(v2) | |
52 dm[i, j] = v1.distance(v2) | |
53 dm[j, i] = v2.distance(v1) | |
54 # print 'dm[i,j]',dm[i,j] | |
55 # sio.savemat("/Users/mitian/Documents/experiments/dm-from-segmenter.mat",{"dm":dm}) | |
56 return dm, gaussian_timestamps | |
57 | |
58 def getGMMs(self, feature, segment_boundaries): | |
59 '''Return GMMs for located segments''' | |
60 gmm_list = [] | |
61 gmm_list.append(GmmDistance(feature[: segment_boundaries[0], :], components = 1)) | |
62 for i in xrange(1, len(segment_boundaries)): | |
63 gmm_list.append(GmmDistance(feature[segment_boundaries[i-1] : segment_boundaries[i], :], components = 1)) | |
64 return gmm_list | |
65 | |
66 | |
67 class FusedPeakSelection(object): | |
68 '''Peak selection from fusion of individual results.''' | |
69 def getFusedPeaks(self, combined_thresh, individual_thresh, individual_tol, combined_tol, w1=None, w2=None, w3=None, w4=None): | |
70 '''Return a list a peak position and the corresponding confidence.''' | |
71 confidence_array = np.zeros_like(w1) | |
72 conf1 = np.zeros_like(w1) | |
73 len_arr = len(w1) | |
74 | |
75 # keep peaks retrieved by single feature if its confidence is above individual_thresh | |
76 w1_keep = np.where(w1>=individual_thresh)[0] | |
77 w2_keep = np.where(w2>=individual_thresh)[0] | |
78 w3_keep = np.where(w3>=individual_thresh)[0] | |
79 w4_keep = np.where(w4>=individual_thresh)[0] | |
80 confidence_array[w1_keep] += w1[w1_keep] | |
81 confidence_array[w2_keep] += w2[w2_keep] | |
82 confidence_array[w3_keep] += w3[w3_keep] | |
83 confidence_array[w4_keep] += w4[w4_keep] | |
84 | |
85 confidence_array[confidence_array>1] = 1 | |
86 | |
87 # deal with peaks picked individual features with high confidence first | |
88 i=0 | |
89 while i < len_arr: | |
90 if confidence_array[i] > 0: | |
91 temp = [confidence_array[i]] | |
92 pos = [i] | |
93 i += 1 | |
94 | |
95 # start searching neighborhood for local maximum | |
96 while (i+individual_tol < len_arr and np.max(confidence_array[i:i+individual_tol]) > 0): | |
97 temp += [confidence_array[i+delta] for delta in xrange(individual_tol) if confidence_array[i+delta]>0] | |
98 pos += [i+delta for delta in xrange(individual_tol) if confidence_array[i+delta]>0] | |
99 i += individual_tol | |
100 | |
101 if len(temp) == 1: | |
102 conf1[pos[0]] = temp[0] | |
103 else: | |
104 # p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp))) | |
105 # conf1[p] = 1 | |
106 p = int(np.mean(pos)) | |
107 conf1[p] = np.mean(temp) | |
108 else: | |
109 i += 1 | |
110 conf1[conf1>1] = 1 | |
111 | |
112 # Process peaks with low confidence but located by multiple features in the same neighborhood | |
113 # conf2 = copy(conf1) | |
114 conf2 = np.zeros_like(conf1) | |
115 weight1, weight2, weight3, weight4 = copy(w1), copy(w2), copy(w3), copy(w4) | |
116 weight1[weight1>individual_thresh] = 0.0 | |
117 weight2[weight2>individual_thresh] = 0.0 | |
118 weight3[weight3>individual_thresh] = 0.0 | |
119 weight4[weight4>individual_thresh] = 0.0 | |
120 combined = weight1 + weight2 + weight3 + weight4 | |
121 combined = (combined - np.min(combined)) / (np.max(combined) - np.min(combined)) | |
122 if combined[0]>0.3: combined[0] = 0.8 | |
123 | |
124 i = 0 | |
125 while i < len_arr: | |
126 if combined[i] > 0: | |
127 temp = [combined[i]] | |
128 pos = [i] | |
129 i += 1 | |
130 | |
131 # start searching neighborhood for local maximum | |
132 while (i+combined_tol < len_arr and np.max(combined[i:i+combined_tol]) > 0): | |
133 temp += [combined[i+delta] for delta in xrange(combined_tol) if combined[i+delta]>0] | |
134 pos += [i+delta for delta in xrange(combined_tol) if combined[i+delta]>0] | |
135 i += combined_tol | |
136 | |
137 if len(temp) == 1: | |
138 conf2[pos[0]] += temp[0] | |
139 else: | |
140 p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp))) | |
141 conf2[p] += np.sum(np.multiply(pos,temp)) / p | |
142 else: | |
143 i += 1 | |
144 | |
145 conf2[conf2<combined_thresh] = 0 | |
146 conf2[conf2>1] = 1 | |
147 | |
148 combined_conf = conf1 + conf2 | |
149 combined_conf[combined_conf>1] = 1 | |
150 conf = np.zeros_like(combined_conf) | |
151 # Combine selections from the obove two steps | |
152 i=0 | |
153 while i < len_arr: | |
154 if combined_conf[i] > 0.3: | |
155 temp = [combined_conf[i]] | |
156 pos = [i] | |
157 i += 1 | |
158 | |
159 # start searching neighborhood for local maximum | |
160 while (i+individual_tol < len_arr and np.max(combined_conf[i:i+individual_tol]) > 0.5): | |
161 temp += [combined_conf[i+delta] for delta in xrange(individual_tol) if combined_conf[i+delta]>0.5] | |
162 pos += [i+delta for delta in xrange(individual_tol) if combined_conf[i+delta]>0.5] | |
163 i += individual_tol | |
164 | |
165 if len(temp) == 1: | |
166 conf[pos[0]] = combined_conf[pos[0]] | |
167 elif (np.max(temp)== 1 and np.sort(temp)[-2] < combined_thresh): | |
168 p = pos[np.argmax(temp)] | |
169 conf[p] = np.max(temp) | |
170 else: | |
171 p = int(np.rint(np.sum(np.multiply(pos,temp))/ np.sum(temp))) | |
172 conf[p] = np.mean(np.multiply(pos,temp)) / p | |
173 else: | |
174 i += 1 | |
175 | |
176 peaks = list(np.where(conf>combined_thresh)[0]) | |
177 return peaks, conf1, conf2, conf | |
178 | |
179 def getPeakWeights(self, sdf, peak_list): | |
180 '''Compute peak confidence. | |
181 Return: array with confidence values at peak positions and zeros otherwise''' | |
182 mask = np.zeros_like(sdf) | |
183 mask[peak_list] = 1.0 | |
184 return sdf * mask | |
185 | |
186 def selectPeak(self, peak_candidates, featureset, winlen=5): | |
187 dist_list = [] | |
188 feature_types = len(featureset) | |
189 gt_dist, hm_dist, tb_dist, tp_dist = [], [], [], [] | |
190 | |
191 for idx, x in enumerate(peak_candidates): | |
192 prev_features = tuple([featureset[i][x-winlen:x, :] for i in xrange(feature_types)]) | |
193 post_features = tuple([featureset[i][x:x+winlen, :] for i in xrange(feature_types)]) | |
194 gt_dist.append(np.sum(pairwise_distances(prev_features[0], post_features[0]))) | |
195 hm_dist.append(np.sum(pairwise_distances(prev_features[1], post_features[1]))) | |
196 tb_dist.append(np.sum(pairwise_distances(prev_features[2], post_features[2]))) | |
197 tp_dist.append(np.sum(pairwise_distances(prev_features[3], post_features[3]))) | |
198 | |
199 return peak_candidates[np.argmax(gt_dist)], peak_candidates[np.argmax(hm_dist)], peak_candidates[np.argmax(tb_dist)], peak_candidates[np.argmax(tp_dist)] | |
200 | |
201 def getPeakFeatures(self, peak_candidates, featureset, winlen): | |
202 ''' | |
203 args: winlen: length of feature window before and after an investigated peak | |
204 featureset: A list of audio features for measuring the dissimilarity. | |
205 | |
206 return: peak_features | |
207 A list of tuples of features for windows before and after each peak. | |
208 ''' | |
209 prev_features = [] | |
210 post_features = [] | |
211 feature_types = len(featureset) | |
212 | |
213 # print peak_candidates[-1], winlen, featureset[0].shape | |
214 # if peak_candidates[-1] + winlen > featureset[0].shape[0]: | |
215 # peak_candidates = peak_candidates[:-1] | |
216 # for x in peak_candidates: | |
217 # prev_features.append(tuple([featureset[i][x-winlen:x, :] for i in xrange(feature_types)])) | |
218 # post_features.append(tuple([featureset[i][x:x+winlen, :] for i in xrange(feature_types)])) | |
219 prev_features.append(tuple([featureset[i][:peak_candidates[0], :] for i in xrange(feature_types)])) | |
220 post_features.append(tuple([featureset[i][peak_candidates[0]:peak_candidates[1], :] for i in xrange(feature_types)])) | |
221 for idx in xrange(1, len(peak_candidates)-1): | |
222 prev_features.append(tuple([featureset[i][peak_candidates[idx-1]:peak_candidates[idx], :] for i in xrange(feature_types)])) | |
223 post_features.append(tuple([featureset[i][peak_candidates[idx]:peak_candidates[idx+1], :] for i in xrange(feature_types)])) | |
224 prev_features.append(tuple([featureset[i][peak_candidates[-2]:peak_candidates[-1], :] for i in xrange(feature_types)])) | |
225 post_features.append(tuple([featureset[i][peak_candidates[-1]:, :] for i in xrange(feature_types)])) | |
226 return prev_features, post_features | |
227 | |
228 def segStats(self, feature_array, boundary_list): | |
229 '''Return some basic stats of features associated with two boundaries.''' | |
230 feature_stats = [] | |
231 for i in xrange(1, len(boundary_list)): | |
232 feature_stats.append(np.std(feature_array[boundary_list[i-1]:boundary_list[i]], axis=0)) | |
233 return feature_stats | |
234 | |
235 def segmentDev(self, prev_features, post_features, metric='kl'): | |
236 '''Deviations are measured for each given feature type. | |
237 peak_candidates: peaks from the 1st round detection | |
238 peak_features: Features for measuring the dissimilarity for parts before and after each peak. | |
239 dtype: tuple. | |
240 ''' | |
241 dev_list = [] | |
242 n_peaks = len(prev_features) | |
243 n_features = len(prev_features[0]) | |
244 # print 'n_peaks, n_features', n_peaks, n_features | |
245 if metric == 'kl': | |
246 for x in xrange(n_peaks): | |
247 f1, f2 = prev_features[x], post_features[x] | |
248 dev_list.append(tuple([GmmDistance(f1[i], components=1).skl_distance_full(GmmDistance(f2[i], components=1)) for i in xrange(n_features)])) | |
249 elif metric == 'euclidean': | |
250 for x in xrange(n_peaks): | |
251 f1, f2 = prev_features[x], post_features[x] | |
252 dev_list.append(tuple([pairwise_distances(f1[i], f2[i]) for i in xrange(n_features)])) | |
253 return dev_list | |
254 | |
255 def main(): | |
256 pass | |
257 | |
258 | |
259 if __name__ == '__main__': | |
260 main() | |
261 |