mi@0: #!/usr/bin/python mi@0: # mi@0: # Copyright (C) Christian Thurau, 2010. mi@0: # Licensed under the GNU General Public License (GPL). mi@0: # http://www.gnu.org/licenses/gpl.txt mi@0: """ mi@0: PyMF Convex Hull Non-negative Matrix Factorization [1] mi@0: mi@0: CHNMF(NMF) : Class for Convex-hull NMF mi@0: quickhull : Function for finding the convex hull in 2D mi@0: mi@0: [1] C. Thurau, K. Kersting, and C. Bauckhage. Convex Non-Negative Matrix mi@0: Factorization in the Wild. ICDM 2009. mi@0: """ mi@0: mi@0: mi@0: import numpy as np mi@0: mi@0: from itertools import combinations mi@0: from dist import vq mi@0: from pca import PCA mi@0: from aa import AA mi@0: mi@0: __all__ = ["CHNMF"] mi@0: mi@0: mi@0: def quickhull(sample): mi@0: """ Find data points on the convex hull of a supplied data set mi@0: mi@0: Args: mi@0: sample: data points as column vectors n x d mi@0: n - number samples mi@0: d - data dimension (should be two) mi@0: mi@0: Returns: mi@0: a k x d matrix containint the convex hull data points mi@0: """ mi@0: mi@0: link = lambda a, b: np.concatenate((a, b[1:])) mi@0: edge = lambda a, b: np.concatenate(([a], [b])) mi@0: mi@0: def dome(sample, base): mi@0: h, t = base mi@0: dists = np.dot(sample - h, np.dot(((0, -1), (1, 0)), (t - h))) mi@0: outer = np.repeat(sample, dists > 0, axis=0) mi@0: mi@0: if len(outer): mi@0: pivot = sample[np.argmax(dists)] mi@0: return link(dome(outer, edge(h, pivot)), mi@0: dome(outer, edge(pivot, t))) mi@0: else: mi@0: return base mi@0: mi@0: if len(sample) > 2: mi@0: axis = sample[:, 0] mi@0: base = np.take(sample, [np.argmin(axis), np.argmax(axis)], axis=0) mi@0: return link(dome(sample, base), mi@0: dome(sample, base[::-1])) mi@0: else: mi@0: return sample mi@0: mi@0: class CHNMF(AA): mi@0: """ mi@0: CHNMF(data, num_bases=4) mi@0: mi@0: Convex Hull Non-negative Matrix Factorization. Factorize a data matrix into mi@0: two matrices s.t. F = | data - W*H | is minimal. H is restricted to convexity mi@0: (H >=0, sum(H, axis=1) = [1 .. 1]) and W resides on actual data points. mi@0: Factorization is solved via an alternating least squares optimization using mi@0: the quadratic programming solver from cvxopt. The results are usually mi@0: equivalent to Archetypal Analysis (pymf.AA) but CHNMF also works for very mi@0: large datasets. mi@0: mi@0: Parameters mi@0: ---------- mi@0: data : array_like, shape (_data_dimension, _num_samples) mi@0: the input data mi@0: num_bases: int, optional mi@0: Number of bases to compute (column rank of W and row rank of H). mi@0: 4 (default) mi@0: base_sel: int, mi@0: Number of pairwise basis vector projections. Set to a value< rank(data). mi@0: Computation time scale exponentially with this value, usually rather low mi@0: values are sufficient (3-10). mi@0: mi@0: Attributes mi@0: ---------- mi@0: W : "data_dimension x num_bases" matrix of basis vectors mi@0: H : "num bases x num_samples" matrix of coefficients mi@0: ferr : frobenius norm (after calling .factorize()) mi@0: mi@0: Example mi@0: ------- mi@0: Applying CHNMF to some rather stupid data set: mi@0: mi@0: >>> import numpy as np mi@0: >>> from chnmf import CHNMF mi@0: >>> data = np.array([[1.0, 0.0, 2.0], [0.0, 1.0, 1.0]]) mi@0: mi@0: Use 2 basis vectors -> W shape(data_dimension, 2). mi@0: mi@0: >>> chnmf_mdl = CHNMF(data, num_bases=2) mi@0: mi@0: And start computing the factorization. mi@0: mi@0: >>> chnmf_mdl.factorize() mi@0: mi@0: The basis vectors are now stored in chnmf_mdl.W, the coefficients in mi@0: chnmf_mdl.H. To compute coefficients for an existing set of basis vectors mi@0: simply copy W to chnmf_mdl.W, and set compute_w to False: mi@0: mi@0: >>> data = np.array([[1.5, 2.0], [1.2, 1.8]]) mi@0: >>> W = np.array([[1.0, 0.0], [0.0, 1.0]]) mi@0: >>> chnmf_mdl = CHNMF(data, num_bases=2) mi@0: >>> chnmf_mdl.W = W mi@0: >>> chnmf_mdl.factorize(compute_w=False) mi@0: mi@0: The result is a set of coefficients chnmf_mdl.H, s.t. data = W * chnmf_mdl.H. mi@0: """ mi@0: mi@0: def __init__(self, data, num_bases=4, base_sel=3): mi@0: mi@0: # call inherited method mi@0: AA.__init__(self, data, num_bases=num_bases) mi@0: mi@0: # base sel should never be larger than the actual data dimension mi@0: self._base_sel = base_sel mi@0: if base_sel > self.data.shape[0]: mi@0: self._base_sel = self.data.shape[0] mi@0: mi@0: def init_h(self): mi@0: self.H = np.zeros((self._num_bases, self._num_samples)) mi@0: mi@0: def init_w(self): mi@0: self.W = np.zeros((self._data_dimension, self._num_bases)) mi@0: mi@0: def _map_w_to_data(self): mi@0: """ Return data points that are most similar to basis vectors W mi@0: """ mi@0: mi@0: # assign W to the next best data sample mi@0: self._Wmapped_index = vq(self.data, self.W) mi@0: self.Wmapped = np.zeros(self.W.shape) mi@0: mi@0: # do not directly assign, i.e. Wdist = self.data[:,sel] mi@0: # as self might be unsorted (in non ascending order) mi@0: # -> sorting sel would screw the matching to W if mi@0: # self.data is stored as a hdf5 table (see h5py) mi@0: for i, s in enumerate(self._Wmapped_index): mi@0: self.Wmapped[:,i] = self.data[:,s] mi@0: mi@0: def update_w(self): mi@0: """ compute new W """ mi@0: def select_hull_points(data, n=3): mi@0: """ select data points for pairwise projections of the first n mi@0: dimensions """ mi@0: mi@0: # iterate over all projections and select data points mi@0: idx = np.array([]) mi@0: mi@0: # iterate over some pairwise combinations of dimensions mi@0: for i in combinations(range(n), 2): mi@0: # sample convex hull points in 2D projection mi@0: convex_hull_d = quickhull(data[i, :].T) mi@0: mi@0: # get indices for convex hull data points mi@0: idx = np.append(idx, vq(data[i, :], convex_hull_d.T)) mi@0: idx = np.unique(idx) mi@0: mi@0: return np.int32(idx) mi@0: mi@0: # determine convex hull data points using either PCA or random mi@0: # projections mi@0: method = 'randomprojection' mi@0: if method == 'pca': mi@0: pcamodel = PCA(self.data) mi@0: pcamodel.factorize(show_progress=False) mi@0: proj = pcamodel.H mi@0: else: mi@0: R = np.random.randn(self._base_sel, self._data_dimension) mi@0: proj = np.dot(R, self.data) mi@0: mi@0: self._hull_idx = select_hull_points(proj, n=self._base_sel) mi@0: aa_mdl = AA(self.data[:, self._hull_idx], num_bases=self._num_bases) mi@0: mi@0: # determine W mi@0: aa_mdl.factorize(niter=50, compute_h=True, compute_w=True, mi@0: compute_err=True, show_progress=False) mi@0: mi@0: self.W = aa_mdl.W mi@0: self._map_w_to_data() mi@0: mi@0: def factorize(self, show_progress=False, compute_w=True, compute_h=True, mi@0: compute_err=True, niter=1): mi@0: """ Factorize s.t. WH = data mi@0: mi@0: Parameters mi@0: ---------- mi@0: show_progress : bool mi@0: print some extra information to stdout. mi@0: compute_h : bool mi@0: iteratively update values for H. mi@0: compute_w : bool mi@0: iteratively update values for W. mi@0: compute_err : bool mi@0: compute Frobenius norm |data-WH| after each update and store mi@0: it to .ferr[k]. mi@0: mi@0: Updated Values mi@0: -------------- mi@0: .W : updated values for W. mi@0: .H : updated values for H. mi@0: .ferr : Frobenius norm |data-WH|. mi@0: """ mi@0: mi@0: AA.factorize(self, niter=1, show_progress=show_progress, mi@0: compute_w=compute_w, compute_h=compute_h, mi@0: compute_err=compute_err) mi@0: mi@0: mi@0: if __name__ == "__main__": mi@0: import doctest mi@0: doctest.testmod()