Daniel@0
|
1 # Part of DML (Digital Music Laboratory)
|
Daniel@0
|
2 # Copyright 2014-2015 Daniel Wolff, City University
|
Daniel@0
|
3
|
Daniel@0
|
4 # This program is free software; you can redistribute it and/or
|
Daniel@0
|
5 # modify it under the terms of the GNU General Public License
|
Daniel@0
|
6 # as published by the Free Software Foundation; either version 2
|
Daniel@0
|
7 # of the License, or (at your option) any later version.
|
Daniel@0
|
8 #
|
Daniel@0
|
9 # This program is distributed in the hope that it will be useful,
|
Daniel@0
|
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
|
Daniel@0
|
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
Daniel@0
|
12 # GNU General Public License for more details.
|
Daniel@0
|
13 #
|
Daniel@0
|
14 # You should have received a copy of the GNU General Public
|
Daniel@0
|
15 # License along with this library; if not, write to the Free Software
|
Daniel@0
|
16 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
Daniel@0
|
17
|
Daniel@0
|
18 #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit
|
Daniel@0
|
19 # -*- coding: utf-8 -*-
|
Daniel@0
|
20 __author__="wolffd"
|
Daniel@0
|
21 __date__ ="$11-Jul-2014 15:31:01$"
|
Daniel@0
|
22 import sys
|
Daniel@0
|
23 import time
|
Daniel@0
|
24 import os
|
Daniel@0
|
25 import hashlib
|
Daniel@0
|
26 from IPython.parallel import Client
|
Daniel@0
|
27
|
Daniel@0
|
28 # this is the main routine to be submmitted as a spark job
|
Daniel@0
|
29 #
|
Daniel@0
|
30 #
|
Daniel@0
|
31 # Running python applications through ./bin/pyspark is deprecated as of Spark 1.0.
|
Daniel@0
|
32 # Use ./bin/spark-submit <python file> --py-files sonic_annotator_vamp.py
|
Daniel@0
|
33 # you can also provide a zip of all necessary python files
|
Daniel@0
|
34 #
|
Daniel@0
|
35 # @param string audiopath root of the folder structure to be traversed
|
Daniel@0
|
36 # @param string transform_file path to the .n3 turtle file describing the transform
|
Daniel@0
|
37 #def main(audiopath = './',
|
Daniel@0
|
38 # transform_file = 'silvet_settings.n3',
|
Daniel@0
|
39 # masterip = '0.0.0.0):
|
Daniel@0
|
40
|
Daniel@0
|
41 def main(audiopath, transform_path, out_path = ''):
|
Daniel@0
|
42 print "iPCluster implementation for Vamp processing"
|
Daniel@0
|
43
|
Daniel@0
|
44 # ---
|
Daniel@0
|
45 # initialise ipcluster
|
Daniel@0
|
46 # ---
|
Daniel@0
|
47 #time.sleep(20)
|
Daniel@0
|
48 rc = Client()
|
Daniel@0
|
49 nb_core = len(rc.ids)
|
Daniel@0
|
50 lview = rc.load_balanced_view()
|
Daniel@0
|
51 lview.block = False # asynch now
|
Daniel@0
|
52 dview = rc[:]
|
Daniel@0
|
53 dview.block = True
|
Daniel@0
|
54
|
Daniel@0
|
55 # import libraries
|
Daniel@0
|
56 with dview.sync_imports():
|
Daniel@0
|
57 import sys
|
Daniel@0
|
58 import os
|
Daniel@0
|
59 import sonic_annotator_vamp
|
Daniel@0
|
60
|
Daniel@0
|
61 # here traverse the file structure
|
Daniel@0
|
62 data = []
|
Daniel@0
|
63 count = 0
|
Daniel@0
|
64 for (dirpath, dirnames, filenames) in os.walk(audiopath):
|
Daniel@0
|
65 for file in filenames:
|
Daniel@0
|
66 print '\rChecked %d files' % (count),
|
Daniel@0
|
67 count = count + 1
|
Daniel@0
|
68 if file.endswith(".wav") or file.endswith(".mp3") or file.endswith(".flac"):
|
Daniel@0
|
69 data.append(os.path.join(dirpath, file).replace('\\','/'))
|
Daniel@0
|
70 # count jobs
|
Daniel@0
|
71 njobs = len(data)
|
Daniel@0
|
72
|
Daniel@0
|
73
|
Daniel@0
|
74 # we now allow
|
Daniel@0
|
75 if transform_path.endswith(".n3"):
|
Daniel@0
|
76 transform_files = [transform_path]
|
Daniel@0
|
77 else:
|
Daniel@0
|
78 transform_files = []
|
Daniel@0
|
79 for file in os.listdir(transform_path):
|
Daniel@0
|
80 if file.endswith(".n3"):
|
Daniel@0
|
81 transform_files.append(transform_path + file)
|
Daniel@0
|
82
|
Daniel@0
|
83 for transform_file in transform_files:
|
Daniel@0
|
84 # get transform hash
|
Daniel@0
|
85 BLOCKSIZE = 65536
|
Daniel@0
|
86 hasher = hashlib.sha1()
|
Daniel@0
|
87 with open(transform_file, 'rb') as afile:
|
Daniel@0
|
88 buf = afile.read(BLOCKSIZE)
|
Daniel@0
|
89 while len(buf) > 0:
|
Daniel@0
|
90 hasher.update(buf)
|
Daniel@0
|
91 buf = afile.read(BLOCKSIZE)
|
Daniel@0
|
92 hash = str(hasher.hexdigest())
|
Daniel@0
|
93
|
Daniel@0
|
94 # create action containing data and parameter file
|
Daniel@0
|
95 action = [(x,transform_file,hash,out_path) for x in data]
|
Daniel@0
|
96
|
Daniel@0
|
97 # output the current task
|
Daniel@0
|
98 tpath = os.path.split(transform_file)
|
Daniel@0
|
99 print "Using " + tpath[1] + " on " + str(njobs) + " files"
|
Daniel@0
|
100
|
Daniel@0
|
101 # ---
|
Daniel@0
|
102 # do the work!
|
Daniel@0
|
103 # ---
|
Daniel@0
|
104 ar = lview.map(sonic_annotator_vamp.transform, action)
|
Daniel@0
|
105
|
Daniel@0
|
106 # asynch process output
|
Daniel@0
|
107 tic = time.time()
|
Daniel@0
|
108 while True:
|
Daniel@0
|
109
|
Daniel@0
|
110 # update time used
|
Daniel@0
|
111 toc = time.time()-tic
|
Daniel@0
|
112
|
Daniel@0
|
113 # update progress
|
Daniel@0
|
114 msgset = set(ar.msg_ids)
|
Daniel@0
|
115 completed = len(msgset.difference(rc.outstanding))
|
Daniel@0
|
116 pending = len(msgset.intersection(rc.outstanding))
|
Daniel@0
|
117
|
Daniel@0
|
118 if completed > 0:
|
Daniel@0
|
119 timerem = ((toc/completed) * pending) / 3600.0
|
Daniel@0
|
120 print '\rRunning %3.2f hrs: %3.2f percent. %d done, %d pending, approx %3.2f hrs' % (toc / 3600.0, completed/(pending+completed*1.0) * 100.0,completed, pending, timerem),
|
Daniel@0
|
121
|
Daniel@0
|
122 if ar.ready():
|
Daniel@0
|
123 print '\n'
|
Daniel@0
|
124 break
|
Daniel@0
|
125 time.sleep(1)
|
Daniel@0
|
126
|
Daniel@0
|
127 toc = time.time()-tic
|
Daniel@0
|
128 #print ar.get()
|
Daniel@0
|
129 print '\rProcessed %d files in %3.2f hours.' % (njobs,toc / 3600.0)
|
Daniel@0
|
130 print '\n'
|
Daniel@0
|
131
|
Daniel@0
|
132 # output
|
Daniel@0
|
133 #print(result)
|
Daniel@0
|
134 #thefile = open(audiopath + tpath[1] + '.txt', 'w')
|
Daniel@0
|
135 #for item in result:
|
Daniel@0
|
136 # thefile.write("%s\n" % item)
|
Daniel@0
|
137 #close(thefile)
|
Daniel@0
|
138
|
Daniel@0
|
139 if __name__ == "__main__":
|
Daniel@0
|
140 if len(sys.argv) >= 3:
|
Daniel@0
|
141 main(sys.argv[1],sys.argv[2])
|
Daniel@0
|
142 else:
|
Daniel@0
|
143 main(audiopath = '/audio', transform_path = 'dml_processing/sonic_annotator/vamp_plugins/bbc_speechmusic.n3', out_path = './')
|
Daniel@0
|
144 |