Daniel@0
|
1 # Part of DML (Digital Music Laboratory)
|
Daniel@0
|
2 #
|
Daniel@0
|
3 # This program is free software; you can redistribute it and/or
|
Daniel@0
|
4 # modify it under the terms of the GNU General Public License
|
Daniel@0
|
5 # as published by the Free Software Foundation; either version 2
|
Daniel@0
|
6 # of the License, or (at your option) any later version.
|
Daniel@0
|
7 #
|
Daniel@0
|
8 # This program is distributed in the hope that it will be useful,
|
Daniel@0
|
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
|
Daniel@0
|
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
Daniel@0
|
11 # GNU General Public License for more details.
|
Daniel@0
|
12 #
|
Daniel@0
|
13 # You should have received a copy of the GNU General Public
|
Daniel@0
|
14 # License along with this library; if not, write to the Free Software
|
Daniel@0
|
15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
Daniel@0
|
16
|
Daniel@0
|
17 #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit
|
Daniel@0
|
18 # -*- coding: utf-8 -*-
|
Daniel@0
|
19 __author__="hargreavess"
|
Daniel@0
|
20
|
Daniel@0
|
21 from assetDB import assetDB
|
Daniel@0
|
22 from pyspark import SparkConf, SparkContext
|
Daniel@0
|
23 import ConfigParser
|
Daniel@0
|
24 import logging
|
Daniel@0
|
25 from transform import *
|
Daniel@0
|
26 import os
|
Daniel@0
|
27 import time
|
Daniel@0
|
28 import shutil
|
Daniel@0
|
29
|
Daniel@0
|
30 def main():
|
Daniel@0
|
31 start_complete = time.time();
|
Daniel@0
|
32
|
Daniel@0
|
33 # get config
|
Daniel@0
|
34 config = ConfigParser.ConfigParser()
|
Daniel@0
|
35 config.read('server.cfg')
|
Daniel@0
|
36
|
Daniel@0
|
37 #vamp_transform = [config.get('Sonic Annotator', 'vamp-transform')]
|
Daniel@0
|
38 vamp_transform_list = config.get('Sonic Annotator', 'vamp-transform-list')
|
Daniel@0
|
39 genre_id = config.getint('Queries', 'genre-id')
|
Daniel@0
|
40
|
Daniel@0
|
41 output_dir = config.get('Sonic Annotator', 'output-dir')
|
Daniel@0
|
42 ltime = time.localtime()
|
Daniel@0
|
43 output_dir = output_dir + '_' + str(ltime.tm_mday) + '_' + str(ltime.tm_mon) + '_' + str(ltime.tm_year)
|
Daniel@0
|
44 output_dir = output_dir + '_' + str(ltime.tm_hour) + str(ltime.tm_min) + '_' + str(ltime.tm_sec)
|
Daniel@0
|
45 output_dir = output_dir + '_genre_id_' + str(genre_id)
|
Daniel@0
|
46 # create output directory, if it doesn't exist
|
Daniel@0
|
47 if not os.access(output_dir, os.F_OK):
|
Daniel@0
|
48 os.makedirs(output_dir)
|
Daniel@0
|
49
|
Daniel@0
|
50 # copy vamp_transform_list file to output directory
|
Daniel@0
|
51 shutil.copy(vamp_transform_list, output_dir)
|
Daniel@0
|
52
|
Daniel@0
|
53 # create logger
|
Daniel@0
|
54 #logger = logging.getLogger('spark_feat_extract')
|
Daniel@0
|
55 logger = logging.getLogger('spark_feat_extract')
|
Daniel@0
|
56 logger.setLevel(logging.DEBUG)
|
Daniel@0
|
57
|
Daniel@0
|
58 # create file handler and set level to debug
|
Daniel@0
|
59 fh = logging.FileHandler(output_dir + "/ilm.assets.spark.features.log")
|
Daniel@0
|
60 fh.setLevel(logging.DEBUG)
|
Daniel@0
|
61
|
Daniel@0
|
62 # create formatter
|
Daniel@0
|
63 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
Daniel@0
|
64
|
Daniel@0
|
65 # add formatter to fh
|
Daniel@0
|
66 fh.setFormatter(formatter)
|
Daniel@0
|
67
|
Daniel@0
|
68 # add fh to logger
|
Daniel@0
|
69 logger.addHandler(fh)
|
Daniel@0
|
70
|
Daniel@0
|
71 logger.info('starting new spark_feat_extract job')
|
Daniel@0
|
72 logger.info("using vamp transform list: " + vamp_transform_list)
|
Daniel@0
|
73 logger.info('audio-file-size-limit: ' + config.get('Audio Files', 'audio-file-size-limit'))
|
Daniel@0
|
74 logger.info("audio-prefix: " + config.get('Audio Files', 'audio-prefix'))
|
Daniel@0
|
75 logger.info('num-cores: ' + config.get('Spark', 'num-cores'))
|
Daniel@0
|
76 logger.info("spark memory: " + config.get('Spark', 'memory'))
|
Daniel@0
|
77 logger.info("genre_id: " + str(genre_id))
|
Daniel@0
|
78
|
Daniel@0
|
79 # create a spark context
|
Daniel@0
|
80 conf = (SparkConf()
|
Daniel@0
|
81 .setMaster("local[" + config.get('Spark', 'num-cores') + "]")
|
Daniel@0
|
82 .setAppName("spark feature extractor")
|
Daniel@0
|
83 .set("spark.executor.memory", "" + config.get('Spark', 'memory') + ""))
|
Daniel@0
|
84 sc = SparkContext(conf = conf)
|
Daniel@0
|
85
|
Daniel@0
|
86 SQL_start = config.getint('Queries', 'sql-start')
|
Daniel@0
|
87 SQL_limit = config.getint('Queries', 'sql-limit')
|
Daniel@0
|
88 local_SQL_start = SQL_start
|
Daniel@0
|
89 logger.info('SQL_start = %i', SQL_start)
|
Daniel@0
|
90 logger.info('SQL_limit = %i', SQL_limit)
|
Daniel@0
|
91
|
Daniel@0
|
92 array_step_size = config.getint('Application', 'array-step-size')
|
Daniel@0
|
93 logger.info('array-step-size = %i', array_step_size)
|
Daniel@0
|
94 local_SQL_limit = min(SQL_limit, array_step_size)
|
Daniel@0
|
95
|
Daniel@0
|
96 while local_SQL_limit <= SQL_limit:
|
Daniel@0
|
97
|
Daniel@0
|
98 # query db for assets (song tracks)
|
Daniel@0
|
99 db = assetDB(prefix=config.get('Audio Files', 'audio-prefix'),config=config)
|
Daniel@0
|
100 db.connect()
|
Daniel@0
|
101
|
Daniel@0
|
102 data = []
|
Daniel@0
|
103 logger.info('local_start = %i', local_SQL_start)
|
Daniel@0
|
104 logger.info('local_SQL_limit = %i', local_SQL_limit)
|
Daniel@0
|
105
|
Daniel@0
|
106 for path, asset in db.get_assets_by_genre(genre_id, local_SQL_start, local_SQL_limit):
|
Daniel@0
|
107 if path == None:
|
Daniel@0
|
108 logger.warning("Asset not found for: %s. (Album ID: %i Track No: %i)",asset.song_title,asset.album_id,asset.track_no)
|
Daniel@0
|
109 else:
|
Daniel@0
|
110 data.append(path)
|
Daniel@0
|
111
|
Daniel@0
|
112 db.close
|
Daniel@0
|
113
|
Daniel@0
|
114 # If the db query returned no results, stop here
|
Daniel@0
|
115 if len(data) == 0:
|
Daniel@0
|
116 break
|
Daniel@0
|
117
|
Daniel@0
|
118 batch_output_dir = output_dir + '/batch' + str(local_SQL_start) + '-' + str(local_SQL_limit)
|
Daniel@0
|
119 os.makedirs(batch_output_dir)
|
Daniel@0
|
120 logger.info('created results directory ' + batch_output_dir)
|
Daniel@0
|
121
|
Daniel@0
|
122 logger.info("calling sc.parallelize(data)...")
|
Daniel@0
|
123 start = time.time();
|
Daniel@0
|
124
|
Daniel@0
|
125 # define distributed dataset
|
Daniel@0
|
126 distData = sc.parallelize(data)
|
Daniel@0
|
127 end = time.time();
|
Daniel@0
|
128 logger.info("finished in " + (str)(end - start))
|
Daniel@0
|
129
|
Daniel@0
|
130 logger.info("calling distData.map...")
|
Daniel@0
|
131 start = time.time();
|
Daniel@0
|
132
|
Daniel@0
|
133 # define map
|
Daniel@0
|
134 m1 = distData.map(lambda x: transform(audio_file=x,
|
Daniel@0
|
135 vamp_transform_list=vamp_transform_list,
|
Daniel@0
|
136 output_dir=batch_output_dir))
|
Daniel@0
|
137 end = time.time();
|
Daniel@0
|
138 logger.info("finished in " + (str)(end - start))
|
Daniel@0
|
139
|
Daniel@0
|
140 logger.info("calling m1.collect()...")
|
Daniel@0
|
141 start = time.time();
|
Daniel@0
|
142
|
Daniel@0
|
143 # collect results
|
Daniel@0
|
144 theResult = m1.collect()
|
Daniel@0
|
145
|
Daniel@0
|
146 end = time.time();
|
Daniel@0
|
147
|
Daniel@0
|
148 logger.info("finished in " + (str)(end - start))
|
Daniel@0
|
149
|
Daniel@0
|
150 local_SQL_start += array_step_size
|
Daniel@0
|
151 local_SQL_limit += min(SQL_limit, array_step_size)
|
Daniel@0
|
152
|
Daniel@0
|
153 print "finished all in " + (str)(end - start_complete)
|
Daniel@0
|
154 logger.info("finished all in " + (str)(end - start_complete))
|
Daniel@0
|
155
|
Daniel@0
|
156 if __name__ == "__main__":
|
Daniel@0
|
157 main()
|
Daniel@0
|
158
|