view utils/xmeans.py @ 19:890cfe424f4a tip

added annotations
author mitian
date Fri, 11 Dec 2015 09:47:40 +0000
parents b4bf37f94e92
children
line wrap: on
line source
#!/usr/bin/env python
"""Class that implements X-means."""

import sys, os
import argparse
import numpy as np
import logging
import time
import pylab as plt
import scipy.cluster.vq as vq
from scipy.spatial import distance


class XMeans(object):
	def __init__(self, X, init_K=2, plot=False):
		self.X = X
		self.init_K = init_K
		self.plot = plot

	def estimate_K_xmeans(self, th=0.2, maxK = 10):
		"""Estimates K running X-means algorithm (Pelleg & Moore, 2000)."""

		# Run initial K-means
		means, labels = self.run_kmeans(self.X, self.init_K)

		# Run X-means algorithm
		stop = False
		curr_K = self.init_K
		while not stop:
			stop = True
			final_means = []
			for k in xrange(curr_K):
				# Find the data that corresponds to the k-th cluster
				D = self.get_clustered_data(self.X, labels, k)
				if len(D) == 0 or D.shape[0] == 1:
					continue

				# Whiten and find whitened mean
				stdD = np.std(D, axis=0)
				#D = vq.whiten(D)
				D /= stdD  # Same as line above
				mean = D.mean(axis=0)

				# Cluster this subspace by half (K=2)
				half_means, half_labels = self.run_kmeans(D, K=2)

				# Compute BICs
				bic1 = self.compute_bic(D, [mean], K=1,
										labels=np.zeros(D.shape[0]),
										R=D.shape[0])
				bic2 = self.compute_bic(D, half_means, K=2,
										labels=half_labels, R=D.shape[0])

				# Split or not
				max_bic = np.max([np.abs(bic1), np.abs(bic2)])
				norm_bic1 = bic1 / max_bic
				norm_bic2 = bic2 / max_bic
				diff_bic = np.abs(norm_bic1 - norm_bic2)

				# Split!
				print "diff_bic", diff_bic
				if diff_bic > th:
					final_means.append(half_means[0] * stdD)
					final_means.append(half_means[1] * stdD)
					curr_K += 1
					stop = False
				# Don't split
				else:
					final_means.append(mean * stdD)

			final_means = np.asarray(final_means)

			print "Estimated K: ", curr_K
			if self.plot:
				plt.scatter(self.X[:, 0], self.X[:, 1])
				plt.scatter(final_means[:, 0], final_means[:, 1], color="y")
				plt.show()

			if curr_K >= maxK or self.X.shape[-1] != final_means.shape[-1]:
				stop = True
			else:
				labels, dist = vq.vq(self.X, final_means)

		return curr_K

	def estimate_K_knee(self, th=.015, maxK=12):
		"""Estimates the K using K-means and BIC, by sweeping various K and
			choosing the optimal BIC."""
		# Sweep K-means
		if self.X.shape[0] < maxK:
			maxK = self.X.shape[0]
		if maxK < 2:
			maxK = 2
		K = np.arange(1, maxK)
		bics = []
		for k in K:
			means, labels = self.run_kmeans(self.X, k)
			print 'means, labels', means.shape, len(labels)
			bic = self.compute_bic(self.X, means, labels, K=k,
								   R=self.X.shape[0])
			bics.append(bic)
		diff_bics = np.diff(bics)
		finalK = K[-1]

		if len(bics) == 1:
			finalK = 2
		else:
			# Normalize
			bics = np.asarray(bics)
			bics -= bics.min()
			#bics /= bics.max()
			diff_bics -= diff_bics.min()
			#diff_bics /= diff_bics.max()

			#print bics, diff_bics

			# Find optimum K
			for i in xrange(len(K[:-1])):
				#if bics[i] > diff_bics[i]:
				if diff_bics[i] < th and K[i] != 1:
					finalK = K[i]
					break

		logging.info("Estimated Unique Number of Segments: %d" % finalK)
		if self.plot:
			plt.subplot(2, 1, 1)
			plt.plot(K, bics, label="BIC")
			plt.plot(K[:-1], diff_bics, label="BIC diff")
			plt.legend(loc=2)
			plt.subplot(2, 1, 2)
			plt.scatter(self.X[:, 0], self.X[:, 1])
			plt.show()

		return finalK

	def get_clustered_data(self, X, labels, label_index):
		"""Returns the data with a specific label_index, using the previously
		 learned labels."""
		D = X[np.argwhere(labels == label_index)]
		return D.reshape((D.shape[0], D.shape[-1]))

	def run_kmeans(self, X, K):
		"""Runs k-means and returns the labels assigned to the data."""
		wX = vq.whiten(X)
		means, dist = vq.kmeans(wX, K, iter=100)
		labels, dist = vq.vq(wX, means)
		return means, labels

	def compute_bic(self, D, means, labels, K, R):
		"""Computes the Bayesian Information Criterion."""
		D = vq.whiten(D)
		Rn = D.shape[0]
		M = D.shape[1]

		if R == K:
			return 1

		# Maximum likelihood estimate (MLE)
		mle_var = 0
		for k in xrange(len(means)):
			X = D[np.argwhere(labels == k)]
			X = X.reshape((X.shape[0], X.shape[-1]))
			for x in X:
				mle_var += distance.euclidean(x, means[k])
				#print x, means[k], mle_var
		mle_var /= (float(R - K))

		# Log-likelihood of the data
		l_D = - Rn/2. * np.log(2*np.pi) - (Rn * M)/2. * np.log(mle_var) - \
			(Rn - K) / 2. + Rn * np.log(Rn) - Rn * np.log(R)

		# Params of BIC
		p = (K-1) + M * K + mle_var

		#print "BIC:", l_D, p, R, K

		# Return the bic
		return l_D - p/2. * np.log(R)

	@classmethod
	def generate_2d_data(self, N=100, K=5):
		"""Generates N*K 2D data points with K means and N data points
			for each mean."""
		# Seed the random
		np.random.seed(seed=int(time.time()))

		# Amount of spread of the centroids
		spread = 30

		# Generate random data
		X = np.empty((0, 2))
		for i in xrange(K):
			mean = np.array([np.random.random()*spread,
							 np.random.random()*spread])
			x = np.random.normal(0.0, scale=1.0, size=(N, 2)) + mean
			X = np.append(X, x, axis=0)

		return X


def test_kmeans(K=5):
	"""Test k-means with the synthetic data."""
	X = XMeans.generate_2d_data(K=4)
	wX = vq.whiten(X)
	dic, dist = vq.kmeans(wX, K, iter=100)

	plt.scatter(wX[:, 0], wX[:, 1])
	plt.scatter(dic[:, 0], dic[:, 1], color="m")
	plt.show()


def main(args):
	#test_kmeans(6)
	X = XMeans.generate_2d_data(K=args.k)
	xmeans = XMeans(X, init_K=2, plot=args.plot)
	est_K = xmeans.estimate_K_xmeans()
	est_K_knee = xmeans.estimate_K_knee()
	print "Estimated x-means K:", est_K
	print "Estimated Knee Point Detection K:", est_K_knee

if __name__ == '__main__':
	parser = argparse.ArgumentParser(
		description="Runs x-means")
	parser.add_argument("k",
						metavar="k", type=int,
						help="Number of clusters to estimate.")
	parser.add_argument("-p", action="store_true", default=False,
						dest="plot", help="Plot the results")
	main(parser.parse_args())