annotate tests/test_io.py @ 163:3ddaa5eda8d4

Add tests for from_file with large_file=True for compressed audio
author Amine Sehili <amine.sehili@gmail.com>
date Sat, 02 Mar 2019 15:04:36 +0100
parents ee883f8c372b
children 4d60f490bb5d
rev   line source
amine@106 1 import os
amine@106 2 import sys
amine@106 3 import math
amine@107 4 from array import array
amine@133 5 from tempfile import NamedTemporaryFile, TemporaryDirectory
amine@110 6 import filecmp
amine@108 7 from unittest import TestCase
amine@108 8 from genty import genty, genty_dataset
amine@157 9 from test_util import _sample_generator, _generate_pure_tone, PURE_TONE_DICT
amine@110 10 from auditok.io import (
amine@126 11 DATA_FORMAT,
amine@121 12 AudioIOError,
amine@110 13 AudioParameterError,
amine@126 14 BufferAudioSource,
amine@162 15 RawAudioSource,
amine@162 16 WaveAudioSource,
amine@110 17 check_audio_data,
amine@143 18 _guess_audio_format,
amine@144 19 _normalize_use_channel,
amine@128 20 _get_audio_parameters,
amine@116 21 _array_to_bytes,
amine@118 22 _mix_audio_channels,
amine@119 23 _extract_selected_channel,
amine@126 24 _load_raw,
amine@129 25 _load_wave,
amine@131 26 _load_with_pydub,
amine@120 27 from_file,
amine@111 28 _save_raw,
amine@110 29 _save_wave,
amine@141 30 _save_with_pydub,
amine@135 31 to_file,
amine@110 32 )
amine@106 33
amine@106 34
amine@106 35 if sys.version_info >= (3, 0):
amine@106 36 PYTHON_3 = True
amine@124 37 from unittest.mock import patch, Mock
amine@106 38 else:
amine@106 39 PYTHON_3 = False
amine@124 40 from mock import patch, Mock
amine@120 41
amine@120 42 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
amine@106 43
amine@106 44
amine@108 45 @genty
amine@108 46 class TestIO(TestCase):
amine@108 47 @genty_dataset(
amine@108 48 valid_mono=(b"\0" * 113, 1, 1),
amine@108 49 valid_stereo=(b"\0" * 160, 1, 2),
amine@108 50 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
amine@108 51 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
amine@108 52 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
amine@108 53 )
amine@108 54 def test_check_audio_data(self, data, sample_width, channels, valid=True):
amine@108 55
amine@108 56 if not valid:
amine@108 57 with self.assertRaises(AudioParameterError):
amine@108 58 check_audio_data(data, sample_width, channels)
amine@108 59 else:
amine@108 60 self.assertIsNone(check_audio_data(data, sample_width, channels))
amine@110 61
amine@110 62 @genty_dataset(
amine@143 63 extention_and_format_same=("wav", "filename.wav", "wav"),
amine@143 64 extention_and_format_different=("wav", "filename.mp3", "wav"),
amine@143 65 extention_no_format=(None, "filename.wav", "wav"),
amine@143 66 format_no_extension=("wav", "filename", "wav"),
amine@143 67 no_format_no_extension=(None, "filename", None),
amine@143 68 )
amine@143 69 def test_guess_audio_format(self, fmt, filename, expected):
amine@143 70 result = _guess_audio_format(fmt, filename)
amine@143 71 self.assertEqual(result, expected)
amine@143 72
amine@143 73 @genty_dataset(
amine@144 74 none=(None, 0),
amine@144 75 positive_int=(1, 1),
amine@144 76 negative_int=(-1, -1),
amine@144 77 left=("left", 0),
amine@144 78 right=("right", 1),
amine@144 79 mix=("mix", "mix"),
amine@144 80 )
amine@144 81 def test_normalize_use_channel(self, use_channel, expected):
amine@144 82 result = _normalize_use_channel(use_channel)
amine@144 83 self.assertEqual(result, expected)
amine@144 84
amine@144 85 @genty_dataset(
amine@145 86 simple=((8000, 2, 1, 0), (8000, 2, 1, 0)),
amine@145 87 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
amine@145 88 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
amine@145 89 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
amine@145 90 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
amine@145 91 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
amine@145 92 )
amine@145 93 def test_get_audio_parameters_short_params(self, values, expected):
amine@145 94 params = {k: v for k, v in zip(("sr", "sw", "ch", "uc"), values)}
amine@145 95 result = _get_audio_parameters(params)
amine@145 96 self.assertEqual(result, expected)
amine@145 97
amine@145 98 @genty_dataset(
amine@145 99 simple=((8000, 2, 1, 0), (8000, 2, 1, 0)),
amine@145 100 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
amine@145 101 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
amine@145 102 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
amine@145 103 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
amine@145 104 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
amine@145 105 )
amine@145 106 def test_get_audio_parameters_long_params(self, values, expected):
amine@145 107 params = {
amine@145 108 k: v
amine@145 109 for k, v in zip(
amine@145 110 ("sampling_rate", "sample_width", "channels", "use_channel"),
amine@145 111 values,
amine@145 112 )
amine@145 113 }
amine@145 114 result = _get_audio_parameters(params)
amine@145 115 self.assertEqual(result, expected)
amine@145 116
amine@145 117 @genty_dataset(simple=((8000, 2, 1, 0), (8000, 2, 1, 0)))
amine@145 118 def test_get_audio_parameters_short_and_long_params(
amine@145 119 self, values, expected
amine@145 120 ):
amine@145 121 params = {
amine@145 122 k: v
amine@145 123 for k, v in zip(
amine@145 124 ("sampling_rate", "sample_width", "channels", "use_channel"),
amine@145 125 values,
amine@145 126 )
amine@145 127 }
amine@145 128
amine@145 129 params.update({k: v for k, v in zip(("sr", "sw", "ch", "uc"), "xxxx")})
amine@145 130 result = _get_audio_parameters(params)
amine@145 131 self.assertEqual(result, expected)
amine@145 132
amine@145 133 @genty_dataset(
amine@146 134 str_sampling_rate=(("x", 2, 1, 0),),
amine@146 135 negative_sampling_rate=((-8000, 2, 1, 0),),
amine@146 136 str_sample_width=((8000, "x", 1, 0),),
amine@146 137 negative_sample_width=((8000, -2, 1, 0),),
amine@146 138 str_channels=((8000, 2, "x", 0),),
amine@146 139 negative_channels=((8000, 2, -1, 0),),
amine@146 140 )
amine@146 141 def test_get_audio_parameters_invalid(self, values):
amine@146 142 params = {
amine@146 143 k: v
amine@146 144 for k, v in zip(
amine@146 145 ("sampling_rate", "sample_width", "channels", "use_channel"),
amine@146 146 values,
amine@146 147 )
amine@146 148 }
amine@146 149 with self.assertRaises(AudioParameterError):
amine@146 150 _get_audio_parameters(params)
amine@146 151
amine@146 152 @genty_dataset(
amine@118 153 mono_1byte=([400], 1),
amine@118 154 stereo_1byte=([400, 600], 1),
amine@118 155 three_channel_1byte=([400, 600, 2400], 1),
amine@118 156 mono_2byte=([400], 2),
amine@118 157 stereo_2byte=([400, 600], 2),
amine@118 158 three_channel_2byte=([400, 600, 1150], 2),
amine@118 159 mono_4byte=([400], 4),
amine@118 160 stereo_4byte=([400, 600], 4),
amine@118 161 four_channel_2byte=([400, 600, 1150, 7220], 4),
amine@118 162 )
amine@118 163 def test_mix_audio_channels(self, frequencies, sample_width):
amine@118 164 sampling_rate = 16000
amine@118 165 sample_width = 2
amine@118 166 channels = len(frequencies)
amine@118 167 mono_channels = [
amine@118 168 _generate_pure_tone(
amine@118 169 freq,
amine@118 170 duration_sec=0.1,
amine@118 171 sampling_rate=sampling_rate,
amine@118 172 sample_width=sample_width,
amine@118 173 )
amine@118 174 for freq in frequencies
amine@118 175 ]
amine@118 176 fmt = DATA_FORMAT[sample_width]
amine@118 177 expected = _array_to_bytes(
amine@118 178 array(
amine@118 179 fmt,
amine@118 180 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@118 181 )
amine@118 182 )
amine@118 183 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@118 184 mixed = _mix_audio_channels(data, channels, sample_width)
amine@118 185 self.assertEqual(mixed, expected)
amine@118 186
amine@118 187 @genty_dataset(
amine@119 188 mono_1byte=([400], 1, 0),
amine@119 189 stereo_1byte_2st_channel=([400, 600], 1, 1),
amine@119 190 mono_2byte=([400], 2, 0),
amine@119 191 stereo_2byte_1st_channel=([400, 600], 2, 0),
amine@119 192 stereo_2byte_2nd_channel=([400, 600], 2, 1),
amine@119 193 three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
amine@119 194 three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
amine@119 195 three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
amine@119 196 three_channel_4byte_1st=([400, 600, 1150], 4, 0),
amine@119 197 three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
amine@119 198 )
amine@119 199 def test_extract_selected_channel(
amine@119 200 self, frequencies, sample_width, use_channel
amine@119 201 ):
amine@119 202
amine@119 203 mono_channels = [
amine@119 204 _generate_pure_tone(
amine@119 205 freq,
amine@119 206 duration_sec=0.1,
amine@119 207 sampling_rate=16000,
amine@119 208 sample_width=sample_width,
amine@119 209 )
amine@119 210 for freq in frequencies
amine@119 211 ]
amine@119 212 channels = len(frequencies)
amine@119 213 fmt = DATA_FORMAT[sample_width]
amine@119 214 expected = _array_to_bytes(mono_channels[use_channel])
amine@119 215 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@119 216 selected_channel = _extract_selected_channel(
amine@119 217 data, channels, sample_width, use_channel
amine@119 218 )
amine@119 219 self.assertEqual(selected_channel, expected)
amine@119 220
amine@148 221 @genty_dataset(mono=([400],), three_channel=([600, 1150, 2400],))
amine@148 222 def test_extract_selected_channel_mix(self, frequencies):
amine@148 223
amine@148 224 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@148 225 channels = len(frequencies)
amine@148 226 fmt = DATA_FORMAT[2]
amine@148 227 expected = _array_to_bytes(
amine@148 228 array(
amine@148 229 fmt,
amine@148 230 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@148 231 )
amine@148 232 )
amine@148 233 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@148 234 selected_channel = _extract_selected_channel(data, channels, 2, "mix")
amine@148 235 self.assertEqual(selected_channel, expected)
amine@148 236
amine@149 237 @genty_dataset(positive=(2,), negative=(-3,))
amine@149 238 def test_extract_selected_channel_invalid_use_channel(self, use_channel):
amine@149 239 with self.assertRaises(AudioParameterError):
amine@149 240 _extract_selected_channel(b"\0\0", 2, 2, use_channel)
amine@149 241
amine@119 242 @genty_dataset(
amine@120 243 raw_with_audio_format=(
amine@120 244 "audio",
amine@120 245 "raw",
amine@120 246 "_load_raw",
amine@120 247 AUDIO_PARAMS_SHORT,
amine@120 248 ),
amine@120 249 raw_with_extension=(
amine@120 250 "audio.raw",
amine@120 251 None,
amine@120 252 "_load_raw",
amine@120 253 AUDIO_PARAMS_SHORT,
amine@120 254 ),
amine@120 255 wave_with_audio_format=("audio", "wave", "_load_wave"),
amine@120 256 wav_with_audio_format=("audio", "wave", "_load_wave"),
amine@120 257 wav_with_extension=("audio.wav", None, "_load_wave"),
amine@120 258 format_and_extension_both_given=("audio.dat", "wav", "_load_wave"),
amine@120 259 format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"),
amine@120 260 no_format_nor_extension=("audio", None, "_load_with_pydub"),
amine@120 261 other_formats_ogg=("audio.ogg", None, "_load_with_pydub"),
amine@120 262 other_formats_webm=("audio", "webm", "_load_with_pydub"),
amine@120 263 )
amine@120 264 def test_from_file(
amine@120 265 self, filename, audio_format, funtion_name, kwargs=None
amine@120 266 ):
amine@120 267 funtion_name = "auditok.io." + funtion_name
amine@120 268 if kwargs is None:
amine@120 269 kwargs = {}
amine@120 270 with patch(funtion_name) as patch_function:
amine@120 271 from_file(filename, audio_format, **kwargs)
amine@120 272 self.assertTrue(patch_function.called)
amine@120 273
amine@162 274 def test_from_file_large_file_raw(self, ):
amine@162 275 filename = "tests/data/test_16KHZ_mono_400Hz.raw"
amine@162 276 audio_source = from_file(filename, large_file=True,
amine@162 277 sampling_rate=16000,
amine@162 278 sample_width=2,
amine@162 279 channels=1)
amine@162 280 self.assertIsInstance(audio_source, RawAudioSource)
amine@162 281
amine@162 282 def test_from_file_large_file_wave(self, ):
amine@162 283 filename = "tests/data/test_16KHZ_mono_400Hz.wav"
amine@162 284 audio_source = from_file(filename, large_file=True)
amine@162 285 self.assertIsInstance(audio_source, WaveAudioSource)
amine@163 286
amine@163 287
amine@163 288 def test_from_file_large_file_compressed(self, ):
amine@163 289 filename = "tests/data/test_16KHZ_mono_400Hz.ogg"
amine@163 290 with self.assertRaises(AudioIOError):
amine@163 291 from_file(filename, large_file=True)
amine@162 292
amine@137 293 @genty_dataset(
amine@137 294 missing_sampling_rate=("sr",),
amine@137 295 missing_sample_width=("sw",),
amine@137 296 missing_channels=("ch",),
amine@137 297 )
amine@137 298 def test_from_file_missing_audio_param(self, missing_param):
amine@137 299 with self.assertRaises(AudioParameterError):
amine@137 300 params = AUDIO_PARAMS_SHORT.copy()
amine@137 301 del params[missing_param]
amine@137 302 from_file("audio", audio_format="raw", **params)
amine@137 303
amine@121 304 def test_from_file_no_pydub(self):
amine@121 305 with patch("auditok.io._WITH_PYDUB", False):
amine@121 306 with self.assertRaises(AudioIOError):
amine@121 307 from_file("audio", "mp3")
amine@121 308
amine@111 309 @genty_dataset(
amine@122 310 raw_first_channel=("raw", 0, 400),
amine@122 311 raw_second_channel=("raw", 1, 800),
amine@122 312 raw_third_channel=("raw", 2, 1600),
amine@122 313 raw_left_channel=("raw", "left", 400),
amine@122 314 raw_right_channel=("raw", "right", 800),
amine@122 315 wav_first_channel=("wav", 0, 400),
amine@122 316 wav_second_channel=("wav", 1, 800),
amine@122 317 wav_third_channel=("wav", 2, 1600),
amine@122 318 wav_left_channel=("wav", "left", 400),
amine@122 319 wav_right_channel=("wav", "right", 800),
amine@122 320 )
amine@122 321 def test_from_file_multichannel_audio(
amine@122 322 self, audio_format, use_channel, frequency
amine@122 323 ):
amine@122 324 expected = PURE_TONE_DICT[frequency]
amine@122 325 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format(
amine@122 326 audio_format
amine@122 327 )
amine@122 328 sample_width = 2
amine@122 329 audio_source = from_file(
amine@122 330 filename,
amine@122 331 sampling_rate=16000,
amine@122 332 sample_width=sample_width,
amine@122 333 channels=3,
amine@122 334 use_channel=use_channel,
amine@122 335 )
amine@122 336 fmt = DATA_FORMAT[sample_width]
amine@122 337 data = array(fmt, audio_source._buffer)
amine@122 338 self.assertEqual(data, expected)
amine@122 339
amine@122 340 @genty_dataset(
amine@123 341 raw_mono=("raw", "mono_400Hz", (400,)),
amine@123 342 raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)),
amine@123 343 wav_mono=("wav", "mono_400Hz", (400,)),
amine@123 344 wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)),
amine@123 345 )
amine@123 346 def test_from_file_multichannel_audio_mix(
amine@123 347 self, audio_format, filename_suffix, frequencies
amine@123 348 ):
amine@123 349 sampling_rate = 16000
amine@123 350 sample_width = 2
amine@123 351 channels = len(frequencies)
amine@123 352 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@123 353 channels = len(frequencies)
amine@123 354 fmt = DATA_FORMAT[sample_width]
amine@123 355 expected = _array_to_bytes(
amine@123 356 array(
amine@123 357 fmt,
amine@123 358 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@123 359 )
amine@123 360 )
amine@123 361 filename = "tests/data/test_16KHZ_{}.{}".format(
amine@123 362 filename_suffix, audio_format
amine@123 363 )
amine@123 364 audio_source = from_file(
amine@123 365 filename,
amine@123 366 use_channel="mix",
amine@123 367 sampling_rate=sampling_rate,
amine@123 368 sample_width=2,
amine@123 369 channels=channels,
amine@123 370 )
amine@123 371 mixed = audio_source._buffer
amine@123 372 self.assertEqual((mixed), expected)
amine@123 373
amine@124 374 @patch("auditok.io._WITH_PYDUB", True)
amine@124 375 @patch("auditok.io.BufferAudioSource")
amine@124 376 @genty_dataset(
amine@124 377 ogg_first_channel=("ogg", 0, "from_ogg"),
amine@124 378 ogg_second_channel=("ogg", 1, "from_ogg"),
amine@124 379 ogg_mix=("ogg", "mix", "from_ogg"),
amine@124 380 ogg_default=("ogg", None, "from_ogg"),
amine@124 381 mp3_left_channel=("mp3", "left", "from_mp3"),
amine@124 382 mp3_right_channel=("mp3", "right", "from_mp3"),
amine@124 383 flac_first_channel=("flac", 0, "from_file"),
amine@124 384 flac_second_channel=("flac", 1, "from_file"),
amine@124 385 flv_left_channel=("flv", "left", "from_flv"),
amine@124 386 webm_right_channel=("webm", "right", "from_file"),
amine@124 387 )
amine@124 388 def test_from_file_multichannel_audio_compressed(
amine@124 389 self, audio_format, use_channel, function, *mocks
amine@124 390 ):
amine@124 391 filename = "audio.{}".format(audio_format)
amine@124 392 segment_mock = Mock()
amine@124 393 segment_mock.sample_width = 2
amine@124 394 segment_mock.channels = 2
amine@124 395 segment_mock._data = b"abcd"
amine@124 396 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@124 397 with patch(
amine@124 398 "auditok.io.AudioSegment.{}".format(function)
amine@124 399 ) as open_func:
amine@124 400 open_func.return_value = segment_mock
amine@124 401 from_file(filename, use_channel=use_channel)
amine@124 402 self.assertTrue(open_func.called)
amine@124 403 self.assertTrue(ext_mock.called)
amine@124 404
amine@124 405 use_channel = {"left": 0, "right": 1, None: 0}.get(
amine@124 406 use_channel, use_channel
amine@124 407 )
amine@124 408 ext_mock.assert_called_with(
amine@124 409 segment_mock._data,
amine@124 410 segment_mock.channels,
amine@124 411 segment_mock.sample_width,
amine@124 412 use_channel,
amine@124 413 )
amine@124 414
amine@124 415 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@124 416 with patch(
amine@124 417 "auditok.io.AudioSegment.{}".format(function)
amine@124 418 ) as open_func:
amine@124 419 segment_mock.channels = 1
amine@124 420 open_func.return_value = segment_mock
amine@124 421 from_file(filename, use_channel=use_channel)
amine@124 422 self.assertTrue(open_func.called)
amine@124 423 self.assertFalse(ext_mock.called)
amine@124 424
amine@125 425 @patch("auditok.io._WITH_PYDUB", True)
amine@125 426 @patch("auditok.io.BufferAudioSource")
amine@125 427 @genty_dataset(
amine@125 428 ogg=("ogg", "from_ogg"),
amine@125 429 mp3=("mp3", "from_mp3"),
amine@125 430 flac=("flac", "from_file"),
amine@125 431 )
amine@125 432 def test_from_file_multichannel_audio_mix_compressed(
amine@125 433 self, audio_format, function, *mocks
amine@125 434 ):
amine@125 435 filename = "audio.{}".format(audio_format)
amine@125 436 segment_mock = Mock()
amine@125 437 segment_mock.sample_width = 2
amine@125 438 segment_mock.channels = 2
amine@125 439 segment_mock._data = b"abcd"
amine@125 440 with patch("auditok.io._mix_audio_channels") as mix_mock:
amine@125 441 with patch(
amine@125 442 "auditok.io.AudioSegment.{}".format(function)
amine@125 443 ) as open_func:
amine@125 444 open_func.return_value = segment_mock
amine@125 445 from_file(filename, use_channel="mix")
amine@125 446 self.assertTrue(open_func.called)
amine@125 447 mix_mock.assert_called_with(
amine@125 448 segment_mock._data,
amine@125 449 segment_mock.channels,
amine@125 450 segment_mock.sample_width,
amine@125 451 )
amine@125 452
amine@123 453 @genty_dataset(
amine@126 454 dafault_first_channel=(None, 400),
amine@126 455 first_channel=(0, 400),
amine@126 456 second_channel=(1, 800),
amine@126 457 third_channel=(2, 1600),
amine@126 458 negative_first_channel=(-3, 400),
amine@126 459 negative_second_channel=(-2, 800),
amine@126 460 negative_third_channel=(-1, 1600),
amine@126 461 )
amine@126 462 def test_load_raw(self, use_channel, frequency):
amine@126 463 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw"
amine@126 464 if use_channel is not None:
amine@126 465 audio_source = _load_raw(
amine@126 466 filename,
amine@126 467 sampling_rate=16000,
amine@126 468 sample_width=2,
amine@126 469 channels=3,
amine@126 470 use_channel=use_channel,
amine@126 471 )
amine@126 472 else:
amine@126 473 audio_source = _load_raw(
amine@126 474 filename, sampling_rate=16000, sample_width=2, channels=3
amine@126 475 )
amine@126 476 self.assertIsInstance(audio_source, BufferAudioSource)
amine@126 477 self.assertEqual(audio_source.sampling_rate, 16000)
amine@126 478 self.assertEqual(audio_source.sample_width, 2)
amine@126 479 self.assertEqual(audio_source.channels, 1)
amine@126 480 # generate a pure sine wave tone of the given frequency
amine@126 481 expected = PURE_TONE_DICT[frequency]
amine@126 482 # compre with data read from file
amine@126 483 fmt = DATA_FORMAT[2]
amine@126 484 data = array(fmt, audio_source._buffer)
amine@126 485 self.assertEqual(data, expected)
amine@126 486
amine@126 487 @genty_dataset(
amine@127 488 mono=("mono_400Hz", (400,)),
amine@127 489 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
amine@127 490 )
amine@127 491 def test_load_raw_mix(self, filename_suffix, frequencies):
amine@127 492 sampling_rate = 16000
amine@127 493 sample_width = 2
amine@127 494 channels = len(frequencies)
amine@127 495 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@127 496
amine@127 497 fmt = DATA_FORMAT[sample_width]
amine@127 498 expected = _array_to_bytes(
amine@127 499 array(
amine@127 500 fmt,
amine@127 501 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@127 502 )
amine@127 503 )
amine@127 504 filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix)
amine@127 505 audio_source = _load_raw(
amine@127 506 filename,
amine@127 507 use_channel="mix",
amine@127 508 sampling_rate=sampling_rate,
amine@127 509 sample_width=2,
amine@127 510 channels=channels,
amine@127 511 )
amine@127 512 mixed = audio_source._buffer
amine@127 513 self.assertEqual(mixed, expected)
amine@127 514 self.assertIsInstance(audio_source, BufferAudioSource)
amine@127 515 self.assertEqual(audio_source.sampling_rate, sampling_rate)
amine@127 516 self.assertEqual(audio_source.sample_width, sample_width)
amine@127 517 self.assertEqual(audio_source.channels, 1)
amine@127 518
amine@127 519 @genty_dataset(
amine@128 520 missing_sampling_rate=("sr",),
amine@128 521 missing_sample_width=("sw",),
amine@128 522 missing_channels=("ch",),
amine@128 523 )
amine@128 524 def test_load_raw_missing_audio_param(self, missing_param):
amine@128 525 with self.assertRaises(AudioParameterError):
amine@128 526 params = AUDIO_PARAMS_SHORT.copy()
amine@128 527 del params[missing_param]
amine@128 528 srate, swidth, channels, _ = _get_audio_parameters(params)
amine@128 529 _load_raw("audio", srate, swidth, channels)
amine@128 530
amine@128 531 @genty_dataset(
amine@129 532 dafault_first_channel=(None, 400),
amine@129 533 first_channel=(0, 400),
amine@129 534 second_channel=(1, 800),
amine@129 535 third_channel=(2, 1600),
amine@129 536 negative_first_channel=(-3, 400),
amine@129 537 negative_second_channel=(-2, 800),
amine@129 538 negative_third_channel=(-1, 1600),
amine@129 539 )
amine@129 540 def test_load_wave(self, use_channel, frequency):
amine@129 541 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav"
amine@129 542 if use_channel is not None:
amine@129 543 audio_source = _load_wave(filename, use_channel=use_channel)
amine@129 544 else:
amine@129 545 audio_source = _load_wave(filename)
amine@129 546 self.assertIsInstance(audio_source, BufferAudioSource)
amine@129 547 self.assertEqual(audio_source.sampling_rate, 16000)
amine@129 548 self.assertEqual(audio_source.sample_width, 2)
amine@129 549 self.assertEqual(audio_source.channels, 1)
amine@129 550 # generate a pure sine wave tone of the given frequency
amine@129 551 expected = PURE_TONE_DICT[frequency]
amine@129 552 # compre with data read from file
amine@129 553 fmt = DATA_FORMAT[2]
amine@129 554 data = array(fmt, audio_source._buffer)
amine@129 555 self.assertEqual(data, expected)
amine@129 556
amine@129 557 @genty_dataset(
amine@130 558 mono=("mono_400Hz", (400,)),
amine@130 559 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
amine@130 560 )
amine@130 561 def test_load_wave_mix(self, filename_suffix, frequencies):
amine@130 562 sampling_rate = 16000
amine@130 563 sample_width = 2
amine@130 564 channels = len(frequencies)
amine@130 565 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@130 566 fmt = DATA_FORMAT[sample_width]
amine@130 567 expected = _array_to_bytes(
amine@130 568 array(
amine@130 569 fmt,
amine@130 570 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@130 571 )
amine@130 572 )
amine@130 573 filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix)
amine@130 574 audio_source = _load_wave(filename, use_channel="mix")
amine@130 575 mixed = audio_source._buffer
amine@130 576 self.assertEqual(mixed, expected)
amine@130 577 self.assertIsInstance(audio_source, BufferAudioSource)
amine@130 578 self.assertEqual(audio_source.sampling_rate, sampling_rate)
amine@130 579 self.assertEqual(audio_source.sample_width, sample_width)
amine@130 580 self.assertEqual(audio_source.channels, 1)
amine@130 581
amine@131 582 @patch("auditok.io._WITH_PYDUB", True)
amine@131 583 @patch("auditok.io.BufferAudioSource")
amine@131 584 @genty_dataset(
amine@131 585 ogg_default_first_channel=("ogg", 2, None, "from_ogg"),
amine@131 586 ogg_first_channel=("ogg", 1, 0, "from_ogg"),
amine@131 587 ogg_second_channel=("ogg", 2, 1, "from_ogg"),
amine@131 588 ogg_mix_channels=("ogg", 3, "mix", "from_ogg"),
amine@131 589 mp3_left_channel=("mp3", 1, "left", "from_mp3"),
amine@131 590 mp3_right_channel=("mp3", 2, "right", "from_mp3"),
amine@131 591 mp3_mix_channels=("mp3", 3, "mix", "from_mp3"),
amine@131 592 flac_first_channel=("flac", 2, 0, "from_file"),
amine@131 593 flac_second_channel=("flac", 2, 1, "from_file"),
amine@131 594 flv_left_channel=("flv", 1, "left", "from_flv"),
amine@131 595 webm_right_channel=("webm", 2, "right", "from_file"),
amine@131 596 webm_mix_channels=("webm", 4, "mix", "from_file"),
amine@131 597 )
amine@131 598 def test_load_with_pydub(
amine@131 599 self, audio_format, channels, use_channel, function, *mocks
amine@131 600 ):
amine@131 601 filename = "audio.{}".format(audio_format)
amine@131 602 segment_mock = Mock()
amine@131 603 segment_mock.sample_width = 2
amine@131 604 segment_mock.channels = channels
amine@131 605 segment_mock._data = b"abcdefgh"
amine@131 606 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@131 607 with patch(
amine@131 608 "auditok.io.AudioSegment.{}".format(function)
amine@131 609 ) as open_func:
amine@131 610 open_func.return_value = segment_mock
amine@131 611 use_channel = {"left": 0, "right": 1, None: 0}.get(
amine@131 612 use_channel, use_channel
amine@131 613 )
amine@131 614 _load_with_pydub(filename, audio_format, use_channel)
amine@131 615 self.assertTrue(open_func.called)
amine@131 616 if channels > 1:
amine@131 617 self.assertTrue(ext_mock.called)
amine@131 618 ext_mock.assert_called_with(
amine@131 619 segment_mock._data,
amine@131 620 segment_mock.channels,
amine@131 621 segment_mock.sample_width,
amine@131 622 use_channel,
amine@131 623 )
amine@131 624 else:
amine@131 625 self.assertFalse(ext_mock.called)
amine@131 626
amine@130 627 @genty_dataset(
amine@132 628 mono=("mono_400Hz.raw", (400,)),
amine@132 629 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
amine@132 630 )
amine@132 631 def test_save_raw(self, filename, frequencies):
amine@132 632 filename = "tests/data/test_16KHZ_{}".format(filename)
amine@132 633 sample_width = 2
amine@132 634 fmt = DATA_FORMAT[sample_width]
amine@132 635 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@132 636 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@132 637 tmpfile = NamedTemporaryFile()
amine@136 638 _save_raw(data, tmpfile.name)
amine@132 639 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
amine@132 640
amine@132 641 @genty_dataset(
amine@110 642 mono=("mono_400Hz.wav", (400,)),
amine@110 643 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
amine@110 644 )
amine@110 645 def test_save_wave(self, filename, frequencies):
amine@110 646 filename = "tests/data/test_16KHZ_{}".format(filename)
amine@110 647 sampling_rate = 16000
amine@110 648 sample_width = 2
amine@110 649 channels = len(frequencies)
amine@110 650 fmt = DATA_FORMAT[sample_width]
amine@110 651 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@110 652 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@110 653 tmpfile = NamedTemporaryFile()
amine@136 654 _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels)
amine@110 655 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
amine@132 656
amine@132 657 @genty_dataset(
amine@132 658 missing_sampling_rate=("sr",),
amine@132 659 missing_sample_width=("sw",),
amine@132 660 missing_channels=("ch",),
amine@132 661 )
amine@132 662 def test_save_wave_missing_audio_param(self, missing_param):
amine@132 663 with self.assertRaises(AudioParameterError):
amine@132 664 params = AUDIO_PARAMS_SHORT.copy()
amine@132 665 del params[missing_param]
amine@132 666 srate, swidth, channels, _ = _get_audio_parameters(params)
amine@136 667 _save_wave(b"\0\0", "audio", srate, swidth, channels)
amine@133 668
amine@141 669 def test_save_with_pydub(self):
amine@141 670 with patch("auditok.io.AudioSegment.export") as export:
amine@142 671 tmpdir = TemporaryDirectory()
amine@142 672 filename = os.path.join(tmpdir.name, "audio.ogg")
amine@142 673 _save_with_pydub(b"\0\0", filename, "ogg", 16000, 2, 1)
amine@141 674 self.assertTrue(export.called)
amine@142 675 tmpdir.cleanup()
amine@141 676
amine@133 677 @genty_dataset(
amine@133 678 raw_with_audio_format=("audio", "raw"),
amine@133 679 raw_with_extension=("audio.raw", None),
amine@133 680 raw_with_audio_format_and_extension=("audio.mp3", "raw"),
amine@133 681 raw_no_audio_format_nor_extension=("audio", None),
amine@133 682 )
amine@133 683 def test_to_file_raw(self, filename, audio_format):
amine@133 684 exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw"
amine@133 685 tmpdir = TemporaryDirectory()
amine@133 686 filename = os.path.join(tmpdir.name, filename)
amine@133 687 data = _array_to_bytes(PURE_TONE_DICT[400])
amine@135 688 to_file(data, filename, audio_format=audio_format)
amine@133 689 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
amine@133 690 tmpdir.cleanup()
amine@134 691
amine@134 692 @genty_dataset(
amine@134 693 wav_with_audio_format=("audio", "wav"),
amine@134 694 wav_with_extension=("audio.wav", None),
amine@134 695 wav_with_audio_format_and_extension=("audio.mp3", "wav"),
amine@134 696 wave_with_audio_format=("audio", "wave"),
amine@134 697 wave_with_extension=("audio.wave", None),
amine@134 698 wave_with_audio_format_and_extension=("audio.mp3", "wave"),
amine@134 699 )
amine@135 700 def test_to_file_wave(self, filename, audio_format):
amine@134 701 exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav"
amine@134 702 tmpdir = TemporaryDirectory()
amine@134 703 filename = os.path.join(tmpdir.name, filename)
amine@134 704 data = _array_to_bytes(PURE_TONE_DICT[400])
amine@135 705 to_file(
amine@135 706 data,
amine@135 707 filename,
amine@135 708 audio_format=audio_format,
amine@135 709 sampling_rate=16000,
amine@135 710 sample_width=2,
amine@135 711 channels=1,
amine@134 712 )
amine@134 713 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
amine@134 714 tmpdir.cleanup()
amine@138 715
amine@138 716 @genty_dataset(
amine@138 717 missing_sampling_rate=("sr",),
amine@138 718 missing_sample_width=("sw",),
amine@138 719 missing_channels=("ch",),
amine@138 720 )
amine@138 721 def test_to_file_missing_audio_param(self, missing_param):
amine@138 722 params = AUDIO_PARAMS_SHORT.copy()
amine@138 723 del params[missing_param]
amine@138 724 with self.assertRaises(AudioParameterError):
amine@138 725 to_file(b"\0\0", "audio", audio_format="wav", **params)
amine@138 726 with self.assertRaises(AudioParameterError):
amine@138 727 to_file(b"\0\0", "audio", audio_format="mp3", **params)
amine@139 728
amine@139 729 def test_to_file_no_pydub(self):
amine@139 730 with patch("auditok.io._WITH_PYDUB", False):
amine@139 731 with self.assertRaises(AudioIOError):
amine@139 732 to_file("audio", b"", "mp3")
amine@140 733
amine@140 734 @patch("auditok.io._WITH_PYDUB", True)
amine@140 735 @genty_dataset(
amine@140 736 ogg_with_extension=("audio.ogg", None),
amine@140 737 ogg_with_audio_format=("audio", "ogg"),
amine@140 738 ogg_format_with_wrong_extension=("audio.wav", "ogg"),
amine@140 739 )
amine@140 740 def test_to_file_compressed(self, filename, audio_format, *mocks):
amine@140 741 with patch("auditok.io.AudioSegment.export") as export:
amine@142 742 tmpdir = TemporaryDirectory()
amine@142 743 filename = os.path.join(tmpdir.name, filename)
amine@140 744 to_file(b"\0\0", filename, audio_format, **AUDIO_PARAMS_SHORT)
amine@140 745 self.assertTrue(export.called)
amine@142 746 tmpdir.cleanup()