Mercurial > hg > absrec
view runbatch.py @ 15:a27cfe83fe12
Changing, changing, trying to get a common framework for batch jobs
author | Nic Cleju <nikcleju@gmail.com> |
---|---|
date | Tue, 20 Mar 2012 17:18:23 +0200 |
parents | |
children | 23e9b536ba71 |
line wrap: on
line source
# -*- coding: utf-8 -*- """ Created on Sat Nov 05 18:08:40 2011 @author: Nic """ import numpy import scipy.io import math import os import time import multiprocessing import sys currmodule = sys.modules[__name__] # Lock for printing in a thread-safe way printLock = None import stdparams_exact import AnalysisGenerate # For exceptions import pyCSalgos.BP.l1eq_pd import pyCSalgos.NESTA.NESTA class RunBatch(): """ Class to run batch """ def __init__(self, xs, ys, prerunfunc, paramfunc, runfunc, postrunfunc): self.prerunfunc = prerunfunc self.paramfunc = paramfunc self.runfunc = runfunc self.postrunfunc = postrunfunc def initProcess(self, share, njobs, printLock): """ Pool initializer function (multiprocessing) Needed to pass the shared variable to the worker processes The variables must be global in the module in order to be seen later in run_once_tuple() see http://stackoverflow.com/questions/1675766/how-to-combine-pool-map-with-array-shared-memory-in-python-multiprocessing """ currmodule = sys.modules[__name__] currmodule.proccount = share currmodule.njobs = njobs currmodule._printLock = printLock def generateTaskParams(self,globalparams): """ Generate a list of task parameters """ taskparams = [] SNRdb = globalparams['SNRdb'] sigma = globalparams['sigma'] d = globalparams['d'] deltas = globalparams['deltas'] rhos = globalparams['rhos'] numvects = globalparams['numvects'] algosN = globalparams['algosN'] algosL = globalparams['algosL'] lambdas = globalparams['lambdas'] # Process parameters noiselevel = 1.0 / (10.0**(SNRdb/10.0)); for delta in deltas: for rho in rhos: p = round(sigma*d); m = round(delta*d); l = round(d - rho*m); # Generate Omega and data based on parameters Omega = AnalysisGenerate.Generate_Analysis_Operator(d, p); # Optionally make Omega more coherent #U,S,Vt = numpy.linalg.svd(Omega); #Sdnew = S * (1+numpy.arange(S.size)) # Make D coherent, not Omega! #Snew = numpy.vstack((numpy.diag(Sdnew), numpy.zeros((Omega.shape[0] - Omega.shape[1], Omega.shape[1])))) #Omega = numpy.dot(U , numpy.dot(Snew,Vt)) # Generate data x0,y,M,Lambda,realnoise = AnalysisGenerate.Generate_Data_Known_Omega(Omega, d,p,m,l,noiselevel, numvects,'l0') # Append task params algoparams = [] for algo in algosN: algoparams.append(algo[0],algo[1],Omega,y,realnoise,M,x0) taskparams.append((algosN,algosL, Omega,y,lambdas,realnoise,M,x0)) return taskparams def run(self, params): print "This is RunBatch.run() by Nic" ncpus = params['ncpus'] savedataname = params['savedataname'] if ncpus is None: print " Running in parallel with default",multiprocessing.cpu_count(),"threads using \"multiprocessing\" package" if multiprocessing.cpu_count() == 1: doparallel = False else: doparallel = True elif ncpus > 1: print " Running in parallel with",ncpus,"threads using \"multiprocessing\" package" doparallel = True elif ncpus == 1: print "Running single thread" doparallel = False else: print "Wrong number of threads, exiting" return # Prepare parameters taskparams = generateTaskParams(params) # Store global variables currmodule.ntasks = len(taskparams) # Run taskresults = [] if doparallel: currmodule.printLock = multiprocessing.Lock() pool = multiprocessing.Pool(ncpus,initializer=initProcess,initargs=(currmodule.proccount,currmodule.ntasks,currmodule.printLock)) taskresults = pool.map(run_once_tuple, taskparams) else: for taskparam in taskparams: taskresults.append(run_once_tuple(taskparam)) # Process results procresults = processResults(params, taskresults) # Save saveSim(params, procresults) print "Finished." def run_once_tuple(self,t): results = run_once(*t) import sys currmodule = sys.modules[__name__] currmodule.proccount.value = currmodule.proccount.value + 1 print "================================" print "Finished task",currmodule.proccount.value,"of",currmodule.ntasks print "================================" return results