annotate tests/test_io.py @ 127:7db4af2fefe4

Add tests for _load_raw with channel mixing
author Amine Sehili <amine.sehili@gmail.com>
date Wed, 06 Feb 2019 20:17:42 +0100
parents 52e428b76a3d
children 3594fdb0703d
rev   line source
amine@106 1 import os
amine@106 2 import sys
amine@106 3 import math
amine@107 4 from array import array
amine@110 5 from tempfile import NamedTemporaryFile
amine@110 6 import filecmp
amine@108 7 from unittest import TestCase
amine@108 8 from genty import genty, genty_dataset
amine@110 9 from auditok.io import (
amine@126 10 DATA_FORMAT,
amine@121 11 AudioIOError,
amine@110 12 AudioParameterError,
amine@126 13 BufferAudioSource,
amine@110 14 check_audio_data,
amine@116 15 _array_to_bytes,
amine@118 16 _mix_audio_channels,
amine@119 17 _extract_selected_channel,
amine@126 18 _load_raw,
amine@120 19 from_file,
amine@111 20 _save_raw,
amine@110 21 _save_wave,
amine@110 22 )
amine@106 23
amine@106 24
amine@106 25 if sys.version_info >= (3, 0):
amine@106 26 PYTHON_3 = True
amine@124 27 from unittest.mock import patch, Mock
amine@106 28 else:
amine@106 29 PYTHON_3 = False
amine@124 30 from mock import patch, Mock
amine@120 31
amine@120 32 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
amine@106 33
amine@106 34
amine@106 35 def _sample_generator(*data_buffers):
amine@106 36 """
amine@106 37 Takes a list of many mono audio data buffers and makes a sample generator
amine@106 38 of interleaved audio samples, one sample from each channel. The resulting
amine@106 39 generator can be used to build a multichannel audio buffer.
amine@106 40 >>> gen = _sample_generator("abcd", "ABCD")
amine@106 41 >>> list(gen)
amine@106 42 ["a", "A", "b", "B", "c", "C", "d", "D"]
amine@106 43 """
amine@106 44 frame_gen = zip(*data_buffers)
amine@106 45 return (sample for frame in frame_gen for sample in frame)
amine@106 46
amine@106 47
amine@107 48 def _generate_pure_tone(
amine@107 49 frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4
amine@107 50 ):
amine@107 51 """
amine@107 52 Generates a pure tone with the given frequency.
amine@107 53 """
amine@107 54 assert frequency <= sampling_rate / 2
amine@107 55 max_value = (2 ** (sample_width * 8) // 2) - 1
amine@107 56 if volume > max_value:
amine@107 57 volume = max_value
amine@107 58 fmt = DATA_FORMAT[sample_width]
amine@107 59 total_samples = int(sampling_rate * duration_sec)
amine@107 60 step = frequency / sampling_rate
amine@107 61 two_pi_step = 2 * math.pi * step
amine@107 62 data = array(
amine@107 63 fmt,
amine@107 64 (
amine@107 65 int(math.sin(two_pi_step * i) * volume)
amine@107 66 for i in range(total_samples)
amine@107 67 ),
amine@107 68 )
amine@107 69 return data
amine@107 70
amine@107 71
amine@107 72 PURE_TONE_DICT = {
amine@107 73 freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600)
amine@107 74 }
amine@107 75 PURE_TONE_DICT.update(
amine@107 76 {
amine@107 77 freq: _generate_pure_tone(freq, 0.1, 16000, 2)
amine@107 78 for freq in (600, 1150, 2400, 7220)
amine@107 79 }
amine@107 80 )
amine@108 81
amine@108 82
amine@108 83 @genty
amine@108 84 class TestIO(TestCase):
amine@108 85 @genty_dataset(
amine@108 86 valid_mono=(b"\0" * 113, 1, 1),
amine@108 87 valid_stereo=(b"\0" * 160, 1, 2),
amine@108 88 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
amine@108 89 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
amine@108 90 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
amine@108 91 )
amine@108 92 def test_check_audio_data(self, data, sample_width, channels, valid=True):
amine@108 93
amine@108 94 if not valid:
amine@108 95 with self.assertRaises(AudioParameterError):
amine@108 96 check_audio_data(data, sample_width, channels)
amine@108 97 else:
amine@108 98 self.assertIsNone(check_audio_data(data, sample_width, channels))
amine@110 99
amine@110 100 @genty_dataset(
amine@118 101 mono_1byte=([400], 1),
amine@118 102 stereo_1byte=([400, 600], 1),
amine@118 103 three_channel_1byte=([400, 600, 2400], 1),
amine@118 104 mono_2byte=([400], 2),
amine@118 105 stereo_2byte=([400, 600], 2),
amine@118 106 three_channel_2byte=([400, 600, 1150], 2),
amine@118 107 mono_4byte=([400], 4),
amine@118 108 stereo_4byte=([400, 600], 4),
amine@118 109 four_channel_2byte=([400, 600, 1150, 7220], 4),
amine@118 110 )
amine@118 111 def test_mix_audio_channels(self, frequencies, sample_width):
amine@118 112 sampling_rate = 16000
amine@118 113 sample_width = 2
amine@118 114 channels = len(frequencies)
amine@118 115 mono_channels = [
amine@118 116 _generate_pure_tone(
amine@118 117 freq,
amine@118 118 duration_sec=0.1,
amine@118 119 sampling_rate=sampling_rate,
amine@118 120 sample_width=sample_width,
amine@118 121 )
amine@118 122 for freq in frequencies
amine@118 123 ]
amine@118 124 fmt = DATA_FORMAT[sample_width]
amine@118 125 expected = _array_to_bytes(
amine@118 126 array(
amine@118 127 fmt,
amine@118 128 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@118 129 )
amine@118 130 )
amine@118 131 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@118 132 mixed = _mix_audio_channels(data, channels, sample_width)
amine@118 133 self.assertEqual(mixed, expected)
amine@118 134
amine@118 135 @genty_dataset(
amine@119 136 mono_1byte=([400], 1, 0),
amine@119 137 stereo_1byte_2st_channel=([400, 600], 1, 1),
amine@119 138 mono_2byte=([400], 2, 0),
amine@119 139 stereo_2byte_1st_channel=([400, 600], 2, 0),
amine@119 140 stereo_2byte_2nd_channel=([400, 600], 2, 1),
amine@119 141 three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
amine@119 142 three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
amine@119 143 three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
amine@119 144 three_channel_4byte_1st=([400, 600, 1150], 4, 0),
amine@119 145 three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
amine@119 146 )
amine@119 147 def test_extract_selected_channel(
amine@119 148 self, frequencies, sample_width, use_channel
amine@119 149 ):
amine@119 150
amine@119 151 mono_channels = [
amine@119 152 _generate_pure_tone(
amine@119 153 freq,
amine@119 154 duration_sec=0.1,
amine@119 155 sampling_rate=16000,
amine@119 156 sample_width=sample_width,
amine@119 157 )
amine@119 158 for freq in frequencies
amine@119 159 ]
amine@119 160 channels = len(frequencies)
amine@119 161 fmt = DATA_FORMAT[sample_width]
amine@119 162 expected = _array_to_bytes(mono_channels[use_channel])
amine@119 163 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@119 164 selected_channel = _extract_selected_channel(
amine@119 165 data, channels, sample_width, use_channel
amine@119 166 )
amine@119 167 self.assertEqual(selected_channel, expected)
amine@119 168
amine@119 169 @genty_dataset(
amine@120 170 raw_with_audio_format=(
amine@120 171 "audio",
amine@120 172 "raw",
amine@120 173 "_load_raw",
amine@120 174 AUDIO_PARAMS_SHORT,
amine@120 175 ),
amine@120 176 raw_with_extension=(
amine@120 177 "audio.raw",
amine@120 178 None,
amine@120 179 "_load_raw",
amine@120 180 AUDIO_PARAMS_SHORT,
amine@120 181 ),
amine@120 182 wave_with_audio_format=("audio", "wave", "_load_wave"),
amine@120 183 wav_with_audio_format=("audio", "wave", "_load_wave"),
amine@120 184 wav_with_extension=("audio.wav", None, "_load_wave"),
amine@120 185 format_and_extension_both_given=("audio.dat", "wav", "_load_wave"),
amine@120 186 format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"),
amine@120 187 no_format_nor_extension=("audio", None, "_load_with_pydub"),
amine@120 188 other_formats_ogg=("audio.ogg", None, "_load_with_pydub"),
amine@120 189 other_formats_webm=("audio", "webm", "_load_with_pydub"),
amine@120 190 )
amine@120 191 def test_from_file(
amine@120 192 self, filename, audio_format, funtion_name, kwargs=None
amine@120 193 ):
amine@120 194 funtion_name = "auditok.io." + funtion_name
amine@120 195 if kwargs is None:
amine@120 196 kwargs = {}
amine@120 197 with patch(funtion_name) as patch_function:
amine@120 198 from_file(filename, audio_format, **kwargs)
amine@120 199 self.assertTrue(patch_function.called)
amine@120 200
amine@120 201 @genty_dataset(
amine@111 202 mono=("mono_400Hz.raw", (400,)),
amine@111 203 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
amine@111 204 )
amine@111 205 def test_save_raw(self, filename, frequencies):
amine@111 206 filename = "tests/data/test_16KHZ_{}".format(filename)
amine@111 207 sample_width = 2
amine@111 208 fmt = DATA_FORMAT[sample_width]
amine@111 209 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@111 210 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@111 211 tmpfile = NamedTemporaryFile()
amine@111 212 _save_raw(tmpfile.name, data)
amine@111 213 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
amine@111 214
amine@121 215 def test_from_file_no_pydub(self):
amine@121 216 with patch("auditok.io._WITH_PYDUB", False):
amine@121 217 with self.assertRaises(AudioIOError):
amine@121 218 from_file("audio", "mp3")
amine@121 219
amine@111 220 @genty_dataset(
amine@122 221 raw_first_channel=("raw", 0, 400),
amine@122 222 raw_second_channel=("raw", 1, 800),
amine@122 223 raw_third_channel=("raw", 2, 1600),
amine@122 224 raw_left_channel=("raw", "left", 400),
amine@122 225 raw_right_channel=("raw", "right", 800),
amine@122 226 wav_first_channel=("wav", 0, 400),
amine@122 227 wav_second_channel=("wav", 1, 800),
amine@122 228 wav_third_channel=("wav", 2, 1600),
amine@122 229 wav_left_channel=("wav", "left", 400),
amine@122 230 wav_right_channel=("wav", "right", 800),
amine@122 231 )
amine@122 232 def test_from_file_multichannel_audio(
amine@122 233 self, audio_format, use_channel, frequency
amine@122 234 ):
amine@122 235 expected = PURE_TONE_DICT[frequency]
amine@122 236 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format(
amine@122 237 audio_format
amine@122 238 )
amine@122 239 sample_width = 2
amine@122 240 audio_source = from_file(
amine@122 241 filename,
amine@122 242 sampling_rate=16000,
amine@122 243 sample_width=sample_width,
amine@122 244 channels=3,
amine@122 245 use_channel=use_channel,
amine@122 246 )
amine@122 247 fmt = DATA_FORMAT[sample_width]
amine@122 248 data = array(fmt, audio_source._buffer)
amine@122 249 self.assertEqual(data, expected)
amine@122 250
amine@122 251 @genty_dataset(
amine@123 252 raw_mono=("raw", "mono_400Hz", (400,)),
amine@123 253 raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)),
amine@123 254 wav_mono=("wav", "mono_400Hz", (400,)),
amine@123 255 wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)),
amine@123 256 )
amine@123 257 def test_from_file_multichannel_audio_mix(
amine@123 258 self, audio_format, filename_suffix, frequencies
amine@123 259 ):
amine@123 260 sampling_rate = 16000
amine@123 261 sample_width = 2
amine@123 262 channels = len(frequencies)
amine@123 263 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@123 264 channels = len(frequencies)
amine@123 265 fmt = DATA_FORMAT[sample_width]
amine@123 266 expected = _array_to_bytes(
amine@123 267 array(
amine@123 268 fmt,
amine@123 269 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@123 270 )
amine@123 271 )
amine@123 272 filename = "tests/data/test_16KHZ_{}.{}".format(
amine@123 273 filename_suffix, audio_format
amine@123 274 )
amine@123 275 audio_source = from_file(
amine@123 276 filename,
amine@123 277 use_channel="mix",
amine@123 278 sampling_rate=sampling_rate,
amine@123 279 sample_width=2,
amine@123 280 channels=channels,
amine@123 281 )
amine@123 282 mixed = audio_source._buffer
amine@123 283 self.assertEqual((mixed), expected)
amine@123 284
amine@124 285 @patch("auditok.io._WITH_PYDUB", True)
amine@124 286 @patch("auditok.io.BufferAudioSource")
amine@124 287 @genty_dataset(
amine@124 288 ogg_first_channel=("ogg", 0, "from_ogg"),
amine@124 289 ogg_second_channel=("ogg", 1, "from_ogg"),
amine@124 290 ogg_mix=("ogg", "mix", "from_ogg"),
amine@124 291 ogg_default=("ogg", None, "from_ogg"),
amine@124 292 mp3_left_channel=("mp3", "left", "from_mp3"),
amine@124 293 mp3_right_channel=("mp3", "right", "from_mp3"),
amine@124 294 flac_first_channel=("flac", 0, "from_file"),
amine@124 295 flac_second_channel=("flac", 1, "from_file"),
amine@124 296 flv_left_channel=("flv", "left", "from_flv"),
amine@124 297 webm_right_channel=("webm", "right", "from_file"),
amine@124 298 )
amine@124 299 def test_from_file_multichannel_audio_compressed(
amine@124 300 self, audio_format, use_channel, function, *mocks
amine@124 301 ):
amine@124 302 filename = "audio.{}".format(audio_format)
amine@124 303 segment_mock = Mock()
amine@124 304 segment_mock.sample_width = 2
amine@124 305 segment_mock.channels = 2
amine@124 306 segment_mock._data = b"abcd"
amine@124 307 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@124 308 with patch(
amine@124 309 "auditok.io.AudioSegment.{}".format(function)
amine@124 310 ) as open_func:
amine@124 311 open_func.return_value = segment_mock
amine@124 312 from_file(filename, use_channel=use_channel)
amine@124 313 self.assertTrue(open_func.called)
amine@124 314 self.assertTrue(ext_mock.called)
amine@124 315
amine@124 316 use_channel = {"left": 0, "right": 1, None: 0}.get(
amine@124 317 use_channel, use_channel
amine@124 318 )
amine@124 319 ext_mock.assert_called_with(
amine@124 320 segment_mock._data,
amine@124 321 segment_mock.channels,
amine@124 322 segment_mock.sample_width,
amine@124 323 use_channel,
amine@124 324 )
amine@124 325
amine@124 326 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@124 327 with patch(
amine@124 328 "auditok.io.AudioSegment.{}".format(function)
amine@124 329 ) as open_func:
amine@124 330 segment_mock.channels = 1
amine@124 331 open_func.return_value = segment_mock
amine@124 332 from_file(filename, use_channel=use_channel)
amine@124 333 self.assertTrue(open_func.called)
amine@124 334 self.assertFalse(ext_mock.called)
amine@124 335
amine@125 336 @patch("auditok.io._WITH_PYDUB", True)
amine@125 337 @patch("auditok.io.BufferAudioSource")
amine@125 338 @genty_dataset(
amine@125 339 ogg=("ogg", "from_ogg"),
amine@125 340 mp3=("mp3", "from_mp3"),
amine@125 341 flac=("flac", "from_file"),
amine@125 342 )
amine@125 343 def test_from_file_multichannel_audio_mix_compressed(
amine@125 344 self, audio_format, function, *mocks
amine@125 345 ):
amine@125 346 filename = "audio.{}".format(audio_format)
amine@125 347 segment_mock = Mock()
amine@125 348 segment_mock.sample_width = 2
amine@125 349 segment_mock.channels = 2
amine@125 350 segment_mock._data = b"abcd"
amine@125 351 with patch("auditok.io._mix_audio_channels") as mix_mock:
amine@125 352 with patch(
amine@125 353 "auditok.io.AudioSegment.{}".format(function)
amine@125 354 ) as open_func:
amine@125 355 open_func.return_value = segment_mock
amine@125 356 from_file(filename, use_channel="mix")
amine@125 357 self.assertTrue(open_func.called)
amine@125 358 mix_mock.assert_called_with(
amine@125 359 segment_mock._data,
amine@125 360 segment_mock.channels,
amine@125 361 segment_mock.sample_width,
amine@125 362 )
amine@125 363
amine@123 364 @genty_dataset(
amine@126 365 dafault_first_channel=(None, 400),
amine@126 366 first_channel=(0, 400),
amine@126 367 second_channel=(1, 800),
amine@126 368 third_channel=(2, 1600),
amine@126 369 negative_first_channel=(-3, 400),
amine@126 370 negative_second_channel=(-2, 800),
amine@126 371 negative_third_channel=(-1, 1600),
amine@126 372 )
amine@126 373 def test_load_raw(self, use_channel, frequency):
amine@126 374 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw"
amine@126 375 if use_channel is not None:
amine@126 376 audio_source = _load_raw(
amine@126 377 filename,
amine@126 378 sampling_rate=16000,
amine@126 379 sample_width=2,
amine@126 380 channels=3,
amine@126 381 use_channel=use_channel,
amine@126 382 )
amine@126 383 else:
amine@126 384 audio_source = _load_raw(
amine@126 385 filename, sampling_rate=16000, sample_width=2, channels=3
amine@126 386 )
amine@126 387 self.assertIsInstance(audio_source, BufferAudioSource)
amine@126 388 self.assertEqual(audio_source.sampling_rate, 16000)
amine@126 389 self.assertEqual(audio_source.sample_width, 2)
amine@126 390 self.assertEqual(audio_source.channels, 1)
amine@126 391 # generate a pure sine wave tone of the given frequency
amine@126 392 expected = PURE_TONE_DICT[frequency]
amine@126 393 # compre with data read from file
amine@126 394 fmt = DATA_FORMAT[2]
amine@126 395 data = array(fmt, audio_source._buffer)
amine@126 396 self.assertEqual(data, expected)
amine@126 397
amine@126 398 @genty_dataset(
amine@127 399 mono=("mono_400Hz", (400,)),
amine@127 400 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
amine@127 401 )
amine@127 402 def test_load_raw_mix(self, filename_suffix, frequencies):
amine@127 403 sampling_rate = 16000
amine@127 404 sample_width = 2
amine@127 405 channels = len(frequencies)
amine@127 406 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@127 407
amine@127 408 fmt = DATA_FORMAT[sample_width]
amine@127 409 expected = _array_to_bytes(
amine@127 410 array(
amine@127 411 fmt,
amine@127 412 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@127 413 )
amine@127 414 )
amine@127 415 filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix)
amine@127 416 audio_source = _load_raw(
amine@127 417 filename,
amine@127 418 use_channel="mix",
amine@127 419 sampling_rate=sampling_rate,
amine@127 420 sample_width=2,
amine@127 421 channels=channels,
amine@127 422 )
amine@127 423 mixed = audio_source._buffer
amine@127 424 self.assertEqual(mixed, expected)
amine@127 425 self.assertIsInstance(audio_source, BufferAudioSource)
amine@127 426 self.assertEqual(audio_source.sampling_rate, sampling_rate)
amine@127 427 self.assertEqual(audio_source.sample_width, sample_width)
amine@127 428 self.assertEqual(audio_source.channels, 1)
amine@127 429
amine@127 430 @genty_dataset(
amine@110 431 mono=("mono_400Hz.wav", (400,)),
amine@110 432 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
amine@110 433 )
amine@110 434 def test_save_wave(self, filename, frequencies):
amine@110 435 filename = "tests/data/test_16KHZ_{}".format(filename)
amine@110 436 sampling_rate = 16000
amine@110 437 sample_width = 2
amine@110 438 channels = len(frequencies)
amine@110 439 fmt = DATA_FORMAT[sample_width]
amine@110 440 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@110 441 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@110 442 tmpfile = NamedTemporaryFile()
amine@110 443 _save_wave(tmpfile.name, data, sampling_rate, sample_width, channels)
amine@110 444 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))