Daniel@0: # Part of DML (Digital Music Laboratory) Daniel@0: # Daniel@0: # This program is free software; you can redistribute it and/or Daniel@0: # modify it under the terms of the GNU General Public License Daniel@0: # as published by the Free Software Foundation; either version 2 Daniel@0: # of the License, or (at your option) any later version. Daniel@0: # Daniel@0: # This program is distributed in the hope that it will be useful, Daniel@0: # but WITHOUT ANY WARRANTY; without even the implied warranty of Daniel@0: # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Daniel@0: # GNU General Public License for more details. Daniel@0: # Daniel@0: # You should have received a copy of the GNU General Public Daniel@0: # License along with this library; if not, write to the Free Software Daniel@0: # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA Daniel@0: Daniel@0: #!/usr/local/spark-1.0.0-bin-hadoop2/bin/spark-submit Daniel@0: # -*- coding: utf-8 -*- Daniel@0: __author__="wolffd" Daniel@0: __date__ ="$11-Jul-2014 15:31:01$" Daniel@0: Daniel@0: # How to run this? Daniel@0: Daniel@0: # to start hdfs: /usr/local/hadoop/sbin/start-dfs.sh Daniel@0: Daniel@0: # Running python applications through ./bin/pyspark is deprecated as of Spark 1.0. Daniel@0: # Use ./bin/spark-submit Daniel@0: # spark-submit test_timeside_vamp_spark_charm.py --py-files vamp_plugin_dml.py,timeside_vamp.py,decode_to_wav.py Daniel@0: Daniel@0: #import pydoop.hdfs as hdfs Daniel@0: from pyspark import SparkConf, SparkContext Daniel@0: # @todo: timeside has to be packed for multi-pc usage Daniel@0: import os.path Daniel@0: import os Daniel@0: import sys Daniel@0: from os import walk Daniel@0: # NOTE: this is only for debugging purposes, we can Daniel@0: # now use a regular timeside installation, e.g. installed by Daniel@0: sys.path.append(os.getcwd() + '/../TimeSide/') Daniel@0: Daniel@0: # mappers Daniel@0: from timeside_vamp import * Daniel@0: from decode_to_wav import * Daniel@0: Daniel@0: def main(): Daniel@0: print "PySpark Telemeta and Vamp Test on CHARM" Daniel@0: Daniel@0: # configure the Spark Setup Daniel@0: conf = (SparkConf() Daniel@0: .setMaster("spark://0.0.0.0:7077") Daniel@0: #.setMaster("local") Daniel@0: .setAppName("CharmVamp") Daniel@0: .set("spark.executor.memory", "1g")) Daniel@0: sc = SparkContext(conf = conf) Daniel@0: Daniel@0: # SMB Share Daniel@0: # mount.cifs //10.2.165.194/mirg /home/wolffd/wansteadshare -o username=dml,password=xxx,domain=ENTERPRISE") Daniel@0: Daniel@0: Daniel@0: # uses local paths Daniel@0: # get list of obkects to process Daniel@0: mypath = '/samples/' Daniel@0: data = [] Daniel@0: for (dirpath, dirnames, filenames) in walk(mypath): Daniel@0: for file in filenames: Daniel@0: if file.endswith(".wav") or file.endswith(".flac"): Daniel@0: data.append(os.path.join(dirpath, file)) Daniel@0: Daniel@0: data = data[0:2] Daniel@0: # HDFS Daniel@0: # note: for HDFS we need wrappers for VAMP and gstreamer :/ Daniel@0: # copy to hdfs (put in different file before) Daniel@0: #hdfs.mkdir("test") Daniel@0: #hdfs.chmod("test","o+rw") Daniel@0: ##this copies the test wavs to hdfs Daniel@0: #hdfs.put("samples/","test/") Daniel@0: # get hdfs paths Daniel@0: # data = [] Daniel@0: # filenames = hdfs.ls("hdfs://0.0.0.0:9000/user/hduser/test/samples") Daniel@0: # print filenames Daniel@0: # for file in filenames: Daniel@0: # if file[-4:]== ".wav" or file[-4:]==".flac": Daniel@0: # data.append(file) Daniel@0: # Daniel@0: # define distributed dataset Daniel@0: # todo: can we do this with the wav data itself? Daniel@0: distData = sc.parallelize(data) Daniel@0: Daniel@0: # define map that decodes to wav Daniel@0: m0 = distData.map(lambda x: decode_to_wav(source=x)) Daniel@0: Daniel@0: # define map that applies the vamp plugin Daniel@0: m1 = m0.map(lambda x: transform(wav_file=x)).collect() Daniel@0: print m1 Daniel@0: return m1 Daniel@0: #process 2 Daniel@0: #m1.take(2) Daniel@0: Daniel@0: if __name__ == "__main__": Daniel@0: main() Daniel@0: