Mercurial > hg > dml-open-backendtools
comparison pyspark/ilm/spark_feat_extract.py @ 0:e34cf1b6fe09 tip
commit
author | Daniel Wolff |
---|---|
date | Sat, 20 Feb 2016 18:14:24 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:e34cf1b6fe09 |
---|---|
1 # Part of DML (Digital Music Laboratory) | |
2 # | |
3 # This program is free software; you can redistribute it and/or | |
4 # modify it under the terms of the GNU General Public License | |
5 # as published by the Free Software Foundation; either version 2 | |
6 # of the License, or (at your option) any later version. | |
7 # | |
8 # This program is distributed in the hope that it will be useful, | |
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
11 # GNU General Public License for more details. | |
12 # | |
13 # You should have received a copy of the GNU General Public | |
14 # License along with this library; if not, write to the Free Software | |
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA | |
16 | |
17 #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit | |
18 # -*- coding: utf-8 -*- | |
19 __author__="hargreavess" | |
20 | |
21 from assetDB import assetDB | |
22 from pyspark import SparkConf, SparkContext | |
23 import ConfigParser | |
24 import logging | |
25 from transform import * | |
26 import os | |
27 import time | |
28 import shutil | |
29 | |
30 def main(): | |
31 start_complete = time.time(); | |
32 | |
33 # get config | |
34 config = ConfigParser.ConfigParser() | |
35 config.read('server.cfg') | |
36 | |
37 #vamp_transform = [config.get('Sonic Annotator', 'vamp-transform')] | |
38 vamp_transform_list = config.get('Sonic Annotator', 'vamp-transform-list') | |
39 genre_id = config.getint('Queries', 'genre-id') | |
40 | |
41 output_dir = config.get('Sonic Annotator', 'output-dir') | |
42 ltime = time.localtime() | |
43 output_dir = output_dir + '_' + str(ltime.tm_mday) + '_' + str(ltime.tm_mon) + '_' + str(ltime.tm_year) | |
44 output_dir = output_dir + '_' + str(ltime.tm_hour) + str(ltime.tm_min) + '_' + str(ltime.tm_sec) | |
45 output_dir = output_dir + '_genre_id_' + str(genre_id) | |
46 # create output directory, if it doesn't exist | |
47 if not os.access(output_dir, os.F_OK): | |
48 os.makedirs(output_dir) | |
49 | |
50 # copy vamp_transform_list file to output directory | |
51 shutil.copy(vamp_transform_list, output_dir) | |
52 | |
53 # create logger | |
54 #logger = logging.getLogger('spark_feat_extract') | |
55 logger = logging.getLogger('spark_feat_extract') | |
56 logger.setLevel(logging.DEBUG) | |
57 | |
58 # create file handler and set level to debug | |
59 fh = logging.FileHandler(output_dir + "/ilm.assets.spark.features.log") | |
60 fh.setLevel(logging.DEBUG) | |
61 | |
62 # create formatter | |
63 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
64 | |
65 # add formatter to fh | |
66 fh.setFormatter(formatter) | |
67 | |
68 # add fh to logger | |
69 logger.addHandler(fh) | |
70 | |
71 logger.info('starting new spark_feat_extract job') | |
72 logger.info("using vamp transform list: " + vamp_transform_list) | |
73 logger.info('audio-file-size-limit: ' + config.get('Audio Files', 'audio-file-size-limit')) | |
74 logger.info("audio-prefix: " + config.get('Audio Files', 'audio-prefix')) | |
75 logger.info('num-cores: ' + config.get('Spark', 'num-cores')) | |
76 logger.info("spark memory: " + config.get('Spark', 'memory')) | |
77 logger.info("genre_id: " + str(genre_id)) | |
78 | |
79 # create a spark context | |
80 conf = (SparkConf() | |
81 .setMaster("local[" + config.get('Spark', 'num-cores') + "]") | |
82 .setAppName("spark feature extractor") | |
83 .set("spark.executor.memory", "" + config.get('Spark', 'memory') + "")) | |
84 sc = SparkContext(conf = conf) | |
85 | |
86 SQL_start = config.getint('Queries', 'sql-start') | |
87 SQL_limit = config.getint('Queries', 'sql-limit') | |
88 local_SQL_start = SQL_start | |
89 logger.info('SQL_start = %i', SQL_start) | |
90 logger.info('SQL_limit = %i', SQL_limit) | |
91 | |
92 array_step_size = config.getint('Application', 'array-step-size') | |
93 logger.info('array-step-size = %i', array_step_size) | |
94 local_SQL_limit = min(SQL_limit, array_step_size) | |
95 | |
96 while local_SQL_limit <= SQL_limit: | |
97 | |
98 # query db for assets (song tracks) | |
99 db = assetDB(prefix=config.get('Audio Files', 'audio-prefix'),config=config) | |
100 db.connect() | |
101 | |
102 data = [] | |
103 logger.info('local_start = %i', local_SQL_start) | |
104 logger.info('local_SQL_limit = %i', local_SQL_limit) | |
105 | |
106 for path, asset in db.get_assets_by_genre(genre_id, local_SQL_start, local_SQL_limit): | |
107 if path == None: | |
108 logger.warning("Asset not found for: %s. (Album ID: %i Track No: %i)",asset.song_title,asset.album_id,asset.track_no) | |
109 else: | |
110 data.append(path) | |
111 | |
112 db.close | |
113 | |
114 # If the db query returned no results, stop here | |
115 if len(data) == 0: | |
116 break | |
117 | |
118 batch_output_dir = output_dir + '/batch' + str(local_SQL_start) + '-' + str(local_SQL_limit) | |
119 os.makedirs(batch_output_dir) | |
120 logger.info('created results directory ' + batch_output_dir) | |
121 | |
122 logger.info("calling sc.parallelize(data)...") | |
123 start = time.time(); | |
124 | |
125 # define distributed dataset | |
126 distData = sc.parallelize(data) | |
127 end = time.time(); | |
128 logger.info("finished in " + (str)(end - start)) | |
129 | |
130 logger.info("calling distData.map...") | |
131 start = time.time(); | |
132 | |
133 # define map | |
134 m1 = distData.map(lambda x: transform(audio_file=x, | |
135 vamp_transform_list=vamp_transform_list, | |
136 output_dir=batch_output_dir)) | |
137 end = time.time(); | |
138 logger.info("finished in " + (str)(end - start)) | |
139 | |
140 logger.info("calling m1.collect()...") | |
141 start = time.time(); | |
142 | |
143 # collect results | |
144 theResult = m1.collect() | |
145 | |
146 end = time.time(); | |
147 | |
148 logger.info("finished in " + (str)(end - start)) | |
149 | |
150 local_SQL_start += array_step_size | |
151 local_SQL_limit += min(SQL_limit, array_step_size) | |
152 | |
153 print "finished all in " + (str)(end - start_complete) | |
154 logger.info("finished all in " + (str)(end - start_complete)) | |
155 | |
156 if __name__ == "__main__": | |
157 main() | |
158 |