Daniel@0: # Part of DML (Digital Music Laboratory) Daniel@0: # Daniel@0: # This program is free software; you can redistribute it and/or Daniel@0: # modify it under the terms of the GNU General Public License Daniel@0: # as published by the Free Software Foundation; either version 2 Daniel@0: # of the License, or (at your option) any later version. Daniel@0: # Daniel@0: # This program is distributed in the hope that it will be useful, Daniel@0: # but WITHOUT ANY WARRANTY; without even the implied warranty of Daniel@0: # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Daniel@0: # GNU General Public License for more details. Daniel@0: # Daniel@0: # You should have received a copy of the GNU General Public Daniel@0: # License along with this library; if not, write to the Free Software Daniel@0: # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA Daniel@0: Daniel@0: #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit Daniel@0: # -*- coding: utf-8 -*- Daniel@0: __author__="hargreavess" Daniel@0: Daniel@0: from assetDB import assetDB Daniel@0: from pyspark import SparkConf, SparkContext Daniel@0: import ConfigParser Daniel@0: import logging Daniel@0: from transform import * Daniel@0: import os Daniel@0: import time Daniel@0: import shutil Daniel@0: Daniel@0: def main(): Daniel@0: start_complete = time.time(); Daniel@0: Daniel@0: # get config Daniel@0: config = ConfigParser.ConfigParser() Daniel@0: config.read('server.cfg') Daniel@0: Daniel@0: #vamp_transform = [config.get('Sonic Annotator', 'vamp-transform')] Daniel@0: vamp_transform_list = config.get('Sonic Annotator', 'vamp-transform-list') Daniel@0: genre_id = config.getint('Queries', 'genre-id') Daniel@0: Daniel@0: output_dir = config.get('Sonic Annotator', 'output-dir') Daniel@0: ltime = time.localtime() Daniel@0: output_dir = output_dir + '_' + str(ltime.tm_mday) + '_' + str(ltime.tm_mon) + '_' + str(ltime.tm_year) Daniel@0: output_dir = output_dir + '_' + str(ltime.tm_hour) + str(ltime.tm_min) + '_' + str(ltime.tm_sec) Daniel@0: output_dir = output_dir + '_genre_id_' + str(genre_id) Daniel@0: # create output directory, if it doesn't exist Daniel@0: if not os.access(output_dir, os.F_OK): Daniel@0: os.makedirs(output_dir) Daniel@0: Daniel@0: # copy vamp_transform_list file to output directory Daniel@0: shutil.copy(vamp_transform_list, output_dir) Daniel@0: Daniel@0: # create logger Daniel@0: #logger = logging.getLogger('spark_feat_extract') Daniel@0: logger = logging.getLogger('spark_feat_extract') Daniel@0: logger.setLevel(logging.DEBUG) Daniel@0: Daniel@0: # create file handler and set level to debug Daniel@0: fh = logging.FileHandler(output_dir + "/ilm.assets.spark.features.log") Daniel@0: fh.setLevel(logging.DEBUG) Daniel@0: Daniel@0: # create formatter Daniel@0: formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') Daniel@0: Daniel@0: # add formatter to fh Daniel@0: fh.setFormatter(formatter) Daniel@0: Daniel@0: # add fh to logger Daniel@0: logger.addHandler(fh) Daniel@0: Daniel@0: logger.info('starting new spark_feat_extract job') Daniel@0: logger.info("using vamp transform list: " + vamp_transform_list) Daniel@0: logger.info('audio-file-size-limit: ' + config.get('Audio Files', 'audio-file-size-limit')) Daniel@0: logger.info("audio-prefix: " + config.get('Audio Files', 'audio-prefix')) Daniel@0: logger.info('num-cores: ' + config.get('Spark', 'num-cores')) Daniel@0: logger.info("spark memory: " + config.get('Spark', 'memory')) Daniel@0: logger.info("genre_id: " + str(genre_id)) Daniel@0: Daniel@0: # create a spark context Daniel@0: conf = (SparkConf() Daniel@0: .setMaster("local[" + config.get('Spark', 'num-cores') + "]") Daniel@0: .setAppName("spark feature extractor") Daniel@0: .set("spark.executor.memory", "" + config.get('Spark', 'memory') + "")) Daniel@0: sc = SparkContext(conf = conf) Daniel@0: Daniel@0: SQL_start = config.getint('Queries', 'sql-start') Daniel@0: SQL_limit = config.getint('Queries', 'sql-limit') Daniel@0: local_SQL_start = SQL_start Daniel@0: logger.info('SQL_start = %i', SQL_start) Daniel@0: logger.info('SQL_limit = %i', SQL_limit) Daniel@0: Daniel@0: array_step_size = config.getint('Application', 'array-step-size') Daniel@0: logger.info('array-step-size = %i', array_step_size) Daniel@0: local_SQL_limit = min(SQL_limit, array_step_size) Daniel@0: Daniel@0: while local_SQL_limit <= SQL_limit: Daniel@0: Daniel@0: # query db for assets (song tracks) Daniel@0: db = assetDB(prefix=config.get('Audio Files', 'audio-prefix'),config=config) Daniel@0: db.connect() Daniel@0: Daniel@0: data = [] Daniel@0: logger.info('local_start = %i', local_SQL_start) Daniel@0: logger.info('local_SQL_limit = %i', local_SQL_limit) Daniel@0: Daniel@0: for path, asset in db.get_assets_by_genre(genre_id, local_SQL_start, local_SQL_limit): Daniel@0: if path == None: Daniel@0: logger.warning("Asset not found for: %s. (Album ID: %i Track No: %i)",asset.song_title,asset.album_id,asset.track_no) Daniel@0: else: Daniel@0: data.append(path) Daniel@0: Daniel@0: db.close Daniel@0: Daniel@0: # If the db query returned no results, stop here Daniel@0: if len(data) == 0: Daniel@0: break Daniel@0: Daniel@0: batch_output_dir = output_dir + '/batch' + str(local_SQL_start) + '-' + str(local_SQL_limit) Daniel@0: os.makedirs(batch_output_dir) Daniel@0: logger.info('created results directory ' + batch_output_dir) Daniel@0: Daniel@0: logger.info("calling sc.parallelize(data)...") Daniel@0: start = time.time(); Daniel@0: Daniel@0: # define distributed dataset Daniel@0: distData = sc.parallelize(data) Daniel@0: end = time.time(); Daniel@0: logger.info("finished in " + (str)(end - start)) Daniel@0: Daniel@0: logger.info("calling distData.map...") Daniel@0: start = time.time(); Daniel@0: Daniel@0: # define map Daniel@0: m1 = distData.map(lambda x: transform(audio_file=x, Daniel@0: vamp_transform_list=vamp_transform_list, Daniel@0: output_dir=batch_output_dir)) Daniel@0: end = time.time(); Daniel@0: logger.info("finished in " + (str)(end - start)) Daniel@0: Daniel@0: logger.info("calling m1.collect()...") Daniel@0: start = time.time(); Daniel@0: Daniel@0: # collect results Daniel@0: theResult = m1.collect() Daniel@0: Daniel@0: end = time.time(); Daniel@0: Daniel@0: logger.info("finished in " + (str)(end - start)) Daniel@0: Daniel@0: local_SQL_start += array_step_size Daniel@0: local_SQL_limit += min(SQL_limit, array_step_size) Daniel@0: Daniel@0: print "finished all in " + (str)(end - start_complete) Daniel@0: logger.info("finished all in " + (str)(end - start_complete)) Daniel@0: Daniel@0: if __name__ == "__main__": Daniel@0: main() Daniel@0: