amine@106: import os amine@106: import sys amine@106: import math amine@107: from array import array amine@133: from tempfile import NamedTemporaryFile, TemporaryDirectory amine@110: import filecmp amine@108: from unittest import TestCase amine@108: from genty import genty, genty_dataset amine@157: from test_util import _sample_generator, _generate_pure_tone, PURE_TONE_DICT amine@110: from auditok.io import ( amine@126: DATA_FORMAT, amine@121: AudioIOError, amine@110: AudioParameterError, amine@126: BufferAudioSource, amine@162: RawAudioSource, amine@162: WaveAudioSource, amine@190: StdinAudioSource, amine@110: check_audio_data, amine@143: _guess_audio_format, amine@144: _normalize_use_channel, amine@128: _get_audio_parameters, amine@116: _array_to_bytes, amine@118: _mix_audio_channels, amine@119: _extract_selected_channel, amine@126: _load_raw, amine@129: _load_wave, amine@131: _load_with_pydub, amine@190: get_audio_source, amine@120: from_file, amine@111: _save_raw, amine@110: _save_wave, amine@141: _save_with_pydub, amine@135: to_file, amine@110: ) amine@106: amine@106: amine@106: if sys.version_info >= (3, 0): amine@106: PYTHON_3 = True amine@124: from unittest.mock import patch, Mock amine@106: else: amine@106: PYTHON_3 = False amine@124: from mock import patch, Mock amine@120: amine@120: AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1} amine@106: amine@106: amine@108: @genty amine@108: class TestIO(TestCase): amine@108: @genty_dataset( amine@108: valid_mono=(b"\0" * 113, 1, 1), amine@108: valid_stereo=(b"\0" * 160, 1, 2), amine@108: invalid_mono_sw_2=(b"\0" * 113, 2, 1, False), amine@108: invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False), amine@108: invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False), amine@108: ) amine@108: def test_check_audio_data(self, data, sample_width, channels, valid=True): amine@108: amine@108: if not valid: amine@108: with self.assertRaises(AudioParameterError): amine@108: check_audio_data(data, sample_width, channels) amine@108: else: amine@108: self.assertIsNone(check_audio_data(data, sample_width, channels)) amine@110: amine@110: @genty_dataset( amine@143: extention_and_format_same=("wav", "filename.wav", "wav"), amine@143: extention_and_format_different=("wav", "filename.mp3", "wav"), amine@143: extention_no_format=(None, "filename.wav", "wav"), amine@143: format_no_extension=("wav", "filename", "wav"), amine@143: no_format_no_extension=(None, "filename", None), amine@143: ) amine@143: def test_guess_audio_format(self, fmt, filename, expected): amine@143: result = _guess_audio_format(fmt, filename) amine@143: self.assertEqual(result, expected) amine@143: amine@143: @genty_dataset( amine@144: none=(None, 0), amine@208: positive_int=(1, 0), amine@144: left=("left", 0), amine@144: right=("right", 1), amine@144: mix=("mix", "mix"), amine@144: ) amine@144: def test_normalize_use_channel(self, use_channel, expected): amine@144: result = _normalize_use_channel(use_channel) amine@144: self.assertEqual(result, expected) amine@144: amine@144: @genty_dataset( amine@210: int_1=((8000, 2, 1, 1), (8000, 2, 1, 1)), amine@210: int_2=((8000, 2, 1, 2), (8000, 2, 1, 2)), amine@210: use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, "left")), amine@210: use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, "right")), amine@145: use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")), amine@210: use_channel_None=((8000, 2, 2, None), (8000, 2, 2, None)), amine@210: no_use_channel=((8000, 2, 2), (8000, 2, 2, None)), amine@145: ) amine@145: def test_get_audio_parameters_short_params(self, values, expected): amine@208: params = dict(zip(("sr", "sw", "ch", "uc"), values)) amine@145: result = _get_audio_parameters(params) amine@145: self.assertEqual(result, expected) amine@145: amine@145: @genty_dataset( amine@210: int_1=((8000, 2, 1, 1), (8000, 2, 1, 1)), amine@210: int_2=((8000, 2, 1, 2), (8000, 2, 1, 2)), amine@210: use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, "left")), amine@210: use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, "right")), amine@145: use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")), amine@210: use_channel_None=((8000, 2, 2, None), (8000, 2, 2, None)), amine@210: no_use_channel=((8000, 2, 2), (8000, 2, 2, None)), amine@145: ) amine@145: def test_get_audio_parameters_long_params(self, values, expected): amine@209: params = dict( amine@209: zip( amine@209: ("sampling_rate", "sample_width", "channels", "use_channel"), amine@209: values, amine@209: ) amine@209: ) amine@145: result = _get_audio_parameters(params) amine@145: self.assertEqual(result, expected) amine@145: amine@210: @genty_dataset(simple=((8000, 2, 1, 1), (8000, 2, 1, 1))) amine@208: def test_get_audio_parameters_long_params_shadow_short_ones( amine@145: self, values, expected amine@145: ): amine@209: params = dict( amine@209: zip( amine@209: ("sampling_rate", "sample_width", "channels", "use_channel"), amine@209: values, amine@209: ) amine@209: ) amine@208: params.update(dict(zip(("sr", "sw", "ch", "uc"), "xxxx"))) amine@145: result = _get_audio_parameters(params) amine@145: self.assertEqual(result, expected) amine@145: amine@145: @genty_dataset( amine@146: str_sampling_rate=(("x", 2, 1, 0),), amine@146: negative_sampling_rate=((-8000, 2, 1, 0),), amine@146: str_sample_width=((8000, "x", 1, 0),), amine@146: negative_sample_width=((8000, -2, 1, 0),), amine@146: str_channels=((8000, 2, "x", 0),), amine@146: negative_channels=((8000, 2, -1, 0),), amine@146: ) amine@146: def test_get_audio_parameters_invalid(self, values): amine@208: # TODO 0 or negative use_channel must raise AudioParameterError amine@208: # change implementation, don't accept negative uc amine@208: # hifglight everywhere in doc that uc must be positive amine@209: params = dict( amine@209: zip( amine@209: ("sampling_rate", "sample_width", "channels", "use_channel"), amine@209: values, amine@209: ) amine@209: ) amine@146: with self.assertRaises(AudioParameterError): amine@146: _get_audio_parameters(params) amine@146: amine@146: @genty_dataset( amine@118: mono_1byte=([400], 1), amine@118: stereo_1byte=([400, 600], 1), amine@118: three_channel_1byte=([400, 600, 2400], 1), amine@118: mono_2byte=([400], 2), amine@118: stereo_2byte=([400, 600], 2), amine@118: three_channel_2byte=([400, 600, 1150], 2), amine@118: mono_4byte=([400], 4), amine@118: stereo_4byte=([400, 600], 4), amine@118: four_channel_2byte=([400, 600, 1150, 7220], 4), amine@118: ) amine@118: def test_mix_audio_channels(self, frequencies, sample_width): amine@118: sampling_rate = 16000 amine@118: sample_width = 2 amine@118: channels = len(frequencies) amine@118: mono_channels = [ amine@118: _generate_pure_tone( amine@118: freq, amine@118: duration_sec=0.1, amine@118: sampling_rate=sampling_rate, amine@118: sample_width=sample_width, amine@118: ) amine@118: for freq in frequencies amine@118: ] amine@118: fmt = DATA_FORMAT[sample_width] amine@118: expected = _array_to_bytes( amine@118: array( amine@118: fmt, amine@118: (sum(samples) // channels for samples in zip(*mono_channels)), amine@118: ) amine@118: ) amine@118: data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) amine@118: mixed = _mix_audio_channels(data, channels, sample_width) amine@118: self.assertEqual(mixed, expected) amine@118: amine@118: @genty_dataset( amine@119: mono_1byte=([400], 1, 0), amine@119: stereo_1byte_2st_channel=([400, 600], 1, 1), amine@119: mono_2byte=([400], 2, 0), amine@119: stereo_2byte_1st_channel=([400, 600], 2, 0), amine@119: stereo_2byte_2nd_channel=([400, 600], 2, 1), amine@119: three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1), amine@119: three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2), amine@119: three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3), amine@119: three_channel_4byte_1st=([400, 600, 1150], 4, 0), amine@119: three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1), amine@119: ) amine@119: def test_extract_selected_channel( amine@119: self, frequencies, sample_width, use_channel amine@119: ): amine@119: amine@119: mono_channels = [ amine@119: _generate_pure_tone( amine@119: freq, amine@119: duration_sec=0.1, amine@119: sampling_rate=16000, amine@119: sample_width=sample_width, amine@119: ) amine@119: for freq in frequencies amine@119: ] amine@119: channels = len(frequencies) amine@119: fmt = DATA_FORMAT[sample_width] amine@119: expected = _array_to_bytes(mono_channels[use_channel]) amine@119: data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) amine@119: selected_channel = _extract_selected_channel( amine@119: data, channels, sample_width, use_channel amine@119: ) amine@119: self.assertEqual(selected_channel, expected) amine@119: amine@148: @genty_dataset(mono=([400],), three_channel=([600, 1150, 2400],)) amine@148: def test_extract_selected_channel_mix(self, frequencies): amine@148: amine@148: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@148: channels = len(frequencies) amine@148: fmt = DATA_FORMAT[2] amine@148: expected = _array_to_bytes( amine@148: array( amine@148: fmt, amine@148: (sum(samples) // channels for samples in zip(*mono_channels)), amine@148: ) amine@148: ) amine@148: data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) amine@148: selected_channel = _extract_selected_channel(data, channels, 2, "mix") amine@148: self.assertEqual(selected_channel, expected) amine@148: amine@149: @genty_dataset(positive=(2,), negative=(-3,)) amine@149: def test_extract_selected_channel_invalid_use_channel(self, use_channel): amine@149: with self.assertRaises(AudioParameterError): amine@149: _extract_selected_channel(b"\0\0", 2, 2, use_channel) amine@149: amine@119: @genty_dataset( amine@120: raw_with_audio_format=( amine@120: "audio", amine@120: "raw", amine@120: "_load_raw", amine@120: AUDIO_PARAMS_SHORT, amine@120: ), amine@120: raw_with_extension=( amine@120: "audio.raw", amine@120: None, amine@120: "_load_raw", amine@120: AUDIO_PARAMS_SHORT, amine@120: ), amine@120: wave_with_audio_format=("audio", "wave", "_load_wave"), amine@120: wav_with_audio_format=("audio", "wave", "_load_wave"), amine@120: wav_with_extension=("audio.wav", None, "_load_wave"), amine@120: format_and_extension_both_given=("audio.dat", "wav", "_load_wave"), amine@120: format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"), amine@120: no_format_nor_extension=("audio", None, "_load_with_pydub"), amine@120: other_formats_ogg=("audio.ogg", None, "_load_with_pydub"), amine@120: other_formats_webm=("audio", "webm", "_load_with_pydub"), amine@120: ) amine@120: def test_from_file( amine@120: self, filename, audio_format, funtion_name, kwargs=None amine@120: ): amine@120: funtion_name = "auditok.io." + funtion_name amine@120: if kwargs is None: amine@120: kwargs = {} amine@120: with patch(funtion_name) as patch_function: amine@120: from_file(filename, audio_format, **kwargs) amine@120: self.assertTrue(patch_function.called) amine@120: amine@190: def test_from_file_large_file_raw(self,): amine@162: filename = "tests/data/test_16KHZ_mono_400Hz.raw" amine@190: audio_source = from_file( amine@190: filename, amine@190: large_file=True, amine@190: sampling_rate=16000, amine@190: sample_width=2, amine@190: channels=1, amine@190: ) amine@162: self.assertIsInstance(audio_source, RawAudioSource) amine@162: amine@190: def test_from_file_large_file_wave(self,): amine@162: filename = "tests/data/test_16KHZ_mono_400Hz.wav" amine@162: audio_source = from_file(filename, large_file=True) amine@162: self.assertIsInstance(audio_source, WaveAudioSource) amine@163: amine@190: def test_from_file_large_file_compressed(self,): amine@163: filename = "tests/data/test_16KHZ_mono_400Hz.ogg" amine@163: with self.assertRaises(AudioIOError): amine@163: from_file(filename, large_file=True) amine@162: amine@137: @genty_dataset( amine@137: missing_sampling_rate=("sr",), amine@137: missing_sample_width=("sw",), amine@137: missing_channels=("ch",), amine@137: ) amine@137: def test_from_file_missing_audio_param(self, missing_param): amine@137: with self.assertRaises(AudioParameterError): amine@137: params = AUDIO_PARAMS_SHORT.copy() amine@137: del params[missing_param] amine@137: from_file("audio", audio_format="raw", **params) amine@137: amine@121: def test_from_file_no_pydub(self): amine@121: with patch("auditok.io._WITH_PYDUB", False): amine@121: with self.assertRaises(AudioIOError): amine@121: from_file("audio", "mp3") amine@121: amine@111: @genty_dataset( amine@208: raw_first_channel=("raw", 1, 400), amine@208: raw_second_channel=("raw", 2, 800), amine@208: raw_third_channel=("raw", 3, 1600), amine@122: raw_left_channel=("raw", "left", 400), amine@122: raw_right_channel=("raw", "right", 800), amine@208: wav_first_channel=("wav", 1, 400), amine@208: wav_second_channel=("wav", 2, 800), amine@208: wav_third_channel=("wav", 3, 1600), amine@122: wav_left_channel=("wav", "left", 400), amine@122: wav_right_channel=("wav", "right", 800), amine@122: ) amine@122: def test_from_file_multichannel_audio( amine@122: self, audio_format, use_channel, frequency amine@122: ): amine@122: expected = PURE_TONE_DICT[frequency] amine@122: filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format( amine@122: audio_format amine@122: ) amine@122: sample_width = 2 amine@122: audio_source = from_file( amine@122: filename, amine@122: sampling_rate=16000, amine@122: sample_width=sample_width, amine@122: channels=3, amine@122: use_channel=use_channel, amine@122: ) amine@122: fmt = DATA_FORMAT[sample_width] amine@122: data = array(fmt, audio_source._buffer) amine@122: self.assertEqual(data, expected) amine@122: amine@122: @genty_dataset( amine@123: raw_mono=("raw", "mono_400Hz", (400,)), amine@123: raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)), amine@123: wav_mono=("wav", "mono_400Hz", (400,)), amine@123: wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)), amine@123: ) amine@123: def test_from_file_multichannel_audio_mix( amine@123: self, audio_format, filename_suffix, frequencies amine@123: ): amine@123: sampling_rate = 16000 amine@123: sample_width = 2 amine@123: channels = len(frequencies) amine@123: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@123: channels = len(frequencies) amine@123: fmt = DATA_FORMAT[sample_width] amine@123: expected = _array_to_bytes( amine@123: array( amine@123: fmt, amine@123: (sum(samples) // channels for samples in zip(*mono_channels)), amine@123: ) amine@123: ) amine@123: filename = "tests/data/test_16KHZ_{}.{}".format( amine@123: filename_suffix, audio_format amine@123: ) amine@123: audio_source = from_file( amine@123: filename, amine@123: use_channel="mix", amine@123: sampling_rate=sampling_rate, amine@123: sample_width=2, amine@123: channels=channels, amine@123: ) amine@123: mixed = audio_source._buffer amine@123: self.assertEqual((mixed), expected) amine@123: amine@124: @patch("auditok.io._WITH_PYDUB", True) amine@124: @patch("auditok.io.BufferAudioSource") amine@124: @genty_dataset( amine@208: ogg_first_channel=("ogg", 1, "from_ogg"), amine@208: ogg_second_channel=("ogg", 2, "from_ogg"), amine@124: ogg_mix=("ogg", "mix", "from_ogg"), amine@124: ogg_default=("ogg", None, "from_ogg"), amine@124: mp3_left_channel=("mp3", "left", "from_mp3"), amine@124: mp3_right_channel=("mp3", "right", "from_mp3"), amine@208: flac_first_channel=("flac", 1, "from_file"), amine@124: flac_second_channel=("flac", 1, "from_file"), amine@124: flv_left_channel=("flv", "left", "from_flv"), amine@124: webm_right_channel=("webm", "right", "from_file"), amine@124: ) amine@124: def test_from_file_multichannel_audio_compressed( amine@124: self, audio_format, use_channel, function, *mocks amine@124: ): amine@124: filename = "audio.{}".format(audio_format) amine@124: segment_mock = Mock() amine@124: segment_mock.sample_width = 2 amine@124: segment_mock.channels = 2 amine@124: segment_mock._data = b"abcd" amine@124: with patch("auditok.io._extract_selected_channel") as ext_mock: amine@124: with patch( amine@124: "auditok.io.AudioSegment.{}".format(function) amine@124: ) as open_func: amine@124: open_func.return_value = segment_mock amine@124: from_file(filename, use_channel=use_channel) amine@124: self.assertTrue(open_func.called) amine@124: self.assertTrue(ext_mock.called) amine@124: amine@208: use_channel = {"left": 1, "right": 2, None: 1}.get( amine@124: use_channel, use_channel amine@124: ) amine@208: if isinstance(use_channel, int): amine@208: # _extract_selected_channel will be called with a channel starting from 0 amine@208: use_channel -= 1 amine@124: ext_mock.assert_called_with( amine@124: segment_mock._data, amine@124: segment_mock.channels, amine@124: segment_mock.sample_width, amine@124: use_channel, amine@124: ) amine@124: amine@124: with patch("auditok.io._extract_selected_channel") as ext_mock: amine@124: with patch( amine@124: "auditok.io.AudioSegment.{}".format(function) amine@124: ) as open_func: amine@124: segment_mock.channels = 1 amine@124: open_func.return_value = segment_mock amine@124: from_file(filename, use_channel=use_channel) amine@124: self.assertTrue(open_func.called) amine@124: self.assertFalse(ext_mock.called) amine@124: amine@125: @patch("auditok.io._WITH_PYDUB", True) amine@125: @patch("auditok.io.BufferAudioSource") amine@125: @genty_dataset( amine@125: ogg=("ogg", "from_ogg"), amine@125: mp3=("mp3", "from_mp3"), amine@125: flac=("flac", "from_file"), amine@125: ) amine@125: def test_from_file_multichannel_audio_mix_compressed( amine@125: self, audio_format, function, *mocks amine@125: ): amine@125: filename = "audio.{}".format(audio_format) amine@125: segment_mock = Mock() amine@125: segment_mock.sample_width = 2 amine@125: segment_mock.channels = 2 amine@125: segment_mock._data = b"abcd" amine@125: with patch("auditok.io._mix_audio_channels") as mix_mock: amine@125: with patch( amine@125: "auditok.io.AudioSegment.{}".format(function) amine@125: ) as open_func: amine@125: open_func.return_value = segment_mock amine@125: from_file(filename, use_channel="mix") amine@125: self.assertTrue(open_func.called) amine@125: mix_mock.assert_called_with( amine@125: segment_mock._data, amine@125: segment_mock.channels, amine@125: segment_mock.sample_width, amine@125: ) amine@125: amine@123: @genty_dataset( amine@126: dafault_first_channel=(None, 400), amine@210: first_channel=(1, 400), amine@210: second_channel=(2, 800), amine@210: third_channel=(3, 1600), amine@126: negative_first_channel=(-3, 400), amine@126: negative_second_channel=(-2, 800), amine@126: negative_third_channel=(-1, 1600), amine@126: ) amine@126: def test_load_raw(self, use_channel, frequency): amine@126: filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw" amine@126: if use_channel is not None: amine@126: audio_source = _load_raw( amine@126: filename, amine@126: sampling_rate=16000, amine@126: sample_width=2, amine@126: channels=3, amine@126: use_channel=use_channel, amine@126: ) amine@126: else: amine@126: audio_source = _load_raw( amine@126: filename, sampling_rate=16000, sample_width=2, channels=3 amine@126: ) amine@126: self.assertIsInstance(audio_source, BufferAudioSource) amine@126: self.assertEqual(audio_source.sampling_rate, 16000) amine@126: self.assertEqual(audio_source.sample_width, 2) amine@126: self.assertEqual(audio_source.channels, 1) amine@126: # generate a pure sine wave tone of the given frequency amine@126: expected = PURE_TONE_DICT[frequency] amine@126: # compre with data read from file amine@126: fmt = DATA_FORMAT[2] amine@126: data = array(fmt, audio_source._buffer) amine@126: self.assertEqual(data, expected) amine@126: amine@126: @genty_dataset( amine@127: mono=("mono_400Hz", (400,)), amine@127: three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)), amine@127: ) amine@127: def test_load_raw_mix(self, filename_suffix, frequencies): amine@127: sampling_rate = 16000 amine@127: sample_width = 2 amine@127: channels = len(frequencies) amine@127: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@127: amine@127: fmt = DATA_FORMAT[sample_width] amine@127: expected = _array_to_bytes( amine@127: array( amine@127: fmt, amine@127: (sum(samples) // channels for samples in zip(*mono_channels)), amine@127: ) amine@127: ) amine@127: filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix) amine@127: audio_source = _load_raw( amine@127: filename, amine@127: use_channel="mix", amine@127: sampling_rate=sampling_rate, amine@127: sample_width=2, amine@127: channels=channels, amine@127: ) amine@127: mixed = audio_source._buffer amine@127: self.assertEqual(mixed, expected) amine@127: self.assertIsInstance(audio_source, BufferAudioSource) amine@127: self.assertEqual(audio_source.sampling_rate, sampling_rate) amine@127: self.assertEqual(audio_source.sample_width, sample_width) amine@127: self.assertEqual(audio_source.channels, 1) amine@127: amine@127: @genty_dataset( amine@128: missing_sampling_rate=("sr",), amine@128: missing_sample_width=("sw",), amine@128: missing_channels=("ch",), amine@128: ) amine@128: def test_load_raw_missing_audio_param(self, missing_param): amine@128: with self.assertRaises(AudioParameterError): amine@128: params = AUDIO_PARAMS_SHORT.copy() amine@128: del params[missing_param] amine@128: srate, swidth, channels, _ = _get_audio_parameters(params) amine@128: _load_raw("audio", srate, swidth, channels) amine@128: amine@128: @genty_dataset( amine@129: dafault_first_channel=(None, 400), amine@210: first_channel=(1, 400), amine@210: second_channel=(2, 800), amine@210: third_channel=(3, 1600), amine@129: negative_first_channel=(-3, 400), amine@129: negative_second_channel=(-2, 800), amine@129: negative_third_channel=(-1, 1600), amine@129: ) amine@129: def test_load_wave(self, use_channel, frequency): amine@129: filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav" amine@129: if use_channel is not None: amine@129: audio_source = _load_wave(filename, use_channel=use_channel) amine@129: else: amine@129: audio_source = _load_wave(filename) amine@129: self.assertIsInstance(audio_source, BufferAudioSource) amine@129: self.assertEqual(audio_source.sampling_rate, 16000) amine@129: self.assertEqual(audio_source.sample_width, 2) amine@129: self.assertEqual(audio_source.channels, 1) amine@129: # generate a pure sine wave tone of the given frequency amine@129: expected = PURE_TONE_DICT[frequency] amine@129: # compre with data read from file amine@129: fmt = DATA_FORMAT[2] amine@129: data = array(fmt, audio_source._buffer) amine@129: self.assertEqual(data, expected) amine@129: amine@129: @genty_dataset( amine@130: mono=("mono_400Hz", (400,)), amine@130: three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)), amine@130: ) amine@130: def test_load_wave_mix(self, filename_suffix, frequencies): amine@130: sampling_rate = 16000 amine@130: sample_width = 2 amine@130: channels = len(frequencies) amine@130: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@130: fmt = DATA_FORMAT[sample_width] amine@130: expected = _array_to_bytes( amine@130: array( amine@130: fmt, amine@130: (sum(samples) // channels for samples in zip(*mono_channels)), amine@130: ) amine@130: ) amine@130: filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix) amine@130: audio_source = _load_wave(filename, use_channel="mix") amine@130: mixed = audio_source._buffer amine@130: self.assertEqual(mixed, expected) amine@130: self.assertIsInstance(audio_source, BufferAudioSource) amine@130: self.assertEqual(audio_source.sampling_rate, sampling_rate) amine@130: self.assertEqual(audio_source.sample_width, sample_width) amine@130: self.assertEqual(audio_source.channels, 1) amine@130: amine@131: @patch("auditok.io._WITH_PYDUB", True) amine@131: @patch("auditok.io.BufferAudioSource") amine@131: @genty_dataset( amine@131: ogg_default_first_channel=("ogg", 2, None, "from_ogg"), amine@131: ogg_first_channel=("ogg", 1, 0, "from_ogg"), amine@131: ogg_second_channel=("ogg", 2, 1, "from_ogg"), amine@131: ogg_mix_channels=("ogg", 3, "mix", "from_ogg"), amine@131: mp3_left_channel=("mp3", 1, "left", "from_mp3"), amine@131: mp3_right_channel=("mp3", 2, "right", "from_mp3"), amine@131: mp3_mix_channels=("mp3", 3, "mix", "from_mp3"), amine@210: flac_first_channel=("flac", 2, 1, "from_file"), amine@131: flac_second_channel=("flac", 2, 1, "from_file"), amine@131: flv_left_channel=("flv", 1, "left", "from_flv"), amine@131: webm_right_channel=("webm", 2, "right", "from_file"), amine@131: webm_mix_channels=("webm", 4, "mix", "from_file"), amine@131: ) amine@131: def test_load_with_pydub( amine@131: self, audio_format, channels, use_channel, function, *mocks amine@131: ): amine@131: filename = "audio.{}".format(audio_format) amine@131: segment_mock = Mock() amine@131: segment_mock.sample_width = 2 amine@131: segment_mock.channels = channels amine@131: segment_mock._data = b"abcdefgh" amine@131: with patch("auditok.io._extract_selected_channel") as ext_mock: amine@131: with patch( amine@131: "auditok.io.AudioSegment.{}".format(function) amine@131: ) as open_func: amine@131: open_func.return_value = segment_mock amine@210: normalized_use_channel = {"left": 1, "right": 2, None: 0}.get( amine@131: use_channel, use_channel amine@131: ) amine@210: if isinstance(normalized_use_channel, int) and normalized_use_channel > 0: amine@210: normalized_use_channel -= 1 amine@131: _load_with_pydub(filename, audio_format, use_channel) amine@131: self.assertTrue(open_func.called) amine@131: if channels > 1: amine@131: self.assertTrue(ext_mock.called) amine@131: ext_mock.assert_called_with( amine@131: segment_mock._data, amine@131: segment_mock.channels, amine@131: segment_mock.sample_width, amine@210: normalized_use_channel, amine@131: ) amine@131: else: amine@131: self.assertFalse(ext_mock.called) amine@131: amine@130: @genty_dataset( amine@132: mono=("mono_400Hz.raw", (400,)), amine@132: three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)), amine@132: ) amine@132: def test_save_raw(self, filename, frequencies): amine@132: filename = "tests/data/test_16KHZ_{}".format(filename) amine@132: sample_width = 2 amine@132: fmt = DATA_FORMAT[sample_width] amine@132: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@132: data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) amine@132: tmpfile = NamedTemporaryFile() amine@136: _save_raw(data, tmpfile.name) amine@132: self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False)) amine@132: amine@132: @genty_dataset( amine@110: mono=("mono_400Hz.wav", (400,)), amine@110: three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)), amine@110: ) amine@110: def test_save_wave(self, filename, frequencies): amine@110: filename = "tests/data/test_16KHZ_{}".format(filename) amine@110: sampling_rate = 16000 amine@110: sample_width = 2 amine@110: channels = len(frequencies) amine@110: fmt = DATA_FORMAT[sample_width] amine@110: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@110: data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) amine@110: tmpfile = NamedTemporaryFile() amine@136: _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels) amine@110: self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False)) amine@132: amine@132: @genty_dataset( amine@132: missing_sampling_rate=("sr",), amine@132: missing_sample_width=("sw",), amine@132: missing_channels=("ch",), amine@132: ) amine@132: def test_save_wave_missing_audio_param(self, missing_param): amine@132: with self.assertRaises(AudioParameterError): amine@132: params = AUDIO_PARAMS_SHORT.copy() amine@132: del params[missing_param] amine@132: srate, swidth, channels, _ = _get_audio_parameters(params) amine@136: _save_wave(b"\0\0", "audio", srate, swidth, channels) amine@133: amine@141: def test_save_with_pydub(self): amine@141: with patch("auditok.io.AudioSegment.export") as export: amine@142: tmpdir = TemporaryDirectory() amine@142: filename = os.path.join(tmpdir.name, "audio.ogg") amine@142: _save_with_pydub(b"\0\0", filename, "ogg", 16000, 2, 1) amine@141: self.assertTrue(export.called) amine@142: tmpdir.cleanup() amine@141: amine@133: @genty_dataset( amine@133: raw_with_audio_format=("audio", "raw"), amine@133: raw_with_extension=("audio.raw", None), amine@133: raw_with_audio_format_and_extension=("audio.mp3", "raw"), amine@133: raw_no_audio_format_nor_extension=("audio", None), amine@133: ) amine@133: def test_to_file_raw(self, filename, audio_format): amine@133: exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw" amine@133: tmpdir = TemporaryDirectory() amine@133: filename = os.path.join(tmpdir.name, filename) amine@133: data = _array_to_bytes(PURE_TONE_DICT[400]) amine@135: to_file(data, filename, audio_format=audio_format) amine@133: self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False)) amine@133: tmpdir.cleanup() amine@134: amine@134: @genty_dataset( amine@134: wav_with_audio_format=("audio", "wav"), amine@134: wav_with_extension=("audio.wav", None), amine@134: wav_with_audio_format_and_extension=("audio.mp3", "wav"), amine@134: wave_with_audio_format=("audio", "wave"), amine@134: wave_with_extension=("audio.wave", None), amine@134: wave_with_audio_format_and_extension=("audio.mp3", "wave"), amine@134: ) amine@135: def test_to_file_wave(self, filename, audio_format): amine@134: exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav" amine@134: tmpdir = TemporaryDirectory() amine@134: filename = os.path.join(tmpdir.name, filename) amine@134: data = _array_to_bytes(PURE_TONE_DICT[400]) amine@135: to_file( amine@135: data, amine@135: filename, amine@135: audio_format=audio_format, amine@135: sampling_rate=16000, amine@135: sample_width=2, amine@135: channels=1, amine@134: ) amine@134: self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False)) amine@134: tmpdir.cleanup() amine@138: amine@138: @genty_dataset( amine@138: missing_sampling_rate=("sr",), amine@138: missing_sample_width=("sw",), amine@138: missing_channels=("ch",), amine@138: ) amine@138: def test_to_file_missing_audio_param(self, missing_param): amine@138: params = AUDIO_PARAMS_SHORT.copy() amine@138: del params[missing_param] amine@138: with self.assertRaises(AudioParameterError): amine@138: to_file(b"\0\0", "audio", audio_format="wav", **params) amine@138: with self.assertRaises(AudioParameterError): amine@138: to_file(b"\0\0", "audio", audio_format="mp3", **params) amine@139: amine@139: def test_to_file_no_pydub(self): amine@139: with patch("auditok.io._WITH_PYDUB", False): amine@139: with self.assertRaises(AudioIOError): amine@139: to_file("audio", b"", "mp3") amine@140: amine@140: @patch("auditok.io._WITH_PYDUB", True) amine@140: @genty_dataset( amine@140: ogg_with_extension=("audio.ogg", None), amine@140: ogg_with_audio_format=("audio", "ogg"), amine@140: ogg_format_with_wrong_extension=("audio.wav", "ogg"), amine@140: ) amine@140: def test_to_file_compressed(self, filename, audio_format, *mocks): amine@140: with patch("auditok.io.AudioSegment.export") as export: amine@142: tmpdir = TemporaryDirectory() amine@142: filename = os.path.join(tmpdir.name, filename) amine@140: to_file(b"\0\0", filename, audio_format, **AUDIO_PARAMS_SHORT) amine@140: self.assertTrue(export.called) amine@142: tmpdir.cleanup() amine@190: amine@190: @genty_dataset( amine@190: string_wave=( amine@190: "tests/data/test_16KHZ_mono_400Hz.wav", amine@190: BufferAudioSource, amine@190: ), amine@190: string_wave_large_file=( amine@190: "tests/data/test_16KHZ_mono_400Hz.wav", amine@190: WaveAudioSource, amine@190: {"large_file": True}, amine@190: ), amine@190: stdin=("-", StdinAudioSource), amine@190: string_raw=("tests/data/test_16KHZ_mono_400Hz.raw", BufferAudioSource), amine@190: string_raw_large_file=( amine@190: "tests/data/test_16KHZ_mono_400Hz.raw", amine@190: RawAudioSource, amine@190: {"large_file": True}, amine@190: ), amine@190: bytes_=(b"0" * 8000, BufferAudioSource), amine@190: ) amine@190: def test_get_audio_source(self, input, expected_type, extra_args=None): amine@190: kwargs = {"sampling_rate": 16000, "sample_width": 2, "channels": 1} amine@190: if extra_args is not None: amine@190: kwargs.update(extra_args) amine@190: audio_source = get_audio_source(input, **kwargs) amine@210: self.assertIsInstance(audio_source, expected_type)