changeset 315:5f1859160fd7

Refactor signal processing code - Use audioop for channel averaging and energy computation
author Amine Sehili <amine.sehili@gmail.com>
date Wed, 16 Oct 2019 21:58:54 +0100
parents 12a030453422
children b6c5125be036
files auditok/core.py auditok/signal.py auditok/signal_numpy.py auditok/util.py tests/test_core.py tests/test_signal.py
diffstat 6 files changed, 88 insertions(+), 72 deletions(-) [+]
line wrap: on
line diff
--- a/auditok/core.py	Mon Oct 14 20:25:12 2019 +0100
+++ b/auditok/core.py	Wed Oct 16 21:58:54 2019 +0100
@@ -685,13 +685,7 @@
     @property
     def samples(self):
         if self._samples is None:
-            fmt = signal.FORMAT[self.sample_width]
-            if self.channels == 1:
-                self._samples = signal.to_array(self._data, fmt)
-            else:
-                self._samples = signal.separate_channels(
-                    self._data, fmt, self.channels
-                )
+            self._samples = signal.to_array(self._data, self.sample_width, self.channels)  
         return self._samples
 
     def __len__(self):
--- a/auditok/signal.py	Mon Oct 14 20:25:12 2019 +0100
+++ b/auditok/signal.py	Wed Oct 16 21:58:54 2019 +0100
@@ -1,19 +1,22 @@
 from array import array
+import audioop
 import math
 
 FORMAT = {1: "b", 2: "h", 4: "i"}
-_EPSILON = 1e-20
+_EPSILON = 1e-10
 
 
-def to_array(data, fmt):
-    return array(fmt, data)
+def to_array(data, sample_width, channels):
+    fmt = FORMAT[sample_width]
+    if channels == 1:
+        return array(fmt, data)
+    return separate_channels(data, fmt, channels)
 
 
 def extract_single_channel(data, fmt, channels, selected):
     samples = array(fmt, data)
     return samples[selected::channels]
 
-
 def average_channels(data, fmt, channels):
     all_channels = array(fmt, data)
     mono_channels = [
@@ -25,6 +28,11 @@
     )
     return avg_arr
 
+def average_channels_stereo(data, sample_width):
+    fmt = FORMAT[sample_width]
+    arr = array(fmt, audioop.tomono(data, sample_width, 0.5, 0.5))
+    return arr
+
 
 def separate_channels(data, fmt, channels):
     all_channels = array(fmt, data)
@@ -34,11 +42,11 @@
     return mono_channels
 
 
-def calculate_energy_single_channel(x):
-    energy = max(sum(i ** 2 for i in x) / len(x), _EPSILON)
-    return 10 * math.log10(energy)
+def calculate_energy_single_channel(x, sample_width):
+    energy_sqrt = max(audioop.rms(x, sample_width), _EPSILON)
+    return 20 * math.log10(energy_sqrt)
 
 
-def calculate_energy_multichannel(x, aggregation_fn=max):
-    energies = (calculate_energy_single_channel(xi) for xi in x)
+def calculate_energy_multichannel(x, sample_width, aggregation_fn=max):
+    energies = (calculate_energy_single_channel(xi, sample_width) for xi in x)
     return aggregation_fn(energies)
--- a/auditok/signal_numpy.py	Mon Oct 14 20:25:12 2019 +0100
+++ b/auditok/signal_numpy.py	Wed Oct 16 21:58:54 2019 +0100
@@ -1,34 +1,26 @@
 import numpy as np
+from .signal import average_channels_stereo, calculate_energy_single_channel, calculate_energy_multichannel
 
 FORMAT = {1: np.int8, 2: np.int16, 4: np.int32}
-_EPSILON = 1e-20
 
-
-def to_array(data, fmt):
-    return np.frombuffer(data, dtype=fmt).astype(np.float64)
+def to_array(data, sample_width, channels):
+    fmt = FORMAT[sample_width]
+    if channels == 1:
+        return np.frombuffer(data, dtype=fmt).astype(np.float64)
+    return separate_channels(data, fmt, channels).astype(np.float64)
 
 
 def extract_single_channel(data, fmt, channels, selected):
     samples = np.frombuffer(data, dtype=fmt)
-    return samples[selected::channels].astype(np.float64)
+    return np.asanyarray(samples[selected::channels], order="C")
 
 
 def average_channels(data, fmt, channels):
     array = np.frombuffer(data, dtype=fmt).astype(np.float64)
-    return array.reshape(-1, channels).mean(axis=1).round()
+    return array.reshape(-1, channels).mean(axis=1).round().astype(fmt)
 
 
 def separate_channels(data, fmt, channels):
-    array = np.frombuffer(data, dtype=fmt).astype(np.float64)
+    array = np.frombuffer(data, dtype=fmt)
     return array.reshape(-1, channels).T
 
-
-def calculate_energy_single_channel(x):
-    x = np.asarray(x)
-    return 10 * np.log10((np.dot(x, x) / x.size).clip(min=_EPSILON))
-
-
-def calculate_energy_multichannel(x, aggregation_fn=np.max):
-    x = np.asarray(x)
-    energy = 10 * np.log10((x * x).mean(axis=1).clip(min=_EPSILON))
-    return aggregation_fn(energy)
--- a/auditok/util.py	Mon Oct 14 20:25:12 2019 +0100
+++ b/auditok/util.py	Wed Oct 16 21:58:54 2019 +0100
@@ -4,16 +4,10 @@
 
 .. autosummary::
 
-        DataSource
-        StringDataSource
-        ADSFactory
-        ADSFactory.AudioDataSource
-        ADSFactory.ADSDecorator
-        ADSFactory.OverlapADS
-        ADSFactory.LimiterADS
-        ADSFactory.RecorderADS
-        DataValidator
+        make_channel_selector
         AudioEnergyValidator
+        AudioReader
+        Recorder
 """
 from __future__ import division
 import sys
@@ -21,6 +15,7 @@
 import math
 from array import array
 from functools import partial
+from audioop import tomono
 from .io import (
     AudioIOError,
     AudioSource,
@@ -32,7 +27,7 @@
 from .exceptions import DuplicateArgument, TooSamllBlockDuration
 
 try:
-    import signal_numpy as signal
+    from . import signal_numpy as signal
 except ImportError as e:
     from . import signal
 
@@ -63,7 +58,7 @@
         raise ValueError(err_msg.format(sample_width))
 
     if channels == 1:
-        return partial(signal.to_array, fmt=fmt)
+        return lambda x : x
 
     if isinstance(selected, int):
         if selected < 0:
@@ -77,10 +72,16 @@
         )
 
     if selected in ("mix", "avg", "average"):
+        if channels == 2:
+            # when data is stereo, using audioop when possible is much faster
+            return partial(signal.average_channels_stereo, sample_width=sample_width)
+        
         return partial(signal.average_channels, fmt=fmt, channels=channels)
 
     if selected in (None, "any"):
         return partial(signal.separate_channels, fmt=fmt, channels=channels)
+    
+    raise ValueError("Selected channel must be an integer, None (alias 'any') or 'average' (alias 'avg' or 'mix')")
 
 
 class DataSource:
@@ -114,6 +115,7 @@
 
 class AudioEnergyValidator(DataValidator):
     def __init__(self, energy_threshold, sample_width, channels, use_channel=None):
+        self._sample_width = sample_width
         self._selector = make_channel_selector(sample_width, channels, use_channel)
         if channels == 1 or use_channel is not None:
             self._energy_fn = signal.calculate_energy_single_channel
@@ -122,7 +124,8 @@
         self._energy_threshold = energy_threshold
 
     def is_valid(self, data):
-        return self._energy_fn(self._selector(data)) >= self._energy_threshold
+        log_energy = self._energy_fn(self._selector(data), self._sample_width)
+        return log_energy >= self._energy_threshold
 
 
 class StringDataSource(DataSource):
--- a/tests/test_core.py	Mon Oct 14 20:25:12 2019 +0100
+++ b/tests/test_core.py	Wed Oct 16 21:58:54 2019 +0100
@@ -9,11 +9,7 @@
 from auditok import split, AudioRegion, AudioParameterError
 from auditok.core import _duration_to_nb_windows, _read_chunks_online
 from auditok.util import AudioDataSource
-from auditok.io import (
-    _normalize_use_channel,
-    _extract_selected_channel,
-    get_audio_source,
-)
+from auditok.io import get_audio_source
 
 
 def _make_random_length_regions(
@@ -597,6 +593,7 @@
             sr=10,
             sw=2,
             ch=channels,
+            eth= 49.99,
             **kwargs
         )
 
@@ -607,6 +604,7 @@
             max_silence=max_silence,
             drop_trailing_silence=False,
             strict_min_dur=False,
+            eth= 49.99,
             **kwargs
         )
 
--- a/tests/test_signal.py	Mon Oct 14 20:25:12 2019 +0100
+++ b/tests/test_signal.py	Wed Oct 16 21:58:54 2019 +0100
@@ -1,3 +1,4 @@
+import unittest
 from unittest import TestCase
 from array import array as array_
 from genty import genty, genty_dataset
@@ -12,17 +13,26 @@
         self.data = b"012345679ABC"
         self.numpy_fmt = {"b": np.int8, "h": np.int16, "i": np.int32}
 
+
     @genty_dataset(
-        int8=("b", [48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67]),
-        int16=("h", [12592, 13106, 13620, 14134, 16697, 17218]),
-        int32=("i", [858927408, 926299444, 1128415545]),
+        int8_mono=(1, [48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67]),
+        int16_mono=(2, [12592, 13106, 13620, 14134, 16697, 17218]),
+        int32_mono=(4, [858927408, 926299444, 1128415545]),
+        int8_stereo=(1,  [[48, 50, 52, 54, 57, 66], [49, 51, 53, 55, 65, 67]]),
+        int16_stereo=(2, [[12592, 13620, 16697], [13106, 14134, 17218]]),
+        int32_3channel=(4, [[858927408], [926299444], [1128415545]]),
     )
-    def test_to_array(self, fmt, expected):
-        resutl = signal_.to_array(self.data, fmt)
-        expected = array_(fmt, expected)
+    def test_to_array(self, sample_width, expected):
+        if isinstance(expected[0], list):
+            channels = len(expected)
+            expected = [array_(signal_.FORMAT[sample_width], xi) for xi in expected]
+        else:
+            channels = 1
+            expected = array_(signal_.FORMAT[sample_width], expected)
+        resutl = signal_.to_array(self.data, sample_width, channels)
+        resutl_numpy = signal_numpy.to_array(self.data, sample_width, channels)
         self.assertEqual(resutl, expected)
-        resutl_numpy = signal_numpy.to_array(self.data, self.numpy_fmt[fmt])
-        self.assertTrue(all(resutl_numpy == expected))
+        self.assertTrue((resutl_numpy == np.asarray(expected)).all())
         self.assertEqual(resutl_numpy.dtype, np.float64)
 
     @genty_dataset(
@@ -63,12 +73,13 @@
             self.data, fmt, channels, selected
         )
         expected = array_(fmt, expected)
+        expected_numpy_fmt = self.numpy_fmt[fmt]
         self.assertEqual(resutl, expected)
         resutl_numpy = signal_numpy.extract_single_channel(
             self.data, self.numpy_fmt[fmt], channels, selected
         )
         self.assertTrue(all(resutl_numpy == expected))
-        self.assertEqual(resutl_numpy.dtype, np.float64)
+        self.assertEqual(resutl_numpy.dtype, expected_numpy_fmt)
 
     @genty_dataset(
         int8_2channel=("b", 2, [48, 50, 52, 54, 61, 66]),
@@ -80,12 +91,13 @@
     def test_average_channels(self, fmt, channels, expected):
         resutl = signal_.average_channels(self.data, fmt, channels)
         expected = array_(fmt, expected)
+        expected_numpy_fmt = self.numpy_fmt[fmt]
         self.assertEqual(resutl, expected)
         resutl_numpy = signal_numpy.average_channels(
             self.data, self.numpy_fmt[fmt], channels
         )
         self.assertTrue(all(resutl_numpy == expected))
-        self.assertEqual(resutl_numpy.dtype, np.float64)
+        self.assertEqual(resutl_numpy.dtype, expected_numpy_fmt)
 
     @genty_dataset(
         int8_1channel=(
@@ -113,40 +125,49 @@
     def test_separate_channels(self, fmt, channels, expected):
         resutl = signal_.separate_channels(self.data, fmt, channels)
         expected = [array_(fmt, exp) for exp in expected]
+        expected_numpy_fmt = self.numpy_fmt[fmt]
         self.assertEqual(resutl, expected)
 
         resutl_numpy = signal_numpy.separate_channels(
             self.data, self.numpy_fmt[fmt], channels
         )
         self.assertTrue((resutl_numpy == expected).all())
-        self.assertEqual(resutl_numpy.dtype, np.float64)
+        self.assertEqual(resutl_numpy.dtype, expected_numpy_fmt)
 
     @genty_dataset(
-        simple=([300, 320, 400, 600], 52.506639194632434),
-        zero=([0], -200),
-        zeros=([0, 0, 0], -200),
+        simple=([300, 320, 400, 600], 2, 52.50624901923348),
+        zero=([0], 2, -200),
+        zeros=([0, 0, 0], 2, -200),
     )
-    def test_calculate_energy_single_channel(self, x, expected):
-        energy = signal_.calculate_energy_single_channel(x)
+    def test_calculate_energy_single_channel(self, x, sample_width, expected):
+        x = array_(signal_.FORMAT[sample_width], x)
+        energy = signal_.calculate_energy_single_channel(x, sample_width)
         self.assertEqual(energy, expected)
-        energy = signal_numpy.calculate_energy_single_channel(x)
+        energy = signal_numpy.calculate_energy_single_channel(x, sample_width)
         self.assertEqual(energy, expected)
 
+
     @genty_dataset(
         min_=(
             [[300, 320, 400, 600], [150, 160, 200, 300]],
+            2,
             min,
-            46.48603928135281,
+            46.485649105953854,
         ),
         max_=(
             [[300, 320, 400, 600], [150, 160, 200, 300]],
+            2,
             max,
-            52.506639194632434,
+            52.50624901923348,
         ),
     )
-    def test_calculate_energy_multichannel(self, x, aggregation_fn, expected):
-        energy = signal_.calculate_energy_multichannel(x, aggregation_fn)
+    def test_calculate_energy_multichannel(self, x, sample_width, aggregation_fn, expected):
+        x = [array_(signal_.FORMAT[sample_width], xi) for xi in x]
+        energy = signal_.calculate_energy_multichannel(x, sample_width, aggregation_fn)
         self.assertEqual(energy, expected)
 
-        energy = signal_numpy.calculate_energy_multichannel(x, aggregation_fn)
+        energy = signal_numpy.calculate_energy_multichannel(x, sample_width, aggregation_fn)
         self.assertEqual(energy, expected)
+
+if __name__ == "__main__":
+    unittest.main()
\ No newline at end of file