nikcleju@1: """ nikcleju@1: #=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=# nikcleju@1: # Bob L. Sturm 20111018 nikcleju@1: # Department of Architecture, Design and Media Technology nikcleju@1: # Aalborg University Copenhagen nikcleju@1: # Lautrupvang 15, 2750 Ballerup, Denmark nikcleju@1: #=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=# nikcleju@1: """ nikcleju@1: nikcleju@1: import unittest nikcleju@1: nikcleju@1: import numpy as np nikcleju@1: from sklearn.utils import check_random_state nikcleju@1: import time nikcleju@1: nikcleju@1: from omp_sk_bugfix import orthogonal_mp nikcleju@1: from omp_QR import greed_omp_qr nikcleju@1: from omp_QR import omp_qr nikcleju@1: nikcleju@1: """ nikcleju@1: Run a problem suite involving sparse vectors in nikcleju@1: ambientDimension dimensional space, with a resolution nikcleju@1: in the phase plane of numGradations x numGradations, nikcleju@1: and at each indeterminacy and sparsity pair run nikcleju@1: numTrials independent trials. nikcleju@1: nikcleju@1: Outputs a text file denoting successes at each phase point. nikcleju@1: For more on phase transitions, see: nikcleju@1: D. L. Donoho and J. Tanner, "Precise undersampling theorems," nikcleju@1: Proc. IEEE, vol. 98, no. 6, pp. 913-924, June 2010. nikcleju@1: """ nikcleju@1: nikcleju@1: class CompareResults(unittest.TestCase): nikcleju@1: nikcleju@1: def testCompareResults(self): nikcleju@1: """OMP results should be almost the same with all implementations""" nikcleju@1: ambientDimension = 400 nikcleju@1: numGradations = 30 nikcleju@1: numTrials = 1 nikcleju@1: runProblemSuite(ambientDimension,numGradations,numTrials, verbose=False) nikcleju@1: nikcleju@1: nikcleju@1: nikcleju@1: def runProblemSuite(ambientDimension,numGradations,numTrials, verbose): nikcleju@1: nikcleju@1: idx = np.arange(ambientDimension) nikcleju@1: phaseDelta = np.linspace(0.05,1,numGradations) nikcleju@1: phaseRho = np.linspace(0.05,1,numGradations) nikcleju@1: success = np.zeros((numGradations, numGradations)) nikcleju@1: nikcleju@1: #Nic: init timers nikcleju@1: t1all = 0 nikcleju@1: t2all = 0 nikcleju@1: t3all = 0 nikcleju@1: nikcleju@1: deltaCounter = 0 nikcleju@1: # delta is number of measurements/ nikcleju@1: for delta in phaseDelta[:17]: nikcleju@1: rhoCounter = 0 nikcleju@1: for rho in phaseRho: nikcleju@1: if verbose: nikcleju@1: print(deltaCounter,rhoCounter) nikcleju@1: nikcleju@1: numMeasurements = int(delta*ambientDimension) nikcleju@1: sparsity = int(rho*numMeasurements) nikcleju@1: # how do I set the following to be random each time? nikcleju@1: generator = check_random_state(100) nikcleju@1: # create unit norm dictionary nikcleju@1: D = generator.randn(numMeasurements, ambientDimension) nikcleju@1: D /= np.sqrt(np.sum((D ** 2), axis=0)) nikcleju@1: # compute Gramian (for efficiency) nikcleju@1: DTD = np.dot(D.T,D) nikcleju@1: nikcleju@1: successCounter = 0 nikcleju@1: trial = numTrials nikcleju@1: while trial > 0: nikcleju@1: # generate sparse signal with a minimum non-zero value nikcleju@1: x = np.zeros((ambientDimension, 1)) nikcleju@1: idx2 = idx nikcleju@1: generator.shuffle(idx2) nikcleju@1: idx3 = idx2[:sparsity] nikcleju@1: while np.min(np.abs(x[idx3,0])) < 1e-10 : nikcleju@1: x[idx3,0] = generator.randn(sparsity) nikcleju@1: # sense sparse signal nikcleju@1: y = np.dot(D, x) nikcleju@1: nikcleju@1: # Nic: Use sparsify OMP function (translated from Matlab) nikcleju@1: ompopts = dict({'stopCrit':'M', 'stopTol':2*sparsity}) nikcleju@1: starttime = time.time() # start timer nikcleju@1: x_r2, errs, times = greed_omp_qr(y.squeeze().copy(), D.copy(), D.shape[1], ompopts) nikcleju@1: t2all = t2all + time.time() - starttime # stop timer nikcleju@1: idx_r2 = np.nonzero(x_r2)[0] nikcleju@1: nikcleju@1: # run to two times expected sparsity, or tolerance nikcleju@1: # why? Often times, OMP can retrieve the correct solution nikcleju@1: # when it is run for more than the expected sparsity nikcleju@1: #x_r, idx_r = omp_qr(y,D,DTD,2*sparsity,1e-5) nikcleju@1: # Nic: adjust tolerance to match with other function nikcleju@1: starttime = time.time() # start timer nikcleju@1: x_r, idx_r = omp_qr(y.copy(),D.copy(),DTD.copy(),2*sparsity,numMeasurements*1e-14/np.vdot(y,y)) nikcleju@1: t1all = t1all + time.time() - starttime # stop timer nikcleju@1: nikcleju@1: # Nic: test sklearn omp nikcleju@1: starttime = time.time() # start timer nikcleju@1: x_r3 = orthogonal_mp(D.copy(), y.copy(), 2*sparsity, tol=numMeasurements*1e-14, precompute_gram=False, copy_X=True) nikcleju@1: idx_r3 = np.nonzero(x_r3)[0] nikcleju@1: t3all = t3all + time.time() - starttime # stop timer nikcleju@1: nikcleju@1: # Nic: compare results nikcleju@1: if verbose: nikcleju@1: print 'diff1 = ',np.linalg.norm(x_r.squeeze() - x_r2.squeeze()) nikcleju@1: print 'diff2 = ',np.linalg.norm(x_r.squeeze() - x_r3.squeeze()) nikcleju@1: print 'diff3 = ',np.linalg.norm(x_r2.squeeze() - x_r3.squeeze()) nikcleju@1: print "Bob's total time = ", t1all nikcleju@1: print "Nic's total time = ", t2all nikcleju@1: print "Skl's total time = ", t3all nikcleju@1: if np.linalg.norm(x_r.squeeze() - x_r2.squeeze()) > 1e-6 or \ nikcleju@1: np.linalg.norm(x_r.squeeze() - x_r3.squeeze()) > 1e-6 or \ nikcleju@1: np.linalg.norm(x_r2.squeeze() - x_r3.squeeze()) > 1e-6: nikcleju@1: if verbose: nikcleju@1: print "STOP: Different results" nikcleju@1: print "Bob's residual: ||y - D x_r ||_2 = ",np.linalg.norm(y.squeeze() - np.dot(D,x_r).squeeze()) nikcleju@1: print "Nic's residual: ||y - D x_r ||_2 = ",np.linalg.norm(y.squeeze() - np.dot(D,x_r2).squeeze()) nikcleju@1: print "Skl's residual: ||y - D x_r ||_2 = ",np.linalg.norm(y.squeeze() - np.dot(D,x_r3).squeeze()) nikcleju@1: raise ValueError("Different results") nikcleju@1: nikcleju@1: # debais to remove small entries nikcleju@1: for nn in idx_r: nikcleju@1: if abs(x_r[nn]) < 1e-10: nikcleju@1: x_r[nn] = 0 nikcleju@1: nikcleju@1: # exact recovery condition using support nikcleju@1: #if sorted(np.flatnonzero(x_r)) == sorted(np.flatnonzero(x)): nikcleju@1: # successCounter += 1 nikcleju@1: # exact recovery condition using error in solution nikcleju@1: error = x - x_r nikcleju@1: """ the following is the exact recovery condition in: A. Maleki nikcleju@1: and D. L. Donoho, "Optimally tuned iterative reconstruction nikcleju@1: algorithms for compressed sensing," IEEE J. Selected Topics nikcleju@1: in Signal Process., vol. 4, pp. 330-341, Apr. 2010. """ nikcleju@1: if np.vdot(error,error) < np.vdot(x,x)*1e-4: nikcleju@1: successCounter += 1 nikcleju@1: trial -= 1 nikcleju@1: nikcleju@1: success[rhoCounter,deltaCounter] = successCounter nikcleju@1: if successCounter == 0: nikcleju@1: break nikcleju@1: nikcleju@1: rhoCounter += 1 nikcleju@1: #np.savetxt('test.txt',success,fmt='#2.1d',delimiter=',') nikcleju@1: deltaCounter += 1 nikcleju@1: nikcleju@1: if __name__ == "__main__": nikcleju@1: nikcleju@1: unittest.main(verbosity=2) nikcleju@1: #suite = unittest.TestLoader().loadTestsFromTestCase(CompareResults) nikcleju@1: #unittest.TextTestRunner(verbosity=2).run(suite)