Mercurial > hg > auditok
changeset 313:10b725735637
Remove unused functions from io.py
author | Amine Sehili <amine.sehili@gmail.com> |
---|---|
date | Mon, 14 Oct 2019 20:22:50 +0100 |
parents | bf374186f80f |
children | 12a030453422 |
files | auditok/exceptions.py auditok/io.py tests/test_AudioSource.py tests/test_io.py |
diffstat | 4 files changed, 41 insertions(+), 237 deletions(-) [+] |
line wrap: on
line diff
--- a/auditok/exceptions.py Sun Oct 13 17:26:26 2019 +0200 +++ b/auditok/exceptions.py Mon Oct 14 20:22:50 2019 +0100 @@ -20,6 +20,17 @@ postprocessing code""" +class AudioIOError(Exception): + """Raised when a compressed audio file cannot be loaded or when trying + to read from a not yet open AudioSource""" + + +class AudioParameterError(AudioIOError): + """Raised when one audio parameter is missing when loading raw data or + saving data to a format other than raw. Also raised when an audio + parameter has a wrong value""" + + class AudioEncodingError(Exception): """Raised if audio data can not be encoded in the provided format"""
--- a/auditok/io.py Sun Oct 13 17:26:26 2019 +0200 +++ b/auditok/io.py Mon Oct 14 20:22:50 2019 +0100 @@ -30,11 +30,7 @@ import audioop from array import array from functools import partial - -if sys.version_info >= (3, 0): - PYTHON_3 = True -else: - PYTHON_3 = False +from .exceptions import AudioIOError, AudioParameterError try: from pydub import AudioSegment @@ -74,14 +70,6 @@ DATA_FORMAT = {1: "b", 2: "h", 4: "i"} -class AudioIOError(Exception): - pass - - -class AudioParameterError(AudioIOError): - pass - - def check_audio_data(data, sample_width, channels): sample_size_bytes = int(sample_width * channels) nb_samples = len(data) // sample_size_bytes @@ -105,31 +93,6 @@ return fmt -def _normalize_use_channel(use_channel): - """ - Returns a value of `use_channel` as expected by audio read/write fuctions. - If `use_channel` is `None`, returns 0. If it's an integer, or the special - str 'mix' returns it as is. If it's `left` or `right` returns 0 or 1 - respectively. - """ - err_message = ( - "'use_channel' parameter must be a non-zero integer or one of " - ) - err_message += "('left', 'right', 'mix'), found: '{}'" - if use_channel is None: - return 0 - if use_channel == "mix": - return "mix" - if isinstance(use_channel, int): - if use_channel == 0: - raise AudioParameterError(err_message.format(use_channel)) - return use_channel - 1 if use_channel > 0 else use_channel - try: - return ["left", "right"].index(use_channel) - except ValueError: - raise AudioParameterError(err_message.format(use_channel)) - - def _get_audio_parameters(param_dict): """ Gets audio parameters from a dictionary of parameters. @@ -166,51 +129,6 @@ return sampling_rate, sample_width, channels -def _array_to_bytes(a): - """ - Converts an `array.array` to `bytes`. - """ - if PYTHON_3: - return a.tobytes() - else: - return a.tostring() - - -def _mix_audio_channels(data, channels, sample_width): - if channels == 1: - return data - if channels == 2: - return audioop.tomono(data, sample_width, 0.5, 0.5) - fmt = DATA_FORMAT[sample_width] - buffer = array(fmt, data) - mono_channels = [ - array(fmt, buffer[ch::channels]) for ch in range(channels) - ] - avg_arr = array( - fmt, (sum(samples) // channels for samples in zip(*mono_channels)) - ) - return _array_to_bytes(avg_arr) - - -def _extract_selected_channel(data, channels, sample_width, use_channel): - if use_channel == "mix": - return _mix_audio_channels(data, channels, sample_width) - - if use_channel >= channels or use_channel < -channels: - err_message = "use_channel == {} but audio data has only {} channel{}." - err_message += " Selected channel must be 'mix' or an integer >= " - err_message += "-channels and < channels" - err_message = err_message.format( - use_channel, channels, "s" if channels > 1 else "" - ) - raise AudioParameterError(err_message) - elif use_channel < 0: - use_channel += channels - fmt = DATA_FORMAT[sample_width] - buffer = array(fmt, data) - return _array_to_bytes(buffer[use_channel::channels]) - - class AudioSource: """ Base class for audio source objects. @@ -675,10 +593,7 @@ _FileAudioSource.__init__(self, sampling_rate, sample_width, channels) self._is_open = False self._sample_size = sample_width * channels - if PYTHON_3: - self._stream = sys.stdin.buffer - else: - self._stream = sys.stdin + self._stream = sys.stdin.buffer def is_open(self): return self._is_open @@ -750,7 +665,7 @@ chunk_gen, total=nb_chunks, duration=duration, - **progress_bar_kwargs + **progress_bar_kwargs, ) if self.stream.is_stopped(): self.stream.start_stream()
--- a/tests/test_AudioSource.py Sun Oct 13 17:26:26 2019 +0200 +++ b/tests/test_AudioSource.py Mon Oct 14 20:22:50 2019 +0100 @@ -6,7 +6,6 @@ from genty import genty, genty_dataset from auditok.io import ( AudioParameterError, - _array_to_bytes, DATA_FORMAT, BufferAudioSource, RawAudioSource, @@ -61,9 +60,7 @@ audio_source.close() mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] fmt = DATA_FORMAT[audio_source.sample_width] - expected = _array_to_bytes( - array(fmt, _sample_generator(*mono_channels)) - ) + expected = array(fmt, _sample_generator(*mono_channels)).tobytes() self.assertEqual(data_read_all, expected) @@ -93,9 +90,7 @@ audio_source.close() mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] fmt = DATA_FORMAT[audio_source.sample_width] - expected = _array_to_bytes( - array(fmt, _sample_generator(*mono_channels)) - ) + expected = array(fmt, _sample_generator(*mono_channels)).tobytes() self.assertEqual(data, expected)
--- a/tests/test_io.py Sun Oct 13 17:26:26 2019 +0200 +++ b/tests/test_io.py Mon Oct 14 20:22:50 2019 +0100 @@ -5,6 +5,7 @@ from tempfile import NamedTemporaryFile, TemporaryDirectory import filecmp from unittest import TestCase +from unittest.mock import patch, Mock from genty import genty, genty_dataset from test_util import _sample_generator, _generate_pure_tone, PURE_TONE_DICT from auditok.io import ( @@ -17,11 +18,7 @@ StdinAudioSource, check_audio_data, _guess_audio_format, - _normalize_use_channel, _get_audio_parameters, - _array_to_bytes, - _mix_audio_channels, - _extract_selected_channel, _load_raw, _load_wave, _load_with_pydub, @@ -33,14 +30,6 @@ to_file, ) - -if sys.version_info >= (3, 0): - PYTHON_3 = True - from unittest.mock import patch, Mock -else: - PYTHON_3 = False - from mock import patch, Mock - AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1} @@ -74,17 +63,6 @@ result = _guess_audio_format(fmt, filename) self.assertEqual(result, expected) - @genty_dataset( - none=(None, 0), - positive_int=(1, 0), - left=("left", 0), - right=("right", 1), - mix=("mix", "mix"), - ) - def test_normalize_use_channel(self, use_channel, expected): - result = _normalize_use_channel(use_channel) - self.assertEqual(result, expected) - def test_get_audio_parameters_short_params(self): expected = (8000, 2, 1) params = dict(zip(("sr", "sw", "ch"), expected)) @@ -121,105 +99,12 @@ ) def test_get_audio_parameters_invalid(self, values): params = dict( - zip( - ("sampling_rate", "sample_width", "channels"), - values, - ) + zip(("sampling_rate", "sample_width", "channels"), values) ) with self.assertRaises(AudioParameterError): _get_audio_parameters(params) @genty_dataset( - mono_1byte=([400], 1), - stereo_1byte=([400, 600], 1), - three_channel_1byte=([400, 600, 2400], 1), - mono_2byte=([400], 2), - stereo_2byte=([400, 600], 2), - three_channel_2byte=([400, 600, 1150], 2), - mono_4byte=([400], 4), - stereo_4byte=([400, 600], 4), - four_channel_2byte=([400, 600, 1150, 7220], 4), - ) - def test_mix_audio_channels(self, frequencies, sample_width): - sampling_rate = 16000 - sample_width = 2 - channels = len(frequencies) - mono_channels = [ - _generate_pure_tone( - freq, - duration_sec=0.1, - sampling_rate=sampling_rate, - sample_width=sample_width, - ) - for freq in frequencies - ] - fmt = DATA_FORMAT[sample_width] - expected = _array_to_bytes( - array( - fmt, - (sum(samples) // channels for samples in zip(*mono_channels)), - ) - ) - data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) - mixed = _mix_audio_channels(data, channels, sample_width) - self.assertEqual(mixed, expected) - - @genty_dataset( - mono_1byte=([400], 1, 0), - stereo_1byte_2st_channel=([400, 600], 1, 1), - mono_2byte=([400], 2, 0), - stereo_2byte_1st_channel=([400, 600], 2, 0), - stereo_2byte_2nd_channel=([400, 600], 2, 1), - three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1), - three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2), - three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3), - three_channel_4byte_1st=([400, 600, 1150], 4, 0), - three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1), - ) - def test_extract_selected_channel( - self, frequencies, sample_width, use_channel - ): - - mono_channels = [ - _generate_pure_tone( - freq, - duration_sec=0.1, - sampling_rate=16000, - sample_width=sample_width, - ) - for freq in frequencies - ] - channels = len(frequencies) - fmt = DATA_FORMAT[sample_width] - expected = _array_to_bytes(mono_channels[use_channel]) - data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) - selected_channel = _extract_selected_channel( - data, channels, sample_width, use_channel - ) - self.assertEqual(selected_channel, expected) - - @genty_dataset(mono=([400],), three_channel=([600, 1150, 2400],)) - def test_extract_selected_channel_mix(self, frequencies): - - mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - channels = len(frequencies) - fmt = DATA_FORMAT[2] - expected = _array_to_bytes( - array( - fmt, - (sum(samples) // channels for samples in zip(*mono_channels)), - ) - ) - data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) - selected_channel = _extract_selected_channel(data, channels, 2, "mix") - self.assertEqual(selected_channel, expected) - - @genty_dataset(positive=(2,), negative=(-3,)) - def test_extract_selected_channel_invalid_use_channel(self, use_channel): - with self.assertRaises(AudioParameterError): - _extract_selected_channel(b"\0\0", 2, 2, use_channel) - - @genty_dataset( raw_with_audio_format=( "audio", "raw", @@ -288,7 +173,6 @@ with self.assertRaises(AudioIOError): from_file("audio", "mp3") - @patch("auditok.io._WITH_PYDUB", True) @patch("auditok.io.BufferAudioSource") @genty_dataset( @@ -311,24 +195,26 @@ segment_mock.sample_width = 2 segment_mock.channels = 2 segment_mock._data = b"abcd" - with patch( - "auditok.io.AudioSegment.{}".format(function) - ) as open_func: + with patch("auditok.io.AudioSegment.{}".format(function)) as open_func: open_func.return_value = segment_mock from_file(filename) self.assertTrue(open_func.called) - @genty_dataset( mono=("mono_400", (400,)), three_channel=("3channel_400-800-1600", (400, 800, 1600)), - mono_large_file=("mono_400", (400,), True), - three_channel_large_file=("3channel_400-800-1600", (400, 800, 1600), True), + three_channel_large_file=( + "3channel_400-800-1600", + (400, 800, 1600), + True, + ), ) def test_load_raw(self, file_id, frequencies, large_file=False): filename = "tests/data/test_16KHZ_{}Hz.raw".format(file_id) - audio_source = _load_raw(filename, 16000, 2, len(frequencies), large_file=large_file) + audio_source = _load_raw( + filename, 16000, 2, len(frequencies), large_file=large_file + ) audio_source.open() data = audio_source.read(-1) audio_source.close() @@ -339,7 +225,7 @@ self.assertEqual(audio_source.channels, len(frequencies)) mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] fmt = DATA_FORMAT[audio_source.sample_width] - expected =_array_to_bytes(array(fmt, _sample_generator(*mono_channels))) + expected = array(fmt, _sample_generator(*mono_channels)).tobytes() self.assertEqual(data, expected) @genty_dataset( @@ -357,9 +243,12 @@ @genty_dataset( mono=("mono_400", (400,)), three_channel=("3channel_400-800-1600", (400, 800, 1600)), - mono_large_file=("mono_400", (400,), True), - three_channel_large_file=("3channel_400-800-1600", (400, 800, 1600), True), + three_channel_large_file=( + "3channel_400-800-1600", + (400, 800, 1600), + True, + ), ) def test_load_wave(self, file_id, frequencies, large_file=False): filename = "tests/data/test_16KHZ_{}Hz.wav".format(file_id) @@ -374,10 +263,9 @@ self.assertEqual(audio_source.channels, len(frequencies)) mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] fmt = DATA_FORMAT[audio_source.sample_width] - expected =_array_to_bytes(array(fmt, _sample_generator(*mono_channels))) + expected = array(fmt, _sample_generator(*mono_channels)).tobytes() self.assertEqual(data, expected) - @patch("auditok.io._WITH_PYDUB", True) @patch("auditok.io.BufferAudioSource") @genty_dataset( @@ -394,22 +282,17 @@ webm_right_channel=("webm", 2, "from_file"), webm_mix_channels=("webm", 4, "from_file"), ) - def test_load_with_pydub( - self, audio_format, channels, function, *mocks - ): + def test_load_with_pydub(self, audio_format, channels, function, *mocks): filename = "audio.{}".format(audio_format) segment_mock = Mock() segment_mock.sample_width = 2 segment_mock.channels = channels segment_mock._data = b"abcdefgh" - with patch( - "auditok.io.AudioSegment.{}".format(function) - ) as open_func: + with patch("auditok.io.AudioSegment.{}".format(function)) as open_func: open_func.return_value = segment_mock _load_with_pydub(filename, audio_format) self.assertTrue(open_func.called) - @genty_dataset( mono=("mono_400Hz.raw", (400,)), three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)), @@ -419,7 +302,7 @@ sample_width = 2 fmt = DATA_FORMAT[sample_width] mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) + data = array(fmt, _sample_generator(*mono_channels)).tobytes() tmpfile = NamedTemporaryFile() _save_raw(data, tmpfile.name) self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False)) @@ -435,7 +318,7 @@ channels = len(frequencies) fmt = DATA_FORMAT[sample_width] mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) + data = array(fmt, _sample_generator(*mono_channels)).tobytes() tmpfile = NamedTemporaryFile() _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels) self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False)) @@ -470,7 +353,7 @@ exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw" tmpdir = TemporaryDirectory() filename = os.path.join(tmpdir.name, filename) - data = _array_to_bytes(PURE_TONE_DICT[400]) + data = PURE_TONE_DICT[400].tobytes() to_file(data, filename, audio_format=audio_format) self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False)) tmpdir.cleanup() @@ -487,7 +370,7 @@ exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav" tmpdir = TemporaryDirectory() filename = os.path.join(tmpdir.name, filename) - data = _array_to_bytes(PURE_TONE_DICT[400]) + data = PURE_TONE_DICT[400].tobytes() to_file( data, filename, @@ -555,4 +438,4 @@ if extra_args is not None: kwargs.update(extra_args) audio_source = get_audio_source(input, **kwargs) - self.assertIsInstance(audio_source, expected_type) \ No newline at end of file + self.assertIsInstance(audio_source, expected_type)