comparison pyspark/ilm/spark_feat_extract.py @ 0:e34cf1b6fe09 tip

commit
author Daniel Wolff
date Sat, 20 Feb 2016 18:14:24 +0100
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:e34cf1b6fe09
1 # Part of DML (Digital Music Laboratory)
2 #
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public
14 # License along with this library; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
16
17 #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit
18 # -*- coding: utf-8 -*-
19 __author__="hargreavess"
20
21 from assetDB import assetDB
22 from pyspark import SparkConf, SparkContext
23 import ConfigParser
24 import logging
25 from transform import *
26 import os
27 import time
28 import shutil
29
30 def main():
31 start_complete = time.time();
32
33 # get config
34 config = ConfigParser.ConfigParser()
35 config.read('server.cfg')
36
37 #vamp_transform = [config.get('Sonic Annotator', 'vamp-transform')]
38 vamp_transform_list = config.get('Sonic Annotator', 'vamp-transform-list')
39 genre_id = config.getint('Queries', 'genre-id')
40
41 output_dir = config.get('Sonic Annotator', 'output-dir')
42 ltime = time.localtime()
43 output_dir = output_dir + '_' + str(ltime.tm_mday) + '_' + str(ltime.tm_mon) + '_' + str(ltime.tm_year)
44 output_dir = output_dir + '_' + str(ltime.tm_hour) + str(ltime.tm_min) + '_' + str(ltime.tm_sec)
45 output_dir = output_dir + '_genre_id_' + str(genre_id)
46 # create output directory, if it doesn't exist
47 if not os.access(output_dir, os.F_OK):
48 os.makedirs(output_dir)
49
50 # copy vamp_transform_list file to output directory
51 shutil.copy(vamp_transform_list, output_dir)
52
53 # create logger
54 #logger = logging.getLogger('spark_feat_extract')
55 logger = logging.getLogger('spark_feat_extract')
56 logger.setLevel(logging.DEBUG)
57
58 # create file handler and set level to debug
59 fh = logging.FileHandler(output_dir + "/ilm.assets.spark.features.log")
60 fh.setLevel(logging.DEBUG)
61
62 # create formatter
63 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
64
65 # add formatter to fh
66 fh.setFormatter(formatter)
67
68 # add fh to logger
69 logger.addHandler(fh)
70
71 logger.info('starting new spark_feat_extract job')
72 logger.info("using vamp transform list: " + vamp_transform_list)
73 logger.info('audio-file-size-limit: ' + config.get('Audio Files', 'audio-file-size-limit'))
74 logger.info("audio-prefix: " + config.get('Audio Files', 'audio-prefix'))
75 logger.info('num-cores: ' + config.get('Spark', 'num-cores'))
76 logger.info("spark memory: " + config.get('Spark', 'memory'))
77 logger.info("genre_id: " + str(genre_id))
78
79 # create a spark context
80 conf = (SparkConf()
81 .setMaster("local[" + config.get('Spark', 'num-cores') + "]")
82 .setAppName("spark feature extractor")
83 .set("spark.executor.memory", "" + config.get('Spark', 'memory') + ""))
84 sc = SparkContext(conf = conf)
85
86 SQL_start = config.getint('Queries', 'sql-start')
87 SQL_limit = config.getint('Queries', 'sql-limit')
88 local_SQL_start = SQL_start
89 logger.info('SQL_start = %i', SQL_start)
90 logger.info('SQL_limit = %i', SQL_limit)
91
92 array_step_size = config.getint('Application', 'array-step-size')
93 logger.info('array-step-size = %i', array_step_size)
94 local_SQL_limit = min(SQL_limit, array_step_size)
95
96 while local_SQL_limit <= SQL_limit:
97
98 # query db for assets (song tracks)
99 db = assetDB(prefix=config.get('Audio Files', 'audio-prefix'),config=config)
100 db.connect()
101
102 data = []
103 logger.info('local_start = %i', local_SQL_start)
104 logger.info('local_SQL_limit = %i', local_SQL_limit)
105
106 for path, asset in db.get_assets_by_genre(genre_id, local_SQL_start, local_SQL_limit):
107 if path == None:
108 logger.warning("Asset not found for: %s. (Album ID: %i Track No: %i)",asset.song_title,asset.album_id,asset.track_no)
109 else:
110 data.append(path)
111
112 db.close
113
114 # If the db query returned no results, stop here
115 if len(data) == 0:
116 break
117
118 batch_output_dir = output_dir + '/batch' + str(local_SQL_start) + '-' + str(local_SQL_limit)
119 os.makedirs(batch_output_dir)
120 logger.info('created results directory ' + batch_output_dir)
121
122 logger.info("calling sc.parallelize(data)...")
123 start = time.time();
124
125 # define distributed dataset
126 distData = sc.parallelize(data)
127 end = time.time();
128 logger.info("finished in " + (str)(end - start))
129
130 logger.info("calling distData.map...")
131 start = time.time();
132
133 # define map
134 m1 = distData.map(lambda x: transform(audio_file=x,
135 vamp_transform_list=vamp_transform_list,
136 output_dir=batch_output_dir))
137 end = time.time();
138 logger.info("finished in " + (str)(end - start))
139
140 logger.info("calling m1.collect()...")
141 start = time.time();
142
143 # collect results
144 theResult = m1.collect()
145
146 end = time.time();
147
148 logger.info("finished in " + (str)(end - start))
149
150 local_SQL_start += array_step_size
151 local_SQL_limit += min(SQL_limit, array_step_size)
152
153 print "finished all in " + (str)(end - start_complete)
154 logger.info("finished all in " + (str)(end - start_complete))
155
156 if __name__ == "__main__":
157 main()
158