view runbatch.py @ 15:a27cfe83fe12

Changing, changing, trying to get a common framework for batch jobs
author Nic Cleju <nikcleju@gmail.com>
date Tue, 20 Mar 2012 17:18:23 +0200
parents
children 23e9b536ba71
line wrap: on
line source
# -*- coding: utf-8 -*-
"""
Created on Sat Nov 05 18:08:40 2011

@author: Nic
"""

import numpy
import scipy.io
import math
import os
import time

import multiprocessing
import sys
currmodule = sys.modules[__name__]
# Lock for printing in a thread-safe way
printLock = None

import stdparams_exact
import AnalysisGenerate

# For exceptions
import pyCSalgos.BP.l1eq_pd
import pyCSalgos.NESTA.NESTA

class RunBatch():
    """
    Class to run batch
    """
    
    def __init__(self, xs, ys, prerunfunc, paramfunc, runfunc, postrunfunc):
        self.prerunfunc = prerunfunc
        self.paramfunc = paramfunc
        self.runfunc = runfunc
        self.postrunfunc = postrunfunc
        
    def initProcess(self, share, njobs, printLock):
        """
        Pool initializer function (multiprocessing)
        Needed to pass the shared variable to the worker processes
        The variables must be global in the module in order to be seen later in run_once_tuple()
        see http://stackoverflow.com/questions/1675766/how-to-combine-pool-map-with-array-shared-memory-in-python-multiprocessing
        """
        currmodule = sys.modules[__name__]
        currmodule.proccount = share
        currmodule.njobs = njobs
        currmodule._printLock = printLock
     
    def generateTaskParams(self,globalparams):
      """
      Generate a list of task parameters
      """
      taskparams = []
      SNRdb = globalparams['SNRdb'] 
      sigma = globalparams['sigma'] 
      d = globalparams['d'] 
      deltas = globalparams['deltas'] 
      rhos = globalparams['rhos'] 
      numvects = globalparams['numvects']
      algosN = globalparams['algosN']
      algosL = globalparams['algosL']
      lambdas = globalparams['lambdas']
      
      # Process parameters
      noiselevel = 1.0 / (10.0**(SNRdb/10.0));
      
      for delta in deltas:
          for rho in rhos:
              p = round(sigma*d);
              m = round(delta*d);
              l = round(d - rho*m);
      
              # Generate Omega and data based on parameters
              Omega = AnalysisGenerate.Generate_Analysis_Operator(d, p);
              # Optionally make Omega more coherent
              #U,S,Vt = numpy.linalg.svd(Omega);
              #Sdnew = S * (1+numpy.arange(S.size)) # Make D coherent, not Omega!
              #Snew = numpy.vstack((numpy.diag(Sdnew), numpy.zeros((Omega.shape[0] - Omega.shape[1], Omega.shape[1]))))
              #Omega = numpy.dot(U , numpy.dot(Snew,Vt))
            
              # Generate data
              x0,y,M,Lambda,realnoise = AnalysisGenerate.Generate_Data_Known_Omega(Omega, d,p,m,l,noiselevel, numvects,'l0')
              
              # Append task params
              algoparams = []
              for algo in algosN:
                  algoparams.append(algo[0],algo[1],Omega,y,realnoise,M,x0)
              taskparams.append((algosN,algosL, Omega,y,lambdas,realnoise,M,x0))
      
      return taskparams     
       
    def run(self, params):

      print "This is RunBatch.run() by Nic"
   
      ncpus = params['ncpus']
      savedataname = params['savedataname']
      
      if ncpus is None:
          print "  Running in parallel with default",multiprocessing.cpu_count(),"threads using \"multiprocessing\" package"
          if multiprocessing.cpu_count() == 1:
              doparallel = False
          else:
              doparallel = True
      elif ncpus > 1:
          print "  Running in parallel with",ncpus,"threads using \"multiprocessing\" package"
          doparallel = True
      elif ncpus == 1:
          print "Running single thread"
          doparallel = False
      else:
          print "Wrong number of threads, exiting"
          return  
      
      # Prepare parameters
      taskparams = generateTaskParams(params)
      
      # Store global variables
      currmodule.ntasks = len(taskparams)
      
      # Run
      taskresults = []
      if doparallel:
        currmodule.printLock = multiprocessing.Lock()
        pool = multiprocessing.Pool(ncpus,initializer=initProcess,initargs=(currmodule.proccount,currmodule.ntasks,currmodule.printLock))
        taskresults = pool.map(run_once_tuple, taskparams)
      else:
        for taskparam in taskparams:
          taskresults.append(run_once_tuple(taskparam))
    
      # Process results
      procresults = processResults(params, taskresults)
    
      # Save
      saveSim(params, procresults)
          
      print "Finished."

    def run_once_tuple(self,t):
      results = run_once(*t)
      import sys
      currmodule = sys.modules[__name__]  
      currmodule.proccount.value = currmodule.proccount.value + 1
      print "================================"
      print "Finished task",currmodule.proccount.value,"of",currmodule.ntasks
      print "================================"
      return results