diff pyspark/vamp_plugin_dml.py @ 0:e34cf1b6fe09 tip

commit
author Daniel Wolff
date Sat, 20 Feb 2016 18:14:24 +0100
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pyspark/vamp_plugin_dml.py	Sat Feb 20 18:14:24 2016 +0100
@@ -0,0 +1,189 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2013 Paul Brossier <piem@piem.org>
+
+# This file is part of TimeSide.
+
+# TimeSide is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 2 of the License, or
+# (at your option) any later version.
+
+# TimeSide is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+
+# You should have received a copy of the GNU General Public License
+# along with TimeSide.  If not, see <http://www.gnu.org/licenses/>.
+
+# Author: Paul Brossier <piem@piem.org>
+
+from timeside.core import implements, interfacedoc
+from timeside.analyzer.core import Analyzer
+from timeside.api import IAnalyzer
+import re
+import subprocess
+import numpy as np
+
+
+def simple_host_process(argslist):
+    """Call vamp-simple-host"""
+
+    vamp_host = 'vamp-simple-host'
+    command = [vamp_host]
+    command.extend(argslist)
+    # try ?
+    stdout = subprocess.check_output(
+        command, stderr=subprocess.STDOUT).splitlines()
+
+    return stdout
+
+
+# Raise an exception if Vamp Host is missing
+from timeside.exceptions import VampImportError
+try:
+    simple_host_process(['-v'])
+    WITH_VAMP = True
+except OSError:
+    WITH_VAMP = False
+    raise VampImportError
+
+
+class VampSimpleHostDML(Analyzer):
+
+    """Vamp plugins library interface analyzer"""
+
+    implements(IAnalyzer)
+
+    def __init__(self, plugin_list=None):
+        super(VampSimpleHostDML, self).__init__()
+        if plugin_list is None:
+            plugin_list = self.get_plugins_list()
+            #plugin_list = [['vamp-example-plugins', 'percussiononsets', 'detectionfunction']]
+
+        self.plugin_list = plugin_list
+
+    @interfacedoc
+    def setup(self, channels=None, samplerate=None,
+              blocksize=None, totalframes=None):
+        super(VampSimpleHostDML, self).setup(
+            channels, samplerate, blocksize, totalframes)
+
+    @staticmethod
+    @interfacedoc
+    def id():
+        return "vamp_simple_host_dml"
+
+    @staticmethod
+    @interfacedoc
+    def name():
+        return "Vamp Plugins Host for DML"
+
+    @staticmethod
+    @interfacedoc
+    def unit():
+        return ""
+
+    def process(self, frames, eod=False):
+        pass
+        return frames, eod
+
+    def post_process(self):
+        #plugin = 'vamp-example-plugins:amplitudefollower:amplitude'
+
+        wavfile = self.mediainfo()['uri'].split('file://')[-1]
+
+        for plugin_line in self.plugin_list:
+
+            plugin = ':'.join(plugin_line)
+            (time, duration, value) = self.vamp_plugin(plugin, wavfile)
+            if value is None:
+                return
+
+            if duration is not None:
+                plugin_res = self.new_result(
+                    data_mode='value', time_mode='segment')
+                plugin_res.data_object.duration = duration
+            else:
+                plugin_res = self.new_result(
+                    data_mode='value', time_mode='event')
+
+            plugin_res.data_object.time = time
+            plugin_res.data_object.value = value
+
+# Fix strat, duration issues if audio is a segment
+#            if self.mediainfo()['is_segment']:
+#                start_index = np.floor(self.mediainfo()['start'] *
+#                                       self.result_samplerate /
+#                                       self.result_stepsize)
+#
+#                stop_index = np.ceil((self.mediainfo()['start'] +
+#                                      self.mediainfo()['duration']) *
+#                                     self.result_samplerate /
+#                                     self.result_stepsize)
+#
+#                fixed_start = (start_index * self.result_stepsize /
+#                               self.result_samplerate)
+#                fixed_duration = ((stop_index - start_index) * self.result_stepsize /
+#                                  self.result_samplerate)
+#
+#                plugin_res.audio_metadata.start = fixed_start
+#                plugin_res.audio_metadata.duration = fixed_duration
+#
+#                value = value[start_index:stop_index + 1]
+            plugin_res.id_metadata.id += '.' + '.'.join(plugin_line[1:])
+            plugin_res.id_metadata.name += ' ' + \
+                ' '.join(plugin_line[1:])
+
+            self.process_pipe.results.add(plugin_res)
+
+    @staticmethod
+    def vamp_plugin(plugin, wavfile):
+
+        args = [plugin, wavfile]
+
+        stdout = simple_host_process(args)  # run vamp-simple-host
+
+        stderr = stdout[0:8]  # stderr containing file and process information
+        res = stdout[8:]  # stdout containg the feature data
+
+        if len(res) == 0:
+            return ([], [], [])
+
+        # Parse stderr to get blocksize and stepsize
+        blocksize_info = stderr[4]
+
+        import re
+        # Match agianst pattern 'Using block size = %d, step size = %d'
+        m = re.match(
+            'Using block size = (\d+), step size = (\d+)', blocksize_info)
+
+        blocksize = int(m.groups()[0])
+        stepsize = int(m.groups()[1])
+        # Get the results
+
+        # how are types defined in this? what types can timeside use?
+        # value = np.asfarray([line.split(': ')[1] for line in res if (len(line.split(': ')) > 1)])
+        value = [line.split(': ')[1] for line in res if (len(line.split(': ')) > 1)]
+        value = [re.sub("[^0-9\.\s]","",x) for x in value]
+        value = np.asfarray([[x.split(' ')[0] for x in value], [x.split(' ')[1] for x in value]]) # only get the first thing in the string
+        time = np.asfarray([r.split(':')[0].split(',')[0] for r in res])
+
+        time_len = len(res[0].split(':')[0].split(','))
+        if time_len == 1:
+            # event
+            duration = None
+        elif time_len == 2:
+            # segment
+            duration = np.asfarray(
+                [r.split(':')[0].split(',')[1] for r in res])
+
+        return (time, duration, value)
+
+    @staticmethod
+    def get_plugins_list():
+        arg = ['--list-outputs']
+        stdout = simple_host_process(arg)
+
+        return [line.split(':')[1:] for line in stdout]