annotate tests/test_io.py @ 190:4d60f490bb5d

Add tests for get_audio_source
author Amine Sehili <amine.sehili@gmail.com>
date Mon, 15 Apr 2019 22:32:47 +0100
parents 3ddaa5eda8d4
children 29472a5a798a
rev   line source
amine@106 1 import os
amine@106 2 import sys
amine@106 3 import math
amine@107 4 from array import array
amine@133 5 from tempfile import NamedTemporaryFile, TemporaryDirectory
amine@110 6 import filecmp
amine@108 7 from unittest import TestCase
amine@108 8 from genty import genty, genty_dataset
amine@157 9 from test_util import _sample_generator, _generate_pure_tone, PURE_TONE_DICT
amine@110 10 from auditok.io import (
amine@126 11 DATA_FORMAT,
amine@121 12 AudioIOError,
amine@110 13 AudioParameterError,
amine@126 14 BufferAudioSource,
amine@162 15 RawAudioSource,
amine@162 16 WaveAudioSource,
amine@190 17 StdinAudioSource,
amine@110 18 check_audio_data,
amine@143 19 _guess_audio_format,
amine@144 20 _normalize_use_channel,
amine@128 21 _get_audio_parameters,
amine@116 22 _array_to_bytes,
amine@118 23 _mix_audio_channels,
amine@119 24 _extract_selected_channel,
amine@126 25 _load_raw,
amine@129 26 _load_wave,
amine@131 27 _load_with_pydub,
amine@190 28 get_audio_source,
amine@120 29 from_file,
amine@111 30 _save_raw,
amine@110 31 _save_wave,
amine@141 32 _save_with_pydub,
amine@135 33 to_file,
amine@110 34 )
amine@106 35
amine@106 36
amine@106 37 if sys.version_info >= (3, 0):
amine@106 38 PYTHON_3 = True
amine@124 39 from unittest.mock import patch, Mock
amine@106 40 else:
amine@106 41 PYTHON_3 = False
amine@124 42 from mock import patch, Mock
amine@120 43
amine@120 44 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
amine@106 45
amine@106 46
amine@108 47 @genty
amine@108 48 class TestIO(TestCase):
amine@108 49 @genty_dataset(
amine@108 50 valid_mono=(b"\0" * 113, 1, 1),
amine@108 51 valid_stereo=(b"\0" * 160, 1, 2),
amine@108 52 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
amine@108 53 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
amine@108 54 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
amine@108 55 )
amine@108 56 def test_check_audio_data(self, data, sample_width, channels, valid=True):
amine@108 57
amine@108 58 if not valid:
amine@108 59 with self.assertRaises(AudioParameterError):
amine@108 60 check_audio_data(data, sample_width, channels)
amine@108 61 else:
amine@108 62 self.assertIsNone(check_audio_data(data, sample_width, channels))
amine@110 63
amine@110 64 @genty_dataset(
amine@143 65 extention_and_format_same=("wav", "filename.wav", "wav"),
amine@143 66 extention_and_format_different=("wav", "filename.mp3", "wav"),
amine@143 67 extention_no_format=(None, "filename.wav", "wav"),
amine@143 68 format_no_extension=("wav", "filename", "wav"),
amine@143 69 no_format_no_extension=(None, "filename", None),
amine@143 70 )
amine@143 71 def test_guess_audio_format(self, fmt, filename, expected):
amine@143 72 result = _guess_audio_format(fmt, filename)
amine@143 73 self.assertEqual(result, expected)
amine@143 74
amine@143 75 @genty_dataset(
amine@144 76 none=(None, 0),
amine@144 77 positive_int=(1, 1),
amine@144 78 negative_int=(-1, -1),
amine@144 79 left=("left", 0),
amine@144 80 right=("right", 1),
amine@144 81 mix=("mix", "mix"),
amine@144 82 )
amine@144 83 def test_normalize_use_channel(self, use_channel, expected):
amine@144 84 result = _normalize_use_channel(use_channel)
amine@144 85 self.assertEqual(result, expected)
amine@144 86
amine@144 87 @genty_dataset(
amine@145 88 simple=((8000, 2, 1, 0), (8000, 2, 1, 0)),
amine@145 89 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
amine@145 90 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
amine@145 91 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
amine@145 92 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
amine@145 93 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
amine@145 94 )
amine@145 95 def test_get_audio_parameters_short_params(self, values, expected):
amine@145 96 params = {k: v for k, v in zip(("sr", "sw", "ch", "uc"), values)}
amine@145 97 result = _get_audio_parameters(params)
amine@145 98 self.assertEqual(result, expected)
amine@145 99
amine@145 100 @genty_dataset(
amine@145 101 simple=((8000, 2, 1, 0), (8000, 2, 1, 0)),
amine@145 102 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
amine@145 103 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
amine@145 104 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
amine@145 105 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
amine@145 106 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
amine@145 107 )
amine@145 108 def test_get_audio_parameters_long_params(self, values, expected):
amine@145 109 params = {
amine@145 110 k: v
amine@145 111 for k, v in zip(
amine@145 112 ("sampling_rate", "sample_width", "channels", "use_channel"),
amine@145 113 values,
amine@145 114 )
amine@145 115 }
amine@145 116 result = _get_audio_parameters(params)
amine@145 117 self.assertEqual(result, expected)
amine@145 118
amine@145 119 @genty_dataset(simple=((8000, 2, 1, 0), (8000, 2, 1, 0)))
amine@145 120 def test_get_audio_parameters_short_and_long_params(
amine@145 121 self, values, expected
amine@145 122 ):
amine@145 123 params = {
amine@145 124 k: v
amine@145 125 for k, v in zip(
amine@145 126 ("sampling_rate", "sample_width", "channels", "use_channel"),
amine@145 127 values,
amine@145 128 )
amine@145 129 }
amine@145 130
amine@145 131 params.update({k: v for k, v in zip(("sr", "sw", "ch", "uc"), "xxxx")})
amine@145 132 result = _get_audio_parameters(params)
amine@145 133 self.assertEqual(result, expected)
amine@145 134
amine@145 135 @genty_dataset(
amine@146 136 str_sampling_rate=(("x", 2, 1, 0),),
amine@146 137 negative_sampling_rate=((-8000, 2, 1, 0),),
amine@146 138 str_sample_width=((8000, "x", 1, 0),),
amine@146 139 negative_sample_width=((8000, -2, 1, 0),),
amine@146 140 str_channels=((8000, 2, "x", 0),),
amine@146 141 negative_channels=((8000, 2, -1, 0),),
amine@146 142 )
amine@146 143 def test_get_audio_parameters_invalid(self, values):
amine@146 144 params = {
amine@146 145 k: v
amine@146 146 for k, v in zip(
amine@146 147 ("sampling_rate", "sample_width", "channels", "use_channel"),
amine@146 148 values,
amine@146 149 )
amine@146 150 }
amine@146 151 with self.assertRaises(AudioParameterError):
amine@146 152 _get_audio_parameters(params)
amine@146 153
amine@146 154 @genty_dataset(
amine@118 155 mono_1byte=([400], 1),
amine@118 156 stereo_1byte=([400, 600], 1),
amine@118 157 three_channel_1byte=([400, 600, 2400], 1),
amine@118 158 mono_2byte=([400], 2),
amine@118 159 stereo_2byte=([400, 600], 2),
amine@118 160 three_channel_2byte=([400, 600, 1150], 2),
amine@118 161 mono_4byte=([400], 4),
amine@118 162 stereo_4byte=([400, 600], 4),
amine@118 163 four_channel_2byte=([400, 600, 1150, 7220], 4),
amine@118 164 )
amine@118 165 def test_mix_audio_channels(self, frequencies, sample_width):
amine@118 166 sampling_rate = 16000
amine@118 167 sample_width = 2
amine@118 168 channels = len(frequencies)
amine@118 169 mono_channels = [
amine@118 170 _generate_pure_tone(
amine@118 171 freq,
amine@118 172 duration_sec=0.1,
amine@118 173 sampling_rate=sampling_rate,
amine@118 174 sample_width=sample_width,
amine@118 175 )
amine@118 176 for freq in frequencies
amine@118 177 ]
amine@118 178 fmt = DATA_FORMAT[sample_width]
amine@118 179 expected = _array_to_bytes(
amine@118 180 array(
amine@118 181 fmt,
amine@118 182 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@118 183 )
amine@118 184 )
amine@118 185 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@118 186 mixed = _mix_audio_channels(data, channels, sample_width)
amine@118 187 self.assertEqual(mixed, expected)
amine@118 188
amine@118 189 @genty_dataset(
amine@119 190 mono_1byte=([400], 1, 0),
amine@119 191 stereo_1byte_2st_channel=([400, 600], 1, 1),
amine@119 192 mono_2byte=([400], 2, 0),
amine@119 193 stereo_2byte_1st_channel=([400, 600], 2, 0),
amine@119 194 stereo_2byte_2nd_channel=([400, 600], 2, 1),
amine@119 195 three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
amine@119 196 three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
amine@119 197 three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
amine@119 198 three_channel_4byte_1st=([400, 600, 1150], 4, 0),
amine@119 199 three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
amine@119 200 )
amine@119 201 def test_extract_selected_channel(
amine@119 202 self, frequencies, sample_width, use_channel
amine@119 203 ):
amine@119 204
amine@119 205 mono_channels = [
amine@119 206 _generate_pure_tone(
amine@119 207 freq,
amine@119 208 duration_sec=0.1,
amine@119 209 sampling_rate=16000,
amine@119 210 sample_width=sample_width,
amine@119 211 )
amine@119 212 for freq in frequencies
amine@119 213 ]
amine@119 214 channels = len(frequencies)
amine@119 215 fmt = DATA_FORMAT[sample_width]
amine@119 216 expected = _array_to_bytes(mono_channels[use_channel])
amine@119 217 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@119 218 selected_channel = _extract_selected_channel(
amine@119 219 data, channels, sample_width, use_channel
amine@119 220 )
amine@119 221 self.assertEqual(selected_channel, expected)
amine@119 222
amine@148 223 @genty_dataset(mono=([400],), three_channel=([600, 1150, 2400],))
amine@148 224 def test_extract_selected_channel_mix(self, frequencies):
amine@148 225
amine@148 226 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@148 227 channels = len(frequencies)
amine@148 228 fmt = DATA_FORMAT[2]
amine@148 229 expected = _array_to_bytes(
amine@148 230 array(
amine@148 231 fmt,
amine@148 232 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@148 233 )
amine@148 234 )
amine@148 235 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@148 236 selected_channel = _extract_selected_channel(data, channels, 2, "mix")
amine@148 237 self.assertEqual(selected_channel, expected)
amine@148 238
amine@149 239 @genty_dataset(positive=(2,), negative=(-3,))
amine@149 240 def test_extract_selected_channel_invalid_use_channel(self, use_channel):
amine@149 241 with self.assertRaises(AudioParameterError):
amine@149 242 _extract_selected_channel(b"\0\0", 2, 2, use_channel)
amine@149 243
amine@119 244 @genty_dataset(
amine@120 245 raw_with_audio_format=(
amine@120 246 "audio",
amine@120 247 "raw",
amine@120 248 "_load_raw",
amine@120 249 AUDIO_PARAMS_SHORT,
amine@120 250 ),
amine@120 251 raw_with_extension=(
amine@120 252 "audio.raw",
amine@120 253 None,
amine@120 254 "_load_raw",
amine@120 255 AUDIO_PARAMS_SHORT,
amine@120 256 ),
amine@120 257 wave_with_audio_format=("audio", "wave", "_load_wave"),
amine@120 258 wav_with_audio_format=("audio", "wave", "_load_wave"),
amine@120 259 wav_with_extension=("audio.wav", None, "_load_wave"),
amine@120 260 format_and_extension_both_given=("audio.dat", "wav", "_load_wave"),
amine@120 261 format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"),
amine@120 262 no_format_nor_extension=("audio", None, "_load_with_pydub"),
amine@120 263 other_formats_ogg=("audio.ogg", None, "_load_with_pydub"),
amine@120 264 other_formats_webm=("audio", "webm", "_load_with_pydub"),
amine@120 265 )
amine@120 266 def test_from_file(
amine@120 267 self, filename, audio_format, funtion_name, kwargs=None
amine@120 268 ):
amine@120 269 funtion_name = "auditok.io." + funtion_name
amine@120 270 if kwargs is None:
amine@120 271 kwargs = {}
amine@120 272 with patch(funtion_name) as patch_function:
amine@120 273 from_file(filename, audio_format, **kwargs)
amine@120 274 self.assertTrue(patch_function.called)
amine@120 275
amine@190 276 def test_from_file_large_file_raw(self,):
amine@162 277 filename = "tests/data/test_16KHZ_mono_400Hz.raw"
amine@190 278 audio_source = from_file(
amine@190 279 filename,
amine@190 280 large_file=True,
amine@190 281 sampling_rate=16000,
amine@190 282 sample_width=2,
amine@190 283 channels=1,
amine@190 284 )
amine@162 285 self.assertIsInstance(audio_source, RawAudioSource)
amine@162 286
amine@190 287 def test_from_file_large_file_wave(self,):
amine@162 288 filename = "tests/data/test_16KHZ_mono_400Hz.wav"
amine@162 289 audio_source = from_file(filename, large_file=True)
amine@162 290 self.assertIsInstance(audio_source, WaveAudioSource)
amine@163 291
amine@190 292 def test_from_file_large_file_compressed(self,):
amine@163 293 filename = "tests/data/test_16KHZ_mono_400Hz.ogg"
amine@163 294 with self.assertRaises(AudioIOError):
amine@163 295 from_file(filename, large_file=True)
amine@162 296
amine@137 297 @genty_dataset(
amine@137 298 missing_sampling_rate=("sr",),
amine@137 299 missing_sample_width=("sw",),
amine@137 300 missing_channels=("ch",),
amine@137 301 )
amine@137 302 def test_from_file_missing_audio_param(self, missing_param):
amine@137 303 with self.assertRaises(AudioParameterError):
amine@137 304 params = AUDIO_PARAMS_SHORT.copy()
amine@137 305 del params[missing_param]
amine@137 306 from_file("audio", audio_format="raw", **params)
amine@137 307
amine@121 308 def test_from_file_no_pydub(self):
amine@121 309 with patch("auditok.io._WITH_PYDUB", False):
amine@121 310 with self.assertRaises(AudioIOError):
amine@121 311 from_file("audio", "mp3")
amine@121 312
amine@111 313 @genty_dataset(
amine@122 314 raw_first_channel=("raw", 0, 400),
amine@122 315 raw_second_channel=("raw", 1, 800),
amine@122 316 raw_third_channel=("raw", 2, 1600),
amine@122 317 raw_left_channel=("raw", "left", 400),
amine@122 318 raw_right_channel=("raw", "right", 800),
amine@122 319 wav_first_channel=("wav", 0, 400),
amine@122 320 wav_second_channel=("wav", 1, 800),
amine@122 321 wav_third_channel=("wav", 2, 1600),
amine@122 322 wav_left_channel=("wav", "left", 400),
amine@122 323 wav_right_channel=("wav", "right", 800),
amine@122 324 )
amine@122 325 def test_from_file_multichannel_audio(
amine@122 326 self, audio_format, use_channel, frequency
amine@122 327 ):
amine@122 328 expected = PURE_TONE_DICT[frequency]
amine@122 329 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format(
amine@122 330 audio_format
amine@122 331 )
amine@122 332 sample_width = 2
amine@122 333 audio_source = from_file(
amine@122 334 filename,
amine@122 335 sampling_rate=16000,
amine@122 336 sample_width=sample_width,
amine@122 337 channels=3,
amine@122 338 use_channel=use_channel,
amine@122 339 )
amine@122 340 fmt = DATA_FORMAT[sample_width]
amine@122 341 data = array(fmt, audio_source._buffer)
amine@122 342 self.assertEqual(data, expected)
amine@122 343
amine@122 344 @genty_dataset(
amine@123 345 raw_mono=("raw", "mono_400Hz", (400,)),
amine@123 346 raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)),
amine@123 347 wav_mono=("wav", "mono_400Hz", (400,)),
amine@123 348 wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)),
amine@123 349 )
amine@123 350 def test_from_file_multichannel_audio_mix(
amine@123 351 self, audio_format, filename_suffix, frequencies
amine@123 352 ):
amine@123 353 sampling_rate = 16000
amine@123 354 sample_width = 2
amine@123 355 channels = len(frequencies)
amine@123 356 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@123 357 channels = len(frequencies)
amine@123 358 fmt = DATA_FORMAT[sample_width]
amine@123 359 expected = _array_to_bytes(
amine@123 360 array(
amine@123 361 fmt,
amine@123 362 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@123 363 )
amine@123 364 )
amine@123 365 filename = "tests/data/test_16KHZ_{}.{}".format(
amine@123 366 filename_suffix, audio_format
amine@123 367 )
amine@123 368 audio_source = from_file(
amine@123 369 filename,
amine@123 370 use_channel="mix",
amine@123 371 sampling_rate=sampling_rate,
amine@123 372 sample_width=2,
amine@123 373 channels=channels,
amine@123 374 )
amine@123 375 mixed = audio_source._buffer
amine@123 376 self.assertEqual((mixed), expected)
amine@123 377
amine@124 378 @patch("auditok.io._WITH_PYDUB", True)
amine@124 379 @patch("auditok.io.BufferAudioSource")
amine@124 380 @genty_dataset(
amine@124 381 ogg_first_channel=("ogg", 0, "from_ogg"),
amine@124 382 ogg_second_channel=("ogg", 1, "from_ogg"),
amine@124 383 ogg_mix=("ogg", "mix", "from_ogg"),
amine@124 384 ogg_default=("ogg", None, "from_ogg"),
amine@124 385 mp3_left_channel=("mp3", "left", "from_mp3"),
amine@124 386 mp3_right_channel=("mp3", "right", "from_mp3"),
amine@124 387 flac_first_channel=("flac", 0, "from_file"),
amine@124 388 flac_second_channel=("flac", 1, "from_file"),
amine@124 389 flv_left_channel=("flv", "left", "from_flv"),
amine@124 390 webm_right_channel=("webm", "right", "from_file"),
amine@124 391 )
amine@124 392 def test_from_file_multichannel_audio_compressed(
amine@124 393 self, audio_format, use_channel, function, *mocks
amine@124 394 ):
amine@124 395 filename = "audio.{}".format(audio_format)
amine@124 396 segment_mock = Mock()
amine@124 397 segment_mock.sample_width = 2
amine@124 398 segment_mock.channels = 2
amine@124 399 segment_mock._data = b"abcd"
amine@124 400 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@124 401 with patch(
amine@124 402 "auditok.io.AudioSegment.{}".format(function)
amine@124 403 ) as open_func:
amine@124 404 open_func.return_value = segment_mock
amine@124 405 from_file(filename, use_channel=use_channel)
amine@124 406 self.assertTrue(open_func.called)
amine@124 407 self.assertTrue(ext_mock.called)
amine@124 408
amine@124 409 use_channel = {"left": 0, "right": 1, None: 0}.get(
amine@124 410 use_channel, use_channel
amine@124 411 )
amine@124 412 ext_mock.assert_called_with(
amine@124 413 segment_mock._data,
amine@124 414 segment_mock.channels,
amine@124 415 segment_mock.sample_width,
amine@124 416 use_channel,
amine@124 417 )
amine@124 418
amine@124 419 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@124 420 with patch(
amine@124 421 "auditok.io.AudioSegment.{}".format(function)
amine@124 422 ) as open_func:
amine@124 423 segment_mock.channels = 1
amine@124 424 open_func.return_value = segment_mock
amine@124 425 from_file(filename, use_channel=use_channel)
amine@124 426 self.assertTrue(open_func.called)
amine@124 427 self.assertFalse(ext_mock.called)
amine@124 428
amine@125 429 @patch("auditok.io._WITH_PYDUB", True)
amine@125 430 @patch("auditok.io.BufferAudioSource")
amine@125 431 @genty_dataset(
amine@125 432 ogg=("ogg", "from_ogg"),
amine@125 433 mp3=("mp3", "from_mp3"),
amine@125 434 flac=("flac", "from_file"),
amine@125 435 )
amine@125 436 def test_from_file_multichannel_audio_mix_compressed(
amine@125 437 self, audio_format, function, *mocks
amine@125 438 ):
amine@125 439 filename = "audio.{}".format(audio_format)
amine@125 440 segment_mock = Mock()
amine@125 441 segment_mock.sample_width = 2
amine@125 442 segment_mock.channels = 2
amine@125 443 segment_mock._data = b"abcd"
amine@125 444 with patch("auditok.io._mix_audio_channels") as mix_mock:
amine@125 445 with patch(
amine@125 446 "auditok.io.AudioSegment.{}".format(function)
amine@125 447 ) as open_func:
amine@125 448 open_func.return_value = segment_mock
amine@125 449 from_file(filename, use_channel="mix")
amine@125 450 self.assertTrue(open_func.called)
amine@125 451 mix_mock.assert_called_with(
amine@125 452 segment_mock._data,
amine@125 453 segment_mock.channels,
amine@125 454 segment_mock.sample_width,
amine@125 455 )
amine@125 456
amine@123 457 @genty_dataset(
amine@126 458 dafault_first_channel=(None, 400),
amine@126 459 first_channel=(0, 400),
amine@126 460 second_channel=(1, 800),
amine@126 461 third_channel=(2, 1600),
amine@126 462 negative_first_channel=(-3, 400),
amine@126 463 negative_second_channel=(-2, 800),
amine@126 464 negative_third_channel=(-1, 1600),
amine@126 465 )
amine@126 466 def test_load_raw(self, use_channel, frequency):
amine@126 467 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw"
amine@126 468 if use_channel is not None:
amine@126 469 audio_source = _load_raw(
amine@126 470 filename,
amine@126 471 sampling_rate=16000,
amine@126 472 sample_width=2,
amine@126 473 channels=3,
amine@126 474 use_channel=use_channel,
amine@126 475 )
amine@126 476 else:
amine@126 477 audio_source = _load_raw(
amine@126 478 filename, sampling_rate=16000, sample_width=2, channels=3
amine@126 479 )
amine@126 480 self.assertIsInstance(audio_source, BufferAudioSource)
amine@126 481 self.assertEqual(audio_source.sampling_rate, 16000)
amine@126 482 self.assertEqual(audio_source.sample_width, 2)
amine@126 483 self.assertEqual(audio_source.channels, 1)
amine@126 484 # generate a pure sine wave tone of the given frequency
amine@126 485 expected = PURE_TONE_DICT[frequency]
amine@126 486 # compre with data read from file
amine@126 487 fmt = DATA_FORMAT[2]
amine@126 488 data = array(fmt, audio_source._buffer)
amine@126 489 self.assertEqual(data, expected)
amine@126 490
amine@126 491 @genty_dataset(
amine@127 492 mono=("mono_400Hz", (400,)),
amine@127 493 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
amine@127 494 )
amine@127 495 def test_load_raw_mix(self, filename_suffix, frequencies):
amine@127 496 sampling_rate = 16000
amine@127 497 sample_width = 2
amine@127 498 channels = len(frequencies)
amine@127 499 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@127 500
amine@127 501 fmt = DATA_FORMAT[sample_width]
amine@127 502 expected = _array_to_bytes(
amine@127 503 array(
amine@127 504 fmt,
amine@127 505 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@127 506 )
amine@127 507 )
amine@127 508 filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix)
amine@127 509 audio_source = _load_raw(
amine@127 510 filename,
amine@127 511 use_channel="mix",
amine@127 512 sampling_rate=sampling_rate,
amine@127 513 sample_width=2,
amine@127 514 channels=channels,
amine@127 515 )
amine@127 516 mixed = audio_source._buffer
amine@127 517 self.assertEqual(mixed, expected)
amine@127 518 self.assertIsInstance(audio_source, BufferAudioSource)
amine@127 519 self.assertEqual(audio_source.sampling_rate, sampling_rate)
amine@127 520 self.assertEqual(audio_source.sample_width, sample_width)
amine@127 521 self.assertEqual(audio_source.channels, 1)
amine@127 522
amine@127 523 @genty_dataset(
amine@128 524 missing_sampling_rate=("sr",),
amine@128 525 missing_sample_width=("sw",),
amine@128 526 missing_channels=("ch",),
amine@128 527 )
amine@128 528 def test_load_raw_missing_audio_param(self, missing_param):
amine@128 529 with self.assertRaises(AudioParameterError):
amine@128 530 params = AUDIO_PARAMS_SHORT.copy()
amine@128 531 del params[missing_param]
amine@128 532 srate, swidth, channels, _ = _get_audio_parameters(params)
amine@128 533 _load_raw("audio", srate, swidth, channels)
amine@128 534
amine@128 535 @genty_dataset(
amine@129 536 dafault_first_channel=(None, 400),
amine@129 537 first_channel=(0, 400),
amine@129 538 second_channel=(1, 800),
amine@129 539 third_channel=(2, 1600),
amine@129 540 negative_first_channel=(-3, 400),
amine@129 541 negative_second_channel=(-2, 800),
amine@129 542 negative_third_channel=(-1, 1600),
amine@129 543 )
amine@129 544 def test_load_wave(self, use_channel, frequency):
amine@129 545 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav"
amine@129 546 if use_channel is not None:
amine@129 547 audio_source = _load_wave(filename, use_channel=use_channel)
amine@129 548 else:
amine@129 549 audio_source = _load_wave(filename)
amine@129 550 self.assertIsInstance(audio_source, BufferAudioSource)
amine@129 551 self.assertEqual(audio_source.sampling_rate, 16000)
amine@129 552 self.assertEqual(audio_source.sample_width, 2)
amine@129 553 self.assertEqual(audio_source.channels, 1)
amine@129 554 # generate a pure sine wave tone of the given frequency
amine@129 555 expected = PURE_TONE_DICT[frequency]
amine@129 556 # compre with data read from file
amine@129 557 fmt = DATA_FORMAT[2]
amine@129 558 data = array(fmt, audio_source._buffer)
amine@129 559 self.assertEqual(data, expected)
amine@129 560
amine@129 561 @genty_dataset(
amine@130 562 mono=("mono_400Hz", (400,)),
amine@130 563 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
amine@130 564 )
amine@130 565 def test_load_wave_mix(self, filename_suffix, frequencies):
amine@130 566 sampling_rate = 16000
amine@130 567 sample_width = 2
amine@130 568 channels = len(frequencies)
amine@130 569 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@130 570 fmt = DATA_FORMAT[sample_width]
amine@130 571 expected = _array_to_bytes(
amine@130 572 array(
amine@130 573 fmt,
amine@130 574 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@130 575 )
amine@130 576 )
amine@130 577 filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix)
amine@130 578 audio_source = _load_wave(filename, use_channel="mix")
amine@130 579 mixed = audio_source._buffer
amine@130 580 self.assertEqual(mixed, expected)
amine@130 581 self.assertIsInstance(audio_source, BufferAudioSource)
amine@130 582 self.assertEqual(audio_source.sampling_rate, sampling_rate)
amine@130 583 self.assertEqual(audio_source.sample_width, sample_width)
amine@130 584 self.assertEqual(audio_source.channels, 1)
amine@130 585
amine@131 586 @patch("auditok.io._WITH_PYDUB", True)
amine@131 587 @patch("auditok.io.BufferAudioSource")
amine@131 588 @genty_dataset(
amine@131 589 ogg_default_first_channel=("ogg", 2, None, "from_ogg"),
amine@131 590 ogg_first_channel=("ogg", 1, 0, "from_ogg"),
amine@131 591 ogg_second_channel=("ogg", 2, 1, "from_ogg"),
amine@131 592 ogg_mix_channels=("ogg", 3, "mix", "from_ogg"),
amine@131 593 mp3_left_channel=("mp3", 1, "left", "from_mp3"),
amine@131 594 mp3_right_channel=("mp3", 2, "right", "from_mp3"),
amine@131 595 mp3_mix_channels=("mp3", 3, "mix", "from_mp3"),
amine@131 596 flac_first_channel=("flac", 2, 0, "from_file"),
amine@131 597 flac_second_channel=("flac", 2, 1, "from_file"),
amine@131 598 flv_left_channel=("flv", 1, "left", "from_flv"),
amine@131 599 webm_right_channel=("webm", 2, "right", "from_file"),
amine@131 600 webm_mix_channels=("webm", 4, "mix", "from_file"),
amine@131 601 )
amine@131 602 def test_load_with_pydub(
amine@131 603 self, audio_format, channels, use_channel, function, *mocks
amine@131 604 ):
amine@131 605 filename = "audio.{}".format(audio_format)
amine@131 606 segment_mock = Mock()
amine@131 607 segment_mock.sample_width = 2
amine@131 608 segment_mock.channels = channels
amine@131 609 segment_mock._data = b"abcdefgh"
amine@131 610 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@131 611 with patch(
amine@131 612 "auditok.io.AudioSegment.{}".format(function)
amine@131 613 ) as open_func:
amine@131 614 open_func.return_value = segment_mock
amine@131 615 use_channel = {"left": 0, "right": 1, None: 0}.get(
amine@131 616 use_channel, use_channel
amine@131 617 )
amine@131 618 _load_with_pydub(filename, audio_format, use_channel)
amine@131 619 self.assertTrue(open_func.called)
amine@131 620 if channels > 1:
amine@131 621 self.assertTrue(ext_mock.called)
amine@131 622 ext_mock.assert_called_with(
amine@131 623 segment_mock._data,
amine@131 624 segment_mock.channels,
amine@131 625 segment_mock.sample_width,
amine@131 626 use_channel,
amine@131 627 )
amine@131 628 else:
amine@131 629 self.assertFalse(ext_mock.called)
amine@131 630
amine@130 631 @genty_dataset(
amine@132 632 mono=("mono_400Hz.raw", (400,)),
amine@132 633 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
amine@132 634 )
amine@132 635 def test_save_raw(self, filename, frequencies):
amine@132 636 filename = "tests/data/test_16KHZ_{}".format(filename)
amine@132 637 sample_width = 2
amine@132 638 fmt = DATA_FORMAT[sample_width]
amine@132 639 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@132 640 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@132 641 tmpfile = NamedTemporaryFile()
amine@136 642 _save_raw(data, tmpfile.name)
amine@132 643 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
amine@132 644
amine@132 645 @genty_dataset(
amine@110 646 mono=("mono_400Hz.wav", (400,)),
amine@110 647 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
amine@110 648 )
amine@110 649 def test_save_wave(self, filename, frequencies):
amine@110 650 filename = "tests/data/test_16KHZ_{}".format(filename)
amine@110 651 sampling_rate = 16000
amine@110 652 sample_width = 2
amine@110 653 channels = len(frequencies)
amine@110 654 fmt = DATA_FORMAT[sample_width]
amine@110 655 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@110 656 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@110 657 tmpfile = NamedTemporaryFile()
amine@136 658 _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels)
amine@110 659 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
amine@132 660
amine@132 661 @genty_dataset(
amine@132 662 missing_sampling_rate=("sr",),
amine@132 663 missing_sample_width=("sw",),
amine@132 664 missing_channels=("ch",),
amine@132 665 )
amine@132 666 def test_save_wave_missing_audio_param(self, missing_param):
amine@132 667 with self.assertRaises(AudioParameterError):
amine@132 668 params = AUDIO_PARAMS_SHORT.copy()
amine@132 669 del params[missing_param]
amine@132 670 srate, swidth, channels, _ = _get_audio_parameters(params)
amine@136 671 _save_wave(b"\0\0", "audio", srate, swidth, channels)
amine@133 672
amine@141 673 def test_save_with_pydub(self):
amine@141 674 with patch("auditok.io.AudioSegment.export") as export:
amine@142 675 tmpdir = TemporaryDirectory()
amine@142 676 filename = os.path.join(tmpdir.name, "audio.ogg")
amine@142 677 _save_with_pydub(b"\0\0", filename, "ogg", 16000, 2, 1)
amine@141 678 self.assertTrue(export.called)
amine@142 679 tmpdir.cleanup()
amine@141 680
amine@133 681 @genty_dataset(
amine@133 682 raw_with_audio_format=("audio", "raw"),
amine@133 683 raw_with_extension=("audio.raw", None),
amine@133 684 raw_with_audio_format_and_extension=("audio.mp3", "raw"),
amine@133 685 raw_no_audio_format_nor_extension=("audio", None),
amine@133 686 )
amine@133 687 def test_to_file_raw(self, filename, audio_format):
amine@133 688 exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw"
amine@133 689 tmpdir = TemporaryDirectory()
amine@133 690 filename = os.path.join(tmpdir.name, filename)
amine@133 691 data = _array_to_bytes(PURE_TONE_DICT[400])
amine@135 692 to_file(data, filename, audio_format=audio_format)
amine@133 693 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
amine@133 694 tmpdir.cleanup()
amine@134 695
amine@134 696 @genty_dataset(
amine@134 697 wav_with_audio_format=("audio", "wav"),
amine@134 698 wav_with_extension=("audio.wav", None),
amine@134 699 wav_with_audio_format_and_extension=("audio.mp3", "wav"),
amine@134 700 wave_with_audio_format=("audio", "wave"),
amine@134 701 wave_with_extension=("audio.wave", None),
amine@134 702 wave_with_audio_format_and_extension=("audio.mp3", "wave"),
amine@134 703 )
amine@135 704 def test_to_file_wave(self, filename, audio_format):
amine@134 705 exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav"
amine@134 706 tmpdir = TemporaryDirectory()
amine@134 707 filename = os.path.join(tmpdir.name, filename)
amine@134 708 data = _array_to_bytes(PURE_TONE_DICT[400])
amine@135 709 to_file(
amine@135 710 data,
amine@135 711 filename,
amine@135 712 audio_format=audio_format,
amine@135 713 sampling_rate=16000,
amine@135 714 sample_width=2,
amine@135 715 channels=1,
amine@134 716 )
amine@134 717 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
amine@134 718 tmpdir.cleanup()
amine@138 719
amine@138 720 @genty_dataset(
amine@138 721 missing_sampling_rate=("sr",),
amine@138 722 missing_sample_width=("sw",),
amine@138 723 missing_channels=("ch",),
amine@138 724 )
amine@138 725 def test_to_file_missing_audio_param(self, missing_param):
amine@138 726 params = AUDIO_PARAMS_SHORT.copy()
amine@138 727 del params[missing_param]
amine@138 728 with self.assertRaises(AudioParameterError):
amine@138 729 to_file(b"\0\0", "audio", audio_format="wav", **params)
amine@138 730 with self.assertRaises(AudioParameterError):
amine@138 731 to_file(b"\0\0", "audio", audio_format="mp3", **params)
amine@139 732
amine@139 733 def test_to_file_no_pydub(self):
amine@139 734 with patch("auditok.io._WITH_PYDUB", False):
amine@139 735 with self.assertRaises(AudioIOError):
amine@139 736 to_file("audio", b"", "mp3")
amine@140 737
amine@140 738 @patch("auditok.io._WITH_PYDUB", True)
amine@140 739 @genty_dataset(
amine@140 740 ogg_with_extension=("audio.ogg", None),
amine@140 741 ogg_with_audio_format=("audio", "ogg"),
amine@140 742 ogg_format_with_wrong_extension=("audio.wav", "ogg"),
amine@140 743 )
amine@140 744 def test_to_file_compressed(self, filename, audio_format, *mocks):
amine@140 745 with patch("auditok.io.AudioSegment.export") as export:
amine@142 746 tmpdir = TemporaryDirectory()
amine@142 747 filename = os.path.join(tmpdir.name, filename)
amine@140 748 to_file(b"\0\0", filename, audio_format, **AUDIO_PARAMS_SHORT)
amine@140 749 self.assertTrue(export.called)
amine@142 750 tmpdir.cleanup()
amine@190 751
amine@190 752 @genty_dataset(
amine@190 753 string_wave=(
amine@190 754 "tests/data/test_16KHZ_mono_400Hz.wav",
amine@190 755 BufferAudioSource,
amine@190 756 ),
amine@190 757 string_wave_large_file=(
amine@190 758 "tests/data/test_16KHZ_mono_400Hz.wav",
amine@190 759 WaveAudioSource,
amine@190 760 {"large_file": True},
amine@190 761 ),
amine@190 762 stdin=("-", StdinAudioSource),
amine@190 763 string_raw=("tests/data/test_16KHZ_mono_400Hz.raw", BufferAudioSource),
amine@190 764 string_raw_large_file=(
amine@190 765 "tests/data/test_16KHZ_mono_400Hz.raw",
amine@190 766 RawAudioSource,
amine@190 767 {"large_file": True},
amine@190 768 ),
amine@190 769 bytes_=(b"0" * 8000, BufferAudioSource),
amine@190 770 )
amine@190 771 def test_get_audio_source(self, input, expected_type, extra_args=None):
amine@190 772 kwargs = {"sampling_rate": 16000, "sample_width": 2, "channels": 1}
amine@190 773 if extra_args is not None:
amine@190 774 kwargs.update(extra_args)
amine@190 775 audio_source = get_audio_source(input, **kwargs)
amine@190 776 self.assertIsInstance(audio_source, expected_type)