Daniel@0: # Part of DML (Digital Music Laboratory) Daniel@0: # Copyright 2014-2015 Daniel Wolff, City University Daniel@0: Daniel@0: # This program is free software; you can redistribute it and/or Daniel@0: # modify it under the terms of the GNU General Public License Daniel@0: # as published by the Free Software Foundation; either version 2 Daniel@0: # of the License, or (at your option) any later version. Daniel@0: # Daniel@0: # This program is distributed in the hope that it will be useful, Daniel@0: # but WITHOUT ANY WARRANTY; without even the implied warranty of Daniel@0: # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Daniel@0: # GNU General Public License for more details. Daniel@0: # Daniel@0: # You should have received a copy of the GNU General Public Daniel@0: # License along with this library; if not, write to the Free Software Daniel@0: # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA Daniel@0: Daniel@0: #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit Daniel@0: # -*- coding: utf-8 -*- Daniel@0: __author__="wolffd" Daniel@0: __date__ ="$11-Jul-2014 15:31:01$" Daniel@0: import sys Daniel@0: import time Daniel@0: import os Daniel@0: import hashlib Daniel@0: from IPython.parallel import Client Daniel@0: Daniel@0: # this is the main routine to be submmitted as a spark job Daniel@0: # Daniel@0: # Daniel@0: # Running python applications through ./bin/pyspark is deprecated as of Spark 1.0. Daniel@0: # Use ./bin/spark-submit --py-files sonic_annotator_vamp.py Daniel@0: # you can also provide a zip of all necessary python files Daniel@0: # Daniel@0: # @param string audiopath root of the folder structure to be traversed Daniel@0: # @param string transform_file path to the .n3 turtle file describing the transform Daniel@0: #def main(audiopath = './', Daniel@0: # transform_file = 'silvet_settings.n3', Daniel@0: # masterip = '0.0.0.0): Daniel@0: Daniel@0: def main(audiopath, transform_path, out_path = ''): Daniel@0: print "iPCluster implementation for Vamp processing" Daniel@0: Daniel@0: # --- Daniel@0: # initialise ipcluster Daniel@0: # --- Daniel@0: #time.sleep(20) Daniel@0: rc = Client() Daniel@0: nb_core = len(rc.ids) Daniel@0: lview = rc.load_balanced_view() Daniel@0: lview.block = False # asynch now Daniel@0: dview = rc[:] Daniel@0: dview.block = True Daniel@0: Daniel@0: # import libraries Daniel@0: with dview.sync_imports(): Daniel@0: import sys Daniel@0: import os Daniel@0: import sonic_annotator_vamp Daniel@0: Daniel@0: # here traverse the file structure Daniel@0: data = [] Daniel@0: count = 0 Daniel@0: for (dirpath, dirnames, filenames) in os.walk(audiopath): Daniel@0: for file in filenames: Daniel@0: print '\rChecked %d files' % (count), Daniel@0: count = count + 1 Daniel@0: if file.endswith(".wav") or file.endswith(".mp3") or file.endswith(".flac"): Daniel@0: data.append(os.path.join(dirpath, file).replace('\\','/')) Daniel@0: # count jobs Daniel@0: njobs = len(data) Daniel@0: Daniel@0: Daniel@0: # we now allow Daniel@0: if transform_path.endswith(".n3"): Daniel@0: transform_files = [transform_path] Daniel@0: else: Daniel@0: transform_files = [] Daniel@0: for file in os.listdir(transform_path): Daniel@0: if file.endswith(".n3"): Daniel@0: transform_files.append(transform_path + file) Daniel@0: Daniel@0: for transform_file in transform_files: Daniel@0: # get transform hash Daniel@0: BLOCKSIZE = 65536 Daniel@0: hasher = hashlib.sha1() Daniel@0: with open(transform_file, 'rb') as afile: Daniel@0: buf = afile.read(BLOCKSIZE) Daniel@0: while len(buf) > 0: Daniel@0: hasher.update(buf) Daniel@0: buf = afile.read(BLOCKSIZE) Daniel@0: hash = str(hasher.hexdigest()) Daniel@0: Daniel@0: # create action containing data and parameter file Daniel@0: action = [(x,transform_file,hash,out_path) for x in data] Daniel@0: Daniel@0: # output the current task Daniel@0: tpath = os.path.split(transform_file) Daniel@0: print "Using " + tpath[1] + " on " + str(njobs) + " files" Daniel@0: Daniel@0: # --- Daniel@0: # do the work! Daniel@0: # --- Daniel@0: ar = lview.map(sonic_annotator_vamp.transform, action) Daniel@0: Daniel@0: # asynch process output Daniel@0: tic = time.time() Daniel@0: while True: Daniel@0: Daniel@0: # update time used Daniel@0: toc = time.time()-tic Daniel@0: Daniel@0: # update progress Daniel@0: msgset = set(ar.msg_ids) Daniel@0: completed = len(msgset.difference(rc.outstanding)) Daniel@0: pending = len(msgset.intersection(rc.outstanding)) Daniel@0: Daniel@0: if completed > 0: Daniel@0: timerem = ((toc/completed) * pending) / 3600.0 Daniel@0: print '\rRunning %3.2f hrs: %3.2f percent. %d done, %d pending, approx %3.2f hrs' % (toc / 3600.0, completed/(pending+completed*1.0) * 100.0,completed, pending, timerem), Daniel@0: Daniel@0: if ar.ready(): Daniel@0: print '\n' Daniel@0: break Daniel@0: time.sleep(1) Daniel@0: Daniel@0: toc = time.time()-tic Daniel@0: #print ar.get() Daniel@0: print '\rProcessed %d files in %3.2f hours.' % (njobs,toc / 3600.0) Daniel@0: print '\n' Daniel@0: Daniel@0: # output Daniel@0: #print(result) Daniel@0: #thefile = open(audiopath + tpath[1] + '.txt', 'w') Daniel@0: #for item in result: Daniel@0: # thefile.write("%s\n" % item) Daniel@0: #close(thefile) Daniel@0: Daniel@0: if __name__ == "__main__": Daniel@0: if len(sys.argv) >= 3: Daniel@0: main(sys.argv[1],sys.argv[2]) Daniel@0: else: Daniel@0: main(audiopath = '/audio', transform_path = 'dml_processing/sonic_annotator/vamp_plugins/bbc_speechmusic.n3', out_path = './') Daniel@0: