amine@106: import os amine@106: import sys amine@106: import math amine@107: from array import array amine@110: from tempfile import NamedTemporaryFile amine@110: import filecmp amine@108: from unittest import TestCase amine@108: from genty import genty, genty_dataset amine@110: from auditok.io import ( amine@126: DATA_FORMAT, amine@121: AudioIOError, amine@110: AudioParameterError, amine@126: BufferAudioSource, amine@110: check_audio_data, amine@128: _get_audio_parameters, amine@116: _array_to_bytes, amine@118: _mix_audio_channels, amine@119: _extract_selected_channel, amine@126: _load_raw, amine@129: _load_wave, amine@131: _load_with_pydub, amine@120: from_file, amine@111: _save_raw, amine@110: _save_wave, amine@110: ) amine@106: amine@106: amine@106: if sys.version_info >= (3, 0): amine@106: PYTHON_3 = True amine@124: from unittest.mock import patch, Mock amine@106: else: amine@106: PYTHON_3 = False amine@124: from mock import patch, Mock amine@120: amine@120: AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1} amine@106: amine@106: amine@106: def _sample_generator(*data_buffers): amine@106: """ amine@106: Takes a list of many mono audio data buffers and makes a sample generator amine@106: of interleaved audio samples, one sample from each channel. The resulting amine@106: generator can be used to build a multichannel audio buffer. amine@106: >>> gen = _sample_generator("abcd", "ABCD") amine@106: >>> list(gen) amine@106: ["a", "A", "b", "B", "c", "C", "d", "D"] amine@106: """ amine@106: frame_gen = zip(*data_buffers) amine@106: return (sample for frame in frame_gen for sample in frame) amine@106: amine@106: amine@107: def _generate_pure_tone( amine@107: frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4 amine@107: ): amine@107: """ amine@107: Generates a pure tone with the given frequency. amine@107: """ amine@107: assert frequency <= sampling_rate / 2 amine@107: max_value = (2 ** (sample_width * 8) // 2) - 1 amine@107: if volume > max_value: amine@107: volume = max_value amine@107: fmt = DATA_FORMAT[sample_width] amine@107: total_samples = int(sampling_rate * duration_sec) amine@107: step = frequency / sampling_rate amine@107: two_pi_step = 2 * math.pi * step amine@107: data = array( amine@107: fmt, amine@107: ( amine@107: int(math.sin(two_pi_step * i) * volume) amine@107: for i in range(total_samples) amine@107: ), amine@107: ) amine@107: return data amine@107: amine@107: amine@107: PURE_TONE_DICT = { amine@107: freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600) amine@107: } amine@107: PURE_TONE_DICT.update( amine@107: { amine@107: freq: _generate_pure_tone(freq, 0.1, 16000, 2) amine@107: for freq in (600, 1150, 2400, 7220) amine@107: } amine@107: ) amine@108: amine@108: amine@108: @genty amine@108: class TestIO(TestCase): amine@108: @genty_dataset( amine@108: valid_mono=(b"\0" * 113, 1, 1), amine@108: valid_stereo=(b"\0" * 160, 1, 2), amine@108: invalid_mono_sw_2=(b"\0" * 113, 2, 1, False), amine@108: invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False), amine@108: invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False), amine@108: ) amine@108: def test_check_audio_data(self, data, sample_width, channels, valid=True): amine@108: amine@108: if not valid: amine@108: with self.assertRaises(AudioParameterError): amine@108: check_audio_data(data, sample_width, channels) amine@108: else: amine@108: self.assertIsNone(check_audio_data(data, sample_width, channels)) amine@110: amine@110: @genty_dataset( amine@118: mono_1byte=([400], 1), amine@118: stereo_1byte=([400, 600], 1), amine@118: three_channel_1byte=([400, 600, 2400], 1), amine@118: mono_2byte=([400], 2), amine@118: stereo_2byte=([400, 600], 2), amine@118: three_channel_2byte=([400, 600, 1150], 2), amine@118: mono_4byte=([400], 4), amine@118: stereo_4byte=([400, 600], 4), amine@118: four_channel_2byte=([400, 600, 1150, 7220], 4), amine@118: ) amine@118: def test_mix_audio_channels(self, frequencies, sample_width): amine@118: sampling_rate = 16000 amine@118: sample_width = 2 amine@118: channels = len(frequencies) amine@118: mono_channels = [ amine@118: _generate_pure_tone( amine@118: freq, amine@118: duration_sec=0.1, amine@118: sampling_rate=sampling_rate, amine@118: sample_width=sample_width, amine@118: ) amine@118: for freq in frequencies amine@118: ] amine@118: fmt = DATA_FORMAT[sample_width] amine@118: expected = _array_to_bytes( amine@118: array( amine@118: fmt, amine@118: (sum(samples) // channels for samples in zip(*mono_channels)), amine@118: ) amine@118: ) amine@118: data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) amine@118: mixed = _mix_audio_channels(data, channels, sample_width) amine@118: self.assertEqual(mixed, expected) amine@118: amine@118: @genty_dataset( amine@119: mono_1byte=([400], 1, 0), amine@119: stereo_1byte_2st_channel=([400, 600], 1, 1), amine@119: mono_2byte=([400], 2, 0), amine@119: stereo_2byte_1st_channel=([400, 600], 2, 0), amine@119: stereo_2byte_2nd_channel=([400, 600], 2, 1), amine@119: three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1), amine@119: three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2), amine@119: three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3), amine@119: three_channel_4byte_1st=([400, 600, 1150], 4, 0), amine@119: three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1), amine@119: ) amine@119: def test_extract_selected_channel( amine@119: self, frequencies, sample_width, use_channel amine@119: ): amine@119: amine@119: mono_channels = [ amine@119: _generate_pure_tone( amine@119: freq, amine@119: duration_sec=0.1, amine@119: sampling_rate=16000, amine@119: sample_width=sample_width, amine@119: ) amine@119: for freq in frequencies amine@119: ] amine@119: channels = len(frequencies) amine@119: fmt = DATA_FORMAT[sample_width] amine@119: expected = _array_to_bytes(mono_channels[use_channel]) amine@119: data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) amine@119: selected_channel = _extract_selected_channel( amine@119: data, channels, sample_width, use_channel amine@119: ) amine@119: self.assertEqual(selected_channel, expected) amine@119: amine@119: @genty_dataset( amine@120: raw_with_audio_format=( amine@120: "audio", amine@120: "raw", amine@120: "_load_raw", amine@120: AUDIO_PARAMS_SHORT, amine@120: ), amine@120: raw_with_extension=( amine@120: "audio.raw", amine@120: None, amine@120: "_load_raw", amine@120: AUDIO_PARAMS_SHORT, amine@120: ), amine@120: wave_with_audio_format=("audio", "wave", "_load_wave"), amine@120: wav_with_audio_format=("audio", "wave", "_load_wave"), amine@120: wav_with_extension=("audio.wav", None, "_load_wave"), amine@120: format_and_extension_both_given=("audio.dat", "wav", "_load_wave"), amine@120: format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"), amine@120: no_format_nor_extension=("audio", None, "_load_with_pydub"), amine@120: other_formats_ogg=("audio.ogg", None, "_load_with_pydub"), amine@120: other_formats_webm=("audio", "webm", "_load_with_pydub"), amine@120: ) amine@120: def test_from_file( amine@120: self, filename, audio_format, funtion_name, kwargs=None amine@120: ): amine@120: funtion_name = "auditok.io." + funtion_name amine@120: if kwargs is None: amine@120: kwargs = {} amine@120: with patch(funtion_name) as patch_function: amine@120: from_file(filename, audio_format, **kwargs) amine@120: self.assertTrue(patch_function.called) amine@120: amine@120: @genty_dataset( amine@111: mono=("mono_400Hz.raw", (400,)), amine@111: three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)), amine@111: ) amine@111: def test_save_raw(self, filename, frequencies): amine@111: filename = "tests/data/test_16KHZ_{}".format(filename) amine@111: sample_width = 2 amine@111: fmt = DATA_FORMAT[sample_width] amine@111: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@111: data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) amine@111: tmpfile = NamedTemporaryFile() amine@111: _save_raw(tmpfile.name, data) amine@111: self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False)) amine@111: amine@121: def test_from_file_no_pydub(self): amine@121: with patch("auditok.io._WITH_PYDUB", False): amine@121: with self.assertRaises(AudioIOError): amine@121: from_file("audio", "mp3") amine@121: amine@111: @genty_dataset( amine@122: raw_first_channel=("raw", 0, 400), amine@122: raw_second_channel=("raw", 1, 800), amine@122: raw_third_channel=("raw", 2, 1600), amine@122: raw_left_channel=("raw", "left", 400), amine@122: raw_right_channel=("raw", "right", 800), amine@122: wav_first_channel=("wav", 0, 400), amine@122: wav_second_channel=("wav", 1, 800), amine@122: wav_third_channel=("wav", 2, 1600), amine@122: wav_left_channel=("wav", "left", 400), amine@122: wav_right_channel=("wav", "right", 800), amine@122: ) amine@122: def test_from_file_multichannel_audio( amine@122: self, audio_format, use_channel, frequency amine@122: ): amine@122: expected = PURE_TONE_DICT[frequency] amine@122: filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format( amine@122: audio_format amine@122: ) amine@122: sample_width = 2 amine@122: audio_source = from_file( amine@122: filename, amine@122: sampling_rate=16000, amine@122: sample_width=sample_width, amine@122: channels=3, amine@122: use_channel=use_channel, amine@122: ) amine@122: fmt = DATA_FORMAT[sample_width] amine@122: data = array(fmt, audio_source._buffer) amine@122: self.assertEqual(data, expected) amine@122: amine@122: @genty_dataset( amine@123: raw_mono=("raw", "mono_400Hz", (400,)), amine@123: raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)), amine@123: wav_mono=("wav", "mono_400Hz", (400,)), amine@123: wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)), amine@123: ) amine@123: def test_from_file_multichannel_audio_mix( amine@123: self, audio_format, filename_suffix, frequencies amine@123: ): amine@123: sampling_rate = 16000 amine@123: sample_width = 2 amine@123: channels = len(frequencies) amine@123: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@123: channels = len(frequencies) amine@123: fmt = DATA_FORMAT[sample_width] amine@123: expected = _array_to_bytes( amine@123: array( amine@123: fmt, amine@123: (sum(samples) // channels for samples in zip(*mono_channels)), amine@123: ) amine@123: ) amine@123: filename = "tests/data/test_16KHZ_{}.{}".format( amine@123: filename_suffix, audio_format amine@123: ) amine@123: audio_source = from_file( amine@123: filename, amine@123: use_channel="mix", amine@123: sampling_rate=sampling_rate, amine@123: sample_width=2, amine@123: channels=channels, amine@123: ) amine@123: mixed = audio_source._buffer amine@123: self.assertEqual((mixed), expected) amine@123: amine@124: @patch("auditok.io._WITH_PYDUB", True) amine@124: @patch("auditok.io.BufferAudioSource") amine@124: @genty_dataset( amine@124: ogg_first_channel=("ogg", 0, "from_ogg"), amine@124: ogg_second_channel=("ogg", 1, "from_ogg"), amine@124: ogg_mix=("ogg", "mix", "from_ogg"), amine@124: ogg_default=("ogg", None, "from_ogg"), amine@124: mp3_left_channel=("mp3", "left", "from_mp3"), amine@124: mp3_right_channel=("mp3", "right", "from_mp3"), amine@124: flac_first_channel=("flac", 0, "from_file"), amine@124: flac_second_channel=("flac", 1, "from_file"), amine@124: flv_left_channel=("flv", "left", "from_flv"), amine@124: webm_right_channel=("webm", "right", "from_file"), amine@124: ) amine@124: def test_from_file_multichannel_audio_compressed( amine@124: self, audio_format, use_channel, function, *mocks amine@124: ): amine@124: filename = "audio.{}".format(audio_format) amine@124: segment_mock = Mock() amine@124: segment_mock.sample_width = 2 amine@124: segment_mock.channels = 2 amine@124: segment_mock._data = b"abcd" amine@124: with patch("auditok.io._extract_selected_channel") as ext_mock: amine@124: with patch( amine@124: "auditok.io.AudioSegment.{}".format(function) amine@124: ) as open_func: amine@124: open_func.return_value = segment_mock amine@124: from_file(filename, use_channel=use_channel) amine@124: self.assertTrue(open_func.called) amine@124: self.assertTrue(ext_mock.called) amine@124: amine@124: use_channel = {"left": 0, "right": 1, None: 0}.get( amine@124: use_channel, use_channel amine@124: ) amine@124: ext_mock.assert_called_with( amine@124: segment_mock._data, amine@124: segment_mock.channels, amine@124: segment_mock.sample_width, amine@124: use_channel, amine@124: ) amine@124: amine@124: with patch("auditok.io._extract_selected_channel") as ext_mock: amine@124: with patch( amine@124: "auditok.io.AudioSegment.{}".format(function) amine@124: ) as open_func: amine@124: segment_mock.channels = 1 amine@124: open_func.return_value = segment_mock amine@124: from_file(filename, use_channel=use_channel) amine@124: self.assertTrue(open_func.called) amine@124: self.assertFalse(ext_mock.called) amine@124: amine@125: @patch("auditok.io._WITH_PYDUB", True) amine@125: @patch("auditok.io.BufferAudioSource") amine@125: @genty_dataset( amine@125: ogg=("ogg", "from_ogg"), amine@125: mp3=("mp3", "from_mp3"), amine@125: flac=("flac", "from_file"), amine@125: ) amine@125: def test_from_file_multichannel_audio_mix_compressed( amine@125: self, audio_format, function, *mocks amine@125: ): amine@125: filename = "audio.{}".format(audio_format) amine@125: segment_mock = Mock() amine@125: segment_mock.sample_width = 2 amine@125: segment_mock.channels = 2 amine@125: segment_mock._data = b"abcd" amine@125: with patch("auditok.io._mix_audio_channels") as mix_mock: amine@125: with patch( amine@125: "auditok.io.AudioSegment.{}".format(function) amine@125: ) as open_func: amine@125: open_func.return_value = segment_mock amine@125: from_file(filename, use_channel="mix") amine@125: self.assertTrue(open_func.called) amine@125: mix_mock.assert_called_with( amine@125: segment_mock._data, amine@125: segment_mock.channels, amine@125: segment_mock.sample_width, amine@125: ) amine@125: amine@123: @genty_dataset( amine@126: dafault_first_channel=(None, 400), amine@126: first_channel=(0, 400), amine@126: second_channel=(1, 800), amine@126: third_channel=(2, 1600), amine@126: negative_first_channel=(-3, 400), amine@126: negative_second_channel=(-2, 800), amine@126: negative_third_channel=(-1, 1600), amine@126: ) amine@126: def test_load_raw(self, use_channel, frequency): amine@126: filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw" amine@126: if use_channel is not None: amine@126: audio_source = _load_raw( amine@126: filename, amine@126: sampling_rate=16000, amine@126: sample_width=2, amine@126: channels=3, amine@126: use_channel=use_channel, amine@126: ) amine@126: else: amine@126: audio_source = _load_raw( amine@126: filename, sampling_rate=16000, sample_width=2, channels=3 amine@126: ) amine@126: self.assertIsInstance(audio_source, BufferAudioSource) amine@126: self.assertEqual(audio_source.sampling_rate, 16000) amine@126: self.assertEqual(audio_source.sample_width, 2) amine@126: self.assertEqual(audio_source.channels, 1) amine@126: # generate a pure sine wave tone of the given frequency amine@126: expected = PURE_TONE_DICT[frequency] amine@126: # compre with data read from file amine@126: fmt = DATA_FORMAT[2] amine@126: data = array(fmt, audio_source._buffer) amine@126: self.assertEqual(data, expected) amine@126: amine@126: @genty_dataset( amine@127: mono=("mono_400Hz", (400,)), amine@127: three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)), amine@127: ) amine@127: def test_load_raw_mix(self, filename_suffix, frequencies): amine@127: sampling_rate = 16000 amine@127: sample_width = 2 amine@127: channels = len(frequencies) amine@127: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@127: amine@127: fmt = DATA_FORMAT[sample_width] amine@127: expected = _array_to_bytes( amine@127: array( amine@127: fmt, amine@127: (sum(samples) // channels for samples in zip(*mono_channels)), amine@127: ) amine@127: ) amine@127: filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix) amine@127: audio_source = _load_raw( amine@127: filename, amine@127: use_channel="mix", amine@127: sampling_rate=sampling_rate, amine@127: sample_width=2, amine@127: channels=channels, amine@127: ) amine@127: mixed = audio_source._buffer amine@127: self.assertEqual(mixed, expected) amine@127: self.assertIsInstance(audio_source, BufferAudioSource) amine@127: self.assertEqual(audio_source.sampling_rate, sampling_rate) amine@127: self.assertEqual(audio_source.sample_width, sample_width) amine@127: self.assertEqual(audio_source.channels, 1) amine@127: amine@127: @genty_dataset( amine@128: missing_sampling_rate=("sr",), amine@128: missing_sample_width=("sw",), amine@128: missing_channels=("ch",), amine@128: ) amine@128: def test_load_raw_missing_audio_param(self, missing_param): amine@128: with self.assertRaises(AudioParameterError): amine@128: params = AUDIO_PARAMS_SHORT.copy() amine@128: del params[missing_param] amine@128: srate, swidth, channels, _ = _get_audio_parameters(params) amine@128: _load_raw("audio", srate, swidth, channels) amine@128: amine@128: @genty_dataset( amine@129: dafault_first_channel=(None, 400), amine@129: first_channel=(0, 400), amine@129: second_channel=(1, 800), amine@129: third_channel=(2, 1600), amine@129: negative_first_channel=(-3, 400), amine@129: negative_second_channel=(-2, 800), amine@129: negative_third_channel=(-1, 1600), amine@129: ) amine@129: def test_load_wave(self, use_channel, frequency): amine@129: filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav" amine@129: if use_channel is not None: amine@129: audio_source = _load_wave(filename, use_channel=use_channel) amine@129: else: amine@129: audio_source = _load_wave(filename) amine@129: self.assertIsInstance(audio_source, BufferAudioSource) amine@129: self.assertEqual(audio_source.sampling_rate, 16000) amine@129: self.assertEqual(audio_source.sample_width, 2) amine@129: self.assertEqual(audio_source.channels, 1) amine@129: # generate a pure sine wave tone of the given frequency amine@129: expected = PURE_TONE_DICT[frequency] amine@129: # compre with data read from file amine@129: fmt = DATA_FORMAT[2] amine@129: data = array(fmt, audio_source._buffer) amine@129: self.assertEqual(data, expected) amine@129: amine@129: @genty_dataset( amine@130: mono=("mono_400Hz", (400,)), amine@130: three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)), amine@130: ) amine@130: def test_load_wave_mix(self, filename_suffix, frequencies): amine@130: sampling_rate = 16000 amine@130: sample_width = 2 amine@130: channels = len(frequencies) amine@130: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@130: fmt = DATA_FORMAT[sample_width] amine@130: expected = _array_to_bytes( amine@130: array( amine@130: fmt, amine@130: (sum(samples) // channels for samples in zip(*mono_channels)), amine@130: ) amine@130: ) amine@130: filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix) amine@130: audio_source = _load_wave(filename, use_channel="mix") amine@130: mixed = audio_source._buffer amine@130: self.assertEqual(mixed, expected) amine@130: self.assertIsInstance(audio_source, BufferAudioSource) amine@130: self.assertEqual(audio_source.sampling_rate, sampling_rate) amine@130: self.assertEqual(audio_source.sample_width, sample_width) amine@130: self.assertEqual(audio_source.channels, 1) amine@130: amine@131: @patch("auditok.io._WITH_PYDUB", True) amine@131: @patch("auditok.io.BufferAudioSource") amine@131: @genty_dataset( amine@131: ogg_default_first_channel=("ogg", 2, None, "from_ogg"), amine@131: ogg_first_channel=("ogg", 1, 0, "from_ogg"), amine@131: ogg_second_channel=("ogg", 2, 1, "from_ogg"), amine@131: ogg_mix_channels=("ogg", 3, "mix", "from_ogg"), amine@131: mp3_left_channel=("mp3", 1, "left", "from_mp3"), amine@131: mp3_right_channel=("mp3", 2, "right", "from_mp3"), amine@131: mp3_mix_channels=("mp3", 3, "mix", "from_mp3"), amine@131: flac_first_channel=("flac", 2, 0, "from_file"), amine@131: flac_second_channel=("flac", 2, 1, "from_file"), amine@131: flv_left_channel=("flv", 1, "left", "from_flv"), amine@131: webm_right_channel=("webm", 2, "right", "from_file"), amine@131: webm_mix_channels=("webm", 4, "mix", "from_file"), amine@131: ) amine@131: def test_load_with_pydub( amine@131: self, audio_format, channels, use_channel, function, *mocks amine@131: ): amine@131: filename = "audio.{}".format(audio_format) amine@131: segment_mock = Mock() amine@131: segment_mock.sample_width = 2 amine@131: segment_mock.channels = channels amine@131: segment_mock._data = b"abcdefgh" amine@131: with patch("auditok.io._extract_selected_channel") as ext_mock: amine@131: with patch( amine@131: "auditok.io.AudioSegment.{}".format(function) amine@131: ) as open_func: amine@131: open_func.return_value = segment_mock amine@131: use_channel = {"left": 0, "right": 1, None: 0}.get( amine@131: use_channel, use_channel amine@131: ) amine@131: _load_with_pydub(filename, audio_format, use_channel) amine@131: self.assertTrue(open_func.called) amine@131: if channels > 1: amine@131: self.assertTrue(ext_mock.called) amine@131: ext_mock.assert_called_with( amine@131: segment_mock._data, amine@131: segment_mock.channels, amine@131: segment_mock.sample_width, amine@131: use_channel, amine@131: ) amine@131: else: amine@131: self.assertFalse(ext_mock.called) amine@131: amine@130: @genty_dataset( amine@110: mono=("mono_400Hz.wav", (400,)), amine@110: three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)), amine@110: ) amine@110: def test_save_wave(self, filename, frequencies): amine@110: filename = "tests/data/test_16KHZ_{}".format(filename) amine@110: sampling_rate = 16000 amine@110: sample_width = 2 amine@110: channels = len(frequencies) amine@110: fmt = DATA_FORMAT[sample_width] amine@110: mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] amine@110: data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels))) amine@110: tmpfile = NamedTemporaryFile() amine@110: _save_wave(tmpfile.name, data, sampling_rate, sample_width, channels) amine@110: self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))