annotate pymf/cnmf.py @ 19:890cfe424f4a tip

added annotations
author mitian
date Fri, 11 Dec 2015 09:47:40 +0000
parents 26838b1f560f
children
rev   line source
mi@0 1 #!/usr/bin/python
mi@0 2 #
mi@0 3 # Copyright (C) Christian Thurau, 2010.
mi@0 4 # Licensed under the GNU General Public License (GPL).
mi@0 5 # http://www.gnu.org/licenses/gpl.txt
mi@0 6 """
mi@0 7 PyMF Convex Matrix Factorization [1]
mi@0 8
mi@0 9 CNMF(NMF) : Class for convex matrix factorization
mi@0 10
mi@0 11 [1] Ding, C., Li, T. and Jordan, M.. Convex and Semi-Nonnegative Matrix Factorizations.
mi@0 12 IEEE Trans. on Pattern Analysis and Machine Intelligence 32(1), 45-55.
mi@0 13 """
mi@0 14
mi@0 15
mi@0 16 import numpy as np
mi@0 17 import logging
mi@0 18 from nmf import NMF
mi@0 19 from kmeans import Kmeans
mi@0 20
mi@0 21
mi@0 22 __all__ = ["CNMF"]
mi@0 23
mi@0 24 class CNMF(NMF):
mi@0 25 """
mi@0 26 CNMF(data, num_bases=4)
mi@0 27
mi@0 28
mi@0 29 Convex NMF. Factorize a data matrix into two matrices s.t.
mi@0 30 F = | data - W*H | = | data - data*beta*H| is minimal. H and beta
mi@0 31 are restricted to convexity (beta >=0, sum(beta, axis=1) = [1 .. 1]).
mi@0 32
mi@0 33 Parameters
mi@0 34 ----------
mi@0 35 data : array_like, shape (_data_dimension, _num_samples)
mi@0 36 the input data
mi@0 37 num_bases: int, optional
mi@0 38 Number of bases to compute (column rank of W and row rank of H).
mi@0 39 4 (default)
mi@0 40
mi@0 41 Attributes
mi@0 42 ----------
mi@0 43 W : "data_dimension x num_bases" matrix of basis vectors
mi@0 44 H : "num bases x num_samples" matrix of coefficients
mi@0 45 ferr : frobenius norm (after calling .factorize())
mi@0 46
mi@0 47 Example
mi@0 48 -------
mi@0 49 Applying CNMF to some rather stupid data set:
mi@0 50
mi@0 51 >>> import numpy as np
mi@0 52 >>> from cnmf import CNMF
mi@0 53 >>> data = np.array([[1.0, 0.0, 2.0], [0.0, 1.0, 1.0]])
mi@0 54 >>> cnmf_mdl = CNMF(data, num_bases=2)
mi@0 55 >>> cnmf_mdl.factorize(niter=10)
mi@0 56
mi@0 57 The basis vectors are now stored in cnmf_mdl.W, the coefficients in cnmf_mdl.H.
mi@0 58 To compute coefficients for an existing set of basis vectors simply copy W
mi@0 59 to cnmf_mdl.W, and set compute_w to False:
mi@0 60
mi@0 61 >>> data = np.array([[1.5, 1.3], [1.2, 0.3]])
mi@0 62 >>> W = [[1.0, 0.0], [0.0, 1.0]]
mi@0 63 >>> cnmf_mdl = CNMF(data, num_bases=2)
mi@0 64 >>> cnmf_mdl.W = W
mi@0 65 >>> cnmf_mdl.factorize(compute_w=False, niter=1)
mi@0 66
mi@0 67 The result is a set of coefficients acnmf_mdl.H, s.t. data = W * cnmf_mdl.H.
mi@0 68 """
mi@0 69
mi@0 70 # see .factorize() for the update of W and H
mi@0 71 # -> proper decoupling of W/H not possible ...
mi@0 72 def update_w(self):
mi@0 73 pass
mi@0 74
mi@0 75 def update_h(self):
mi@0 76 pass
mi@0 77
mi@0 78 def init_h(self):
mi@0 79 if not hasattr(self, 'H'):
mi@0 80 # init basic matrices
mi@0 81 self.H = np.zeros((self._num_bases, self._num_samples))
mi@0 82
mi@0 83 # initialize using k-means
mi@0 84 km = Kmeans(self.data[:,:], num_bases=self._num_bases)
mi@0 85 km.factorize(niter=10)
mi@0 86 assign = km.assigned
mi@0 87
mi@0 88 num_i = np.zeros(self._num_bases)
mi@0 89 for i in range(self._num_bases):
mi@0 90 num_i[i] = len(np.where(assign == i)[0])
mi@0 91
mi@0 92 self.H.T[range(len(assign)), assign] = 1.0
mi@0 93 self.H += 0.2*np.ones((self._num_bases, self._num_samples))
mi@0 94
mi@0 95 if not hasattr(self, 'G'):
mi@0 96 self.G = np.zeros((self._num_samples, self._num_bases))
mi@0 97
mi@0 98 self.G[range(len(assign)), assign] = 1.0
mi@0 99 self.G += 0.01
mi@0 100 self.G /= np.tile(np.reshape(num_i[assign],(-1,1)), self.G.shape[1])
mi@0 101
mi@0 102 if not hasattr(self,'W'):
mi@0 103 self.W = np.dot(self.data[:,:], self.G)
mi@0 104
mi@0 105 def init_w(self):
mi@0 106 pass
mi@0 107
mi@0 108 def factorize(self, niter=10, compute_w=True, compute_h=True,
mi@0 109 compute_err=True, show_progress=False):
mi@0 110 """ Factorize s.t. WH = data
mi@0 111
mi@0 112 Parameters
mi@0 113 ----------
mi@0 114 niter : int
mi@0 115 number of iterations.
mi@0 116 show_progress : bool
mi@0 117 print some extra information to stdout.
mi@0 118 compute_h : bool
mi@0 119 iteratively update values for H.
mi@0 120 compute_w : bool
mi@0 121 iteratively update values for W.
mi@0 122 compute_err : bool
mi@0 123 compute Frobenius norm |data-WH| after each update and store
mi@0 124 it to .ferr[k].
mi@0 125
mi@0 126 Updated Values
mi@0 127 --------------
mi@0 128 .W : updated values for W.
mi@0 129 .H : updated values for H.
mi@0 130 .ferr : Frobenius norm |data-WH| for each iteration.
mi@0 131 """
mi@0 132
mi@0 133 if not hasattr(self,'W'):
mi@0 134 self.init_w()
mi@0 135
mi@0 136 if not hasattr(self,'H'):
mi@0 137 self.init_h()
mi@0 138
mi@0 139 def separate_positive(m):
mi@0 140 return (np.abs(m) + m)/2.0
mi@0 141
mi@0 142 def separate_negative(m):
mi@0 143 return (np.abs(m) - m)/2.0
mi@0 144
mi@0 145 if show_progress:
mi@0 146 self._logger.setLevel(logging.INFO)
mi@0 147 else:
mi@0 148 self._logger.setLevel(logging.ERROR)
mi@0 149
mi@0 150 XtX = np.dot(self.data[:,:].T, self.data[:,:])
mi@0 151 XtX_pos = separate_positive(XtX)
mi@0 152 XtX_neg = separate_negative(XtX)
mi@0 153
mi@0 154 self.ferr = np.zeros(niter)
mi@0 155 # iterate over W and H
mi@0 156
mi@0 157 for i in xrange(niter):
mi@0 158 # update H
mi@0 159 XtX_neg_x_W = np.dot(XtX_neg, self.G)
mi@0 160 XtX_pos_x_W = np.dot(XtX_pos, self.G)
mi@0 161
mi@0 162 if compute_h:
mi@0 163 H_x_WT = np.dot(self.H.T, self.G.T)
mi@0 164 ha = XtX_pos_x_W + np.dot(H_x_WT, XtX_neg_x_W)
mi@0 165 hb = XtX_neg_x_W + np.dot(H_x_WT, XtX_pos_x_W) + 10**-9
mi@0 166 self.H = (self.H.T*np.sqrt(ha/hb)).T
mi@0 167
mi@0 168 # update W
mi@0 169 if compute_w:
mi@0 170 HT_x_H = np.dot(self.H, self.H.T)
mi@0 171 wa = np.dot(XtX_pos, self.H.T) + np.dot(XtX_neg_x_W, HT_x_H)
mi@0 172 wb = np.dot(XtX_neg, self.H.T) + np.dot(XtX_pos_x_W, HT_x_H) + 10**-9
mi@0 173
mi@0 174 self.G *= np.sqrt(wa/wb)
mi@0 175 self.W = np.dot(self.data[:,:], self.G)
mi@0 176
mi@0 177 if compute_err:
mi@0 178 self.ferr[i] = self.frobenius_norm()
mi@0 179 self._logger.info('Iteration ' + str(i+1) + '/' + str(niter) +
mi@0 180 ' FN:' + str(self.ferr[i]))
mi@0 181 else:
mi@0 182 self._logger.info('Iteration ' + str(i+1) + '/' + str(niter))
mi@0 183
mi@0 184 if i > 1 and compute_err:
mi@0 185 if self.converged(i):
mi@0 186 self.ferr = self.ferr[:i]
mi@0 187 break
mi@0 188
mi@0 189 if __name__ == "__main__":
mi@0 190 import doctest
mi@0 191 doctest.testmod()