view ipcluster/test_sonic_annotator_notimeside.py @ 0:e34cf1b6fe09 tip

commit
author Daniel Wolff
date Sat, 20 Feb 2016 18:14:24 +0100
parents
children
line wrap: on
line source
# Part of DML (Digital Music Laboratory)
# Copyright 2014-2015 Daniel Wolff, City University
 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA

#!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit
# -*- coding: utf-8 -*-
__author__="wolffd"
__date__ ="$11-Jul-2014 15:31:01$"
import sys
import time
import os
import hashlib
from IPython.parallel import Client

# this is the main routine to be submmitted as a spark job
#
#
# Running python applications through ./bin/pyspark is deprecated as of Spark 1.0.
# Use ./bin/spark-submit <python file> --py-files sonic_annotator_vamp.py
# you can also provide a zip of all necessary python files
#
# @param string audiopath root of the folder structure to be traversed
# @param string transform_file path to the .n3 turtle file describing the transform
#def main(audiopath = './',
#         transform_file = 'silvet_settings.n3',
#         masterip = '0.0.0.0):

def main(audiopath, transform_path, out_path = ''):
    print "iPCluster implementation for Vamp processing"
    
    # ---
    # initialise ipcluster
    # ---
    #time.sleep(20)
    rc = Client()
    nb_core = len(rc.ids)
    lview = rc.load_balanced_view()
    lview.block = False # asynch now
    dview = rc[:]
    dview.block = True
    
    # import libraries
    with dview.sync_imports():
        import sys
        import os
        import sonic_annotator_vamp
    
    # here traverse the file structure
    data = []
    count = 0
    for (dirpath, dirnames, filenames) in os.walk(audiopath):
        for file in filenames:
            print '\rChecked %d files' % (count), 
            count = count + 1
            if file.endswith(".wav") or file.endswith(".mp3") or file.endswith(".flac"):
                data.append(os.path.join(dirpath, file).replace('\\','/'))
    # count jobs
    njobs = len(data)

    
    # we now allow
    if transform_path.endswith(".n3"):
        transform_files = [transform_path]
    else:
        transform_files = []
        for file in os.listdir(transform_path):
            if file.endswith(".n3"):
                transform_files.append(transform_path + file)
    
    for transform_file in transform_files:
        # get transform hash
        BLOCKSIZE = 65536
        hasher = hashlib.sha1()
        with open(transform_file, 'rb') as afile:
            buf = afile.read(BLOCKSIZE)
            while len(buf) > 0:
                hasher.update(buf)
                buf = afile.read(BLOCKSIZE)
        hash = str(hasher.hexdigest())
        
        # create action containing data and parameter file
        action = [(x,transform_file,hash,out_path) for x in data]
        
        # output the current task
        tpath = os.path.split(transform_file)
        print "Using " + tpath[1] + " on " + str(njobs) + " files"
        
        # ---
        # do the work!
        # ---
        ar = lview.map(sonic_annotator_vamp.transform, action)

        # asynch process output
        tic = time.time()
        while True:
            
            # update time used
            toc = time.time()-tic
            
            # update progress
            msgset = set(ar.msg_ids)
            completed = len(msgset.difference(rc.outstanding))
            pending = len(msgset.intersection(rc.outstanding))
            
            if completed > 0:
                timerem = ((toc/completed) * pending) / 3600.0
                print '\rRunning %3.2f hrs: %3.2f percent. %d done, %d pending, approx %3.2f hrs' % (toc / 3600.0, completed/(pending+completed*1.0) * 100.0,completed, pending, timerem),
            
            if ar.ready():
                print '\n'
                break
            time.sleep(1)
            
        toc = time.time()-tic
        #print ar.get()
        print '\rProcessed %d files in %3.2f hours.' % (njobs,toc / 3600.0)
        print '\n'
    
    # output
    #print(result)
    #thefile = open(audiopath + tpath[1] + '.txt', 'w')
    #for item in result:
    #    thefile.write("%s\n" % item)
    #close(thefile)

if __name__ == "__main__":
    if len(sys.argv) >= 3:
        main(sys.argv[1],sys.argv[2])
    else:
        main(audiopath = '/audio', transform_path = 'dml_processing/sonic_annotator/vamp_plugins/bbc_speechmusic.n3', out_path = './')