annotate tests/test_io.py @ 208:29472a5a798a

Harmonize calls to _extract_selected_channel
author Amine Sehili <amine.sehili@gmail.com>
date Fri, 14 Jun 2019 22:40:04 +0100
parents 4d60f490bb5d
children 9047740c5092
rev   line source
amine@106 1 import os
amine@106 2 import sys
amine@106 3 import math
amine@107 4 from array import array
amine@133 5 from tempfile import NamedTemporaryFile, TemporaryDirectory
amine@110 6 import filecmp
amine@108 7 from unittest import TestCase
amine@108 8 from genty import genty, genty_dataset
amine@157 9 from test_util import _sample_generator, _generate_pure_tone, PURE_TONE_DICT
amine@110 10 from auditok.io import (
amine@126 11 DATA_FORMAT,
amine@121 12 AudioIOError,
amine@110 13 AudioParameterError,
amine@126 14 BufferAudioSource,
amine@162 15 RawAudioSource,
amine@162 16 WaveAudioSource,
amine@190 17 StdinAudioSource,
amine@110 18 check_audio_data,
amine@143 19 _guess_audio_format,
amine@144 20 _normalize_use_channel,
amine@128 21 _get_audio_parameters,
amine@116 22 _array_to_bytes,
amine@118 23 _mix_audio_channels,
amine@119 24 _extract_selected_channel,
amine@126 25 _load_raw,
amine@129 26 _load_wave,
amine@131 27 _load_with_pydub,
amine@190 28 get_audio_source,
amine@120 29 from_file,
amine@111 30 _save_raw,
amine@110 31 _save_wave,
amine@141 32 _save_with_pydub,
amine@135 33 to_file,
amine@110 34 )
amine@106 35
amine@106 36
amine@106 37 if sys.version_info >= (3, 0):
amine@106 38 PYTHON_3 = True
amine@124 39 from unittest.mock import patch, Mock
amine@106 40 else:
amine@106 41 PYTHON_3 = False
amine@124 42 from mock import patch, Mock
amine@120 43
amine@120 44 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
amine@106 45
amine@106 46
amine@108 47 @genty
amine@108 48 class TestIO(TestCase):
amine@108 49 @genty_dataset(
amine@108 50 valid_mono=(b"\0" * 113, 1, 1),
amine@108 51 valid_stereo=(b"\0" * 160, 1, 2),
amine@108 52 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
amine@108 53 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
amine@108 54 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
amine@108 55 )
amine@108 56 def test_check_audio_data(self, data, sample_width, channels, valid=True):
amine@108 57
amine@108 58 if not valid:
amine@108 59 with self.assertRaises(AudioParameterError):
amine@108 60 check_audio_data(data, sample_width, channels)
amine@108 61 else:
amine@108 62 self.assertIsNone(check_audio_data(data, sample_width, channels))
amine@110 63
amine@110 64 @genty_dataset(
amine@143 65 extention_and_format_same=("wav", "filename.wav", "wav"),
amine@143 66 extention_and_format_different=("wav", "filename.mp3", "wav"),
amine@143 67 extention_no_format=(None, "filename.wav", "wav"),
amine@143 68 format_no_extension=("wav", "filename", "wav"),
amine@143 69 no_format_no_extension=(None, "filename", None),
amine@143 70 )
amine@143 71 def test_guess_audio_format(self, fmt, filename, expected):
amine@143 72 result = _guess_audio_format(fmt, filename)
amine@143 73 self.assertEqual(result, expected)
amine@143 74
amine@143 75 @genty_dataset(
amine@144 76 none=(None, 0),
amine@208 77 positive_int=(1, 0),
amine@144 78 left=("left", 0),
amine@144 79 right=("right", 1),
amine@144 80 mix=("mix", "mix"),
amine@144 81 )
amine@144 82 def test_normalize_use_channel(self, use_channel, expected):
amine@144 83 result = _normalize_use_channel(use_channel)
amine@144 84 self.assertEqual(result, expected)
amine@144 85
amine@144 86 @genty_dataset(
amine@208 87 int_1=((8000, 2, 1, 1), (8000, 2, 1, 0)),
amine@208 88 int_2=((8000, 2, 1, 2), (8000, 2, 1, 1)),
amine@145 89 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
amine@145 90 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
amine@145 91 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
amine@145 92 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
amine@145 93 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
amine@145 94 )
amine@145 95 def test_get_audio_parameters_short_params(self, values, expected):
amine@208 96 params = dict(zip(("sr", "sw", "ch", "uc"), values))
amine@145 97 result = _get_audio_parameters(params)
amine@145 98 self.assertEqual(result, expected)
amine@145 99
amine@145 100 @genty_dataset(
amine@208 101 int_1=((8000, 2, 1, 1), (8000, 2, 1, 0)),
amine@208 102 int_2=((8000, 2, 1, 2), (8000, 2, 1, 1)),
amine@145 103 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
amine@145 104 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
amine@145 105 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
amine@145 106 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
amine@145 107 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
amine@145 108 )
amine@145 109 def test_get_audio_parameters_long_params(self, values, expected):
amine@208 110 params = dict(zip(("sampling_rate", "sample_width", "channels", "use_channel"), values))
amine@145 111 result = _get_audio_parameters(params)
amine@145 112 self.assertEqual(result, expected)
amine@145 113
amine@208 114 @genty_dataset(simple=((8000, 2, 1, 1), (8000, 2, 1, 0)))
amine@208 115 def test_get_audio_parameters_long_params_shadow_short_ones(
amine@145 116 self, values, expected
amine@145 117 ):
amine@208 118 params = dict(zip(("sampling_rate", "sample_width", "channels", "use_channel"), values))
amine@208 119 params.update(dict(zip(("sr", "sw", "ch", "uc"), "xxxx")))
amine@145 120 result = _get_audio_parameters(params)
amine@145 121 self.assertEqual(result, expected)
amine@145 122
amine@145 123 @genty_dataset(
amine@146 124 str_sampling_rate=(("x", 2, 1, 0),),
amine@146 125 negative_sampling_rate=((-8000, 2, 1, 0),),
amine@146 126 str_sample_width=((8000, "x", 1, 0),),
amine@146 127 negative_sample_width=((8000, -2, 1, 0),),
amine@146 128 str_channels=((8000, 2, "x", 0),),
amine@146 129 negative_channels=((8000, 2, -1, 0),),
amine@146 130 )
amine@146 131 def test_get_audio_parameters_invalid(self, values):
amine@208 132 # TODO 0 or negative use_channel must raise AudioParameterError
amine@208 133 # change implementation, don't accept negative uc
amine@208 134 # hifglight everywhere in doc that uc must be positive
amine@208 135 params = dict(zip(("sampling_rate", "sample_width", "channels", "use_channel"), values))
amine@146 136 with self.assertRaises(AudioParameterError):
amine@146 137 _get_audio_parameters(params)
amine@146 138
amine@146 139 @genty_dataset(
amine@118 140 mono_1byte=([400], 1),
amine@118 141 stereo_1byte=([400, 600], 1),
amine@118 142 three_channel_1byte=([400, 600, 2400], 1),
amine@118 143 mono_2byte=([400], 2),
amine@118 144 stereo_2byte=([400, 600], 2),
amine@118 145 three_channel_2byte=([400, 600, 1150], 2),
amine@118 146 mono_4byte=([400], 4),
amine@118 147 stereo_4byte=([400, 600], 4),
amine@118 148 four_channel_2byte=([400, 600, 1150, 7220], 4),
amine@118 149 )
amine@118 150 def test_mix_audio_channels(self, frequencies, sample_width):
amine@118 151 sampling_rate = 16000
amine@118 152 sample_width = 2
amine@118 153 channels = len(frequencies)
amine@118 154 mono_channels = [
amine@118 155 _generate_pure_tone(
amine@118 156 freq,
amine@118 157 duration_sec=0.1,
amine@118 158 sampling_rate=sampling_rate,
amine@118 159 sample_width=sample_width,
amine@118 160 )
amine@118 161 for freq in frequencies
amine@118 162 ]
amine@118 163 fmt = DATA_FORMAT[sample_width]
amine@118 164 expected = _array_to_bytes(
amine@118 165 array(
amine@118 166 fmt,
amine@118 167 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@118 168 )
amine@118 169 )
amine@118 170 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@118 171 mixed = _mix_audio_channels(data, channels, sample_width)
amine@118 172 self.assertEqual(mixed, expected)
amine@118 173
amine@118 174 @genty_dataset(
amine@119 175 mono_1byte=([400], 1, 0),
amine@119 176 stereo_1byte_2st_channel=([400, 600], 1, 1),
amine@119 177 mono_2byte=([400], 2, 0),
amine@119 178 stereo_2byte_1st_channel=([400, 600], 2, 0),
amine@119 179 stereo_2byte_2nd_channel=([400, 600], 2, 1),
amine@119 180 three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
amine@119 181 three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
amine@119 182 three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
amine@119 183 three_channel_4byte_1st=([400, 600, 1150], 4, 0),
amine@119 184 three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
amine@119 185 )
amine@119 186 def test_extract_selected_channel(
amine@119 187 self, frequencies, sample_width, use_channel
amine@119 188 ):
amine@119 189
amine@119 190 mono_channels = [
amine@119 191 _generate_pure_tone(
amine@119 192 freq,
amine@119 193 duration_sec=0.1,
amine@119 194 sampling_rate=16000,
amine@119 195 sample_width=sample_width,
amine@119 196 )
amine@119 197 for freq in frequencies
amine@119 198 ]
amine@119 199 channels = len(frequencies)
amine@119 200 fmt = DATA_FORMAT[sample_width]
amine@119 201 expected = _array_to_bytes(mono_channels[use_channel])
amine@119 202 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@119 203 selected_channel = _extract_selected_channel(
amine@119 204 data, channels, sample_width, use_channel
amine@119 205 )
amine@119 206 self.assertEqual(selected_channel, expected)
amine@119 207
amine@148 208 @genty_dataset(mono=([400],), three_channel=([600, 1150, 2400],))
amine@148 209 def test_extract_selected_channel_mix(self, frequencies):
amine@148 210
amine@148 211 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@148 212 channels = len(frequencies)
amine@148 213 fmt = DATA_FORMAT[2]
amine@148 214 expected = _array_to_bytes(
amine@148 215 array(
amine@148 216 fmt,
amine@148 217 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@148 218 )
amine@148 219 )
amine@148 220 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@148 221 selected_channel = _extract_selected_channel(data, channels, 2, "mix")
amine@148 222 self.assertEqual(selected_channel, expected)
amine@148 223
amine@149 224 @genty_dataset(positive=(2,), negative=(-3,))
amine@149 225 def test_extract_selected_channel_invalid_use_channel(self, use_channel):
amine@149 226 with self.assertRaises(AudioParameterError):
amine@149 227 _extract_selected_channel(b"\0\0", 2, 2, use_channel)
amine@149 228
amine@119 229 @genty_dataset(
amine@120 230 raw_with_audio_format=(
amine@120 231 "audio",
amine@120 232 "raw",
amine@120 233 "_load_raw",
amine@120 234 AUDIO_PARAMS_SHORT,
amine@120 235 ),
amine@120 236 raw_with_extension=(
amine@120 237 "audio.raw",
amine@120 238 None,
amine@120 239 "_load_raw",
amine@120 240 AUDIO_PARAMS_SHORT,
amine@120 241 ),
amine@120 242 wave_with_audio_format=("audio", "wave", "_load_wave"),
amine@120 243 wav_with_audio_format=("audio", "wave", "_load_wave"),
amine@120 244 wav_with_extension=("audio.wav", None, "_load_wave"),
amine@120 245 format_and_extension_both_given=("audio.dat", "wav", "_load_wave"),
amine@120 246 format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"),
amine@120 247 no_format_nor_extension=("audio", None, "_load_with_pydub"),
amine@120 248 other_formats_ogg=("audio.ogg", None, "_load_with_pydub"),
amine@120 249 other_formats_webm=("audio", "webm", "_load_with_pydub"),
amine@120 250 )
amine@120 251 def test_from_file(
amine@120 252 self, filename, audio_format, funtion_name, kwargs=None
amine@120 253 ):
amine@120 254 funtion_name = "auditok.io." + funtion_name
amine@120 255 if kwargs is None:
amine@120 256 kwargs = {}
amine@120 257 with patch(funtion_name) as patch_function:
amine@120 258 from_file(filename, audio_format, **kwargs)
amine@120 259 self.assertTrue(patch_function.called)
amine@120 260
amine@190 261 def test_from_file_large_file_raw(self,):
amine@162 262 filename = "tests/data/test_16KHZ_mono_400Hz.raw"
amine@190 263 audio_source = from_file(
amine@190 264 filename,
amine@190 265 large_file=True,
amine@190 266 sampling_rate=16000,
amine@190 267 sample_width=2,
amine@190 268 channels=1,
amine@190 269 )
amine@162 270 self.assertIsInstance(audio_source, RawAudioSource)
amine@162 271
amine@190 272 def test_from_file_large_file_wave(self,):
amine@162 273 filename = "tests/data/test_16KHZ_mono_400Hz.wav"
amine@162 274 audio_source = from_file(filename, large_file=True)
amine@162 275 self.assertIsInstance(audio_source, WaveAudioSource)
amine@163 276
amine@190 277 def test_from_file_large_file_compressed(self,):
amine@163 278 filename = "tests/data/test_16KHZ_mono_400Hz.ogg"
amine@163 279 with self.assertRaises(AudioIOError):
amine@163 280 from_file(filename, large_file=True)
amine@162 281
amine@137 282 @genty_dataset(
amine@137 283 missing_sampling_rate=("sr",),
amine@137 284 missing_sample_width=("sw",),
amine@137 285 missing_channels=("ch",),
amine@137 286 )
amine@137 287 def test_from_file_missing_audio_param(self, missing_param):
amine@137 288 with self.assertRaises(AudioParameterError):
amine@137 289 params = AUDIO_PARAMS_SHORT.copy()
amine@137 290 del params[missing_param]
amine@137 291 from_file("audio", audio_format="raw", **params)
amine@137 292
amine@121 293 def test_from_file_no_pydub(self):
amine@121 294 with patch("auditok.io._WITH_PYDUB", False):
amine@121 295 with self.assertRaises(AudioIOError):
amine@121 296 from_file("audio", "mp3")
amine@121 297
amine@111 298 @genty_dataset(
amine@208 299 raw_first_channel=("raw", 1, 400),
amine@208 300 raw_second_channel=("raw", 2, 800),
amine@208 301 raw_third_channel=("raw", 3, 1600),
amine@122 302 raw_left_channel=("raw", "left", 400),
amine@122 303 raw_right_channel=("raw", "right", 800),
amine@208 304 wav_first_channel=("wav", 1, 400),
amine@208 305 wav_second_channel=("wav", 2, 800),
amine@208 306 wav_third_channel=("wav", 3, 1600),
amine@122 307 wav_left_channel=("wav", "left", 400),
amine@122 308 wav_right_channel=("wav", "right", 800),
amine@122 309 )
amine@122 310 def test_from_file_multichannel_audio(
amine@122 311 self, audio_format, use_channel, frequency
amine@122 312 ):
amine@122 313 expected = PURE_TONE_DICT[frequency]
amine@122 314 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format(
amine@122 315 audio_format
amine@122 316 )
amine@122 317 sample_width = 2
amine@122 318 audio_source = from_file(
amine@122 319 filename,
amine@122 320 sampling_rate=16000,
amine@122 321 sample_width=sample_width,
amine@122 322 channels=3,
amine@122 323 use_channel=use_channel,
amine@122 324 )
amine@122 325 fmt = DATA_FORMAT[sample_width]
amine@122 326 data = array(fmt, audio_source._buffer)
amine@122 327 self.assertEqual(data, expected)
amine@122 328
amine@122 329 @genty_dataset(
amine@123 330 raw_mono=("raw", "mono_400Hz", (400,)),
amine@123 331 raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)),
amine@123 332 wav_mono=("wav", "mono_400Hz", (400,)),
amine@123 333 wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)),
amine@123 334 )
amine@123 335 def test_from_file_multichannel_audio_mix(
amine@123 336 self, audio_format, filename_suffix, frequencies
amine@123 337 ):
amine@123 338 sampling_rate = 16000
amine@123 339 sample_width = 2
amine@123 340 channels = len(frequencies)
amine@123 341 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@123 342 channels = len(frequencies)
amine@123 343 fmt = DATA_FORMAT[sample_width]
amine@123 344 expected = _array_to_bytes(
amine@123 345 array(
amine@123 346 fmt,
amine@123 347 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@123 348 )
amine@123 349 )
amine@123 350 filename = "tests/data/test_16KHZ_{}.{}".format(
amine@123 351 filename_suffix, audio_format
amine@123 352 )
amine@123 353 audio_source = from_file(
amine@123 354 filename,
amine@123 355 use_channel="mix",
amine@123 356 sampling_rate=sampling_rate,
amine@123 357 sample_width=2,
amine@123 358 channels=channels,
amine@123 359 )
amine@123 360 mixed = audio_source._buffer
amine@123 361 self.assertEqual((mixed), expected)
amine@123 362
amine@124 363 @patch("auditok.io._WITH_PYDUB", True)
amine@124 364 @patch("auditok.io.BufferAudioSource")
amine@124 365 @genty_dataset(
amine@208 366 ogg_first_channel=("ogg", 1, "from_ogg"),
amine@208 367 ogg_second_channel=("ogg", 2, "from_ogg"),
amine@124 368 ogg_mix=("ogg", "mix", "from_ogg"),
amine@124 369 ogg_default=("ogg", None, "from_ogg"),
amine@124 370 mp3_left_channel=("mp3", "left", "from_mp3"),
amine@124 371 mp3_right_channel=("mp3", "right", "from_mp3"),
amine@208 372 flac_first_channel=("flac", 1, "from_file"),
amine@124 373 flac_second_channel=("flac", 1, "from_file"),
amine@124 374 flv_left_channel=("flv", "left", "from_flv"),
amine@124 375 webm_right_channel=("webm", "right", "from_file"),
amine@124 376 )
amine@124 377 def test_from_file_multichannel_audio_compressed(
amine@124 378 self, audio_format, use_channel, function, *mocks
amine@124 379 ):
amine@124 380 filename = "audio.{}".format(audio_format)
amine@124 381 segment_mock = Mock()
amine@124 382 segment_mock.sample_width = 2
amine@124 383 segment_mock.channels = 2
amine@124 384 segment_mock._data = b"abcd"
amine@124 385 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@124 386 with patch(
amine@124 387 "auditok.io.AudioSegment.{}".format(function)
amine@124 388 ) as open_func:
amine@124 389 open_func.return_value = segment_mock
amine@124 390 from_file(filename, use_channel=use_channel)
amine@124 391 self.assertTrue(open_func.called)
amine@124 392 self.assertTrue(ext_mock.called)
amine@124 393
amine@208 394 use_channel = {"left": 1, "right": 2, None: 1}.get(
amine@124 395 use_channel, use_channel
amine@124 396 )
amine@208 397 if isinstance(use_channel, int):
amine@208 398 # _extract_selected_channel will be called with a channel starting from 0
amine@208 399 use_channel -= 1
amine@124 400 ext_mock.assert_called_with(
amine@124 401 segment_mock._data,
amine@124 402 segment_mock.channels,
amine@124 403 segment_mock.sample_width,
amine@124 404 use_channel,
amine@124 405 )
amine@124 406
amine@124 407 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@124 408 with patch(
amine@124 409 "auditok.io.AudioSegment.{}".format(function)
amine@124 410 ) as open_func:
amine@124 411 segment_mock.channels = 1
amine@124 412 open_func.return_value = segment_mock
amine@124 413 from_file(filename, use_channel=use_channel)
amine@124 414 self.assertTrue(open_func.called)
amine@124 415 self.assertFalse(ext_mock.called)
amine@124 416
amine@125 417 @patch("auditok.io._WITH_PYDUB", True)
amine@125 418 @patch("auditok.io.BufferAudioSource")
amine@125 419 @genty_dataset(
amine@125 420 ogg=("ogg", "from_ogg"),
amine@125 421 mp3=("mp3", "from_mp3"),
amine@125 422 flac=("flac", "from_file"),
amine@125 423 )
amine@125 424 def test_from_file_multichannel_audio_mix_compressed(
amine@125 425 self, audio_format, function, *mocks
amine@125 426 ):
amine@125 427 filename = "audio.{}".format(audio_format)
amine@125 428 segment_mock = Mock()
amine@125 429 segment_mock.sample_width = 2
amine@125 430 segment_mock.channels = 2
amine@125 431 segment_mock._data = b"abcd"
amine@125 432 with patch("auditok.io._mix_audio_channels") as mix_mock:
amine@125 433 with patch(
amine@125 434 "auditok.io.AudioSegment.{}".format(function)
amine@125 435 ) as open_func:
amine@125 436 open_func.return_value = segment_mock
amine@125 437 from_file(filename, use_channel="mix")
amine@125 438 self.assertTrue(open_func.called)
amine@125 439 mix_mock.assert_called_with(
amine@125 440 segment_mock._data,
amine@125 441 segment_mock.channels,
amine@125 442 segment_mock.sample_width,
amine@125 443 )
amine@125 444
amine@123 445 @genty_dataset(
amine@126 446 dafault_first_channel=(None, 400),
amine@126 447 first_channel=(0, 400),
amine@126 448 second_channel=(1, 800),
amine@126 449 third_channel=(2, 1600),
amine@126 450 negative_first_channel=(-3, 400),
amine@126 451 negative_second_channel=(-2, 800),
amine@126 452 negative_third_channel=(-1, 1600),
amine@126 453 )
amine@126 454 def test_load_raw(self, use_channel, frequency):
amine@126 455 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw"
amine@126 456 if use_channel is not None:
amine@126 457 audio_source = _load_raw(
amine@126 458 filename,
amine@126 459 sampling_rate=16000,
amine@126 460 sample_width=2,
amine@126 461 channels=3,
amine@126 462 use_channel=use_channel,
amine@126 463 )
amine@126 464 else:
amine@126 465 audio_source = _load_raw(
amine@126 466 filename, sampling_rate=16000, sample_width=2, channels=3
amine@126 467 )
amine@126 468 self.assertIsInstance(audio_source, BufferAudioSource)
amine@126 469 self.assertEqual(audio_source.sampling_rate, 16000)
amine@126 470 self.assertEqual(audio_source.sample_width, 2)
amine@126 471 self.assertEqual(audio_source.channels, 1)
amine@126 472 # generate a pure sine wave tone of the given frequency
amine@126 473 expected = PURE_TONE_DICT[frequency]
amine@126 474 # compre with data read from file
amine@126 475 fmt = DATA_FORMAT[2]
amine@126 476 data = array(fmt, audio_source._buffer)
amine@126 477 self.assertEqual(data, expected)
amine@126 478
amine@126 479 @genty_dataset(
amine@127 480 mono=("mono_400Hz", (400,)),
amine@127 481 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
amine@127 482 )
amine@127 483 def test_load_raw_mix(self, filename_suffix, frequencies):
amine@127 484 sampling_rate = 16000
amine@127 485 sample_width = 2
amine@127 486 channels = len(frequencies)
amine@127 487 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@127 488
amine@127 489 fmt = DATA_FORMAT[sample_width]
amine@127 490 expected = _array_to_bytes(
amine@127 491 array(
amine@127 492 fmt,
amine@127 493 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@127 494 )
amine@127 495 )
amine@127 496 filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix)
amine@127 497 audio_source = _load_raw(
amine@127 498 filename,
amine@127 499 use_channel="mix",
amine@127 500 sampling_rate=sampling_rate,
amine@127 501 sample_width=2,
amine@127 502 channels=channels,
amine@127 503 )
amine@127 504 mixed = audio_source._buffer
amine@127 505 self.assertEqual(mixed, expected)
amine@127 506 self.assertIsInstance(audio_source, BufferAudioSource)
amine@127 507 self.assertEqual(audio_source.sampling_rate, sampling_rate)
amine@127 508 self.assertEqual(audio_source.sample_width, sample_width)
amine@127 509 self.assertEqual(audio_source.channels, 1)
amine@127 510
amine@127 511 @genty_dataset(
amine@128 512 missing_sampling_rate=("sr",),
amine@128 513 missing_sample_width=("sw",),
amine@128 514 missing_channels=("ch",),
amine@128 515 )
amine@128 516 def test_load_raw_missing_audio_param(self, missing_param):
amine@128 517 with self.assertRaises(AudioParameterError):
amine@128 518 params = AUDIO_PARAMS_SHORT.copy()
amine@128 519 del params[missing_param]
amine@128 520 srate, swidth, channels, _ = _get_audio_parameters(params)
amine@128 521 _load_raw("audio", srate, swidth, channels)
amine@128 522
amine@128 523 @genty_dataset(
amine@129 524 dafault_first_channel=(None, 400),
amine@129 525 first_channel=(0, 400),
amine@129 526 second_channel=(1, 800),
amine@129 527 third_channel=(2, 1600),
amine@129 528 negative_first_channel=(-3, 400),
amine@129 529 negative_second_channel=(-2, 800),
amine@129 530 negative_third_channel=(-1, 1600),
amine@129 531 )
amine@129 532 def test_load_wave(self, use_channel, frequency):
amine@129 533 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav"
amine@129 534 if use_channel is not None:
amine@129 535 audio_source = _load_wave(filename, use_channel=use_channel)
amine@129 536 else:
amine@129 537 audio_source = _load_wave(filename)
amine@129 538 self.assertIsInstance(audio_source, BufferAudioSource)
amine@129 539 self.assertEqual(audio_source.sampling_rate, 16000)
amine@129 540 self.assertEqual(audio_source.sample_width, 2)
amine@129 541 self.assertEqual(audio_source.channels, 1)
amine@129 542 # generate a pure sine wave tone of the given frequency
amine@129 543 expected = PURE_TONE_DICT[frequency]
amine@129 544 # compre with data read from file
amine@129 545 fmt = DATA_FORMAT[2]
amine@129 546 data = array(fmt, audio_source._buffer)
amine@129 547 self.assertEqual(data, expected)
amine@129 548
amine@129 549 @genty_dataset(
amine@130 550 mono=("mono_400Hz", (400,)),
amine@130 551 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
amine@130 552 )
amine@130 553 def test_load_wave_mix(self, filename_suffix, frequencies):
amine@130 554 sampling_rate = 16000
amine@130 555 sample_width = 2
amine@130 556 channels = len(frequencies)
amine@130 557 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@130 558 fmt = DATA_FORMAT[sample_width]
amine@130 559 expected = _array_to_bytes(
amine@130 560 array(
amine@130 561 fmt,
amine@130 562 (sum(samples) // channels for samples in zip(*mono_channels)),
amine@130 563 )
amine@130 564 )
amine@130 565 filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix)
amine@130 566 audio_source = _load_wave(filename, use_channel="mix")
amine@130 567 mixed = audio_source._buffer
amine@130 568 self.assertEqual(mixed, expected)
amine@130 569 self.assertIsInstance(audio_source, BufferAudioSource)
amine@130 570 self.assertEqual(audio_source.sampling_rate, sampling_rate)
amine@130 571 self.assertEqual(audio_source.sample_width, sample_width)
amine@130 572 self.assertEqual(audio_source.channels, 1)
amine@130 573
amine@131 574 @patch("auditok.io._WITH_PYDUB", True)
amine@131 575 @patch("auditok.io.BufferAudioSource")
amine@131 576 @genty_dataset(
amine@131 577 ogg_default_first_channel=("ogg", 2, None, "from_ogg"),
amine@131 578 ogg_first_channel=("ogg", 1, 0, "from_ogg"),
amine@131 579 ogg_second_channel=("ogg", 2, 1, "from_ogg"),
amine@131 580 ogg_mix_channels=("ogg", 3, "mix", "from_ogg"),
amine@131 581 mp3_left_channel=("mp3", 1, "left", "from_mp3"),
amine@131 582 mp3_right_channel=("mp3", 2, "right", "from_mp3"),
amine@131 583 mp3_mix_channels=("mp3", 3, "mix", "from_mp3"),
amine@131 584 flac_first_channel=("flac", 2, 0, "from_file"),
amine@131 585 flac_second_channel=("flac", 2, 1, "from_file"),
amine@131 586 flv_left_channel=("flv", 1, "left", "from_flv"),
amine@131 587 webm_right_channel=("webm", 2, "right", "from_file"),
amine@131 588 webm_mix_channels=("webm", 4, "mix", "from_file"),
amine@131 589 )
amine@131 590 def test_load_with_pydub(
amine@131 591 self, audio_format, channels, use_channel, function, *mocks
amine@131 592 ):
amine@131 593 filename = "audio.{}".format(audio_format)
amine@131 594 segment_mock = Mock()
amine@131 595 segment_mock.sample_width = 2
amine@131 596 segment_mock.channels = channels
amine@131 597 segment_mock._data = b"abcdefgh"
amine@131 598 with patch("auditok.io._extract_selected_channel") as ext_mock:
amine@131 599 with patch(
amine@131 600 "auditok.io.AudioSegment.{}".format(function)
amine@131 601 ) as open_func:
amine@131 602 open_func.return_value = segment_mock
amine@131 603 use_channel = {"left": 0, "right": 1, None: 0}.get(
amine@131 604 use_channel, use_channel
amine@131 605 )
amine@131 606 _load_with_pydub(filename, audio_format, use_channel)
amine@131 607 self.assertTrue(open_func.called)
amine@131 608 if channels > 1:
amine@131 609 self.assertTrue(ext_mock.called)
amine@131 610 ext_mock.assert_called_with(
amine@131 611 segment_mock._data,
amine@131 612 segment_mock.channels,
amine@131 613 segment_mock.sample_width,
amine@131 614 use_channel,
amine@131 615 )
amine@131 616 else:
amine@131 617 self.assertFalse(ext_mock.called)
amine@131 618
amine@130 619 @genty_dataset(
amine@132 620 mono=("mono_400Hz.raw", (400,)),
amine@132 621 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
amine@132 622 )
amine@132 623 def test_save_raw(self, filename, frequencies):
amine@132 624 filename = "tests/data/test_16KHZ_{}".format(filename)
amine@132 625 sample_width = 2
amine@132 626 fmt = DATA_FORMAT[sample_width]
amine@132 627 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@132 628 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@132 629 tmpfile = NamedTemporaryFile()
amine@136 630 _save_raw(data, tmpfile.name)
amine@132 631 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
amine@132 632
amine@132 633 @genty_dataset(
amine@110 634 mono=("mono_400Hz.wav", (400,)),
amine@110 635 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
amine@110 636 )
amine@110 637 def test_save_wave(self, filename, frequencies):
amine@110 638 filename = "tests/data/test_16KHZ_{}".format(filename)
amine@110 639 sampling_rate = 16000
amine@110 640 sample_width = 2
amine@110 641 channels = len(frequencies)
amine@110 642 fmt = DATA_FORMAT[sample_width]
amine@110 643 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
amine@110 644 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
amine@110 645 tmpfile = NamedTemporaryFile()
amine@136 646 _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels)
amine@110 647 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
amine@132 648
amine@132 649 @genty_dataset(
amine@132 650 missing_sampling_rate=("sr",),
amine@132 651 missing_sample_width=("sw",),
amine@132 652 missing_channels=("ch",),
amine@132 653 )
amine@132 654 def test_save_wave_missing_audio_param(self, missing_param):
amine@132 655 with self.assertRaises(AudioParameterError):
amine@132 656 params = AUDIO_PARAMS_SHORT.copy()
amine@132 657 del params[missing_param]
amine@132 658 srate, swidth, channels, _ = _get_audio_parameters(params)
amine@136 659 _save_wave(b"\0\0", "audio", srate, swidth, channels)
amine@133 660
amine@141 661 def test_save_with_pydub(self):
amine@141 662 with patch("auditok.io.AudioSegment.export") as export:
amine@142 663 tmpdir = TemporaryDirectory()
amine@142 664 filename = os.path.join(tmpdir.name, "audio.ogg")
amine@142 665 _save_with_pydub(b"\0\0", filename, "ogg", 16000, 2, 1)
amine@141 666 self.assertTrue(export.called)
amine@142 667 tmpdir.cleanup()
amine@141 668
amine@133 669 @genty_dataset(
amine@133 670 raw_with_audio_format=("audio", "raw"),
amine@133 671 raw_with_extension=("audio.raw", None),
amine@133 672 raw_with_audio_format_and_extension=("audio.mp3", "raw"),
amine@133 673 raw_no_audio_format_nor_extension=("audio", None),
amine@133 674 )
amine@133 675 def test_to_file_raw(self, filename, audio_format):
amine@133 676 exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw"
amine@133 677 tmpdir = TemporaryDirectory()
amine@133 678 filename = os.path.join(tmpdir.name, filename)
amine@133 679 data = _array_to_bytes(PURE_TONE_DICT[400])
amine@135 680 to_file(data, filename, audio_format=audio_format)
amine@133 681 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
amine@133 682 tmpdir.cleanup()
amine@134 683
amine@134 684 @genty_dataset(
amine@134 685 wav_with_audio_format=("audio", "wav"),
amine@134 686 wav_with_extension=("audio.wav", None),
amine@134 687 wav_with_audio_format_and_extension=("audio.mp3", "wav"),
amine@134 688 wave_with_audio_format=("audio", "wave"),
amine@134 689 wave_with_extension=("audio.wave", None),
amine@134 690 wave_with_audio_format_and_extension=("audio.mp3", "wave"),
amine@134 691 )
amine@135 692 def test_to_file_wave(self, filename, audio_format):
amine@134 693 exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav"
amine@134 694 tmpdir = TemporaryDirectory()
amine@134 695 filename = os.path.join(tmpdir.name, filename)
amine@134 696 data = _array_to_bytes(PURE_TONE_DICT[400])
amine@135 697 to_file(
amine@135 698 data,
amine@135 699 filename,
amine@135 700 audio_format=audio_format,
amine@135 701 sampling_rate=16000,
amine@135 702 sample_width=2,
amine@135 703 channels=1,
amine@134 704 )
amine@134 705 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
amine@134 706 tmpdir.cleanup()
amine@138 707
amine@138 708 @genty_dataset(
amine@138 709 missing_sampling_rate=("sr",),
amine@138 710 missing_sample_width=("sw",),
amine@138 711 missing_channels=("ch",),
amine@138 712 )
amine@138 713 def test_to_file_missing_audio_param(self, missing_param):
amine@138 714 params = AUDIO_PARAMS_SHORT.copy()
amine@138 715 del params[missing_param]
amine@138 716 with self.assertRaises(AudioParameterError):
amine@138 717 to_file(b"\0\0", "audio", audio_format="wav", **params)
amine@138 718 with self.assertRaises(AudioParameterError):
amine@138 719 to_file(b"\0\0", "audio", audio_format="mp3", **params)
amine@139 720
amine@139 721 def test_to_file_no_pydub(self):
amine@139 722 with patch("auditok.io._WITH_PYDUB", False):
amine@139 723 with self.assertRaises(AudioIOError):
amine@139 724 to_file("audio", b"", "mp3")
amine@140 725
amine@140 726 @patch("auditok.io._WITH_PYDUB", True)
amine@140 727 @genty_dataset(
amine@140 728 ogg_with_extension=("audio.ogg", None),
amine@140 729 ogg_with_audio_format=("audio", "ogg"),
amine@140 730 ogg_format_with_wrong_extension=("audio.wav", "ogg"),
amine@140 731 )
amine@140 732 def test_to_file_compressed(self, filename, audio_format, *mocks):
amine@140 733 with patch("auditok.io.AudioSegment.export") as export:
amine@142 734 tmpdir = TemporaryDirectory()
amine@142 735 filename = os.path.join(tmpdir.name, filename)
amine@140 736 to_file(b"\0\0", filename, audio_format, **AUDIO_PARAMS_SHORT)
amine@140 737 self.assertTrue(export.called)
amine@142 738 tmpdir.cleanup()
amine@190 739
amine@190 740 @genty_dataset(
amine@190 741 string_wave=(
amine@190 742 "tests/data/test_16KHZ_mono_400Hz.wav",
amine@190 743 BufferAudioSource,
amine@190 744 ),
amine@190 745 string_wave_large_file=(
amine@190 746 "tests/data/test_16KHZ_mono_400Hz.wav",
amine@190 747 WaveAudioSource,
amine@190 748 {"large_file": True},
amine@190 749 ),
amine@190 750 stdin=("-", StdinAudioSource),
amine@190 751 string_raw=("tests/data/test_16KHZ_mono_400Hz.raw", BufferAudioSource),
amine@190 752 string_raw_large_file=(
amine@190 753 "tests/data/test_16KHZ_mono_400Hz.raw",
amine@190 754 RawAudioSource,
amine@190 755 {"large_file": True},
amine@190 756 ),
amine@190 757 bytes_=(b"0" * 8000, BufferAudioSource),
amine@190 758 )
amine@190 759 def test_get_audio_source(self, input, expected_type, extra_args=None):
amine@190 760 kwargs = {"sampling_rate": 16000, "sample_width": 2, "channels": 1}
amine@190 761 if extra_args is not None:
amine@190 762 kwargs.update(extra_args)
amine@190 763 audio_source = get_audio_source(input, **kwargs)
amine@208 764 self.assertIsInstance(audio_source, expected_type)