annotate pyspark/ilm/spark_feat_extract.py @ 0:e34cf1b6fe09 tip

commit
author Daniel Wolff
date Sat, 20 Feb 2016 18:14:24 +0100
parents
children
rev   line source
Daniel@0 1 # Part of DML (Digital Music Laboratory)
Daniel@0 2 #
Daniel@0 3 # This program is free software; you can redistribute it and/or
Daniel@0 4 # modify it under the terms of the GNU General Public License
Daniel@0 5 # as published by the Free Software Foundation; either version 2
Daniel@0 6 # of the License, or (at your option) any later version.
Daniel@0 7 #
Daniel@0 8 # This program is distributed in the hope that it will be useful,
Daniel@0 9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
Daniel@0 10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Daniel@0 11 # GNU General Public License for more details.
Daniel@0 12 #
Daniel@0 13 # You should have received a copy of the GNU General Public
Daniel@0 14 # License along with this library; if not, write to the Free Software
Daniel@0 15 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
Daniel@0 16
Daniel@0 17 #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit
Daniel@0 18 # -*- coding: utf-8 -*-
Daniel@0 19 __author__="hargreavess"
Daniel@0 20
Daniel@0 21 from assetDB import assetDB
Daniel@0 22 from pyspark import SparkConf, SparkContext
Daniel@0 23 import ConfigParser
Daniel@0 24 import logging
Daniel@0 25 from transform import *
Daniel@0 26 import os
Daniel@0 27 import time
Daniel@0 28 import shutil
Daniel@0 29
Daniel@0 30 def main():
Daniel@0 31 start_complete = time.time();
Daniel@0 32
Daniel@0 33 # get config
Daniel@0 34 config = ConfigParser.ConfigParser()
Daniel@0 35 config.read('server.cfg')
Daniel@0 36
Daniel@0 37 #vamp_transform = [config.get('Sonic Annotator', 'vamp-transform')]
Daniel@0 38 vamp_transform_list = config.get('Sonic Annotator', 'vamp-transform-list')
Daniel@0 39 genre_id = config.getint('Queries', 'genre-id')
Daniel@0 40
Daniel@0 41 output_dir = config.get('Sonic Annotator', 'output-dir')
Daniel@0 42 ltime = time.localtime()
Daniel@0 43 output_dir = output_dir + '_' + str(ltime.tm_mday) + '_' + str(ltime.tm_mon) + '_' + str(ltime.tm_year)
Daniel@0 44 output_dir = output_dir + '_' + str(ltime.tm_hour) + str(ltime.tm_min) + '_' + str(ltime.tm_sec)
Daniel@0 45 output_dir = output_dir + '_genre_id_' + str(genre_id)
Daniel@0 46 # create output directory, if it doesn't exist
Daniel@0 47 if not os.access(output_dir, os.F_OK):
Daniel@0 48 os.makedirs(output_dir)
Daniel@0 49
Daniel@0 50 # copy vamp_transform_list file to output directory
Daniel@0 51 shutil.copy(vamp_transform_list, output_dir)
Daniel@0 52
Daniel@0 53 # create logger
Daniel@0 54 #logger = logging.getLogger('spark_feat_extract')
Daniel@0 55 logger = logging.getLogger('spark_feat_extract')
Daniel@0 56 logger.setLevel(logging.DEBUG)
Daniel@0 57
Daniel@0 58 # create file handler and set level to debug
Daniel@0 59 fh = logging.FileHandler(output_dir + "/ilm.assets.spark.features.log")
Daniel@0 60 fh.setLevel(logging.DEBUG)
Daniel@0 61
Daniel@0 62 # create formatter
Daniel@0 63 formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
Daniel@0 64
Daniel@0 65 # add formatter to fh
Daniel@0 66 fh.setFormatter(formatter)
Daniel@0 67
Daniel@0 68 # add fh to logger
Daniel@0 69 logger.addHandler(fh)
Daniel@0 70
Daniel@0 71 logger.info('starting new spark_feat_extract job')
Daniel@0 72 logger.info("using vamp transform list: " + vamp_transform_list)
Daniel@0 73 logger.info('audio-file-size-limit: ' + config.get('Audio Files', 'audio-file-size-limit'))
Daniel@0 74 logger.info("audio-prefix: " + config.get('Audio Files', 'audio-prefix'))
Daniel@0 75 logger.info('num-cores: ' + config.get('Spark', 'num-cores'))
Daniel@0 76 logger.info("spark memory: " + config.get('Spark', 'memory'))
Daniel@0 77 logger.info("genre_id: " + str(genre_id))
Daniel@0 78
Daniel@0 79 # create a spark context
Daniel@0 80 conf = (SparkConf()
Daniel@0 81 .setMaster("local[" + config.get('Spark', 'num-cores') + "]")
Daniel@0 82 .setAppName("spark feature extractor")
Daniel@0 83 .set("spark.executor.memory", "" + config.get('Spark', 'memory') + ""))
Daniel@0 84 sc = SparkContext(conf = conf)
Daniel@0 85
Daniel@0 86 SQL_start = config.getint('Queries', 'sql-start')
Daniel@0 87 SQL_limit = config.getint('Queries', 'sql-limit')
Daniel@0 88 local_SQL_start = SQL_start
Daniel@0 89 logger.info('SQL_start = %i', SQL_start)
Daniel@0 90 logger.info('SQL_limit = %i', SQL_limit)
Daniel@0 91
Daniel@0 92 array_step_size = config.getint('Application', 'array-step-size')
Daniel@0 93 logger.info('array-step-size = %i', array_step_size)
Daniel@0 94 local_SQL_limit = min(SQL_limit, array_step_size)
Daniel@0 95
Daniel@0 96 while local_SQL_limit <= SQL_limit:
Daniel@0 97
Daniel@0 98 # query db for assets (song tracks)
Daniel@0 99 db = assetDB(prefix=config.get('Audio Files', 'audio-prefix'),config=config)
Daniel@0 100 db.connect()
Daniel@0 101
Daniel@0 102 data = []
Daniel@0 103 logger.info('local_start = %i', local_SQL_start)
Daniel@0 104 logger.info('local_SQL_limit = %i', local_SQL_limit)
Daniel@0 105
Daniel@0 106 for path, asset in db.get_assets_by_genre(genre_id, local_SQL_start, local_SQL_limit):
Daniel@0 107 if path == None:
Daniel@0 108 logger.warning("Asset not found for: %s. (Album ID: %i Track No: %i)",asset.song_title,asset.album_id,asset.track_no)
Daniel@0 109 else:
Daniel@0 110 data.append(path)
Daniel@0 111
Daniel@0 112 db.close
Daniel@0 113
Daniel@0 114 # If the db query returned no results, stop here
Daniel@0 115 if len(data) == 0:
Daniel@0 116 break
Daniel@0 117
Daniel@0 118 batch_output_dir = output_dir + '/batch' + str(local_SQL_start) + '-' + str(local_SQL_limit)
Daniel@0 119 os.makedirs(batch_output_dir)
Daniel@0 120 logger.info('created results directory ' + batch_output_dir)
Daniel@0 121
Daniel@0 122 logger.info("calling sc.parallelize(data)...")
Daniel@0 123 start = time.time();
Daniel@0 124
Daniel@0 125 # define distributed dataset
Daniel@0 126 distData = sc.parallelize(data)
Daniel@0 127 end = time.time();
Daniel@0 128 logger.info("finished in " + (str)(end - start))
Daniel@0 129
Daniel@0 130 logger.info("calling distData.map...")
Daniel@0 131 start = time.time();
Daniel@0 132
Daniel@0 133 # define map
Daniel@0 134 m1 = distData.map(lambda x: transform(audio_file=x,
Daniel@0 135 vamp_transform_list=vamp_transform_list,
Daniel@0 136 output_dir=batch_output_dir))
Daniel@0 137 end = time.time();
Daniel@0 138 logger.info("finished in " + (str)(end - start))
Daniel@0 139
Daniel@0 140 logger.info("calling m1.collect()...")
Daniel@0 141 start = time.time();
Daniel@0 142
Daniel@0 143 # collect results
Daniel@0 144 theResult = m1.collect()
Daniel@0 145
Daniel@0 146 end = time.time();
Daniel@0 147
Daniel@0 148 logger.info("finished in " + (str)(end - start))
Daniel@0 149
Daniel@0 150 local_SQL_start += array_step_size
Daniel@0 151 local_SQL_limit += min(SQL_limit, array_step_size)
Daniel@0 152
Daniel@0 153 print "finished all in " + (str)(end - start_complete)
Daniel@0 154 logger.info("finished all in " + (str)(end - start_complete))
Daniel@0 155
Daniel@0 156 if __name__ == "__main__":
Daniel@0 157 main()
Daniel@0 158