mi@0: import sys mi@0: import scipy.spatial as ss mi@0: from scipy.special import digamma,gamma mi@0: from math import log,pi mi@0: import numpy.random as nr mi@0: import numpy as np mi@0: import random mi@0: from sklearn.metrics.pairwise import pairwise_distances mi@0: from scipy.stats import ttest_ind, ttest_rel, pearsonr, norm mi@0: from scipy.linalg import eigh, cholesky mi@0: mi@0: def mi(x,y,k=3,base=2): mi@0: """ Mutual information of x and y mi@0: x,y should be a list of vectors, e.g. x = [[1.3],[3.7],[5.1],[2.4]] mi@0: if x is a one-dimensional scalar and we have four samples mi@0: """ mi@0: assert len(x)==len(y), "Lists should have same length" mi@0: assert k <= len(x) - 1, "Set k smaller than num. samples - 1" mi@0: intens = 1e-10 #small noise to break degeneracy, see doc. mi@0: mi@0: x = [list(p + intens*nr.rand(len(x[0]))) for p in x] mi@0: y = [list(p + intens*nr.rand(len(y[0]))) for p in y] mi@0: points = zip2(x,y) mi@0: #Find nearest neighbors in joint space, p=inf means max-norm mi@0: tree = ss.cKDTree(points) mi@0: dvec = [tree.query(point,k+1,p=float('inf'))[0][k] for point in points] mi@0: mi@0: a,b,c,d = avgdigamma(x,dvec), avgdigamma(y,dvec), digamma(k), digamma(len(x)) mi@0: mi@0: return (-a-b+c+d)/log(base) mi@0: mi@0: def mi2(x,y,k=3,base=2): mi@0: """ Mutual information of x and y mi@0: x,y should be a list of vectors, e.g. x = [[1.3],[3.7],[5.1],[2.4]] mi@0: if x is a one-dimensional scalar and we have four samples mi@0: """ mi@0: assert len(x)==len(y), "Lists should have same length" mi@0: assert k <= len(x) - 1, "Set k smaller than num. samples - 1" mi@0: intens = 1e-10 #small noise to break degeneracy, see doc. mi@0: mi@0: x += intens * nr.rand(len(x)) mi@0: y += intens * nr.rand(len(y)) mi@0: points = np.array([x,y]).T mi@0: mi@0: #Find nearest neighbors in joint space, p=inf means max-norm mi@0: tree = ss.cKDTree(points) mi@0: dvec = [tree.query(point,k+1,p=float('inf'))[0][k] for point in points] mi@0: a,b,c,d = avgdigamma(x[np.newaxis,:].T, dvec), avgdigamma(y[np.newaxis,:].T, dvec), digamma(k), digamma(len(x)) mi@0: mi@0: mi = (-a-b+c+d)/log(base) mi@0: if mi < 0: mi@0: return 0.0 mi@0: return mi mi@0: mi@0: def mi3(x,y,k=3,base=2): mi@0: """ Mutual information of x and y mi@0: x,y should be a list of vectors, e.g. x = [[1.3],[3.7],[5.1],[2.4]] mi@0: if x is a one-dimensional scalar and we have four samples mi@0: """ mi@0: if len(x) < 1000: mi@0: return mi2(x,y,k,base) mi@0: mi@0: intens = 1e-10 #small noise to break degeneracy, see doc. mi@0: mi@0: sampleSize = 500 mi@0: c = digamma(k) mi@0: d = digamma(sampleSize) mi@0: num_iter = 1 + int(len(x)/1000) mi@0: mi@0: mi_mean = np.zeros(num_iter,dtype=np.float64) mi@0: for i in xrange(num_iter): mi@0: ix = np.random.randint(low=0, high=len(x), size=sampleSize) mi@0: mi@0: xs = x[ix] mi@0: ys = y[ix] mi@0: xs += intens * nr.rand(len(xs)) mi@0: ys += intens * nr.rand(len(ys)) mi@0: points = np.array([xs,ys]).T mi@0: mi@0: #Find nearest neighbors in joint space, p=inf means max-norm mi@0: tree = ss.cKDTree(points) mi@0: dvec = [tree.query(point,k+1,p=float('inf'))[0][k] for point in points] mi@0: a,b = avgdigamma(xs[np.newaxis,:].T, dvec), avgdigamma(ys[np.newaxis,:].T, dvec) mi@0: mi@0: mi_mean[i] = (-a-b+c+d)/log(base) mi@0: mi@0: mi = np.mean(mi_mean) mi@0: if mi < 0: mi@0: return 0.0 mi@0: return mi mi@0: mi@0: mi@0: def mic(xs,ys,intens,s,k): mi@0: xs += intens * nr.rand(s) mi@0: ys += intens * nr.rand(s) mi@0: points = np.array([xs,ys]).T mi@0: tree = ss.cKDTree(points) mi@0: dvec = [tree.query(point,k+1,p=float('inf'))[0][k] for point in points] mi@0: return avgdigamma(xs[np.newaxis,:].T, dvec), avgdigamma(ys[np.newaxis,:].T, dvec) mi@0: mi@0: mi@0: def dmi(x,y,k=3,base=2): mi@0: ''' Mutual information distance between x and y.''' mi@0: mi@0: if np.array_equal(x, y): mi@0: return 0.0 mi@0: intens = 1e-10 #small noise to break degeneracy mi@0: c = digamma(k) mi@0: s = len(x) mi@0: lb = 1.0/log(base) mi@0: mi@0: # for small samples calculate mi directly mi@0: if s < 1000: mi@0: a,b = mic(x,y,intens,s,k) mi@0: d = digamma(s) mi@0: mx = (-c+d)*lb mi@0: nmi = (-a-b+c+d)*lb / mx mi@0: if nmi > 1 : nmi = 1.0 # handle the case when mi of correlated samples is overestimated mi@0: if nmi < 0 : nmi = 0.0 # handle estimation error resulting in small negative values mi@0: return 1.0-nmi mi@0: mi@0: sampleSize = 500 mi@0: num_iter = 1 + int(s/1000) mi@0: d = digamma(sampleSize) mi@0: mi@0: mi_mean = np.zeros(num_iter,dtype=np.float64) mi@0: for i in xrange(num_iter): mi@0: ix = np.random.randint(low = 0, high = s, size=sampleSize) mi@0: a,b = mic(x[ix],y[ix],intens,sampleSize,k) mi@0: mi_mean[i] = (-a-b+c+d)*lb mi@0: mi@0: mmi = np.mean(mi_mean) mi@0: mx = (-c+d)*lb mi@0: nmi = mmi / mx mi@0: print mmi,mx,nmi mi@0: mi@0: if nmi > 1 : nmi = 1.0 # handle the case when mi of correlated samples is overestimated mi@0: if nmi < 0 : nmi = 0.0 # handle estimation error resulting in small negative values mi@0: return 1.0-nmi mi@0: mi@0: mi@0: def avgdigamma(points,dvec): mi@0: #This part finds number of neighbors in some radius in the marginal space mi@0: #returns expectation value of mi@0: N = len(points) mi@0: tree = ss.cKDTree(points) mi@0: avg = 0. mi@0: for i in range(N): mi@0: dist = dvec[i] mi@0: #subtlety, we don't include the boundary point, mi@0: #but we are implicitly adding 1 to kraskov def bc center point is included mi@0: num_points = len(tree.query_ball_point(points[i],dist-1e-15,p=float('inf'))) mi@0: avg += digamma(num_points)/N mi@0: return avg mi@0: mi@0: def zip2(*args): mi@0: #zip2(x,y) takes the lists of vectors and makes it a list of vectors in a joint space mi@0: #E.g. zip2([[1],[2],[3]],[[4],[5],[6]]) = [[1,4],[2,5],[3,6]] mi@0: return [sum(sublist,[]) for sublist in zip(*args)] mi@0: mi@0: mi@0: def test_mi(num_samples=9000): mi@0: ''' mi@0: Generate correlated multivariate random variables: mi@0: ''' mi@0: mi@0: # num_samples = 9000 mi@0: mi@0: # Generate samples from three independent normally distributed random mi@0: # variables (with mean 0 and std. dev. 1). mi@0: X = norm.rvs(size=(3, num_samples)) mi@0: mi@0: # The desired covariance matrix. mi@0: r = np.array([ mi@0: [ 3.40, -2.75, -2.00], mi@0: [ -2.75, 5.50, 1.50], mi@0: [ -2.00, 1.50, 1.25] mi@0: ]) mi@0: mi@0: # Choice of cholesky or eigenvector method. mi@0: method = 'cholesky' mi@0: #method = 'eigenvectors' mi@0: mi@0: if method == 'cholesky': mi@0: # Compute the Cholesky decomposition. mi@0: c = cholesky(r, lower=True) mi@0: else: mi@0: # Compute the eigenvalues and eigenvectors. mi@0: evals, evecs = eigh(r) mi@0: # Construct c, so c*c^T = random. mi@0: c = np.dot(evecs, np.diag(np.sqrt(evals))) mi@0: mi@0: # Convert the data to correlated random variables. mi@0: Y1 = np.dot(c, X)[2,:] mi@0: Y2 = norm.rvs(size=(3, num_samples))[0,:] mi@0: X = X[0,:] mi@0: mi@0: xx = mi2(X, X) mi@0: xy1 = mi2(X, Y1) mi@0: xy2 = mi2(X, Y2) mi@0: print 'identical', xx mi@0: print 'correlated', xy1 mi@0: print 'uncorrelated', xy2 mi@0: mi@0: xx = mi3(X, X) mi@0: xy1 = mi3(X, Y1) mi@0: xy2 = mi3(X, Y2) mi@0: print 'identical', xx mi@0: print 'correlated', xy1 mi@0: print 'uncorrelated', xy2 mi@0: mi@0: xx = dmi(X, X) mi@0: xy1 = dmi(X, Y1) mi@0: xy2 = dmi(X, Y2) mi@0: print 'identical', xx mi@0: print 'correlated', xy1 mi@0: print 'uncorrelated', xy2 mi@0: mi@0: mi@0: def print_progress(counter="", message=""): mi@0: sys.stdout.write("%(counter)s: %(message)s" %vars()) mi@0: sys.stdout.flush() mi@0: sys.stdout.write("\r\r") mi@0: mi@0: def test_direct(num_samples): mi@0: X = norm.rvs(size=(1, num_samples))[0,:] mi@0: return mi2(X, X) mi@0: mi@0: def main(): mi@0: test_mi() mi@0: raise SystemExit mi@0: mi@0: import matplotlib.pyplot as plt mi@0: figure = plt.figure() mi@0: axis = figure.add_subplot(111) mi@0: series = np.linspace(100,25000,20) mi@0: # series = np.linspace(10,250,20) mi@0: mi@0: # result = [test_direct(int(x)) for x in series] mi@0: result = [] mi@0: for i,x in enumerate(series) : mi@0: print_progress(i) mi@0: result.append(test_direct(int(x))) mi@0: axis.plot(series,result) mi@0: plt.show() mi@0: # test_direct(1500) mi@0: mi@0: if __name__ == '__main__': mi@0: main()