changeset 313:10b725735637

Remove unused functions from io.py
author Amine Sehili <amine.sehili@gmail.com>
date Mon, 14 Oct 2019 20:22:50 +0100
parents bf374186f80f
children 12a030453422
files auditok/exceptions.py auditok/io.py tests/test_AudioSource.py tests/test_io.py
diffstat 4 files changed, 41 insertions(+), 237 deletions(-) [+]
line wrap: on
line diff
--- a/auditok/exceptions.py	Sun Oct 13 17:26:26 2019 +0200
+++ b/auditok/exceptions.py	Mon Oct 14 20:22:50 2019 +0100
@@ -20,6 +20,17 @@
     postprocessing code"""
 
 
+class AudioIOError(Exception):
+    """Raised when a compressed audio file cannot be loaded or when trying
+    to read from a not yet open AudioSource"""
+
+
+class AudioParameterError(AudioIOError):
+    """Raised when one audio parameter is missing when loading raw data or
+    saving data to a format other than raw. Also raised when an audio
+    parameter has a wrong value"""
+
+
 class AudioEncodingError(Exception):
     """Raised if audio data can not be encoded in the provided format"""
 
--- a/auditok/io.py	Sun Oct 13 17:26:26 2019 +0200
+++ b/auditok/io.py	Mon Oct 14 20:22:50 2019 +0100
@@ -30,11 +30,7 @@
 import audioop
 from array import array
 from functools import partial
-
-if sys.version_info >= (3, 0):
-    PYTHON_3 = True
-else:
-    PYTHON_3 = False
+from .exceptions import AudioIOError, AudioParameterError
 
 try:
     from pydub import AudioSegment
@@ -74,14 +70,6 @@
 DATA_FORMAT = {1: "b", 2: "h", 4: "i"}
 
 
-class AudioIOError(Exception):
-    pass
-
-
-class AudioParameterError(AudioIOError):
-    pass
-
-
 def check_audio_data(data, sample_width, channels):
     sample_size_bytes = int(sample_width * channels)
     nb_samples = len(data) // sample_size_bytes
@@ -105,31 +93,6 @@
     return fmt
 
 
-def _normalize_use_channel(use_channel):
-    """
-    Returns a value of `use_channel` as expected by audio read/write fuctions.
-    If `use_channel` is `None`, returns 0. If it's an integer, or the special
-    str 'mix' returns it as is. If it's `left` or `right` returns 0 or 1
-    respectively.
-    """
-    err_message = (
-        "'use_channel' parameter must be a non-zero integer or one of "
-    )
-    err_message += "('left', 'right', 'mix'), found: '{}'"
-    if use_channel is None:
-        return 0
-    if use_channel == "mix":
-        return "mix"
-    if isinstance(use_channel, int):
-        if use_channel == 0:
-            raise AudioParameterError(err_message.format(use_channel))
-        return use_channel - 1 if use_channel > 0 else use_channel
-    try:
-        return ["left", "right"].index(use_channel)
-    except ValueError:
-        raise AudioParameterError(err_message.format(use_channel))
-
-
 def _get_audio_parameters(param_dict):
     """
     Gets audio parameters from a dictionary of parameters.
@@ -166,51 +129,6 @@
     return sampling_rate, sample_width, channels
 
 
-def _array_to_bytes(a):
-    """
-    Converts an `array.array` to `bytes`.
-    """
-    if PYTHON_3:
-        return a.tobytes()
-    else:
-        return a.tostring()
-
-
-def _mix_audio_channels(data, channels, sample_width):
-    if channels == 1:
-        return data
-    if channels == 2:
-        return audioop.tomono(data, sample_width, 0.5, 0.5)
-    fmt = DATA_FORMAT[sample_width]
-    buffer = array(fmt, data)
-    mono_channels = [
-        array(fmt, buffer[ch::channels]) for ch in range(channels)
-    ]
-    avg_arr = array(
-        fmt, (sum(samples) // channels for samples in zip(*mono_channels))
-    )
-    return _array_to_bytes(avg_arr)
-
-
-def _extract_selected_channel(data, channels, sample_width, use_channel):
-    if use_channel == "mix":
-        return _mix_audio_channels(data, channels, sample_width)
-
-    if use_channel >= channels or use_channel < -channels:
-        err_message = "use_channel == {} but audio data has only {} channel{}."
-        err_message += " Selected channel must be 'mix' or an integer >= "
-        err_message += "-channels and < channels"
-        err_message = err_message.format(
-            use_channel, channels, "s" if channels > 1 else ""
-        )
-        raise AudioParameterError(err_message)
-    elif use_channel < 0:
-        use_channel += channels
-    fmt = DATA_FORMAT[sample_width]
-    buffer = array(fmt, data)
-    return _array_to_bytes(buffer[use_channel::channels])
-
-
 class AudioSource:
     """ 
     Base class for audio source objects.
@@ -675,10 +593,7 @@
         _FileAudioSource.__init__(self, sampling_rate, sample_width, channels)
         self._is_open = False
         self._sample_size = sample_width * channels
-        if PYTHON_3:
-            self._stream = sys.stdin.buffer
-        else:
-            self._stream = sys.stdin
+        self._stream = sys.stdin.buffer
 
     def is_open(self):
         return self._is_open
@@ -750,7 +665,7 @@
                 chunk_gen,
                 total=nb_chunks,
                 duration=duration,
-                **progress_bar_kwargs
+                **progress_bar_kwargs,
             )
         if self.stream.is_stopped():
             self.stream.start_stream()
--- a/tests/test_AudioSource.py	Sun Oct 13 17:26:26 2019 +0200
+++ b/tests/test_AudioSource.py	Mon Oct 14 20:22:50 2019 +0100
@@ -6,7 +6,6 @@
 from genty import genty, genty_dataset
 from auditok.io import (
     AudioParameterError,
-    _array_to_bytes,
     DATA_FORMAT,
     BufferAudioSource,
     RawAudioSource,
@@ -61,9 +60,7 @@
         audio_source.close()
         mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
         fmt = DATA_FORMAT[audio_source.sample_width]
-        expected = _array_to_bytes(
-            array(fmt, _sample_generator(*mono_channels))
-        )
+        expected = array(fmt, _sample_generator(*mono_channels)).tobytes()
 
         self.assertEqual(data_read_all, expected)
 
@@ -93,9 +90,7 @@
         audio_source.close()
         mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
         fmt = DATA_FORMAT[audio_source.sample_width]
-        expected = _array_to_bytes(
-            array(fmt, _sample_generator(*mono_channels))
-        )
+        expected = array(fmt, _sample_generator(*mono_channels)).tobytes()
 
         self.assertEqual(data, expected)
 
--- a/tests/test_io.py	Sun Oct 13 17:26:26 2019 +0200
+++ b/tests/test_io.py	Mon Oct 14 20:22:50 2019 +0100
@@ -5,6 +5,7 @@
 from tempfile import NamedTemporaryFile, TemporaryDirectory
 import filecmp
 from unittest import TestCase
+from unittest.mock import patch, Mock
 from genty import genty, genty_dataset
 from test_util import _sample_generator, _generate_pure_tone, PURE_TONE_DICT
 from auditok.io import (
@@ -17,11 +18,7 @@
     StdinAudioSource,
     check_audio_data,
     _guess_audio_format,
-    _normalize_use_channel,
     _get_audio_parameters,
-    _array_to_bytes,
-    _mix_audio_channels,
-    _extract_selected_channel,
     _load_raw,
     _load_wave,
     _load_with_pydub,
@@ -33,14 +30,6 @@
     to_file,
 )
 
-
-if sys.version_info >= (3, 0):
-    PYTHON_3 = True
-    from unittest.mock import patch, Mock
-else:
-    PYTHON_3 = False
-    from mock import patch, Mock
-
 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
 
 
@@ -74,17 +63,6 @@
         result = _guess_audio_format(fmt, filename)
         self.assertEqual(result, expected)
 
-    @genty_dataset(
-        none=(None, 0),
-        positive_int=(1, 0),
-        left=("left", 0),
-        right=("right", 1),
-        mix=("mix", "mix"),
-    )
-    def test_normalize_use_channel(self, use_channel, expected):
-        result = _normalize_use_channel(use_channel)
-        self.assertEqual(result, expected)
-
     def test_get_audio_parameters_short_params(self):
         expected = (8000, 2, 1)
         params = dict(zip(("sr", "sw", "ch"), expected))
@@ -121,105 +99,12 @@
     )
     def test_get_audio_parameters_invalid(self, values):
         params = dict(
-            zip(
-                ("sampling_rate", "sample_width", "channels"),
-                values,
-            )
+            zip(("sampling_rate", "sample_width", "channels"), values)
         )
         with self.assertRaises(AudioParameterError):
             _get_audio_parameters(params)
 
     @genty_dataset(
-        mono_1byte=([400], 1),
-        stereo_1byte=([400, 600], 1),
-        three_channel_1byte=([400, 600, 2400], 1),
-        mono_2byte=([400], 2),
-        stereo_2byte=([400, 600], 2),
-        three_channel_2byte=([400, 600, 1150], 2),
-        mono_4byte=([400], 4),
-        stereo_4byte=([400, 600], 4),
-        four_channel_2byte=([400, 600, 1150, 7220], 4),
-    )
-    def test_mix_audio_channels(self, frequencies, sample_width):
-        sampling_rate = 16000
-        sample_width = 2
-        channels = len(frequencies)
-        mono_channels = [
-            _generate_pure_tone(
-                freq,
-                duration_sec=0.1,
-                sampling_rate=sampling_rate,
-                sample_width=sample_width,
-            )
-            for freq in frequencies
-        ]
-        fmt = DATA_FORMAT[sample_width]
-        expected = _array_to_bytes(
-            array(
-                fmt,
-                (sum(samples) // channels for samples in zip(*mono_channels)),
-            )
-        )
-        data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
-        mixed = _mix_audio_channels(data, channels, sample_width)
-        self.assertEqual(mixed, expected)
-
-    @genty_dataset(
-        mono_1byte=([400], 1, 0),
-        stereo_1byte_2st_channel=([400, 600], 1, 1),
-        mono_2byte=([400], 2, 0),
-        stereo_2byte_1st_channel=([400, 600], 2, 0),
-        stereo_2byte_2nd_channel=([400, 600], 2, 1),
-        three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
-        three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
-        three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
-        three_channel_4byte_1st=([400, 600, 1150], 4, 0),
-        three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
-    )
-    def test_extract_selected_channel(
-        self, frequencies, sample_width, use_channel
-    ):
-
-        mono_channels = [
-            _generate_pure_tone(
-                freq,
-                duration_sec=0.1,
-                sampling_rate=16000,
-                sample_width=sample_width,
-            )
-            for freq in frequencies
-        ]
-        channels = len(frequencies)
-        fmt = DATA_FORMAT[sample_width]
-        expected = _array_to_bytes(mono_channels[use_channel])
-        data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
-        selected_channel = _extract_selected_channel(
-            data, channels, sample_width, use_channel
-        )
-        self.assertEqual(selected_channel, expected)
-
-    @genty_dataset(mono=([400],), three_channel=([600, 1150, 2400],))
-    def test_extract_selected_channel_mix(self, frequencies):
-
-        mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
-        channels = len(frequencies)
-        fmt = DATA_FORMAT[2]
-        expected = _array_to_bytes(
-            array(
-                fmt,
-                (sum(samples) // channels for samples in zip(*mono_channels)),
-            )
-        )
-        data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
-        selected_channel = _extract_selected_channel(data, channels, 2, "mix")
-        self.assertEqual(selected_channel, expected)
-
-    @genty_dataset(positive=(2,), negative=(-3,))
-    def test_extract_selected_channel_invalid_use_channel(self, use_channel):
-        with self.assertRaises(AudioParameterError):
-            _extract_selected_channel(b"\0\0", 2, 2, use_channel)
-
-    @genty_dataset(
         raw_with_audio_format=(
             "audio",
             "raw",
@@ -288,7 +173,6 @@
             with self.assertRaises(AudioIOError):
                 from_file("audio", "mp3")
 
-
     @patch("auditok.io._WITH_PYDUB", True)
     @patch("auditok.io.BufferAudioSource")
     @genty_dataset(
@@ -311,24 +195,26 @@
         segment_mock.sample_width = 2
         segment_mock.channels = 2
         segment_mock._data = b"abcd"
-        with patch(
-            "auditok.io.AudioSegment.{}".format(function)
-        ) as open_func:
+        with patch("auditok.io.AudioSegment.{}".format(function)) as open_func:
             open_func.return_value = segment_mock
             from_file(filename)
             self.assertTrue(open_func.called)
 
-
     @genty_dataset(
         mono=("mono_400", (400,)),
         three_channel=("3channel_400-800-1600", (400, 800, 1600)),
-
         mono_large_file=("mono_400", (400,), True),
-        three_channel_large_file=("3channel_400-800-1600", (400, 800, 1600), True),
+        three_channel_large_file=(
+            "3channel_400-800-1600",
+            (400, 800, 1600),
+            True,
+        ),
     )
     def test_load_raw(self, file_id, frequencies, large_file=False):
         filename = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
-        audio_source = _load_raw(filename, 16000, 2, len(frequencies), large_file=large_file)
+        audio_source = _load_raw(
+            filename, 16000, 2, len(frequencies), large_file=large_file
+        )
         audio_source.open()
         data = audio_source.read(-1)
         audio_source.close()
@@ -339,7 +225,7 @@
         self.assertEqual(audio_source.channels, len(frequencies))
         mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
         fmt = DATA_FORMAT[audio_source.sample_width]
-        expected =_array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
+        expected = array(fmt, _sample_generator(*mono_channels)).tobytes()
         self.assertEqual(data, expected)
 
     @genty_dataset(
@@ -357,9 +243,12 @@
     @genty_dataset(
         mono=("mono_400", (400,)),
         three_channel=("3channel_400-800-1600", (400, 800, 1600)),
-
         mono_large_file=("mono_400", (400,), True),
-        three_channel_large_file=("3channel_400-800-1600", (400, 800, 1600), True),
+        three_channel_large_file=(
+            "3channel_400-800-1600",
+            (400, 800, 1600),
+            True,
+        ),
     )
     def test_load_wave(self, file_id, frequencies, large_file=False):
         filename = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
@@ -374,10 +263,9 @@
         self.assertEqual(audio_source.channels, len(frequencies))
         mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
         fmt = DATA_FORMAT[audio_source.sample_width]
-        expected =_array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
+        expected = array(fmt, _sample_generator(*mono_channels)).tobytes()
         self.assertEqual(data, expected)
 
-
     @patch("auditok.io._WITH_PYDUB", True)
     @patch("auditok.io.BufferAudioSource")
     @genty_dataset(
@@ -394,22 +282,17 @@
         webm_right_channel=("webm", 2, "from_file"),
         webm_mix_channels=("webm", 4, "from_file"),
     )
-    def test_load_with_pydub(
-        self, audio_format, channels, function, *mocks
-    ):
+    def test_load_with_pydub(self, audio_format, channels, function, *mocks):
         filename = "audio.{}".format(audio_format)
         segment_mock = Mock()
         segment_mock.sample_width = 2
         segment_mock.channels = channels
         segment_mock._data = b"abcdefgh"
-        with patch(
-            "auditok.io.AudioSegment.{}".format(function)
-        ) as open_func:
+        with patch("auditok.io.AudioSegment.{}".format(function)) as open_func:
             open_func.return_value = segment_mock
             _load_with_pydub(filename, audio_format)
             self.assertTrue(open_func.called)
 
-
     @genty_dataset(
         mono=("mono_400Hz.raw", (400,)),
         three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
@@ -419,7 +302,7 @@
         sample_width = 2
         fmt = DATA_FORMAT[sample_width]
         mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
-        data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
+        data = array(fmt, _sample_generator(*mono_channels)).tobytes()
         tmpfile = NamedTemporaryFile()
         _save_raw(data, tmpfile.name)
         self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
@@ -435,7 +318,7 @@
         channels = len(frequencies)
         fmt = DATA_FORMAT[sample_width]
         mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
-        data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
+        data = array(fmt, _sample_generator(*mono_channels)).tobytes()
         tmpfile = NamedTemporaryFile()
         _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels)
         self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
@@ -470,7 +353,7 @@
         exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw"
         tmpdir = TemporaryDirectory()
         filename = os.path.join(tmpdir.name, filename)
-        data = _array_to_bytes(PURE_TONE_DICT[400])
+        data = PURE_TONE_DICT[400].tobytes()
         to_file(data, filename, audio_format=audio_format)
         self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
         tmpdir.cleanup()
@@ -487,7 +370,7 @@
         exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav"
         tmpdir = TemporaryDirectory()
         filename = os.path.join(tmpdir.name, filename)
-        data = _array_to_bytes(PURE_TONE_DICT[400])
+        data = PURE_TONE_DICT[400].tobytes()
         to_file(
             data,
             filename,
@@ -555,4 +438,4 @@
         if extra_args is not None:
             kwargs.update(extra_args)
         audio_source = get_audio_source(input, **kwargs)
-        self.assertIsInstance(audio_source, expected_type)
\ No newline at end of file
+        self.assertIsInstance(audio_source, expected_type)