annotate tests/test_AudioReader.py @ 403:996948ada980

Update tests
author Amine Sehili <amine.sehili@gmail.com>
date Sun, 26 May 2024 22:43:08 +0200
parents 323d59b404a2
children 9c9112e23c1c
rev   line source
amine@330 1 import sys
amine@330 2 import wave
amine@403 3 from functools import partial
amine@403 4
amine@403 5 import pytest
amine@403 6
amine@330 7 from auditok import (
amine@403 8 AudioReader,
amine@403 9 BufferAudioSource,
amine@403 10 Recorder,
amine@403 11 WaveAudioSource,
amine@330 12 dataset,
amine@330 13 )
amine@403 14 from auditok.util import _Limiter, _OverlapAudioReader
amine@330 15
amine@330 16
amine@403 17 def _read_all_data(reader):
amine@403 18 blocks = []
amine@403 19 while True:
amine@403 20 data = reader.read()
amine@403 21 if data is None:
amine@403 22 break
amine@403 23 blocks.append(data)
amine@403 24 return b"".join(blocks)
amine@403 25
amine@403 26
amine@403 27 class TestAudioReaderWithFileAudioSource:
amine@403 28 @pytest.fixture(autouse=True)
amine@403 29 def setup_and_teardown(self):
amine@330 30 self.audio_source = WaveAudioSource(
amine@330 31 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@330 32 )
amine@403 33 self.audio_source.open()
amine@403 34 yield
amine@403 35 self.audio_source.close()
amine@330 36
amine@403 37 def test_AudioReader_type(self):
amine@403 38 reader = AudioReader(input=self.audio_source)
amine@400 39 err_msg = (
amine@403 40 "wrong object type, expected: 'AudioReader', found: {0}"
amine@330 41 )
amine@403 42 assert isinstance(reader, AudioReader), err_msg.format(type(reader))
amine@330 43
amine@403 44 def _test_default_block_size(self):
amine@403 45 reader = AudioReader(input=self.audio_source)
amine@403 46 data = reader.read()
amine@403 47 size = len(data)
amine@400 48 assert (
amine@400 49 size == 160
amine@400 50 ), "Wrong default block_size, expected: 160, found: {0}".format(size)
amine@330 51
amine@403 52 @pytest.mark.parametrize(
amine@403 53 "block_dur, expected_nb_samples",
amine@403 54 [
amine@403 55 (None, 160), # default: 10 ms
amine@403 56 (0.025, 400), # 25 ms
amine@403 57 ],
amine@403 58 ids=["default", "_25ms"],
amine@403 59 )
amine@403 60 def test_block_duration(self, block_dur, expected_nb_samples):
amine@403 61 """Test the number of samples read for a given block duration."""
amine@403 62 if block_dur is not None:
amine@403 63 reader = AudioReader(input=self.audio_source, block_dur=block_dur)
amine@403 64 else:
amine@403 65 reader = AudioReader(input=self.audio_source)
amine@403 66 data = reader.read()
amine@403 67 nb_samples = len(data) // reader.sample_width
amine@400 68 assert (
amine@403 69 nb_samples == expected_nb_samples
amine@403 70 ), f"Wrong block_size, expected: {expected_nb_samples}, found: {nb_samples}"
amine@330 71
amine@403 72 @pytest.mark.parametrize(
amine@403 73 "block_dur, hop_dur, expected_nb_blocks, expected_last_block_nb_samples",
amine@403 74 [
amine@403 75 (None, None, 1879, 126), # default: 10 ms
amine@403 76 (0.01, None, 1879, 126), # block_dur_10ms_hop_dur_None
amine@403 77 (0.01, 0.01, 1879, 126), # block_dur_10ms_hop_dur_10ms
amine@403 78 (0.02, None, 940, 126), # block_dur_20ms_hop_dur_None
amine@403 79 (0.025, None, 752, 206), # block_dur_25ms_hop_dur_None
amine@403 80 (0.02, 0.01, 1878, 286), # block_dur_20ms_hop_dur_10ms
amine@403 81 (0.025, 0.005, 3754, 366), # block_dur_25ms_hop_dur_None
amine@403 82 ],
amine@403 83 ids=[
amine@403 84 "default",
amine@403 85 "block_dur_10ms_hop_dur_None",
amine@403 86 "block_dur_10ms_hop_dur_100ms",
amine@403 87 "block_dur_20ms_hop_dur_None",
amine@403 88 "block_dur_25ms_hop_dur_None",
amine@403 89 "block_dur_20ms_hop_dur_10ms",
amine@403 90 "block_dur_25ms_hop_dur_None",
amine@403 91 ],
amine@403 92 )
amine@403 93 def test_hop_duration(
amine@403 94 self,
amine@403 95 block_dur,
amine@403 96 hop_dur,
amine@403 97 expected_nb_blocks,
amine@403 98 expected_last_block_nb_samples,
amine@403 99 ):
amine@403 100 """Test the number of read blocks and the duration of last block for
amine@403 101 different 'block_dur' and 'hop_dur' values.
amine@330 102
amine@403 103 Args:
amine@403 104 block_dur (float or None): block duration in seconds.
amine@403 105 hop_dur (float or None): hop duration in seconds.
amine@403 106 expected_nb_blocks (int): expected number of read block.
amine@403 107 expected_last_block_nb_samples (int): expected number of sample
amine@403 108 in the last block.
amine@403 109 """
amine@403 110 if block_dur is not None:
amine@403 111 reader = AudioReader(
amine@403 112 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur
amine@403 113 )
amine@403 114 else:
amine@403 115 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur)
amine@330 116
amine@403 117 nb_blocks = 0
amine@403 118 last_block_nb_samples = None
amine@403 119 while True:
amine@403 120 data = reader.read()
amine@403 121 if data is not None:
amine@403 122 nb_blocks += 1
amine@403 123 last_block_nb_samples = len(data) // reader.sample_width
amine@403 124 else:
amine@403 125 break
amine@403 126 err_msg = "Wrong number of blocks read from source, expected: "
amine@403 127 err_msg += f"{expected_nb_blocks}, found: {nb_blocks}"
amine@403 128 assert nb_blocks == expected_nb_blocks, err_msg
amine@330 129
amine@403 130 err_msg = (
amine@403 131 "Wrong number of samples in last block read from source, expected: "
amine@403 132 )
amine@403 133 err_msg += (
amine@403 134 f"{expected_last_block_nb_samples}, found: {last_block_nb_samples}"
amine@330 135 )
amine@330 136
amine@403 137 assert last_block_nb_samples == expected_last_block_nb_samples, err_msg
amine@403 138
amine@403 139 def test_hop_duration_exception(self):
amine@403 140 """Test passing hop_dur > block_dur raises ValueError"""
amine@403 141 with pytest.raises(ValueError):
amine@403 142 AudioReader(self.audio_source, block_dur=0.01, hop_dur=0.015)
amine@403 143
amine@403 144 @pytest.mark.parametrize(
amine@403 145 "block_dur, hop_dur",
amine@403 146 [
amine@403 147 (None, None), # default
amine@403 148 (0.01, None), # block_dur_10ms_hop_dur_None
amine@403 149 (None, 0.01), # block_dur_None__hop_dur_10ms
amine@403 150 (0.05, 0.05), # block_dur_50ms_hop_dur_50ms
amine@403 151 ],
amine@403 152 ids=[
amine@403 153 "default",
amine@403 154 "block_dur_10ms_hop_dur_None",
amine@403 155 "block_dur_None__hop_dur_10ms",
amine@403 156 "block_dur_50ms_hop_dur_50ms",
amine@403 157 ],
amine@403 158 )
amine@403 159 def test_reader_class_block_dur_equals_hop_dur(self, block_dur, hop_dur):
amine@403 160 """Test passing hop_dur == block_dur does not create an instance of
amine@403 161 '_OverlapAudioReader'.
amine@403 162 """
amine@403 163 if block_dur is not None:
amine@403 164 reader = AudioReader(
amine@403 165 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur
amine@403 166 )
amine@403 167 else:
amine@403 168 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur)
amine@403 169 assert not isinstance(reader, _OverlapAudioReader)
amine@330 170
amine@330 171 def test_sampling_rate(self):
amine@403 172 reader = AudioReader(input=self.audio_source)
amine@403 173 sampling_rate = reader.sampling_rate
amine@400 174 assert (
amine@403 175 sampling_rate == 16000
amine@403 176 ), f"Wrong sampling rate, expected: 16000, found: {sampling_rate}"
amine@330 177
amine@330 178 def test_sample_width(self):
amine@403 179 reader = AudioReader(input=self.audio_source)
amine@403 180 sample_width = reader.sample_width
amine@400 181 assert (
amine@403 182 sample_width == 2
amine@403 183 ), f"Wrong sample width, expected: 2, found: {sample_width}"
amine@330 184
amine@330 185 def test_channels(self):
amine@403 186 reader = AudioReader(input=self.audio_source)
amine@403 187 channels = reader.channels
amine@400 188 assert (
amine@400 189 channels == 1
amine@403 190 ), f"Wrong number of channels, expected: 1, found: {channels}"
amine@330 191
amine@330 192 def test_read(self):
amine@403 193 reader = AudioReader(input=self.audio_source, block_dur=0.02)
amine@403 194 reader_data = reader.read()
amine@330 195 audio_source = WaveAudioSource(
amine@330 196 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@330 197 )
amine@330 198 audio_source.open()
amine@403 199 audio_source_data = audio_source.read(320)
amine@330 200 audio_source.close()
amine@403 201 assert (
amine@403 202 reader_data == audio_source_data
amine@403 203 ), "Unexpected data read from AudioReader"
amine@330 204
amine@403 205 def test_read_with_overlap(self):
amine@403 206 reader = AudioReader(
amine@403 207 input=self.audio_source, block_dur=0.02, hop_dur=0.01
amine@403 208 )
amine@403 209 _ = reader.read() # first block
amine@403 210 reader_data = reader.read() # second block with 0.01 S overlap
amine@403 211 audio_source = WaveAudioSource(
amine@403 212 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@403 213 )
amine@403 214 audio_source.open()
amine@403 215 _ = audio_source.read(160)
amine@403 216 audio_source_data = audio_source.read(320)
amine@403 217 audio_source.close()
amine@403 218 assert (
amine@403 219 reader_data == audio_source_data
amine@403 220 ), "Unexpected data read from AudioReader"
amine@330 221
amine@403 222 def test_read_from_AudioReader_with_max_read(self):
amine@330 223 # read a maximum of 0.75 seconds from audio source
amine@403 224 reader = AudioReader(input=self.audio_source, max_read=0.75)
amine@403 225 assert isinstance(reader._audio_source._audio_source, _Limiter)
amine@403 226 reader_data = _read_all_data(reader)
amine@330 227
amine@330 228 audio_source = WaveAudioSource(
amine@330 229 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@330 230 )
amine@330 231 audio_source.open()
amine@330 232 audio_source_data = audio_source.read(int(16000 * 0.75))
amine@330 233 audio_source.close()
amine@330 234
amine@400 235 assert (
amine@403 236 reader_data == audio_source_data
amine@403 237 ), f"Unexpected data read from AudioReader with 'max_read = {0.75}'"
amine@330 238
amine@403 239 def test_read_data_size_from_AudioReader_with_max_read(self):
amine@330 240 # read a maximum of 1.191 seconds from audio source
amine@403 241 reader = AudioReader(input=self.audio_source, max_read=1.191)
amine@403 242 assert isinstance(reader._audio_source._audio_source, _Limiter)
amine@403 243 total_samples = round(reader.sampling_rate * 1.191)
amine@403 244 block_size = int(reader.block_dur * reader.sampling_rate)
amine@403 245 nb_full_blocks, last_block_size = divmod(total_samples, block_size)
amine@330 246 total_samples_with_overlap = (
amine@403 247 nb_full_blocks * block_size + last_block_size
amine@330 248 )
amine@403 249 expected_read_bytes = (
amine@403 250 total_samples_with_overlap * reader.sample_width * reader.channels
amine@403 251 )
amine@330 252
amine@403 253 reader_data = _read_all_data(reader)
amine@403 254 total_read = len(reader_data)
amine@403 255 err_msg = f"Wrong data length read from LimiterADS, expected: {expected_read_bytes}, found: {total_read}"
amine@403 256 assert total_read == expected_read_bytes, err_msg
amine@403 257
amine@403 258 def test_read_from_Recorder(self):
amine@403 259 reader = Recorder(input=self.audio_source, block_dur=0.025)
amine@403 260 reader_data = []
amine@403 261 for _ in range(10):
amine@403 262 block = reader.read()
amine@330 263 if block is None:
amine@330 264 break
amine@403 265 reader_data.append(block)
amine@403 266 reader_data = b"".join(reader_data)
amine@330 267
amine@330 268 audio_source = WaveAudioSource(
amine@330 269 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@330 270 )
amine@330 271 audio_source.open()
amine@403 272 audio_source_data = audio_source.read(400 * 10)
amine@330 273 audio_source.close()
amine@330 274
amine@400 275 assert (
amine@403 276 reader_data == audio_source_data
amine@403 277 ), "Unexpected data read from Recorder"
amine@330 278
amine@403 279 def test_AudioReader_rewindable(self):
amine@403 280 reader = AudioReader(input=self.audio_source, record=True)
amine@403 281 assert (
amine@403 282 reader.rewindable
amine@403 283 ), "AudioReader with record=True should be rewindable"
amine@330 284
amine@403 285 def test_AudioReader_record_and_rewind(self):
amine@403 286 reader = AudioReader(
amine@403 287 input=self.audio_source, record=True, block_dur=0.02
amine@330 288 )
amine@403 289 # read 0.02 * 10 = 0.2 sec. of data
amine@330 290 for i in range(10):
amine@403 291 reader.read()
amine@403 292 reader.rewind()
amine@330 293
amine@330 294 # read all available data after rewind
amine@403 295 reader_data = _read_all_data(reader)
amine@330 296
amine@330 297 audio_source = WaveAudioSource(
amine@330 298 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@330 299 )
amine@330 300 audio_source.open()
amine@403 301 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data
amine@330 302 audio_source.close()
amine@330 303
amine@400 304 assert (
amine@403 305 reader_data == audio_source_data
amine@403 306 ), "Unexpected data read from AudioReader with record = True"
amine@330 307
amine@403 308 def test_Recorder_record_and_rewind(self):
amine@403 309 recorder = Recorder(input=self.audio_source, block_dur=0.02)
amine@403 310 # read 0.02 * 10 = 0.2 sec. of data
amine@403 311 for i in range(10):
amine@403 312 recorder.read()
amine@403 313
amine@403 314 recorder.rewind()
amine@403 315
amine@403 316 # read all available data after rewind
amine@403 317 recorder_data = []
amine@403 318 recorder_data = _read_all_data(recorder)
amine@403 319
amine@403 320 audio_source = WaveAudioSource(
amine@403 321 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@403 322 )
amine@403 323 audio_source.open()
amine@403 324 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data
amine@403 325 audio_source.close()
amine@403 326
amine@403 327 assert (
amine@403 328 recorder_data == audio_source_data
amine@403 329 ), "Unexpected data read from Recorder"
amine@403 330
amine@403 331 def test_read_overlapping_blocks(self):
amine@330 332 # Use arbitrary valid block_size and hop_size
amine@330 333 block_size = 1714
amine@330 334 hop_size = 313
amine@403 335 block_dur = block_size / self.audio_source.sampling_rate
amine@403 336 hop_dur = hop_size / self.audio_source.sampling_rate
amine@330 337
amine@403 338 reader = AudioReader(
amine@403 339 input=self.audio_source,
amine@403 340 block_dur=block_dur,
amine@403 341 hop_dur=hop_dur,
amine@330 342 )
amine@330 343
amine@403 344 # Read all available overlapping blocks of data
amine@403 345 reader_data = []
amine@330 346 while True:
amine@403 347 block = reader.read()
amine@330 348 if block is None:
amine@330 349 break
amine@403 350 reader_data.append(block)
amine@330 351
amine@330 352 # Read all data from file and build a BufferAudioSource
amine@330 353 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
amine@330 354 wave_data = fp.readframes(fp.getnframes())
amine@330 355 fp.close()
amine@330 356 audio_source = BufferAudioSource(
amine@403 357 wave_data,
amine@403 358 reader.sampling_rate,
amine@403 359 reader.sample_width,
amine@403 360 reader.channels,
amine@330 361 )
amine@330 362 audio_source.open()
amine@330 363
amine@403 364 # Compare all blocks read from OverlapADS to those read from an
amine@403 365 # audio source with a manual position setting
amine@403 366 for i, block in enumerate(reader_data):
amine@330 367 tmp = audio_source.read(block_size)
amine@400 368 assert (
amine@400 369 block == tmp
amine@403 370 ), f"Unexpected data (block {i}) from reader with overlapping blocks"
amine@330 371 audio_source.position = (i + 1) * hop_size
amine@330 372
amine@330 373 audio_source.close()
amine@330 374
amine@403 375 def test_read_overlapping_blocks_with_max_read(self):
amine@330 376 block_size = 256
amine@330 377 hop_size = 200
amine@403 378 block_dur = block_size / self.audio_source.sampling_rate
amine@403 379 hop_dur = hop_size / self.audio_source.sampling_rate
amine@330 380
amine@403 381 reader = AudioReader(
amine@403 382 input=self.audio_source,
amine@403 383 block_dur=block_dur,
amine@403 384 hop_dur=hop_dur,
amine@403 385 max_read=0.5,
amine@330 386 )
amine@330 387
amine@403 388 # Read all available overlapping blocks of data
amine@403 389 reader_data = []
amine@330 390 while True:
amine@403 391 block = reader.read()
amine@330 392 if block is None:
amine@330 393 break
amine@403 394 reader_data.append(block)
amine@330 395
amine@330 396 # Read all data from file and build a BufferAudioSource
amine@330 397 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
amine@330 398 wave_data = fp.readframes(fp.getnframes())
amine@330 399 fp.close()
amine@330 400 audio_source = BufferAudioSource(
amine@403 401 wave_data,
amine@403 402 reader.sampling_rate,
amine@403 403 reader.sample_width,
amine@403 404 reader.channels,
amine@330 405 )
amine@330 406 audio_source.open()
amine@330 407
amine@403 408 # Compare all blocks read from OverlapADS to those read from an
amine@403 409 # audio source with a manual position setting
amine@403 410 for i, block in enumerate(reader_data):
amine@403 411 tmp = audio_source.read(len(block) // (reader.sw * reader.ch))
amine@403 412 assert (
amine@403 413 block == tmp
amine@403 414 ), f"Unexpected data (block {i}) from reader with overlapping blocks and max_read"
amine@330 415 audio_source.position = (i + 1) * hop_size
amine@330 416
amine@330 417 audio_source.close()
amine@330 418
amine@403 419 def test_length_read_overlapping_blocks_with_max_read(self):
amine@330 420 block_size = 313
amine@330 421 hop_size = 207
amine@403 422 block_dur = block_size / self.audio_source.sampling_rate
amine@403 423 hop_dur = hop_size / self.audio_source.sampling_rate
amine@403 424
amine@403 425 reader = AudioReader(
amine@403 426 input=self.audio_source,
amine@403 427 max_read=1.932,
amine@403 428 block_dur=block_dur,
amine@403 429 hop_dur=hop_dur,
amine@330 430 )
amine@330 431
amine@403 432 total_samples = round(reader.sampling_rate * 1.932)
amine@330 433 first_read_size = block_size
amine@330 434 next_read_size = block_size - hop_size
amine@330 435 nb_next_blocks, last_block_size = divmod(
amine@330 436 (total_samples - first_read_size), next_read_size
amine@330 437 )
amine@330 438 total_samples_with_overlap = (
amine@330 439 first_read_size + next_read_size * nb_next_blocks + last_block_size
amine@330 440 )
amine@403 441 expected_read_bytes = (
amine@403 442 total_samples_with_overlap * reader.sw * reader.channels
amine@403 443 )
amine@330 444
amine@403 445 cache_size = (
amine@403 446 (block_size - hop_size) * reader.sample_width * reader.channels
amine@403 447 )
amine@330 448 total_read = cache_size
amine@330 449
amine@330 450 i = 0
amine@330 451 while True:
amine@403 452 block = reader.read()
amine@330 453 if block is None:
amine@330 454 break
amine@330 455 i += 1
amine@330 456 total_read += len(block) - cache_size
amine@330 457
amine@400 458 err_msg = (
amine@400 459 "Wrong data length read from LimiterADS, expected: {0}, found: {1}"
amine@400 460 )
amine@400 461 assert total_read == expected_read_bytes, err_msg.format(
amine@400 462 expected_read_bytes, total_read
amine@330 463 )
amine@330 464
amine@403 465 def test_reader_with_overlapping_blocks__rewindable(self):
amine@403 466 reader = AudioReader(
amine@403 467 input=self.audio_source,
amine@403 468 block_dur=320,
amine@403 469 hop_dur=160,
amine@330 470 record=True,
amine@330 471 )
amine@403 472 assert (
amine@403 473 reader.rewindable
amine@403 474 ), "AudioReader with record=True should be rewindable"
amine@330 475
amine@403 476 def test_overlapping_blocks_with_max_read_rewind_and_read(self):
amine@330 477 # Use arbitrary valid block_size and hop_size
amine@330 478 block_size = 1600
amine@330 479 hop_size = 400
amine@403 480 block_dur = block_size / self.audio_source.sampling_rate
amine@403 481 hop_dur = hop_size / self.audio_source.sampling_rate
amine@330 482
amine@403 483 reader = AudioReader(
amine@403 484 input=self.audio_source,
amine@403 485 block_dur=block_dur,
amine@403 486 hop_dur=hop_dur,
amine@330 487 record=True,
amine@330 488 )
amine@330 489
amine@330 490 # Read all available data overlapping blocks
amine@330 491 i = 0
amine@330 492 while True:
amine@403 493 block = reader.read()
amine@330 494 if block is None:
amine@330 495 break
amine@330 496 i += 1
amine@330 497
amine@403 498 reader.rewind()
amine@330 499
amine@330 500 # Read all data from file and build a BufferAudioSource
amine@330 501 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
amine@330 502 wave_data = fp.readframes(fp.getnframes())
amine@330 503 fp.close()
amine@330 504 audio_source = BufferAudioSource(
amine@403 505 wave_data,
amine@403 506 reader.sampling_rate,
amine@403 507 reader.sample_width,
amine@403 508 reader.channels,
amine@330 509 )
amine@330 510 audio_source.open()
amine@330 511
amine@403 512 # Compare blocks read from AudioReader to those read from an BufferAudioSource with manual position setting
amine@330 513 for j in range(i):
amine@330 514 tmp = audio_source.read(block_size)
amine@400 515 assert (
amine@403 516 reader.read() == tmp
amine@403 517 ), f"Unexpected data (block {i}) from reader with overlapping blocks and record = True"
amine@330 518 audio_source.position = (j + 1) * hop_size
amine@330 519
amine@330 520 audio_source.close()
amine@330 521
amine@403 522 def test_overlapping_blocks_with_record_and_max_read_rewind_and_read(self):
amine@330 523 # Use arbitrary valid block_size and hop_size
amine@330 524 block_size = 1600
amine@330 525 hop_size = 400
amine@403 526 block_dur = block_size / self.audio_source.sampling_rate
amine@403 527 hop_dur = hop_size / self.audio_source.sampling_rate
amine@330 528
amine@403 529 reader = AudioReader(
amine@403 530 input=self.audio_source,
amine@330 531 max_time=1.50,
amine@403 532 block_dur=block_dur,
amine@403 533 hop_dur=hop_dur,
amine@330 534 record=True,
amine@330 535 )
amine@330 536
amine@330 537 # Read all available data overlapping blocks
amine@330 538 i = 0
amine@330 539 while True:
amine@403 540 block = reader.read()
amine@330 541 if block is None:
amine@330 542 break
amine@330 543 i += 1
amine@330 544
amine@403 545 reader.rewind()
amine@330 546
amine@330 547 # Read all data from file and build a BufferAudioSource
amine@330 548 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
amine@330 549 wave_data = fp.readframes(fp.getnframes())
amine@330 550 fp.close()
amine@330 551 audio_source = BufferAudioSource(
amine@403 552 wave_data,
amine@403 553 reader.sampling_rate,
amine@403 554 reader.sample_width,
amine@403 555 reader.channels,
amine@330 556 )
amine@330 557 audio_source.open()
amine@330 558
amine@403 559 # Compare all blocks read from AudioReader to those read from BufferAudioSource with a manual position setting
amine@330 560 for j in range(i):
amine@330 561 tmp = audio_source.read(block_size)
amine@400 562 assert (
amine@403 563 reader.read() == tmp
amine@400 564 ), "Unexpected block (N={0}) read from OverlapADS".format(i)
amine@330 565 audio_source.position = (j + 1) * hop_size
amine@330 566
amine@330 567 audio_source.close()
amine@330 568
amine@403 569 def test_length_read_overlapping_blocks_with_record_and_max_read(self):
amine@330 570 # Use arbitrary valid block_size and hop_size
amine@330 571 block_size = 1000
amine@330 572 hop_size = 200
amine@403 573 block_dur = block_size / self.audio_source.sampling_rate
amine@403 574 hop_dur = hop_size / self.audio_source.sampling_rate
amine@330 575
amine@403 576 reader = AudioReader(
amine@403 577 input=self.audio_source,
amine@403 578 block_dur=block_dur,
amine@403 579 hop_dur=hop_dur,
amine@330 580 record=True,
amine@403 581 max_read=1.317,
amine@330 582 )
amine@403 583 total_samples = round(reader.sampling_rate * 1.317)
amine@330 584 first_read_size = block_size
amine@330 585 next_read_size = block_size - hop_size
amine@330 586 nb_next_blocks, last_block_size = divmod(
amine@330 587 (total_samples - first_read_size), next_read_size
amine@330 588 )
amine@330 589 total_samples_with_overlap = (
amine@330 590 first_read_size + next_read_size * nb_next_blocks + last_block_size
amine@330 591 )
amine@403 592 expected_read_bytes = (
amine@403 593 total_samples_with_overlap * reader.sample_width * reader.channels
amine@403 594 )
amine@330 595
amine@403 596 cache_size = (
amine@403 597 (block_size - hop_size) * reader.sample_width * reader.channels
amine@403 598 )
amine@330 599 total_read = cache_size
amine@330 600
amine@330 601 i = 0
amine@330 602 while True:
amine@403 603 block = reader.read()
amine@330 604 if block is None:
amine@330 605 break
amine@330 606 i += 1
amine@330 607 total_read += len(block) - cache_size
amine@330 608
amine@403 609 err_msg = f"Wrong data length read from AudioReader, expected: {expected_read_bytes}, found: {total_read}"
amine@403 610 assert total_read == expected_read_bytes, err_msg
amine@330 611
amine@330 612
amine@403 613 def test_AudioReader_raw_data():
amine@330 614
amine@403 615 data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
amine@403 616 block_size = 5
amine@403 617 hop_size = 4
amine@403 618 reader = AudioReader(
amine@403 619 input=data,
amine@403 620 sampling_rate=16,
amine@403 621 sample_width=2,
amine@403 622 channels=1,
amine@403 623 block_dur=block_size / 16,
amine@403 624 hop_dur=hop_size / 16,
amine@403 625 max_read=0.80,
amine@403 626 record=True,
amine@403 627 )
amine@403 628 reader.open()
amine@403 629
amine@403 630 assert (
amine@403 631 reader.sampling_rate == 16
amine@403 632 ), f"Wrong sampling rate, expected: 16, found: {reader.sampling_rate }"
amine@403 633
amine@403 634 assert (
amine@403 635 reader.sample_width == 2
amine@403 636 ), f"Wrong sample width, expected: 2, found: {reader.sample_width}"
amine@403 637
amine@403 638 # Read all available data overlapping blocks
amine@403 639 i = 0
amine@403 640 while True:
amine@403 641 block = reader.read()
amine@403 642 if block is None:
amine@403 643 break
amine@403 644 i += 1
amine@403 645
amine@403 646 reader.rewind()
amine@403 647
amine@403 648 # Build a BufferAudioSource
amine@403 649 audio_source = BufferAudioSource(
amine@403 650 data, reader.sampling_rate, reader.sample_width, reader.channels
amine@403 651 )
amine@403 652 audio_source.open()
amine@403 653
amine@403 654 # Compare all blocks read from AudioReader to those read from an audio
amine@403 655 # source with a manual position setting
amine@403 656 for j in range(i):
amine@403 657 tmp = audio_source.read(block_size)
amine@403 658 block = reader.read()
amine@400 659 assert (
amine@403 660 block == tmp
amine@403 661 ), f"Unexpected block '{block}' (N={i}) read from OverlapADS"
amine@403 662 audio_source.position = (j + 1) * hop_size
amine@403 663 audio_source.close()
amine@403 664 reader.close()
amine@330 665
amine@330 666
amine@403 667 def test_AudioReader_alias_params():
amine@403 668 reader = AudioReader(
amine@403 669 input=b"0" * 1600,
amine@403 670 sr=16000,
amine@403 671 sw=2,
amine@403 672 channels=1,
amine@403 673 )
amine@403 674 assert reader.sampling_rate == 16000, (
amine@403 675 "Unexpected sampling rate: reader.sampling_rate = "
amine@403 676 + f"{reader.sampling_rate} instead of 16000"
amine@403 677 )
amine@403 678 assert reader.sr == 16000, (
amine@403 679 "Unexpected sampling rate: reader.sr = "
amine@403 680 + f"{reader.sr} instead of 16000"
amine@403 681 )
amine@403 682 assert reader.sample_width == 2, (
amine@403 683 "Unexpected sample width: reader.sample_width = "
amine@403 684 + f"{reader.sample_width} instead of 2"
amine@403 685 )
amine@403 686 assert reader.sw == 2, (
amine@403 687 "Unexpected sample width: reader.sw = " + f"{reader.sw} instead of 2"
amine@403 688 )
amine@403 689 assert reader.channels == 1, (
amine@403 690 "Unexpected number of channels: reader.channels = "
amine@403 691 + f"{reader.channels} instead of 1"
amine@403 692 )
amine@403 693 assert reader.ch == 1, (
amine@403 694 "Unexpected number of channels: reader.ch = "
amine@403 695 + f"{reader.ch} instead of 1"
amine@403 696 )
amine@330 697
amine@330 698
amine@400 699 @pytest.mark.parametrize(
amine@400 700 "file_id, max_read, size",
amine@400 701 [
amine@400 702 ("mono_400", 0.5, 16000), # mono
amine@400 703 ("3channel_400-800-1600", 0.5, 16000 * 3), # multichannel
amine@400 704 ],
amine@400 705 ids=["mono", "multichannel"],
amine@400 706 )
amine@400 707 def test_Limiter(file_id, max_read, size):
amine@400 708 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
amine@400 709 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
amine@400 710 with open(input_raw, "rb") as fp:
amine@400 711 expected = fp.read(size)
amine@330 712
amine@400 713 reader = AudioReader(input_wav, block_dur=0.1, max_read=max_read)
amine@400 714 reader.open()
amine@400 715 data = _read_all_data(reader)
amine@400 716 reader.close()
amine@400 717 assert data == expected
amine@330 718
amine@330 719
amine@400 720 @pytest.mark.parametrize(
amine@400 721 "file_id",
amine@400 722 [
amine@400 723 "mono_400", # mono
amine@400 724 "3channel_400-800-1600", # multichannel
amine@400 725 ],
amine@400 726 ids=["mono", "multichannel"],
amine@400 727 )
amine@400 728 def test_Recorder(file_id):
amine@400 729 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
amine@400 730 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
amine@400 731 with open(input_raw, "rb") as fp:
amine@400 732 expected = fp.read()
amine@400 733
amine@400 734 reader = AudioReader(input_wav, block_dur=0.1, record=True)
amine@400 735 reader.open()
amine@400 736 data = _read_all_data(reader)
amine@400 737 assert data == expected
amine@400 738
amine@400 739 # rewind many times
amine@400 740 for _ in range(3):
amine@400 741 reader.rewind()
amine@330 742 data = _read_all_data(reader)
amine@400 743 assert data == expected
amine@400 744 assert data == reader.data
amine@400 745 reader.close()
amine@330 746
amine@330 747
amine@400 748 @pytest.mark.parametrize(
amine@400 749 "file_id",
amine@400 750 [
amine@400 751 "mono_400", # mono
amine@400 752 "3channel_400-800-1600", # multichannel
amine@400 753 ],
amine@400 754 ids=["mono", "multichannel"],
amine@400 755 )
amine@400 756 def test_Recorder_alias(file_id):
amine@400 757 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
amine@400 758 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
amine@400 759 with open(input_raw, "rb") as fp:
amine@400 760 expected = fp.read()
amine@400 761
amine@400 762 reader = Recorder(input_wav, block_dur=0.1)
amine@400 763 reader.open()
amine@400 764 data = _read_all_data(reader)
amine@400 765 assert data == expected
amine@400 766
amine@400 767 # rewind many times
amine@400 768 for _ in range(3):
amine@400 769 reader.rewind()
amine@330 770 data = _read_all_data(reader)
amine@400 771 assert data == expected
amine@400 772 assert data == reader.data
amine@400 773 reader.close()