Daniel@0: # Part of DML (Digital Music Laboratory) Daniel@0: # Daniel@0: # This program is free software; you can redistribute it and/or Daniel@0: # modify it under the terms of the GNU General Public License Daniel@0: # as published by the Free Software Foundation; either version 2 Daniel@0: # of the License, or (at your option) any later version. Daniel@0: # Daniel@0: # This program is distributed in the hope that it will be useful, Daniel@0: # but WITHOUT ANY WARRANTY; without even the implied warranty of Daniel@0: # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Daniel@0: # GNU General Public License for more details. Daniel@0: # Daniel@0: # You should have received a copy of the GNU General Public Daniel@0: # License along with this library; if not, write to the Free Software Daniel@0: # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA Daniel@0: Daniel@0: #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit Daniel@0: # -*- coding: utf-8 -*- Daniel@0: __author__="wolffd" Daniel@0: __date__ ="$11-Jul-2014 15:31:01$" Daniel@0: Daniel@0: from pyspark import SparkConf, SparkContext Daniel@0: import sys Daniel@0: import os Daniel@0: from sonic_annotator_vamp import * Daniel@0: Daniel@0: # this is the main routine to be submmitted as a spark job Daniel@0: # Daniel@0: # Daniel@0: # Running python applications through ./bin/pyspark is deprecated as of Spark 1.0. Daniel@0: # Use ./bin/spark-submit --py-files sonic_annotator_vamp.py Daniel@0: # you can also provide a zip of all necessary python files Daniel@0: # Daniel@0: # @param string audiopath root of the folder structure to be traversed Daniel@0: # @param string transform_file path to the .n3 turtle file describing the transform Daniel@0: #def main(audiopath = '/home/wolffd/Documents/python/dml/TimeSide/tests/samples/', Daniel@0: # transform_file = '/home/wolffd/Documents/python/dml/pyspark/sonic-annotator-notimeside/silvet_settings.n3', Daniel@0: # masterip = '10.2.165.101'): Daniel@0: def main(audiopath = '/CHARM-Collection', Daniel@0: transform_file = 'bbc_speechmusic.n3', Daniel@0: masterip = '0.0.0.0'): Daniel@0: print "PySpark Telemeta and Vamp Test" Daniel@0: Daniel@0: # configure spark, cave: local profile uses just 1 core Daniel@0: conf = (SparkConf() Daniel@0: #.setMaster("local") Daniel@0: .setMaster("spark://" + masterip + ":7077") Daniel@0: .setAppName("Sonic Annotating") Daniel@0: .set("spark.executor.memory", "40g") Daniel@0: .set("spark.cores.max", "35")); Daniel@0: sc = SparkContext(conf = conf) Daniel@0: Daniel@0: # here traverse the file structure Daniel@0: data = [] Daniel@0: for (dirpath, dirnames, filenames) in os.walk(audiopath): Daniel@0: for file in filenames: Daniel@0: if file.endswith(".wav") or file.endswith(".mp3") or file.endswith(".flac"): Daniel@0: data.append(os.path.join(dirpath, file)) Daniel@0: njobs = len(data) Daniel@0: donejobs = sc.accumulator(0) Daniel@0: print "Total: " + str(njobs) + " files" Daniel@0: Daniel@0: # define distributed dataset Daniel@0: distData = sc.parallelize(data) Daniel@0: Daniel@0: # define map Daniel@0: m1 = distData.map(lambda x: transform(wav_file=x,transform_file=transform_file)) Daniel@0: Daniel@0: # reduce (just do the maps ;) ) Daniel@0: result = m1.collect() Daniel@0: Daniel@0: if __name__ == "__main__": Daniel@0: if len(sys.argv) >= 3: Daniel@0: main(sys.argv[1],sys.argv[2]) Daniel@0: else: Daniel@0: main()