mi@0: #!/usr/bin/python mi@0: # mi@0: # Copyright (C) Christian Thurau, 2010. mi@0: # Licensed under the GNU General Public License (GPL). mi@0: # http://www.gnu.org/licenses/gpl.txt mi@0: """ mi@0: PyMF Non-negative Matrix Factorization. mi@0: mi@0: NMF: Class for Non-negative Matrix Factorization mi@0: mi@0: [1] Lee, D. D. and Seung, H. S. (1999), Learning the Parts of Objects by Non-negative mi@0: Matrix Factorization, Nature 401(6755), 788-799. mi@0: """ mi@0: mi@0: mi@0: import numpy as np mi@0: import logging mi@0: import logging.config mi@0: import scipy.sparse mi@0: mi@0: __all__ = ["NMF"] mi@0: mi@0: class NMF(): mi@0: """ mi@0: NMF(data, num_bases=4) mi@0: mi@0: mi@0: Non-negative Matrix Factorization. Factorize a data matrix into two matrices mi@0: s.t. F = | data - W*H | = | is minimal. H, and W are restricted to non-negative mi@0: data. Uses the classicial multiplicative update rule. mi@0: mi@0: Parameters mi@0: ---------- mi@0: data : array_like, shape (_data_dimension, _num_samples) mi@0: the input data mi@0: num_bases: int, optional mi@0: Number of bases to compute (column rank of W and row rank of H). mi@0: 4 (default) mi@0: mi@0: Attributes mi@0: ---------- mi@0: W : "data_dimension x num_bases" matrix of basis vectors mi@0: H : "num bases x num_samples" matrix of coefficients mi@0: ferr : frobenius norm (after calling .factorize()) mi@0: mi@0: Example mi@0: ------- mi@0: Applying NMF to some rather stupid data set: mi@0: mi@0: >>> import numpy as np mi@0: >>> data = np.array([[1.0, 0.0, 2.0], [0.0, 1.0, 1.0]]) mi@0: >>> nmf_mdl = NMF(data, num_bases=2, niter=10) mi@0: >>> nmf_mdl.factorize() mi@0: mi@0: The basis vectors are now stored in nmf_mdl.W, the coefficients in nmf_mdl.H. mi@0: To compute coefficients for an existing set of basis vectors simply copy W mi@0: to nmf_mdl.W, and set compute_w to False: mi@0: mi@0: >>> data = np.array([[1.5], [1.2]]) mi@0: >>> W = np.array([[1.0, 0.0], [0.0, 1.0]]) mi@0: >>> nmf_mdl = NMF(data, num_bases=2) mi@0: >>> nmf_mdl.W = W mi@0: >>> nmf_mdl.factorize(niter=20, compute_w=False) mi@0: mi@0: The result is a set of coefficients nmf_mdl.H, s.t. data = W * nmf_mdl.H. mi@0: """ mi@0: mi@0: # some small value mi@0: _EPS = 10**-8 mi@0: mi@0: def __init__(self, data, num_bases=4): mi@0: mi@0: def setup_logging(): mi@0: # create logger mi@0: self._logger = logging.getLogger("pymf") mi@0: mi@0: # add ch to logger mi@0: if len(self._logger.handlers) < 1: mi@0: # create console handler and set level to debug mi@0: ch = logging.StreamHandler() mi@0: ch.setLevel(logging.DEBUG) mi@0: # create formatter mi@0: formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") mi@0: mi@0: # add formatter to ch mi@0: ch.setFormatter(formatter) mi@0: mi@0: self._logger.addHandler(ch) mi@0: mi@0: setup_logging() mi@0: mi@0: # set variables mi@0: self.data = data mi@0: self._num_bases = num_bases mi@0: mi@0: # initialize H and W to random values mi@0: (self._data_dimension, self._num_samples) = self.data.shape mi@0: mi@0: mi@0: def frobenius_norm(self): mi@0: """ Frobenius norm (||data - WH||) of a data matrix and a low rank mi@0: approximation given by WH mi@0: mi@0: Returns: mi@0: frobenius norm: F = ||data - WH|| mi@0: """ mi@0: mi@0: # check if W and H exist mi@0: if hasattr(self,'H') and hasattr(self,'W') and not scipy.sparse.issparse(self.data): mi@0: err = np.sqrt( np.sum((self.data[:,:] - np.dot(self.W, self.H))**2 )) mi@0: else: mi@0: err = -123456 mi@0: mi@0: return err mi@0: mi@0: def init_w(self): mi@0: self.W = np.random.random((self._data_dimension, self._num_bases)) mi@0: mi@0: def init_h(self): mi@0: self.H = np.random.random((self._num_bases, self._num_samples)) mi@0: mi@0: def update_h(self): mi@0: # pre init H1, and H2 (necessary for storing matrices on disk) mi@0: H2 = np.dot(np.dot(self.W.T, self.W), self.H) + 10**-9 mi@0: self.H *= np.dot(self.W.T, self.data[:,:]) mi@0: self.H /= H2 mi@0: mi@0: def update_w(self): mi@0: # pre init W1, and W2 (necessary for storing matrices on disk) mi@0: W2 = np.dot(np.dot(self.W, self.H), self.H.T) + 10**-9 mi@0: self.W *= np.dot(self.data[:,:], self.H.T) mi@0: self.W /= W2 mi@0: mi@0: def converged(self, i): mi@0: derr = np.abs(self.ferr[i] - self.ferr[i-1])/self._num_samples mi@0: if derr < self._EPS: mi@0: return True mi@0: else: mi@0: return False mi@0: mi@0: def factorize(self, niter=1, show_progress=False, mi@0: compute_w=True, compute_h=True, compute_err=True): mi@0: """ Factorize s.t. WH = data mi@0: mi@0: Parameters mi@0: ---------- mi@0: niter : int mi@0: number of iterations. mi@0: show_progress : bool mi@0: print some extra information to stdout. mi@0: compute_h : bool mi@0: iteratively update values for H. mi@0: compute_w : bool mi@0: iteratively update values for W. mi@0: compute_err : bool mi@0: compute Frobenius norm |data-WH| after each update and store mi@0: it to .ferr[k]. mi@0: mi@0: Updated Values mi@0: -------------- mi@0: .W : updated values for W. mi@0: .H : updated values for H. mi@0: .ferr : Frobenius norm |data-WH| for each iteration. mi@0: """ mi@0: mi@0: if show_progress: mi@0: self._logger.setLevel(logging.INFO) mi@0: else: mi@0: self._logger.setLevel(logging.ERROR) mi@0: mi@0: # create W and H if they don't already exist mi@0: # -> any custom initialization to W,H should be done before mi@0: if not hasattr(self,'W'): mi@0: self.init_w() mi@0: mi@0: if not hasattr(self,'H'): mi@0: self.init_h() mi@0: mi@0: if compute_err: mi@0: self.ferr = np.zeros(niter) mi@0: mi@0: for i in xrange(niter): mi@0: if compute_w: mi@0: self.update_w() mi@0: mi@0: if compute_h: mi@0: self.update_h() mi@0: mi@0: if compute_err: mi@0: self.ferr[i] = self.frobenius_norm() mi@0: self._logger.info('Iteration ' + str(i+1) + '/' + str(niter) + mi@0: ' FN:' + str(self.ferr[i])) mi@0: else: mi@0: self._logger.info('Iteration ' + str(i+1) + '/' + str(niter)) mi@0: mi@0: mi@0: # check if the err is not changing anymore mi@0: if i > 1 and compute_err: mi@0: if self.converged(i): mi@0: # adjust the error measure mi@0: self.ferr = self.ferr[:i] mi@0: break mi@0: mi@0: if __name__ == "__main__": mi@0: import doctest mi@0: doctest.testmod()