Mercurial > hg > dml-open-backendtools
view pyspark/test_timeside_vamp_spark_charm.py @ 0:e34cf1b6fe09 tip
commit
author | Daniel Wolff |
---|---|
date | Sat, 20 Feb 2016 18:14:24 +0100 |
parents | |
children |
line wrap: on
line source
# Part of DML (Digital Music Laboratory) # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit # -*- coding: utf-8 -*- __author__="wolffd" __date__ ="$11-Jul-2014 15:31:01$" # How to run this? # to start hdfs: /usr/local/hadoop/sbin/start-dfs.sh # Running python applications through ./bin/pyspark is deprecated as of Spark 1.0. # Use ./bin/spark-submit # spark-submit test_timeside_vamp_spark_charm.py --py-files vamp_plugin_dml.py,timeside_vamp.py,decode_to_wav.py #import pydoop.hdfs as hdfs from pyspark import SparkConf, SparkContext # @todo: timeside has to be packed for multi-pc usage import os.path import os import sys from os import walk # NOTE: this is only for debugging purposes, we can # now use a regular timeside installation, e.g. installed by sys.path.append(os.getcwd() + '/../TimeSide/') # mappers from timeside_vamp import * from decode_to_wav import * def main(): print "PySpark Telemeta and Vamp Test on CHARM" # configure the Spark Setup conf = (SparkConf() .setMaster("spark://0.0.0.0:7077") #.setMaster("local") .setAppName("CharmVamp") .set("spark.executor.memory", "1g")) sc = SparkContext(conf = conf) # SMB Share # mount.cifs //10.2.165.194/mirg /home/wolffd/wansteadshare -o username=dml,password=xxx,domain=ENTERPRISE") # uses local paths # get list of obkects to process mypath = '/samples/' data = [] for (dirpath, dirnames, filenames) in walk(mypath): for file in filenames: if file.endswith(".wav") or file.endswith(".flac"): data.append(os.path.join(dirpath, file)) data = data[0:2] # HDFS # note: for HDFS we need wrappers for VAMP and gstreamer :/ # copy to hdfs (put in different file before) #hdfs.mkdir("test") #hdfs.chmod("test","o+rw") ##this copies the test wavs to hdfs #hdfs.put("samples/","test/") # get hdfs paths # data = [] # filenames = hdfs.ls("hdfs://0.0.0.0:9000/user/hduser/test/samples") # print filenames # for file in filenames: # if file[-4:]== ".wav" or file[-4:]==".flac": # data.append(file) # # define distributed dataset # todo: can we do this with the wav data itself? distData = sc.parallelize(data) # define map that decodes to wav m0 = distData.map(lambda x: decode_to_wav(source=x)) # define map that applies the vamp plugin m1 = m0.map(lambda x: transform(wav_file=x)).collect() print m1 return m1 #process 2 #m1.take(2) if __name__ == "__main__": main()