view runbatch.py @ 21:d395461b92ae tip

Lots and lots of modifications. Approximate recovery script working.
author Nic Cleju <nikcleju@gmail.com>
date Mon, 23 Apr 2012 10:54:57 +0300
parents 7fdf964f4edd
children
line wrap: on
line source
# -*- coding: utf-8 -*-
"""
Trying to write a common framework for simulations
Not finished yet, better be ignored

@author: Nic
"""

import numpy
import scipy.io
import math
import os
import time

import multiprocessing
import sys
currmodule = sys.modules[__name__]
# Lock for printing in a thread-safe way
printLock = None

import stdparams_exact
import stdparams_approx
import AnalysisGenerate

# For exceptions
import pyCSalgos.BP.l1eq_pd
import pyCSalgos.NESTA.NESTA

class RunBatch():
    """
    Class to run batch
    """
    
    def __init__(self, paramfunc, immedResultsFunc, processResultsFunc):
        self.paramfunc = paramfunc
        self.immedResultsFunc = immedResultsFunc
        self.processResultsFunc = processResultsFunc
        
    def initProcess(self, share, njobs, printLock):
        """
        Pool initializer function (multiprocessing)
        Needed to pass the shared variable to the worker processes
        The variables must be global in the module in order to be seen later in run_once_tuple()
        see http://stackoverflow.com/questions/1675766/how-to-combine-pool-map-with-array-shared-memory-in-python-multiprocessing
        """
        currmodule = sys.modules[__name__]
        currmodule.proccount = share
        currmodule.njobs = njobs
        currmodule._printLock = printLock
       
    def run(self, globalparams):

      print "This is RunBatch.run() by Nic"
   
      ncpus = globalparams['ncpus']
      savedataname = globalparams['savedataname']
      
      if ncpus is None:
          print "  Running in parallel with default",multiprocessing.cpu_count(),"threads using \"multiprocessing\" package"
          if multiprocessing.cpu_count() == 1:
              doparallel = False
          else:
              doparallel = True
      elif ncpus > 1:
          print "  Running in parallel with",ncpus,"threads using \"multiprocessing\" package"
          doparallel = True
      elif ncpus == 1:
          print "Running single thread"
          doparallel = False
      else:
          print "Wrong number of threads, exiting"
          return  
      
      # Prepare parameters
      #taskparams = generateTaskParams(params)
      taskparams, refparams = self.paramfunc(globalparams)
      
      # Store global variables
      currmodule.ntasks = len(taskparams)
      
      # Run
      taskresults = []
      if doparallel:
        currmodule.printLock = multiprocessing.Lock()
        pool = multiprocessing.Pool(ncpus,initializer=initProcess,initargs=(currmodule.proccount,currmodule.ntasks,currmodule.printLock))
        taskresults = pool.map(self.run_once_tuple, globalparams,taskparams)
      else:
        for taskparam,refparam in zip(taskparams, refparams):
          #taskresults.append(self.run_once_tuple(globalparams,taskparam))
          taskresults.append(self.run_once(globalparams,taskparam,refparam))
    
      # Process results
      procresults = processResults(params, taskresults)
      #procresults = []
      #for taskresult in taskresults
      #    procresults.append(self.processResultsFunc(globalparams, taskresult))
    
      # Save
      saveSim(globalparams, procresults)
          
      print "Finished."

    def run_once_tuple(self,globalparams,taskparam):
      results = self.run_once(*t)
      import sys
      currmodule = sys.modules[__name__]  
      currmodule.proccount.value = currmodule.proccount.value + 1
      print "================================"
      print "Finished task",currmodule.proccount.value,"of",currmodule.ntasks
      print "================================"
      return results
      
    def run_once(self, globalparams, algoparams, refparam):

      d = globalparams['d']  
      
      #xrec = dict()
      #err = dict()
      #relerr = dict()
      #elapsed = dict()
    
      # Prepare storage variables for algorithms non-Lambda
      #for algo in algosN:
      #  xrec[algo[1]]    = numpy.zeros((d, y.shape[1]))
      #  err[algo[1]]     = numpy.zeros(y.shape[1])
      #  relerr[algo[1]]  = numpy.zeros(y.shape[1])
      #  elapsed[algo[1]] = 0
      # Prepare storage variables for algorithms with Lambda    
      #for algo in algosL:
      #  xrec[algo[1]]    = numpy.zeros((lambdas.size, d, y.shape[1]))
      #  err[algo[1]]     = numpy.zeros((lambdas.size, y.shape[1]))
      #  relerr[algo[1]]  = numpy.zeros((lambdas.size, y.shape[1]))
      #  elapsed[algo[1]] = numpy.zeros(lambdas.size)

      taskresult = []
      rawresults = None
      for algoparam in algoparams:
        try:
          #timestart = time.time()
          #xrec[strname][:,iy] = algoparams[0](*algoparams[1:])
          rawresults = algoparam[0](*algoparam[1:])
          #elapsed[strname] = elapsed[strname] + (time.time() - timestart)
        except pyCSalgos.BP.l1qec.l1qecInputValueError as e:
          print "Caught exception when running algorithm",strname," :",e.message
        except pyCSalgos.NESTA.NESTA.NestaError as e:
          #print "Caught exception when running algorithm",strname," :",e.message
          print "Caught error! :",e.message
        #err[strname][iy]    = numpy.linalg.norm(x0[:,iy] - xrec[strname][:,iy])
        #relerr[strname][iy] = err[strname][iy] / numpy.linalg.norm(x0[:,iy])
        #taskresult.append(immediateProcessResults(rawresults))
        taskresult.append(self.immedResultsFunc(rawresults, refparam))
      
      return taskresult
        
    
#      # Run algorithms with Lambda
#      for ilbd,lbd in zip(numpy.arange(lambdas.size),lambdas):
#        for iy in numpy.arange(y.shape[1]):
#          D = numpy.linalg.pinv(Omega)
#          #U,S,Vt = numpy.linalg.svd(D)
#          for algofunc,strname in algosL:
#            epsilon = 1.1 * numpy.linalg.norm(realnoise[:,iy])
#            try:
#              timestart = time.time()
#              #gamma = algofunc(y[:,iy],M,Omega,D,U,S,Vt,epsilon,lbd)
#              gamma = algofunc(y[:,iy],M,Omega,epsilon,lbd)
#              elapsed[strname][ilbd] = elapsed[strname][ilbd] + (time.time() - timestart)
#            except pyCSalgos.BP.l1qc.l1qcInputValueError as e:
#              print "Caught exception when running algorithm",strname," :",e.message
#            xrec[strname][ilbd,:,iy] = numpy.dot(D,gamma)
#            err[strname][ilbd,iy]    = numpy.linalg.norm(x0[:,iy] - xrec[strname][ilbd,:,iy])
#            relerr[strname][ilbd,iy] = err[strname][ilbd,iy] / numpy.linalg.norm(x0[:,iy])
#        print 'Lambda = ',lbd,' :'
#        for algofunc,strname in algosL:
#          print '   ',strname,' : avg relative error = ',numpy.mean(relerr[strname][ilbd,:])
    
#      # Prepare results
#      mrelerrN = dict()
#      for algotuple in algosN:
#        mrelerrN[algotuple[1]] = numpy.mean(relerr[algotuple[1]])
#      mrelerrL = dict()
#      for algotuple in algosL:
#        mrelerrL[algotuple[1]] = numpy.mean(relerr[algotuple[1]],1)
#      
#      return mrelerrN,mrelerrL,elapsed 
      
    

def generateTaskParams(globalparams):
  """
  Generate a list of task parameters
  """
  taskparams = []
  SNRdb = globalparams['SNRdb'] 
  sigma = globalparams['sigma'] 
  d = globalparams['d'] 
  deltas = globalparams['deltas'] 
  rhos = globalparams['rhos'] 
  numvects = globalparams['numvects']
  algosN = globalparams['algosN']
  algosL = globalparams['algosL']
  lambdas = globalparams['lambdas']
  
  # Process parameters
  noiselevel = 1.0 / (10.0**(SNRdb/10.0));
  
  for delta in deltas:
      for rho in rhos:
          p = round(sigma*d);
          m = round(delta*d);
          l = round(d - rho*m);
  
          # Generate Omega and data based on parameters
          Omega = AnalysisGenerate.Generate_Analysis_Operator(d, p);
          D = numpy.linalg.pinv(Omega)
        
          # Generate data
          x0,y,M,Lambda,realnoise = AnalysisGenerate.Generate_Data_Known_Omega(Omega, d,p,m,l,noiselevel, numvects,'l0')

          boxparams = []
          refparams = []
          # Prepare algos and params to run
          for iy in numpy.arange(y.shape[1]):
            for algofunc,strname in algosN:
              epsilon = 1.1 * numpy.linalg.norm(realnoise[:,iy])
              boxparams.append((algofunc,y[:,iy],M,Omega,epsilon))
              refparams.append(x0[:,iy])
          for ilbd,lbd in zip(numpy.arange(lambdas.size),lambdas):
            for iy in numpy.arange(y.shape[1]):
              for algofunc,strname in algosL:
                epsilon = 1.1 * numpy.linalg.norm(realnoise[:,iy])
                boxparams.append((algofunc,y[:,iy],M,Omega,epsilon,lbd))
                refparams.append(x0[:,iy])

          # algoparams = algo function + parameters it requires
          taskparams.append(boxparams)
 
  return taskparams, refparams
      
      
def immedResults(xrec,x0):
    if xrec == None:
        return None
    err = numpy.linalg.norm(x0 - xrec)
    relerr = err / numpy.linalg.norm(x0)
    
    return err,relerr
      
def processResults(globalparams, taskresults):
  deltas = globalparams['deltas']
  rhos = globalparams['rhos']
  algosN = globalparams['algosN']
  algosL = globalparams['algosL']
  lambdas = globalparams['lambdas']
  
  meanmatrix = dict()
  elapsed = dict()
  for algo in algosN:
    meanmatrix[algo[1]]   = numpy.zeros((rhos.size, deltas.size))
    elapsed[algo[1]] = 0
  for algo in algosL:
    meanmatrix[algo[1]]   = numpy.zeros((lambdas.size, rhos.size, deltas.size))
    elapsed[algo[1]] = numpy.zeros(lambdas.size)

  # To fix this
  idx = 0
  for idelta,delta in zip(numpy.arange(deltas.size),deltas):
    for irho,rho in zip(numpy.arange(rhos.size),rhos):
        immedResults = taskresults[idx]
        for iy in numpy.arange(y.shape[1]):
          for algofunc,strname in algosN:
            epsilon = 1.1 * numpy.linalg.norm(realnoise[:,iy])
            boxparams.append((algofunc,y[:,iy],M,Omega,epsilon))
            refparams.append(x0[:,iy])
        for ilbd,lbd in zip(numpy.arange(lambdas.size),lambdas):
          for iy in numpy.arange(y.shape[1]):
            for algofunc,strname in algosL:
              epsilon = 1.1 * numpy.linalg.norm(realnoise[:,iy])
              boxparams.append((algofunc,y[:,iy],M,Omega,epsilon,lbd))
              refparams.append(x0[:,iy])        
 
  
  # Process results  
  idx = 0
  for idelta,delta in zip(numpy.arange(deltas.size),deltas):
    for irho,rho in zip(numpy.arange(rhos.size),rhos):
      #mrelerrN,mrelerrL,addelapsed = taskresults[idx]
      mrelerrN,mrelerrL = taskresults[idx]
      idx = idx+1
      for algotuple in algosN: 
        meanmatrix[algotuple[1]][irho,idelta] = 1 - mrelerrN[algotuple[1]]
        if meanmatrix[algotuple[1]][irho,idelta] < 0 or math.isnan(meanmatrix[algotuple[1]][irho,idelta]):
          meanmatrix[algotuple[1]][irho,idelta] = 0
        elapsed[algotuple[1]] = elapsed[algotuple[1]] + addelapsed[algotuple[1]]
      for algotuple in algosL:
        for ilbd in numpy.arange(lambdas.size):
          meanmatrix[algotuple[1]][ilbd,irho,idelta] = 1 - mrelerrL[algotuple[1]][ilbd]
          if meanmatrix[algotuple[1]][ilbd,irho,idelta] < 0 or math.isnan(meanmatrix[algotuple[1]][ilbd,irho,idelta]):
            meanmatrix[algotuple[1]][ilbd,irho,idelta] = 0
          elapsed[algotuple[1]][ilbd] = elapsed[algotuple[1]][ilbd] + addelapsed[algotuple[1]][ilbd]

        
  procresults = dict()
  procresults['meanmatrix'] = meanmatrix
  procresults['elapsed'] = elapsed
  return procresults          
      
# Script main
if __name__ == "__main__":

  stdparams_approx.paramstest['ncpus'] = 1
  rb = RunBatch(generateTaskParams, immedResults, processResults)
  rb.run(stdparams_approx.paramstest)
 
  print "Finished"