Mercurial > hg > absrec
view runbatch.py @ 21:d395461b92ae tip
Lots and lots of modifications. Approximate recovery script working.
author | Nic Cleju <nikcleju@gmail.com> |
---|---|
date | Mon, 23 Apr 2012 10:54:57 +0300 |
parents | 7fdf964f4edd |
children |
line wrap: on
line source
# -*- coding: utf-8 -*- """ Trying to write a common framework for simulations Not finished yet, better be ignored @author: Nic """ import numpy import scipy.io import math import os import time import multiprocessing import sys currmodule = sys.modules[__name__] # Lock for printing in a thread-safe way printLock = None import stdparams_exact import stdparams_approx import AnalysisGenerate # For exceptions import pyCSalgos.BP.l1eq_pd import pyCSalgos.NESTA.NESTA class RunBatch(): """ Class to run batch """ def __init__(self, paramfunc, immedResultsFunc, processResultsFunc): self.paramfunc = paramfunc self.immedResultsFunc = immedResultsFunc self.processResultsFunc = processResultsFunc def initProcess(self, share, njobs, printLock): """ Pool initializer function (multiprocessing) Needed to pass the shared variable to the worker processes The variables must be global in the module in order to be seen later in run_once_tuple() see http://stackoverflow.com/questions/1675766/how-to-combine-pool-map-with-array-shared-memory-in-python-multiprocessing """ currmodule = sys.modules[__name__] currmodule.proccount = share currmodule.njobs = njobs currmodule._printLock = printLock def run(self, globalparams): print "This is RunBatch.run() by Nic" ncpus = globalparams['ncpus'] savedataname = globalparams['savedataname'] if ncpus is None: print " Running in parallel with default",multiprocessing.cpu_count(),"threads using \"multiprocessing\" package" if multiprocessing.cpu_count() == 1: doparallel = False else: doparallel = True elif ncpus > 1: print " Running in parallel with",ncpus,"threads using \"multiprocessing\" package" doparallel = True elif ncpus == 1: print "Running single thread" doparallel = False else: print "Wrong number of threads, exiting" return # Prepare parameters #taskparams = generateTaskParams(params) taskparams, refparams = self.paramfunc(globalparams) # Store global variables currmodule.ntasks = len(taskparams) # Run taskresults = [] if doparallel: currmodule.printLock = multiprocessing.Lock() pool = multiprocessing.Pool(ncpus,initializer=initProcess,initargs=(currmodule.proccount,currmodule.ntasks,currmodule.printLock)) taskresults = pool.map(self.run_once_tuple, globalparams,taskparams) else: for taskparam,refparam in zip(taskparams, refparams): #taskresults.append(self.run_once_tuple(globalparams,taskparam)) taskresults.append(self.run_once(globalparams,taskparam,refparam)) # Process results procresults = processResults(params, taskresults) #procresults = [] #for taskresult in taskresults # procresults.append(self.processResultsFunc(globalparams, taskresult)) # Save saveSim(globalparams, procresults) print "Finished." def run_once_tuple(self,globalparams,taskparam): results = self.run_once(*t) import sys currmodule = sys.modules[__name__] currmodule.proccount.value = currmodule.proccount.value + 1 print "================================" print "Finished task",currmodule.proccount.value,"of",currmodule.ntasks print "================================" return results def run_once(self, globalparams, algoparams, refparam): d = globalparams['d'] #xrec = dict() #err = dict() #relerr = dict() #elapsed = dict() # Prepare storage variables for algorithms non-Lambda #for algo in algosN: # xrec[algo[1]] = numpy.zeros((d, y.shape[1])) # err[algo[1]] = numpy.zeros(y.shape[1]) # relerr[algo[1]] = numpy.zeros(y.shape[1]) # elapsed[algo[1]] = 0 # Prepare storage variables for algorithms with Lambda #for algo in algosL: # xrec[algo[1]] = numpy.zeros((lambdas.size, d, y.shape[1])) # err[algo[1]] = numpy.zeros((lambdas.size, y.shape[1])) # relerr[algo[1]] = numpy.zeros((lambdas.size, y.shape[1])) # elapsed[algo[1]] = numpy.zeros(lambdas.size) taskresult = [] rawresults = None for algoparam in algoparams: try: #timestart = time.time() #xrec[strname][:,iy] = algoparams[0](*algoparams[1:]) rawresults = algoparam[0](*algoparam[1:]) #elapsed[strname] = elapsed[strname] + (time.time() - timestart) except pyCSalgos.BP.l1qec.l1qecInputValueError as e: print "Caught exception when running algorithm",strname," :",e.message except pyCSalgos.NESTA.NESTA.NestaError as e: #print "Caught exception when running algorithm",strname," :",e.message print "Caught error! :",e.message #err[strname][iy] = numpy.linalg.norm(x0[:,iy] - xrec[strname][:,iy]) #relerr[strname][iy] = err[strname][iy] / numpy.linalg.norm(x0[:,iy]) #taskresult.append(immediateProcessResults(rawresults)) taskresult.append(self.immedResultsFunc(rawresults, refparam)) return taskresult # # Run algorithms with Lambda # for ilbd,lbd in zip(numpy.arange(lambdas.size),lambdas): # for iy in numpy.arange(y.shape[1]): # D = numpy.linalg.pinv(Omega) # #U,S,Vt = numpy.linalg.svd(D) # for algofunc,strname in algosL: # epsilon = 1.1 * numpy.linalg.norm(realnoise[:,iy]) # try: # timestart = time.time() # #gamma = algofunc(y[:,iy],M,Omega,D,U,S,Vt,epsilon,lbd) # gamma = algofunc(y[:,iy],M,Omega,epsilon,lbd) # elapsed[strname][ilbd] = elapsed[strname][ilbd] + (time.time() - timestart) # except pyCSalgos.BP.l1qc.l1qcInputValueError as e: # print "Caught exception when running algorithm",strname," :",e.message # xrec[strname][ilbd,:,iy] = numpy.dot(D,gamma) # err[strname][ilbd,iy] = numpy.linalg.norm(x0[:,iy] - xrec[strname][ilbd,:,iy]) # relerr[strname][ilbd,iy] = err[strname][ilbd,iy] / numpy.linalg.norm(x0[:,iy]) # print 'Lambda = ',lbd,' :' # for algofunc,strname in algosL: # print ' ',strname,' : avg relative error = ',numpy.mean(relerr[strname][ilbd,:]) # # Prepare results # mrelerrN = dict() # for algotuple in algosN: # mrelerrN[algotuple[1]] = numpy.mean(relerr[algotuple[1]]) # mrelerrL = dict() # for algotuple in algosL: # mrelerrL[algotuple[1]] = numpy.mean(relerr[algotuple[1]],1) # # return mrelerrN,mrelerrL,elapsed def generateTaskParams(globalparams): """ Generate a list of task parameters """ taskparams = [] SNRdb = globalparams['SNRdb'] sigma = globalparams['sigma'] d = globalparams['d'] deltas = globalparams['deltas'] rhos = globalparams['rhos'] numvects = globalparams['numvects'] algosN = globalparams['algosN'] algosL = globalparams['algosL'] lambdas = globalparams['lambdas'] # Process parameters noiselevel = 1.0 / (10.0**(SNRdb/10.0)); for delta in deltas: for rho in rhos: p = round(sigma*d); m = round(delta*d); l = round(d - rho*m); # Generate Omega and data based on parameters Omega = AnalysisGenerate.Generate_Analysis_Operator(d, p); D = numpy.linalg.pinv(Omega) # Generate data x0,y,M,Lambda,realnoise = AnalysisGenerate.Generate_Data_Known_Omega(Omega, d,p,m,l,noiselevel, numvects,'l0') boxparams = [] refparams = [] # Prepare algos and params to run for iy in numpy.arange(y.shape[1]): for algofunc,strname in algosN: epsilon = 1.1 * numpy.linalg.norm(realnoise[:,iy]) boxparams.append((algofunc,y[:,iy],M,Omega,epsilon)) refparams.append(x0[:,iy]) for ilbd,lbd in zip(numpy.arange(lambdas.size),lambdas): for iy in numpy.arange(y.shape[1]): for algofunc,strname in algosL: epsilon = 1.1 * numpy.linalg.norm(realnoise[:,iy]) boxparams.append((algofunc,y[:,iy],M,Omega,epsilon,lbd)) refparams.append(x0[:,iy]) # algoparams = algo function + parameters it requires taskparams.append(boxparams) return taskparams, refparams def immedResults(xrec,x0): if xrec == None: return None err = numpy.linalg.norm(x0 - xrec) relerr = err / numpy.linalg.norm(x0) return err,relerr def processResults(globalparams, taskresults): deltas = globalparams['deltas'] rhos = globalparams['rhos'] algosN = globalparams['algosN'] algosL = globalparams['algosL'] lambdas = globalparams['lambdas'] meanmatrix = dict() elapsed = dict() for algo in algosN: meanmatrix[algo[1]] = numpy.zeros((rhos.size, deltas.size)) elapsed[algo[1]] = 0 for algo in algosL: meanmatrix[algo[1]] = numpy.zeros((lambdas.size, rhos.size, deltas.size)) elapsed[algo[1]] = numpy.zeros(lambdas.size) # To fix this idx = 0 for idelta,delta in zip(numpy.arange(deltas.size),deltas): for irho,rho in zip(numpy.arange(rhos.size),rhos): immedResults = taskresults[idx] for iy in numpy.arange(y.shape[1]): for algofunc,strname in algosN: epsilon = 1.1 * numpy.linalg.norm(realnoise[:,iy]) boxparams.append((algofunc,y[:,iy],M,Omega,epsilon)) refparams.append(x0[:,iy]) for ilbd,lbd in zip(numpy.arange(lambdas.size),lambdas): for iy in numpy.arange(y.shape[1]): for algofunc,strname in algosL: epsilon = 1.1 * numpy.linalg.norm(realnoise[:,iy]) boxparams.append((algofunc,y[:,iy],M,Omega,epsilon,lbd)) refparams.append(x0[:,iy]) # Process results idx = 0 for idelta,delta in zip(numpy.arange(deltas.size),deltas): for irho,rho in zip(numpy.arange(rhos.size),rhos): #mrelerrN,mrelerrL,addelapsed = taskresults[idx] mrelerrN,mrelerrL = taskresults[idx] idx = idx+1 for algotuple in algosN: meanmatrix[algotuple[1]][irho,idelta] = 1 - mrelerrN[algotuple[1]] if meanmatrix[algotuple[1]][irho,idelta] < 0 or math.isnan(meanmatrix[algotuple[1]][irho,idelta]): meanmatrix[algotuple[1]][irho,idelta] = 0 elapsed[algotuple[1]] = elapsed[algotuple[1]] + addelapsed[algotuple[1]] for algotuple in algosL: for ilbd in numpy.arange(lambdas.size): meanmatrix[algotuple[1]][ilbd,irho,idelta] = 1 - mrelerrL[algotuple[1]][ilbd] if meanmatrix[algotuple[1]][ilbd,irho,idelta] < 0 or math.isnan(meanmatrix[algotuple[1]][ilbd,irho,idelta]): meanmatrix[algotuple[1]][ilbd,irho,idelta] = 0 elapsed[algotuple[1]][ilbd] = elapsed[algotuple[1]][ilbd] + addelapsed[algotuple[1]][ilbd] procresults = dict() procresults['meanmatrix'] = meanmatrix procresults['elapsed'] = elapsed return procresults # Script main if __name__ == "__main__": stdparams_approx.paramstest['ncpus'] = 1 rb = RunBatch(generateTaskParams, immedResults, processResults) rb.run(stdparams_approx.paramstest) print "Finished"