Mercurial > hg > segmentation
view utils/xmeans.py @ 0:26838b1f560f
initial commit of a segmenter project
author | mi tian |
---|---|
date | Thu, 02 Apr 2015 18:09:27 +0100 |
parents | |
children | b4bf37f94e92 |
line wrap: on
line source
#!/usr/bin/env python """Class that implements X-means.""" import argparse import numpy as np import logging import time import pylab as plt import scipy.cluster.vq as vq from scipy.spatial import distance class XMeans: def __init__(self, X, init_K=2, plot=False): self.X = X self.init_K = init_K self.plot = plot def estimate_K_xmeans(self, th=0.2, maxK = 10): """Estimates K running X-means algorithm (Pelleg & Moore, 2000).""" # Run initial K-means means, labels = self.run_kmeans(self.X, self.init_K) # Run X-means algorithm stop = False curr_K = self.init_K while not stop: stop = True final_means = [] for k in xrange(curr_K): # Find the data that corresponds to the k-th cluster D = self.get_clustered_data(self.X, labels, k) if len(D) == 0 or D.shape[0] == 1: continue # Whiten and find whitened mean stdD = np.std(D, axis=0) #D = vq.whiten(D) D /= stdD # Same as line above mean = D.mean(axis=0) # Cluster this subspace by half (K=2) half_means, half_labels = self.run_kmeans(D, K=2) # Compute BICs bic1 = self.compute_bic(D, [mean], K=1, labels=np.zeros(D.shape[0]), R=D.shape[0]) bic2 = self.compute_bic(D, half_means, K=2, labels=half_labels, R=D.shape[0]) # Split or not max_bic = np.max([np.abs(bic1), np.abs(bic2)]) norm_bic1 = bic1 / max_bic norm_bic2 = bic2 / max_bic diff_bic = np.abs(norm_bic1 - norm_bic2) # Split! print "diff_bic", diff_bic if diff_bic > th: final_means.append(half_means[0] * stdD) final_means.append(half_means[1] * stdD) curr_K += 1 stop = False # Don't split else: final_means.append(mean * stdD) final_means = np.asarray(final_means) print "Estimated K: ", curr_K if self.plot: plt.scatter(self.X[:, 0], self.X[:, 1]) plt.scatter(final_means[:, 0], final_means[:, 1], color="y") plt.show() if curr_K >= maxK or self.X.shape[-1] != final_means.shape[-1]: stop = True else: labels, dist = vq.vq(self.X, final_means) return curr_K def estimate_K_knee(self, th=.015, maxK=12): """Estimates the K using K-means and BIC, by sweeping various K and choosing the optimal BIC.""" # Sweep K-means if self.X.shape[0] < maxK: maxK = self.X.shape[0] if maxK < 2: maxK = 2 K = np.arange(1, maxK) bics = [] for k in K: means, labels = self.run_kmeans(self.X, k) bic = self.compute_bic(self.X, means, labels, K=k, R=self.X.shape[0]) bics.append(bic) diff_bics = np.diff(bics) finalK = K[-1] if len(bics) == 1: finalK = 2 else: # Normalize bics = np.asarray(bics) bics -= bics.min() #bics /= bics.max() diff_bics -= diff_bics.min() #diff_bics /= diff_bics.max() #print bics, diff_bics # Find optimum K for i in xrange(len(K[:-1])): #if bics[i] > diff_bics[i]: if diff_bics[i] < th and K[i] != 1: finalK = K[i] break logging.info("Estimated Unique Number of Segments: %d" % finalK) if self.plot: plt.subplot(2, 1, 1) plt.plot(K, bics, label="BIC") plt.plot(K[:-1], diff_bics, label="BIC diff") plt.legend(loc=2) plt.subplot(2, 1, 2) plt.scatter(self.X[:, 0], self.X[:, 1]) plt.show() return finalK def get_clustered_data(self, X, labels, label_index): """Returns the data with a specific label_index, using the previously learned labels.""" D = X[np.argwhere(labels == label_index)] return D.reshape((D.shape[0], D.shape[-1])) def run_kmeans(self, X, K): """Runs k-means and returns the labels assigned to the data.""" wX = vq.whiten(X) means, dist = vq.kmeans(wX, K, iter=100) labels, dist = vq.vq(wX, means) return means, labels def compute_bic(self, D, means, labels, K, R): """Computes the Bayesian Information Criterion.""" D = vq.whiten(D) Rn = D.shape[0] M = D.shape[1] if R == K: return 1 # Maximum likelihood estimate (MLE) mle_var = 0 for k in xrange(len(means)): X = D[np.argwhere(labels == k)] X = X.reshape((X.shape[0], X.shape[-1])) for x in X: mle_var += distance.euclidean(x, means[k]) #print x, means[k], mle_var mle_var /= (float(R - K)) # Log-likelihood of the data l_D = - Rn/2. * np.log(2*np.pi) - (Rn * M)/2. * np.log(mle_var) - \ (Rn - K) / 2. + Rn * np.log(Rn) - Rn * np.log(R) # Params of BIC p = (K-1) + M * K + mle_var #print "BIC:", l_D, p, R, K # Return the bic return l_D - p/2. * np.log(R) @classmethod def generate_2d_data(self, N=100, K=5): """Generates N*K 2D data points with K means and N data points for each mean.""" # Seed the random np.random.seed(seed=int(time.time())) # Amount of spread of the centroids spread = 30 # Generate random data X = np.empty((0, 2)) for i in xrange(K): mean = np.array([np.random.random()*spread, np.random.random()*spread]) x = np.random.normal(0.0, scale=1.0, size=(N, 2)) + mean X = np.append(X, x, axis=0) return X def test_kmeans(K=5): """Test k-means with the synthetic data.""" X = XMeans.generate_2d_data(K=4) wX = vq.whiten(X) dic, dist = vq.kmeans(wX, K, iter=100) plt.scatter(wX[:, 0], wX[:, 1]) plt.scatter(dic[:, 0], dic[:, 1], color="m") plt.show() def main(args): #test_kmeans(6) X = XMeans.generate_2d_data(K=args.k) xmeans = XMeans(X, init_K=2, plot=args.plot) est_K = xmeans.estimate_K_xmeans() est_K_knee = xmeans.estimate_K_knee() print "Estimated x-means K:", est_K print "Estimated Knee Point Detection K:", est_K_knee if __name__ == '__main__': parser = argparse.ArgumentParser( description="Runs x-means") parser.add_argument("k", metavar="k", type=int, help="Number of clusters to estimate.") parser.add_argument("-p", action="store_true", default=False, dest="plot", help="Plot the results") main(parser.parse_args())