Mercurial > hg > dml-open-backendtools
diff pyspark/test_timeside_vamp_spark_charm.py @ 0:e34cf1b6fe09 tip
commit
author | Daniel Wolff |
---|---|
date | Sat, 20 Feb 2016 18:14:24 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pyspark/test_timeside_vamp_spark_charm.py Sat Feb 20 18:14:24 2016 +0100 @@ -0,0 +1,101 @@ +# Part of DML (Digital Music Laboratory) +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit +# -*- coding: utf-8 -*- +__author__="wolffd" +__date__ ="$11-Jul-2014 15:31:01$" + +# How to run this? + +# to start hdfs: /usr/local/hadoop/sbin/start-dfs.sh + +# Running python applications through ./bin/pyspark is deprecated as of Spark 1.0. +# Use ./bin/spark-submit +# spark-submit test_timeside_vamp_spark_charm.py --py-files vamp_plugin_dml.py,timeside_vamp.py,decode_to_wav.py + +#import pydoop.hdfs as hdfs +from pyspark import SparkConf, SparkContext +# @todo: timeside has to be packed for multi-pc usage +import os.path +import os +import sys +from os import walk +# NOTE: this is only for debugging purposes, we can +# now use a regular timeside installation, e.g. installed by +sys.path.append(os.getcwd() + '/../TimeSide/') + +# mappers +from timeside_vamp import * +from decode_to_wav import * + +def main(): + print "PySpark Telemeta and Vamp Test on CHARM" + + # configure the Spark Setup + conf = (SparkConf() + .setMaster("spark://0.0.0.0:7077") + #.setMaster("local") + .setAppName("CharmVamp") + .set("spark.executor.memory", "1g")) + sc = SparkContext(conf = conf) + + # SMB Share + # mount.cifs //10.2.165.194/mirg /home/wolffd/wansteadshare -o username=dml,password=xxx,domain=ENTERPRISE") + + + # uses local paths + # get list of obkects to process + mypath = '/samples/' + data = [] + for (dirpath, dirnames, filenames) in walk(mypath): + for file in filenames: + if file.endswith(".wav") or file.endswith(".flac"): + data.append(os.path.join(dirpath, file)) + + data = data[0:2] + # HDFS + # note: for HDFS we need wrappers for VAMP and gstreamer :/ + # copy to hdfs (put in different file before) + #hdfs.mkdir("test") + #hdfs.chmod("test","o+rw") + ##this copies the test wavs to hdfs + #hdfs.put("samples/","test/") + # get hdfs paths +# data = [] +# filenames = hdfs.ls("hdfs://0.0.0.0:9000/user/hduser/test/samples") +# print filenames +# for file in filenames: +# if file[-4:]== ".wav" or file[-4:]==".flac": +# data.append(file) +# + # define distributed dataset + # todo: can we do this with the wav data itself? + distData = sc.parallelize(data) + + # define map that decodes to wav + m0 = distData.map(lambda x: decode_to_wav(source=x)) + + # define map that applies the vamp plugin + m1 = m0.map(lambda x: transform(wav_file=x)).collect() + print m1 + return m1 + #process 2 + #m1.take(2) + +if __name__ == "__main__": + main() +