mi@0: #!/usr/bin/python mi@0: # mi@0: # Copyright (C) Christian Thurau, 2010. mi@0: # Licensed under the GNU General Public License (GPL). mi@0: # http://www.gnu.org/licenses/gpl.txt mi@0: """ mi@0: PyMF Convex Matrix Factorization [1] mi@0: mi@0: CNMF(NMF) : Class for convex matrix factorization mi@0: mi@0: [1] Ding, C., Li, T. and Jordan, M.. Convex and Semi-Nonnegative Matrix Factorizations. mi@0: IEEE Trans. on Pattern Analysis and Machine Intelligence 32(1), 45-55. mi@0: """ mi@0: mi@0: mi@0: import numpy as np mi@0: import logging mi@0: from nmf import NMF mi@0: from kmeans import Kmeans mi@0: mi@0: mi@0: __all__ = ["CNMF"] mi@0: mi@0: class CNMF(NMF): mi@0: """ mi@0: CNMF(data, num_bases=4) mi@0: mi@0: mi@0: Convex NMF. Factorize a data matrix into two matrices s.t. mi@0: F = | data - W*H | = | data - data*beta*H| is minimal. H and beta mi@0: are restricted to convexity (beta >=0, sum(beta, axis=1) = [1 .. 1]). mi@0: mi@0: Parameters mi@0: ---------- mi@0: data : array_like, shape (_data_dimension, _num_samples) mi@0: the input data mi@0: num_bases: int, optional mi@0: Number of bases to compute (column rank of W and row rank of H). mi@0: 4 (default) mi@0: mi@0: Attributes mi@0: ---------- mi@0: W : "data_dimension x num_bases" matrix of basis vectors mi@0: H : "num bases x num_samples" matrix of coefficients mi@0: ferr : frobenius norm (after calling .factorize()) mi@0: mi@0: Example mi@0: ------- mi@0: Applying CNMF to some rather stupid data set: mi@0: mi@0: >>> import numpy as np mi@0: >>> from cnmf import CNMF mi@0: >>> data = np.array([[1.0, 0.0, 2.0], [0.0, 1.0, 1.0]]) mi@0: >>> cnmf_mdl = CNMF(data, num_bases=2) mi@0: >>> cnmf_mdl.factorize(niter=10) mi@0: mi@0: The basis vectors are now stored in cnmf_mdl.W, the coefficients in cnmf_mdl.H. mi@0: To compute coefficients for an existing set of basis vectors simply copy W mi@0: to cnmf_mdl.W, and set compute_w to False: mi@0: mi@0: >>> data = np.array([[1.5, 1.3], [1.2, 0.3]]) mi@0: >>> W = [[1.0, 0.0], [0.0, 1.0]] mi@0: >>> cnmf_mdl = CNMF(data, num_bases=2) mi@0: >>> cnmf_mdl.W = W mi@0: >>> cnmf_mdl.factorize(compute_w=False, niter=1) mi@0: mi@0: The result is a set of coefficients acnmf_mdl.H, s.t. data = W * cnmf_mdl.H. mi@0: """ mi@0: mi@0: # see .factorize() for the update of W and H mi@0: # -> proper decoupling of W/H not possible ... mi@0: def update_w(self): mi@0: pass mi@0: mi@0: def update_h(self): mi@0: pass mi@0: mi@0: def init_h(self): mi@0: if not hasattr(self, 'H'): mi@0: # init basic matrices mi@0: self.H = np.zeros((self._num_bases, self._num_samples)) mi@0: mi@0: # initialize using k-means mi@0: km = Kmeans(self.data[:,:], num_bases=self._num_bases) mi@0: km.factorize(niter=10) mi@0: assign = km.assigned mi@0: mi@0: num_i = np.zeros(self._num_bases) mi@0: for i in range(self._num_bases): mi@0: num_i[i] = len(np.where(assign == i)[0]) mi@0: mi@0: self.H.T[range(len(assign)), assign] = 1.0 mi@0: self.H += 0.2*np.ones((self._num_bases, self._num_samples)) mi@0: mi@0: if not hasattr(self, 'G'): mi@0: self.G = np.zeros((self._num_samples, self._num_bases)) mi@0: mi@0: self.G[range(len(assign)), assign] = 1.0 mi@0: self.G += 0.01 mi@0: self.G /= np.tile(np.reshape(num_i[assign],(-1,1)), self.G.shape[1]) mi@0: mi@0: if not hasattr(self,'W'): mi@0: self.W = np.dot(self.data[:,:], self.G) mi@0: mi@0: def init_w(self): mi@0: pass mi@0: mi@0: def factorize(self, niter=10, compute_w=True, compute_h=True, mi@0: compute_err=True, show_progress=False): mi@0: """ Factorize s.t. WH = data mi@0: mi@0: Parameters mi@0: ---------- mi@0: niter : int mi@0: number of iterations. mi@0: show_progress : bool mi@0: print some extra information to stdout. mi@0: compute_h : bool mi@0: iteratively update values for H. mi@0: compute_w : bool mi@0: iteratively update values for W. mi@0: compute_err : bool mi@0: compute Frobenius norm |data-WH| after each update and store mi@0: it to .ferr[k]. mi@0: mi@0: Updated Values mi@0: -------------- mi@0: .W : updated values for W. mi@0: .H : updated values for H. mi@0: .ferr : Frobenius norm |data-WH| for each iteration. mi@0: """ mi@0: mi@0: if not hasattr(self,'W'): mi@0: self.init_w() mi@0: mi@0: if not hasattr(self,'H'): mi@0: self.init_h() mi@0: mi@0: def separate_positive(m): mi@0: return (np.abs(m) + m)/2.0 mi@0: mi@0: def separate_negative(m): mi@0: return (np.abs(m) - m)/2.0 mi@0: mi@0: if show_progress: mi@0: self._logger.setLevel(logging.INFO) mi@0: else: mi@0: self._logger.setLevel(logging.ERROR) mi@0: mi@0: XtX = np.dot(self.data[:,:].T, self.data[:,:]) mi@0: XtX_pos = separate_positive(XtX) mi@0: XtX_neg = separate_negative(XtX) mi@0: mi@0: self.ferr = np.zeros(niter) mi@0: # iterate over W and H mi@0: mi@0: for i in xrange(niter): mi@0: # update H mi@0: XtX_neg_x_W = np.dot(XtX_neg, self.G) mi@0: XtX_pos_x_W = np.dot(XtX_pos, self.G) mi@0: mi@0: if compute_h: mi@0: H_x_WT = np.dot(self.H.T, self.G.T) mi@0: ha = XtX_pos_x_W + np.dot(H_x_WT, XtX_neg_x_W) mi@0: hb = XtX_neg_x_W + np.dot(H_x_WT, XtX_pos_x_W) + 10**-9 mi@0: self.H = (self.H.T*np.sqrt(ha/hb)).T mi@0: mi@0: # update W mi@0: if compute_w: mi@0: HT_x_H = np.dot(self.H, self.H.T) mi@0: wa = np.dot(XtX_pos, self.H.T) + np.dot(XtX_neg_x_W, HT_x_H) mi@0: wb = np.dot(XtX_neg, self.H.T) + np.dot(XtX_pos_x_W, HT_x_H) + 10**-9 mi@0: mi@0: self.G *= np.sqrt(wa/wb) mi@0: self.W = np.dot(self.data[:,:], self.G) mi@0: mi@0: if compute_err: mi@0: self.ferr[i] = self.frobenius_norm() mi@0: self._logger.info('Iteration ' + str(i+1) + '/' + str(niter) + mi@0: ' FN:' + str(self.ferr[i])) mi@0: else: mi@0: self._logger.info('Iteration ' + str(i+1) + '/' + str(niter)) mi@0: mi@0: if i > 1 and compute_err: mi@0: if self.converged(i): mi@0: self.ferr = self.ferr[:i] mi@0: break mi@0: mi@0: if __name__ == "__main__": mi@0: import doctest mi@0: doctest.testmod()