annotate tests/test_io.py @ 149:81413c14c5bb

Add tests for _extract_selected_channel with wrong use_channel value
author Amine Sehili <amine.sehili@gmail.com>
date Thu, 21 Feb 2019 20:53:17 +0100
parents 2a1054f9d9f3
children 682bf4477fae
rev   line source
amine@106 1 import os
amine@106 2 import sys
amine@106 3 import math
amine@107 4 from array import array
amine@133 5 from tempfile import NamedTemporaryFile, TemporaryDirectory
amine@110 6 import filecmp
amine@108 7 from unittest import TestCase
amine@108 8 from genty import genty, genty_dataset
amine@110 9 from auditok.io import (
amine@126 10 DATA_FORMAT,
amine@121 11 AudioIOError,
amine@110 12 AudioParameterError,
amine@126 13 BufferAudioSource,
amine@110 14 check_audio_data,
amine@143 15 _guess_audio_format,
amine@144 16 _normalize_use_channel,
amine@128 17 _get_audio_parameters,
amine@116 18 _array_to_bytes,
amine@118 19 _mix_audio_channels,
amine@119 20 _extract_selected_channel,
amine@126 21 _load_raw,
amine@129 22 _load_wave,
amine@131 23 _load_with_pydub,
amine@120 24 from_file,
amine@111 25 _save_raw,
amine@110 26 _save_wave,
amine@141 27 _save_with_pydub,
amine@135 28 to_file,
amine@110 29 )
amine@106 30
amine@106 31
amine@106 32 if sys.version_info >= (3, 0):
amine@106 33 PYTHON_3 = True
amine@124 34 from unittest.mock import patch, Mock
amine@106 35 else:
amine@106 36 PYTHON_3 = False
amine@124 37 from mock import patch, Mock
amine@120 38
amine@120 39 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
amine@106 40
amine@106 41
amine@106 42 def _sample_generator(*data_buffers):
amine@106 43 """
amine@106 44 Takes a list of many mono audio data buffers and makes a sample generator
amine@106 45 of interleaved audio samples, one sample from each channel. The resulting
amine@106 46 generator can be used to build a multichannel audio buffer.
amine@106 47 >>> gen = _sample_generator("abcd", "ABCD")
amine@106 48 >>> list(gen)
amine@106 49 ["a", "A", "b", "B", "c", "C", "d", "D"]
amine@106 50 """
amine@106 51 frame_gen = zip(*data_buffers)
amine@106 52 return (sample for frame in frame_gen for sample in frame)
amine@106 53
amine@106 54
amine@107 55 def _generate_pure_tone(
amine@107 56 frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4
amine@107 57 ):
amine@107 58 """
amine@107 59 Generates a pure tone with the given frequency.
amine@107 60 """
amine@107 61 assert frequency <= sampling_rate / 2
amine@107 62 max_value = (2 ** (sample_width * 8) // 2) - 1
amine@107 63 if volume > max_value:
amine@107 64 volume = max_value
amine@107 65 fmt = DATA_FORMAT[sample_width]
amine@107 66 total_samples = int(sampling_rate * duration_sec)
amine@107 67 step = frequency / sampling_rate
amine@107 68 two_pi_step = 2 * math.pi * step
amine@107 69 data = array(
amine@107 70 fmt,
amine@107 71 (
amine@107 72 int(math.sin(two_pi_step * i) * volume)
amine@107 73 for i in range(total_samples)
amine@107 74 ),
amine@107 75 )
amine@107 76 return data
amine@107 77
amine@107 78
amine@107 79 PURE_TONE_DICT = {
amine@107 80 freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600)
amine@107 81 }
amine@107 82 PURE_TONE_DICT.update(
amine@107 83 {
amine@107 84 freq: _generate_pure_tone(freq, 0.1, 16000, 2)
amine@107 85 for freq in (600, 1150, 2400, 7220)
amine@107 86 }
amine@107 87 )
amine@108 88
amine@108 89
amine@108 90 @genty
amine@108 91 class TestIO(TestCase):
amine@108 92 @genty_dataset(
amine@108 93 valid_mono=(b"\0" * 113, 1, 1),
amine@108 94 valid_stereo=(b"\0" * 160, 1, 2),
amine@108 95 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
amine@108 96 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
amine@108 97 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
amine@108 98 )
amine@108 99 def test_check_audio_data(self, data, sample_width, channels, valid=True):
amine@108 100
amine@108 101 if not valid:
amine@108 102 with self.assertRaises(AudioParameterError):
amine@108 103 check_audio_data(data, sample_width, channels)
amine@108 104 else:
amine@108 105 self.assertIsNone(check_audio_data(data, sample_width, channels))
amine@110 106
amine@110 107 @genty_dataset(
amine@143 108 extention_and_format_same=("wav", "filename.wav", "wav"),
amine@143 109 extention_and_format_different=("wav", "filename.mp3", "wav"),
amine@143 110 extention_no_format=(None, "filename.wav", "wav"),
amine@143 111 format_no_extension=("wav", "filename", "wav"),
amine@143 112 no_format_no_extension=(None, "filename", None),
amine@143 113 )
amine@143 114 def test_guess_audio_format(self, fmt, filename, expected):
amine@143 115 result = _guess_audio_format(fmt, filename)
amine@143 116 self.assertEqual(result, expected)
amine@143 117
amine@143 118 @genty_dataset(
amine@144 119 none=(None, 0),
amine@144 120 positive_int=(1, 1),
amine@144 121 negative_int=(-1, -1),
amine@144 122 left=("left", 0),
amine@144 123 right=("right", 1),
amine@144 124 mix=("mix", "mix"),
amine@144 125 )
amine@144 126 def test_normalize_use_channel(self, use_channel, expected):
amine@144 127 result = _normalize_use_channel(use_channel)
amine@144 128 self.assertEqual(result, expected)
amine@144 129
amine@144 130 @genty_dataset(
amine@145 131 simple=((8000, 2, 1, 0), (8000, 2, 1, 0)),
amine@145 132 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
amine@145 133 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
amine@145 134 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
amine@145 135 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
amine@145 136 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
amine@145 137 )
amine@145 138 def test_get_audio_parameters_short_params(self, values, expected):
amine@145 139 params = {k: v for k, v in zip(("sr", "sw", "ch", "uc"), values)}
amine@145 140 result = _get_audio_parameters(params)
amine@145 141 self.assertEqual(result, expected)
amine@145 142
amine@145 143 @genty_dataset(
amine@145 144 simple=((8000, 2, 1, 0), (8000, 2, 1, 0)),
amine@145 145 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
amine@145 146 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
amine@145 147 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
amine@145 148 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
amine@145 149 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
amine@145 150 )
amine@145 151 def test_get_audio_parameters_long_params(self, values, expected):
amine@145 152 params = {
amine@145 153 k: v
amine@145 154 for k, v in zip(
amine@145 155 ("sampling_rate", "sample_width", "channels", "use_channel"),
amine@145 156 values,
amine@145 157 )
amine@145 158 }
amine@145 159 result = _get_audio_parameters(params)
amine@145 160 self.assertEqual(result, expected)
amine@145 161
amine@145 162 @genty_dataset(simple=((8000, 2, 1, 0), (8000, 2, 1, 0)))
amine@145 163 def test_get_audio_parameters_short_and_long_params(
amine@145 164 self, values, expected
amine@145 165 ):
amine@145 166 params = {
amine@145 167 k: v
amine@145 168 for k, v in zip(
amine@145 169 ("sampling_rate", "sample_width", "channels", "use_channel"),
amine@145 170 values,
amine@145 171 )
amine@145 172 }
amine@145 173
amine@145 174 params.update({k: v for k, v in zip(("sr", "sw", "ch", "uc"), "xxxx")})
amine@145 175 result = _get_audio_parameters(params)
amine@145 176 self.assertEqual(result, expected)
amine@145 177
amine@145 178 @genty_dataset(
amine@146 179 str_sampling_rate=(("x", 2, 1, 0),),
amine@146 180 negative_sampling_rate=((-8000, 2, 1, 0),),
amine@146 181 str_sample_width=((8000, "x", 1, 0),),
amine@146 182 negative_sample_width=((8000, -2, 1, 0),),
amine@146 183 str_channels=((8000, 2, "x", 0),),
amine@146 184 negative_channels=((8000, 2, -1, 0),),
amine@146 185 )
amine@146 186 def test_get_audio_parameters_invalid(self, values):
amine@146 187 params = {
amine@146 188 k: v
amine@146 189 for k, v in zip(
amine@146 190 ("sampling_rate", "sample_width", "channels", "use_channel"),
amine@146 191 values,
amine@146 192 )
amine@146 193 }
amine@146 194 with self.assertRaises(AudioParameterError):
amine@146 195 _get_audio_parameters(params)
amine@146 196
amine@146 197 @genty_dataset(
amine@118 198 mono_1byte=([400], 1),
amine@118 199 stereo_1byte=([400, 600], 1),
amine@118 200 three_channel_1byte=([400, 600, 2400], 1),
amine@118 201 mono_2byte=([400], 2),
amine@118 202 stereo_2byte=([400, 600], 2),
amine@118 203 three_channel_2byte=([400, 600, 1150], 2),
amine@118 204 mono_4byte=([400], 4),
amine@118 205 stereo_4byte=([400, 600], 4),
amine@118 206 four_channel_2byte=([400, 600, 1150, 7220], 4),
amine@118 207 )
amine@118 208 def test_mix_audio_channels(self, frequencies, sample_width):
amine@118 209 sampling_rate = 16000
amine@118 210 sample_width = 2
amine@118 211 channels = len(frequencies)
amine@118 212 mono_channels = [
amine@118 213 _generate_pure_tone(
amine@118 214 freq,
amine@118 215 duration_sec=0.1,
amine@118 216 sampling_rate=sampling_rate,
amine@118 217 sample_width=sample_width,
amine@118 218 )
amine@118 219 for freq in frequencies
amine@118 220 ]
amine@118 221 fmt = DATA_FORMAT[sample_width]
amine@118 222 expected = _array_to_bytes(
amine@118 223 array(
amine@118 224 fmt,
amine@118 225 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@118 226 )
amine@118 227 )
amine@118 228 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@118 229 mixed = _mix_audio_channels(data, channels, sample_width)
amine@118 230 self.assertEqual(mixed, expected)
amine@118 231
amine@118 232 @genty_dataset(
amine@119 233 mono_1byte=([400], 1, 0),
amine@119 234 stereo_1byte_2st_channel=([400, 600], 1, 1),
amine@119 235 mono_2byte=([400], 2, 0),
amine@119 236 stereo_2byte_1st_channel=([400, 600], 2, 0),
amine@119 237 stereo_2byte_2nd_channel=([400, 600], 2, 1),
amine@119 238 three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
amine@119 239 three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
amine@119 240 three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
amine@119 241 three_channel_4byte_1st=([400, 600, 1150], 4, 0),
amine@119 242 three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
amine@119 243 )
amine@119 244 def test_extract_selected_channel(
amine@119 245 self, frequencies, sample_width, use_channel
amine@119 246 ):
amine@119 247
amine@119 248 mono_channels = [
amine@119 249 _generate_pure_tone(
amine@119 250 freq,
amine@119 251 duration_sec=0.1,
amine@119 252 sampling_rate=16000,
amine@119 253 sample_width=sample_width,
amine@119 254 )
amine@119 255 for freq in frequencies
amine@119 256 ]
amine@119 257 channels = len(frequencies)
amine@119 258 fmt = DATA_FORMAT[sample_width]
amine@119 259 expected = _array_to_bytes(mono_channels[use_channel])
amine@119 260 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@119 261 selected_channel = _extract_selected_channel(
amine@119 262 data, channels, sample_width, use_channel
amine@119 263 )
amine@119 264 self.assertEqual(selected_channel, expected)
amine@119 265
amine@148 266 @genty_dataset(mono=([400],), three_channel=([600, 1150, 2400],))
amine@148 267 def test_extract_selected_channel_mix(self, frequencies):
amine@148 268
amine@148 269 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@148 270 channels = len(frequencies)
amine@148 271 fmt = DATA_FORMAT[2]
amine@148 272 expected = _array_to_bytes(
amine@148 273 array(
amine@148 274 fmt,
amine@148 275 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@148 276 )
amine@148 277 )
amine@148 278 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@148 279 selected_channel = _extract_selected_channel(data, channels, 2, "mix")
amine@148 280 self.assertEqual(selected_channel, expected)
amine@148 281
amine@149 282 @genty_dataset(positive=(2,), negative=(-3,))
amine@149 283 def test_extract_selected_channel_invalid_use_channel(self, use_channel):
amine@149 284 with self.assertRaises(AudioParameterError):
amine@149 285 _extract_selected_channel(b"\0\0", 2, 2, use_channel)
amine@149 286
amine@119 287 @genty_dataset(
amine@120 288 raw_with_audio_format=(
amine@120 289 "audio",
amine@120 290 "raw",
amine@120 291 "_load_raw",
amine@120 292 AUDIO_PARAMS_SHORT,
amine@120 293 ),
amine@120 294 raw_with_extension=(
amine@120 295 "audio.raw",
amine@120 296 None,
amine@120 297 "_load_raw",
amine@120 298 AUDIO_PARAMS_SHORT,
amine@120 299 ),
amine@120 300 wave_with_audio_format=("audio", "wave", "_load_wave"),
amine@120 301 wav_with_audio_format=("audio", "wave", "_load_wave"),
amine@120 302 wav_with_extension=("audio.wav", None, "_load_wave"),
amine@120 303 format_and_extension_both_given=("audio.dat", "wav", "_load_wave"),
amine@120 304 format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"),
amine@120 305 no_format_nor_extension=("audio", None, "_load_with_pydub"),
amine@120 306 other_formats_ogg=("audio.ogg", None, "_load_with_pydub"),
amine@120 307 other_formats_webm=("audio", "webm", "_load_with_pydub"),
amine@120 308 )
amine@120 309 def test_from_file(
amine@120 310 self, filename, audio_format, funtion_name, kwargs=None
amine@120 311 ):
amine@120 312 funtion_name = "auditok.io." + funtion_name
amine@120 313 if kwargs is None:
amine@120 314 kwargs = {}
amine@120 315 with patch(funtion_name) as patch_function:
amine@120 316 from_file(filename, audio_format, **kwargs)
amine@120 317 self.assertTrue(patch_function.called)
amine@120 318
amine@137 319 @genty_dataset(
amine@137 320 missing_sampling_rate=("sr",),
amine@137 321 missing_sample_width=("sw",),
amine@137 322 missing_channels=("ch",),
amine@137 323 )
amine@137 324 def test_from_file_missing_audio_param(self, missing_param):
amine@137 325 with self.assertRaises(AudioParameterError):
amine@137 326 params = AUDIO_PARAMS_SHORT.copy()
amine@137 327 del params[missing_param]
amine@137 328 from_file("audio", audio_format="raw", **params)
amine@137 329
amine@121 330 def test_from_file_no_pydub(self):
amine@121 331 with patch("auditok.io._WITH_PYDUB", False):
amine@121 332 with self.assertRaises(AudioIOError):
amine@121 333 from_file("audio", "mp3")
amine@121 334
amine@111 335 @genty_dataset(
amine@122 336 raw_first_channel=("raw", 0, 400),
amine@122 337 raw_second_channel=("raw", 1, 800),
amine@122 338 raw_third_channel=("raw", 2, 1600),
amine@122 339 raw_left_channel=("raw", "left", 400),
amine@122 340 raw_right_channel=("raw", "right", 800),
amine@122 341 wav_first_channel=("wav", 0, 400),
amine@122 342 wav_second_channel=("wav", 1, 800),
amine@122 343 wav_third_channel=("wav", 2, 1600),
amine@122 344 wav_left_channel=("wav", "left", 400),
amine@122 345 wav_right_channel=("wav", "right", 800),
amine@122 346 )
amine@122 347 def test_from_file_multichannel_audio(
amine@122 348 self, audio_format, use_channel, frequency
amine@122 349 ):
amine@122 350 expected = PURE_TONE_DICT[frequency]
amine@122 351 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format(
amine@122 352 audio_format
amine@122 353 )
amine@122 354 sample_width = 2
amine@122 355 audio_source = from_file(
amine@122 356 filename,
amine@122 357 sampling_rate=16000,
amine@122 358 sample_width=sample_width,
amine@122 359 channels=3,
amine@122 360 use_channel=use_channel,
amine@122 361 )
amine@122 362 fmt = DATA_FORMAT[sample_width]
amine@122 363 data = array(fmt, audio_source._buffer)
amine@122 364 self.assertEqual(data, expected)
amine@122 365
amine@122 366 @genty_dataset(
amine@123 367 raw_mono=("raw", "mono_400Hz", (400,)),
amine@123 368 raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)),
amine@123 369 wav_mono=("wav", "mono_400Hz", (400,)),
amine@123 370 wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)),
amine@123 371 )
amine@123 372 def test_from_file_multichannel_audio_mix(
amine@123 373 self, audio_format, filename_suffix, frequencies
amine@123 374 ):
amine@123 375 sampling_rate = 16000
amine@123 376 sample_width = 2
amine@123 377 channels = len(frequencies)
amine@123 378 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@123 379 channels = len(frequencies)
amine@123 380 fmt = DATA_FORMAT[sample_width]
amine@123 381 expected = _array_to_bytes(
amine@123 382 array(
amine@123 383 fmt,
amine@123 384 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@123 385 )
amine@123 386 )
amine@123 387 filename = "tests/data/test_16KHZ_{}.{}".format(
amine@123 388 filename_suffix, audio_format
amine@123 389 )
amine@123 390 audio_source = from_file(
amine@123 391 filename,
amine@123 392 use_channel="mix",
amine@123 393 sampling_rate=sampling_rate,
amine@123 394 sample_width=2,
amine@123 395 channels=channels,
amine@123 396 )
amine@123 397 mixed = audio_source._buffer
amine@123 398 self.assertEqual((mixed), expected)
amine@123 399
amine@124 400 @patch("auditok.io._WITH_PYDUB", True)
amine@124 401 @patch("auditok.io.BufferAudioSource")
amine@124 402 @genty_dataset(
amine@124 403 ogg_first_channel=("ogg", 0, "from_ogg"),
amine@124 404 ogg_second_channel=("ogg", 1, "from_ogg"),
amine@124 405 ogg_mix=("ogg", "mix", "from_ogg"),
amine@124 406 ogg_default=("ogg", None, "from_ogg"),
amine@124 407 mp3_left_channel=("mp3", "left", "from_mp3"),
amine@124 408 mp3_right_channel=("mp3", "right", "from_mp3"),
amine@124 409 flac_first_channel=("flac", 0, "from_file"),
amine@124 410 flac_second_channel=("flac", 1, "from_file"),
amine@124 411 flv_left_channel=("flv", "left", "from_flv"),
amine@124 412 webm_right_channel=("webm", "right", "from_file"),
amine@124 413 )
amine@124 414 def test_from_file_multichannel_audio_compressed(
amine@124 415 self, audio_format, use_channel, function, *mocks
amine@124 416 ):
amine@124 417 filename = "audio.{}".format(audio_format)
amine@124 418 segment_mock = Mock()
amine@124 419 segment_mock.sample_width = 2
amine@124 420 segment_mock.channels = 2
amine@124 421 segment_mock._data = b"abcd"
amine@124 422 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@124 423 with patch(
amine@124 424 "auditok.io.AudioSegment.{}".format(function)
amine@124 425 ) as open_func:
amine@124 426 open_func.return_value = segment_mock
amine@124 427 from_file(filename, use_channel=use_channel)
amine@124 428 self.assertTrue(open_func.called)
amine@124 429 self.assertTrue(ext_mock.called)
amine@124 430
amine@124 431 use_channel = {"left": 0, "right": 1, None: 0}.get(
amine@124 432 use_channel, use_channel
amine@124 433 )
amine@124 434 ext_mock.assert_called_with(
amine@124 435 segment_mock._data,
amine@124 436 segment_mock.channels,
amine@124 437 segment_mock.sample_width,
amine@124 438 use_channel,
amine@124 439 )
amine@124 440
amine@124 441 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@124 442 with patch(
amine@124 443 "auditok.io.AudioSegment.{}".format(function)
amine@124 444 ) as open_func:
amine@124 445 segment_mock.channels = 1
amine@124 446 open_func.return_value = segment_mock
amine@124 447 from_file(filename, use_channel=use_channel)
amine@124 448 self.assertTrue(open_func.called)
amine@124 449 self.assertFalse(ext_mock.called)
amine@124 450
amine@125 451 @patch("auditok.io._WITH_PYDUB", True)
amine@125 452 @patch("auditok.io.BufferAudioSource")
amine@125 453 @genty_dataset(
amine@125 454 ogg=("ogg", "from_ogg"),
amine@125 455 mp3=("mp3", "from_mp3"),
amine@125 456 flac=("flac", "from_file"),
amine@125 457 )
amine@125 458 def test_from_file_multichannel_audio_mix_compressed(
amine@125 459 self, audio_format, function, *mocks
amine@125 460 ):
amine@125 461 filename = "audio.{}".format(audio_format)
amine@125 462 segment_mock = Mock()
amine@125 463 segment_mock.sample_width = 2
amine@125 464 segment_mock.channels = 2
amine@125 465 segment_mock._data = b"abcd"
amine@125 466 with patch("auditok.io._mix_audio_channels") as mix_mock:
amine@125 467 with patch(
amine@125 468 "auditok.io.AudioSegment.{}".format(function)
amine@125 469 ) as open_func:
amine@125 470 open_func.return_value = segment_mock
amine@125 471 from_file(filename, use_channel="mix")
amine@125 472 self.assertTrue(open_func.called)
amine@125 473 mix_mock.assert_called_with(
amine@125 474 segment_mock._data,
amine@125 475 segment_mock.channels,
amine@125 476 segment_mock.sample_width,
amine@125 477 )
amine@125 478
amine@123 479 @genty_dataset(
amine@126 480 dafault_first_channel=(None, 400),
amine@126 481 first_channel=(0, 400),
amine@126 482 second_channel=(1, 800),
amine@126 483 third_channel=(2, 1600),
amine@126 484 negative_first_channel=(-3, 400),
amine@126 485 negative_second_channel=(-2, 800),
amine@126 486 negative_third_channel=(-1, 1600),
amine@126 487 )
amine@126 488 def test_load_raw(self, use_channel, frequency):
amine@126 489 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw"
amine@126 490 if use_channel is not None:
amine@126 491 audio_source = _load_raw(
amine@126 492 filename,
amine@126 493 sampling_rate=16000,
amine@126 494 sample_width=2,
amine@126 495 channels=3,
amine@126 496 use_channel=use_channel,
amine@126 497 )
amine@126 498 else:
amine@126 499 audio_source = _load_raw(
amine@126 500 filename, sampling_rate=16000, sample_width=2, channels=3
amine@126 501 )
amine@126 502 self.assertIsInstance(audio_source, BufferAudioSource)
amine@126 503 self.assertEqual(audio_source.sampling_rate, 16000)
amine@126 504 self.assertEqual(audio_source.sample_width, 2)
amine@126 505 self.assertEqual(audio_source.channels, 1)
amine@126 506 # generate a pure sine wave tone of the given frequency
amine@126 507 expected = PURE_TONE_DICT[frequency]
amine@126 508 # compre with data read from file
amine@126 509 fmt = DATA_FORMAT[2]
amine@126 510 data = array(fmt, audio_source._buffer)
amine@126 511 self.assertEqual(data, expected)
amine@126 512
amine@126 513 @genty_dataset(
amine@127 514 mono=("mono_400Hz", (400,)),
amine@127 515 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
amine@127 516 )
amine@127 517 def test_load_raw_mix(self, filename_suffix, frequencies):
amine@127 518 sampling_rate = 16000
amine@127 519 sample_width = 2
amine@127 520 channels = len(frequencies)
amine@127 521 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@127 522
amine@127 523 fmt = DATA_FORMAT[sample_width]
amine@127 524 expected = _array_to_bytes(
amine@127 525 array(
amine@127 526 fmt,
amine@127 527 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@127 528 )
amine@127 529 )
amine@127 530 filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix)
amine@127 531 audio_source = _load_raw(
amine@127 532 filename,
amine@127 533 use_channel="mix",
amine@127 534 sampling_rate=sampling_rate,
amine@127 535 sample_width=2,
amine@127 536 channels=channels,
amine@127 537 )
amine@127 538 mixed = audio_source._buffer
amine@127 539 self.assertEqual(mixed, expected)
amine@127 540 self.assertIsInstance(audio_source, BufferAudioSource)
amine@127 541 self.assertEqual(audio_source.sampling_rate, sampling_rate)
amine@127 542 self.assertEqual(audio_source.sample_width, sample_width)
amine@127 543 self.assertEqual(audio_source.channels, 1)
amine@127 544
amine@127 545 @genty_dataset(
amine@128 546 missing_sampling_rate=("sr",),
amine@128 547 missing_sample_width=("sw",),
amine@128 548 missing_channels=("ch",),
amine@128 549 )
amine@128 550 def test_load_raw_missing_audio_param(self, missing_param):
amine@128 551 with self.assertRaises(AudioParameterError):
amine@128 552 params = AUDIO_PARAMS_SHORT.copy()
amine@128 553 del params[missing_param]
amine@128 554 srate, swidth, channels, _ = _get_audio_parameters(params)
amine@128 555 _load_raw("audio", srate, swidth, channels)
amine@128 556
amine@128 557 @genty_dataset(
amine@129 558 dafault_first_channel=(None, 400),
amine@129 559 first_channel=(0, 400),
amine@129 560 second_channel=(1, 800),
amine@129 561 third_channel=(2, 1600),
amine@129 562 negative_first_channel=(-3, 400),
amine@129 563 negative_second_channel=(-2, 800),
amine@129 564 negative_third_channel=(-1, 1600),
amine@129 565 )
amine@129 566 def test_load_wave(self, use_channel, frequency):
amine@129 567 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav"
amine@129 568 if use_channel is not None:
amine@129 569 audio_source = _load_wave(filename, use_channel=use_channel)
amine@129 570 else:
amine@129 571 audio_source = _load_wave(filename)
amine@129 572 self.assertIsInstance(audio_source, BufferAudioSource)
amine@129 573 self.assertEqual(audio_source.sampling_rate, 16000)
amine@129 574 self.assertEqual(audio_source.sample_width, 2)
amine@129 575 self.assertEqual(audio_source.channels, 1)
amine@129 576 # generate a pure sine wave tone of the given frequency
amine@129 577 expected = PURE_TONE_DICT[frequency]
amine@129 578 # compre with data read from file
amine@129 579 fmt = DATA_FORMAT[2]
amine@129 580 data = array(fmt, audio_source._buffer)
amine@129 581 self.assertEqual(data, expected)
amine@129 582
amine@129 583 @genty_dataset(
amine@130 584 mono=("mono_400Hz", (400,)),
amine@130 585 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
amine@130 586 )
amine@130 587 def test_load_wave_mix(self, filename_suffix, frequencies):
amine@130 588 sampling_rate = 16000
amine@130 589 sample_width = 2
amine@130 590 channels = len(frequencies)
amine@130 591 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@130 592 fmt = DATA_FORMAT[sample_width]
amine@130 593 expected = _array_to_bytes(
amine@130 594 array(
amine@130 595 fmt,
amine@130 596 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@130 597 )
amine@130 598 )
amine@130 599 filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix)
amine@130 600 audio_source = _load_wave(filename, use_channel="mix")
amine@130 601 mixed = audio_source._buffer
amine@130 602 self.assertEqual(mixed, expected)
amine@130 603 self.assertIsInstance(audio_source, BufferAudioSource)
amine@130 604 self.assertEqual(audio_source.sampling_rate, sampling_rate)
amine@130 605 self.assertEqual(audio_source.sample_width, sample_width)
amine@130 606 self.assertEqual(audio_source.channels, 1)
amine@130 607
amine@131 608 @patch("auditok.io._WITH_PYDUB", True)
amine@131 609 @patch("auditok.io.BufferAudioSource")
amine@131 610 @genty_dataset(
amine@131 611 ogg_default_first_channel=("ogg", 2, None, "from_ogg"),
amine@131 612 ogg_first_channel=("ogg", 1, 0, "from_ogg"),
amine@131 613 ogg_second_channel=("ogg", 2, 1, "from_ogg"),
amine@131 614 ogg_mix_channels=("ogg", 3, "mix", "from_ogg"),
amine@131 615 mp3_left_channel=("mp3", 1, "left", "from_mp3"),
amine@131 616 mp3_right_channel=("mp3", 2, "right", "from_mp3"),
amine@131 617 mp3_mix_channels=("mp3", 3, "mix", "from_mp3"),
amine@131 618 flac_first_channel=("flac", 2, 0, "from_file"),
amine@131 619 flac_second_channel=("flac", 2, 1, "from_file"),
amine@131 620 flv_left_channel=("flv", 1, "left", "from_flv"),
amine@131 621 webm_right_channel=("webm", 2, "right", "from_file"),
amine@131 622 webm_mix_channels=("webm", 4, "mix", "from_file"),
amine@131 623 )
amine@131 624 def test_load_with_pydub(
amine@131 625 self, audio_format, channels, use_channel, function, *mocks
amine@131 626 ):
amine@131 627 filename = "audio.{}".format(audio_format)
amine@131 628 segment_mock = Mock()
amine@131 629 segment_mock.sample_width = 2
amine@131 630 segment_mock.channels = channels
amine@131 631 segment_mock._data = b"abcdefgh"
amine@131 632 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@131 633 with patch(
amine@131 634 "auditok.io.AudioSegment.{}".format(function)
amine@131 635 ) as open_func:
amine@131 636 open_func.return_value = segment_mock
amine@131 637 use_channel = {"left": 0, "right": 1, None: 0}.get(
amine@131 638 use_channel, use_channel
amine@131 639 )
amine@131 640 _load_with_pydub(filename, audio_format, use_channel)
amine@131 641 self.assertTrue(open_func.called)
amine@131 642 if channels > 1:
amine@131 643 self.assertTrue(ext_mock.called)
amine@131 644 ext_mock.assert_called_with(
amine@131 645 segment_mock._data,
amine@131 646 segment_mock.channels,
amine@131 647 segment_mock.sample_width,
amine@131 648 use_channel,
amine@131 649 )
amine@131 650 else:
amine@131 651 self.assertFalse(ext_mock.called)
amine@131 652
amine@130 653 @genty_dataset(
amine@132 654 mono=("mono_400Hz.raw", (400,)),
amine@132 655 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
amine@132 656 )
amine@132 657 def test_save_raw(self, filename, frequencies):
amine@132 658 filename = "tests/data/test_16KHZ_{}".format(filename)
amine@132 659 sample_width = 2
amine@132 660 fmt = DATA_FORMAT[sample_width]
amine@132 661 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@132 662 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@132 663 tmpfile = NamedTemporaryFile()
amine@136 664 _save_raw(data, tmpfile.name)
amine@132 665 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
amine@132 666
amine@132 667 @genty_dataset(
amine@110 668 mono=("mono_400Hz.wav", (400,)),
amine@110 669 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
amine@110 670 )
amine@110 671 def test_save_wave(self, filename, frequencies):
amine@110 672 filename = "tests/data/test_16KHZ_{}".format(filename)
amine@110 673 sampling_rate = 16000
amine@110 674 sample_width = 2
amine@110 675 channels = len(frequencies)
amine@110 676 fmt = DATA_FORMAT[sample_width]
amine@110 677 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@110 678 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@110 679 tmpfile = NamedTemporaryFile()
amine@136 680 _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels)
amine@110 681 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
amine@132 682
amine@132 683 @genty_dataset(
amine@132 684 missing_sampling_rate=("sr",),
amine@132 685 missing_sample_width=("sw",),
amine@132 686 missing_channels=("ch",),
amine@132 687 )
amine@132 688 def test_save_wave_missing_audio_param(self, missing_param):
amine@132 689 with self.assertRaises(AudioParameterError):
amine@132 690 params = AUDIO_PARAMS_SHORT.copy()
amine@132 691 del params[missing_param]
amine@132 692 srate, swidth, channels, _ = _get_audio_parameters(params)
amine@136 693 _save_wave(b"\0\0", "audio", srate, swidth, channels)
amine@133 694
amine@141 695 def test_save_with_pydub(self):
amine@141 696 with patch("auditok.io.AudioSegment.export") as export:
amine@142 697 tmpdir = TemporaryDirectory()
amine@142 698 filename = os.path.join(tmpdir.name, "audio.ogg")
amine@142 699 _save_with_pydub(b"\0\0", filename, "ogg", 16000, 2, 1)
amine@141 700 self.assertTrue(export.called)
amine@142 701 tmpdir.cleanup()
amine@141 702
amine@133 703 @genty_dataset(
amine@133 704 raw_with_audio_format=("audio", "raw"),
amine@133 705 raw_with_extension=("audio.raw", None),
amine@133 706 raw_with_audio_format_and_extension=("audio.mp3", "raw"),
amine@133 707 raw_no_audio_format_nor_extension=("audio", None),
amine@133 708 )
amine@133 709 def test_to_file_raw(self, filename, audio_format):
amine@133 710 exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw"
amine@133 711 tmpdir = TemporaryDirectory()
amine@133 712 filename = os.path.join(tmpdir.name, filename)
amine@133 713 data = _array_to_bytes(PURE_TONE_DICT[400])
amine@135 714 to_file(data, filename, audio_format=audio_format)
amine@133 715 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
amine@133 716 tmpdir.cleanup()
amine@134 717
amine@134 718 @genty_dataset(
amine@134 719 wav_with_audio_format=("audio", "wav"),
amine@134 720 wav_with_extension=("audio.wav", None),
amine@134 721 wav_with_audio_format_and_extension=("audio.mp3", "wav"),
amine@134 722 wave_with_audio_format=("audio", "wave"),
amine@134 723 wave_with_extension=("audio.wave", None),
amine@134 724 wave_with_audio_format_and_extension=("audio.mp3", "wave"),
amine@134 725 )
amine@135 726 def test_to_file_wave(self, filename, audio_format):
amine@134 727 exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav"
amine@134 728 tmpdir = TemporaryDirectory()
amine@134 729 filename = os.path.join(tmpdir.name, filename)
amine@134 730 data = _array_to_bytes(PURE_TONE_DICT[400])
amine@135 731 to_file(
amine@135 732 data,
amine@135 733 filename,
amine@135 734 audio_format=audio_format,
amine@135 735 sampling_rate=16000,
amine@135 736 sample_width=2,
amine@135 737 channels=1,
amine@134 738 )
amine@134 739 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
amine@134 740 tmpdir.cleanup()
amine@138 741
amine@138 742 @genty_dataset(
amine@138 743 missing_sampling_rate=("sr",),
amine@138 744 missing_sample_width=("sw",),
amine@138 745 missing_channels=("ch",),
amine@138 746 )
amine@138 747 def test_to_file_missing_audio_param(self, missing_param):
amine@138 748 params = AUDIO_PARAMS_SHORT.copy()
amine@138 749 del params[missing_param]
amine@138 750 with self.assertRaises(AudioParameterError):
amine@138 751 to_file(b"\0\0", "audio", audio_format="wav", **params)
amine@138 752 with self.assertRaises(AudioParameterError):
amine@138 753 to_file(b"\0\0", "audio", audio_format="mp3", **params)
amine@139 754
amine@139 755 def test_to_file_no_pydub(self):
amine@139 756 with patch("auditok.io._WITH_PYDUB", False):
amine@139 757 with self.assertRaises(AudioIOError):
amine@139 758 to_file("audio", b"", "mp3")
amine@140 759
amine@140 760 @patch("auditok.io._WITH_PYDUB", True)
amine@140 761 @genty_dataset(
amine@140 762 ogg_with_extension=("audio.ogg", None),
amine@140 763 ogg_with_audio_format=("audio", "ogg"),
amine@140 764 ogg_format_with_wrong_extension=("audio.wav", "ogg"),
amine@140 765 )
amine@140 766 def test_to_file_compressed(self, filename, audio_format, *mocks):
amine@140 767 with patch("auditok.io.AudioSegment.export") as export:
amine@142 768 tmpdir = TemporaryDirectory()
amine@142 769 filename = os.path.join(tmpdir.name, filename)
amine@140 770 to_file(b"\0\0", filename, audio_format, **AUDIO_PARAMS_SHORT)
amine@140 771 self.assertTrue(export.called)
amine@142 772 tmpdir.cleanup()