Mercurial > hg > pycsalgos
comparison tests/omp_test.py @ 2:735a0e24575c
Organized folders: added tests, apps, matlab, docs folders. Added __init__.py files
author | nikcleju |
---|---|
date | Fri, 21 Oct 2011 13:53:49 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
1:2a2abf5092f8 | 2:735a0e24575c |
---|---|
1 """ | |
2 #=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=# | |
3 # Bob L. Sturm <bst@create.aau.dk> 20111018 | |
4 # Department of Architecture, Design and Media Technology | |
5 # Aalborg University Copenhagen | |
6 # Lautrupvang 15, 2750 Ballerup, Denmark | |
7 #=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=#=# | |
8 """ | |
9 | |
10 import unittest | |
11 | |
12 import numpy as np | |
13 from sklearn.utils import check_random_state | |
14 import time | |
15 | |
16 from omp_sk_bugfix import orthogonal_mp | |
17 from omp_QR import greed_omp_qr | |
18 from omp_QR import omp_qr | |
19 | |
20 """ | |
21 Run a problem suite involving sparse vectors in | |
22 ambientDimension dimensional space, with a resolution | |
23 in the phase plane of numGradations x numGradations, | |
24 and at each indeterminacy and sparsity pair run | |
25 numTrials independent trials. | |
26 | |
27 Outputs a text file denoting successes at each phase point. | |
28 For more on phase transitions, see: | |
29 D. L. Donoho and J. Tanner, "Precise undersampling theorems," | |
30 Proc. IEEE, vol. 98, no. 6, pp. 913-924, June 2010. | |
31 """ | |
32 | |
33 class CompareResults(unittest.TestCase): | |
34 | |
35 def testCompareResults(self): | |
36 """OMP results should be almost the same with all implementations""" | |
37 ambientDimension = 400 | |
38 numGradations = 30 | |
39 numTrials = 1 | |
40 runProblemSuite(ambientDimension,numGradations,numTrials, verbose=False) | |
41 | |
42 | |
43 | |
44 def runProblemSuite(ambientDimension,numGradations,numTrials, verbose): | |
45 | |
46 idx = np.arange(ambientDimension) | |
47 phaseDelta = np.linspace(0.05,1,numGradations) | |
48 phaseRho = np.linspace(0.05,1,numGradations) | |
49 success = np.zeros((numGradations, numGradations)) | |
50 | |
51 #Nic: init timers | |
52 t1all = 0 | |
53 t2all = 0 | |
54 t3all = 0 | |
55 | |
56 deltaCounter = 0 | |
57 # delta is number of measurements/ | |
58 for delta in phaseDelta[:17]: | |
59 rhoCounter = 0 | |
60 for rho in phaseRho: | |
61 if verbose: | |
62 print(deltaCounter,rhoCounter) | |
63 | |
64 numMeasurements = int(delta*ambientDimension) | |
65 sparsity = int(rho*numMeasurements) | |
66 # how do I set the following to be random each time? | |
67 generator = check_random_state(100) | |
68 # create unit norm dictionary | |
69 D = generator.randn(numMeasurements, ambientDimension) | |
70 D /= np.sqrt(np.sum((D ** 2), axis=0)) | |
71 # compute Gramian (for efficiency) | |
72 DTD = np.dot(D.T,D) | |
73 | |
74 successCounter = 0 | |
75 trial = numTrials | |
76 while trial > 0: | |
77 # generate sparse signal with a minimum non-zero value | |
78 x = np.zeros((ambientDimension, 1)) | |
79 idx2 = idx | |
80 generator.shuffle(idx2) | |
81 idx3 = idx2[:sparsity] | |
82 while np.min(np.abs(x[idx3,0])) < 1e-10 : | |
83 x[idx3,0] = generator.randn(sparsity) | |
84 # sense sparse signal | |
85 y = np.dot(D, x) | |
86 | |
87 # Nic: Use sparsify OMP function (translated from Matlab) | |
88 ompopts = dict({'stopCrit':'M', 'stopTol':2*sparsity}) | |
89 starttime = time.time() # start timer | |
90 x_r2, errs, times = greed_omp_qr(y.squeeze().copy(), D.copy(), D.shape[1], ompopts) | |
91 t2all = t2all + time.time() - starttime # stop timer | |
92 idx_r2 = np.nonzero(x_r2)[0] | |
93 | |
94 # run to two times expected sparsity, or tolerance | |
95 # why? Often times, OMP can retrieve the correct solution | |
96 # when it is run for more than the expected sparsity | |
97 #x_r, idx_r = omp_qr(y,D,DTD,2*sparsity,1e-5) | |
98 # Nic: adjust tolerance to match with other function | |
99 starttime = time.time() # start timer | |
100 x_r, idx_r = omp_qr(y.copy(),D.copy(),DTD.copy(),2*sparsity,numMeasurements*1e-14/np.vdot(y,y)) | |
101 t1all = t1all + time.time() - starttime # stop timer | |
102 | |
103 # Nic: test sklearn omp | |
104 starttime = time.time() # start timer | |
105 x_r3 = orthogonal_mp(D.copy(), y.copy(), 2*sparsity, tol=numMeasurements*1e-14, precompute_gram=False, copy_X=True) | |
106 idx_r3 = np.nonzero(x_r3)[0] | |
107 t3all = t3all + time.time() - starttime # stop timer | |
108 | |
109 # Nic: compare results | |
110 if verbose: | |
111 print 'diff1 = ',np.linalg.norm(x_r.squeeze() - x_r2.squeeze()) | |
112 print 'diff2 = ',np.linalg.norm(x_r.squeeze() - x_r3.squeeze()) | |
113 print 'diff3 = ',np.linalg.norm(x_r2.squeeze() - x_r3.squeeze()) | |
114 print "Bob's total time = ", t1all | |
115 print "Nic's total time = ", t2all | |
116 print "Skl's total time = ", t3all | |
117 if np.linalg.norm(x_r.squeeze() - x_r2.squeeze()) > 1e-6 or \ | |
118 np.linalg.norm(x_r.squeeze() - x_r3.squeeze()) > 1e-6 or \ | |
119 np.linalg.norm(x_r2.squeeze() - x_r3.squeeze()) > 1e-6: | |
120 if verbose: | |
121 print "STOP: Different results" | |
122 print "Bob's residual: ||y - D x_r ||_2 = ",np.linalg.norm(y.squeeze() - np.dot(D,x_r).squeeze()) | |
123 print "Nic's residual: ||y - D x_r ||_2 = ",np.linalg.norm(y.squeeze() - np.dot(D,x_r2).squeeze()) | |
124 print "Skl's residual: ||y - D x_r ||_2 = ",np.linalg.norm(y.squeeze() - np.dot(D,x_r3).squeeze()) | |
125 raise ValueError("Different results") | |
126 | |
127 # debais to remove small entries | |
128 for nn in idx_r: | |
129 if abs(x_r[nn]) < 1e-10: | |
130 x_r[nn] = 0 | |
131 | |
132 # exact recovery condition using support | |
133 #if sorted(np.flatnonzero(x_r)) == sorted(np.flatnonzero(x)): | |
134 # successCounter += 1 | |
135 # exact recovery condition using error in solution | |
136 error = x - x_r | |
137 """ the following is the exact recovery condition in: A. Maleki | |
138 and D. L. Donoho, "Optimally tuned iterative reconstruction | |
139 algorithms for compressed sensing," IEEE J. Selected Topics | |
140 in Signal Process., vol. 4, pp. 330-341, Apr. 2010. """ | |
141 if np.vdot(error,error) < np.vdot(x,x)*1e-4: | |
142 successCounter += 1 | |
143 trial -= 1 | |
144 | |
145 success[rhoCounter,deltaCounter] = successCounter | |
146 if successCounter == 0: | |
147 break | |
148 | |
149 rhoCounter += 1 | |
150 #np.savetxt('test.txt',success,fmt='#2.1d',delimiter=',') | |
151 deltaCounter += 1 | |
152 | |
153 if __name__ == "__main__": | |
154 | |
155 unittest.main(verbosity=2) | |
156 #suite = unittest.TestLoader().loadTestsFromTestCase(CompareResults) | |
157 #unittest.TextTestRunner(verbosity=2).run(suite) |