Mercurial > hg > dml-open-backendtools
comparison ipcluster/test_sonic_annotator_notimeside.py @ 0:e34cf1b6fe09 tip
commit
author | Daniel Wolff |
---|---|
date | Sat, 20 Feb 2016 18:14:24 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:e34cf1b6fe09 |
---|---|
1 # Part of DML (Digital Music Laboratory) | |
2 # Copyright 2014-2015 Daniel Wolff, City University | |
3 | |
4 # This program is free software; you can redistribute it and/or | |
5 # modify it under the terms of the GNU General Public License | |
6 # as published by the Free Software Foundation; either version 2 | |
7 # of the License, or (at your option) any later version. | |
8 # | |
9 # This program is distributed in the hope that it will be useful, | |
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
12 # GNU General Public License for more details. | |
13 # | |
14 # You should have received a copy of the GNU General Public | |
15 # License along with this library; if not, write to the Free Software | |
16 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
17 | |
18 #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit | |
19 # -*- coding: utf-8 -*- | |
20 __author__="wolffd" | |
21 __date__ ="$11-Jul-2014 15:31:01$" | |
22 import sys | |
23 import time | |
24 import os | |
25 import hashlib | |
26 from IPython.parallel import Client | |
27 | |
28 # this is the main routine to be submmitted as a spark job | |
29 # | |
30 # | |
31 # Running python applications through ./bin/pyspark is deprecated as of Spark 1.0. | |
32 # Use ./bin/spark-submit <python file> --py-files sonic_annotator_vamp.py | |
33 # you can also provide a zip of all necessary python files | |
34 # | |
35 # @param string audiopath root of the folder structure to be traversed | |
36 # @param string transform_file path to the .n3 turtle file describing the transform | |
37 #def main(audiopath = './', | |
38 # transform_file = 'silvet_settings.n3', | |
39 # masterip = '0.0.0.0): | |
40 | |
41 def main(audiopath, transform_path, out_path = ''): | |
42 print "iPCluster implementation for Vamp processing" | |
43 | |
44 # --- | |
45 # initialise ipcluster | |
46 # --- | |
47 #time.sleep(20) | |
48 rc = Client() | |
49 nb_core = len(rc.ids) | |
50 lview = rc.load_balanced_view() | |
51 lview.block = False # asynch now | |
52 dview = rc[:] | |
53 dview.block = True | |
54 | |
55 # import libraries | |
56 with dview.sync_imports(): | |
57 import sys | |
58 import os | |
59 import sonic_annotator_vamp | |
60 | |
61 # here traverse the file structure | |
62 data = [] | |
63 count = 0 | |
64 for (dirpath, dirnames, filenames) in os.walk(audiopath): | |
65 for file in filenames: | |
66 print '\rChecked %d files' % (count), | |
67 count = count + 1 | |
68 if file.endswith(".wav") or file.endswith(".mp3") or file.endswith(".flac"): | |
69 data.append(os.path.join(dirpath, file).replace('\\','/')) | |
70 # count jobs | |
71 njobs = len(data) | |
72 | |
73 | |
74 # we now allow | |
75 if transform_path.endswith(".n3"): | |
76 transform_files = [transform_path] | |
77 else: | |
78 transform_files = [] | |
79 for file in os.listdir(transform_path): | |
80 if file.endswith(".n3"): | |
81 transform_files.append(transform_path + file) | |
82 | |
83 for transform_file in transform_files: | |
84 # get transform hash | |
85 BLOCKSIZE = 65536 | |
86 hasher = hashlib.sha1() | |
87 with open(transform_file, 'rb') as afile: | |
88 buf = afile.read(BLOCKSIZE) | |
89 while len(buf) > 0: | |
90 hasher.update(buf) | |
91 buf = afile.read(BLOCKSIZE) | |
92 hash = str(hasher.hexdigest()) | |
93 | |
94 # create action containing data and parameter file | |
95 action = [(x,transform_file,hash,out_path) for x in data] | |
96 | |
97 # output the current task | |
98 tpath = os.path.split(transform_file) | |
99 print "Using " + tpath[1] + " on " + str(njobs) + " files" | |
100 | |
101 # --- | |
102 # do the work! | |
103 # --- | |
104 ar = lview.map(sonic_annotator_vamp.transform, action) | |
105 | |
106 # asynch process output | |
107 tic = time.time() | |
108 while True: | |
109 | |
110 # update time used | |
111 toc = time.time()-tic | |
112 | |
113 # update progress | |
114 msgset = set(ar.msg_ids) | |
115 completed = len(msgset.difference(rc.outstanding)) | |
116 pending = len(msgset.intersection(rc.outstanding)) | |
117 | |
118 if completed > 0: | |
119 timerem = ((toc/completed) * pending) / 3600.0 | |
120 print '\rRunning %3.2f hrs: %3.2f percent. %d done, %d pending, approx %3.2f hrs' % (toc / 3600.0, completed/(pending+completed*1.0) * 100.0,completed, pending, timerem), | |
121 | |
122 if ar.ready(): | |
123 print '\n' | |
124 break | |
125 time.sleep(1) | |
126 | |
127 toc = time.time()-tic | |
128 #print ar.get() | |
129 print '\rProcessed %d files in %3.2f hours.' % (njobs,toc / 3600.0) | |
130 print '\n' | |
131 | |
132 # output | |
133 #print(result) | |
134 #thefile = open(audiopath + tpath[1] + '.txt', 'w') | |
135 #for item in result: | |
136 # thefile.write("%s\n" % item) | |
137 #close(thefile) | |
138 | |
139 if __name__ == "__main__": | |
140 if len(sys.argv) >= 3: | |
141 main(sys.argv[1],sys.argv[2]) | |
142 else: | |
143 main(audiopath = '/audio', transform_path = 'dml_processing/sonic_annotator/vamp_plugins/bbc_speechmusic.n3', out_path = './') | |
144 |