annotate tests/test_AudioReader.py @ 455:7dae98b84cdd tip master

Merge branch 'master' of https://github.com/amsehili/auditok
author www-data <www-data@c4dm-xenserv-virt2.eecs.qmul.ac.uk>
date Tue, 03 Dec 2024 09:18:01 +0000
parents 9c9112e23c1c
children
rev   line source
amine@330 1 import sys
amine@330 2 import wave
amine@403 3 from functools import partial
amine@403 4
amine@403 5 import pytest
amine@403 6
amine@330 7 from auditok import (
amine@403 8 AudioReader,
amine@403 9 BufferAudioSource,
amine@403 10 Recorder,
amine@403 11 WaveAudioSource,
amine@330 12 dataset,
amine@330 13 )
amine@403 14 from auditok.util import _Limiter, _OverlapAudioReader
amine@330 15
amine@330 16
amine@403 17 def _read_all_data(reader):
amine@403 18 blocks = []
amine@403 19 while True:
amine@403 20 data = reader.read()
amine@403 21 if data is None:
amine@403 22 break
amine@403 23 blocks.append(data)
amine@403 24 return b"".join(blocks)
amine@403 25
amine@403 26
amine@403 27 class TestAudioReaderWithFileAudioSource:
amine@403 28 @pytest.fixture(autouse=True)
amine@403 29 def setup_and_teardown(self):
amine@330 30 self.audio_source = WaveAudioSource(
amine@330 31 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@330 32 )
amine@403 33 self.audio_source.open()
amine@403 34 yield
amine@403 35 self.audio_source.close()
amine@330 36
amine@403 37 def test_AudioReader_type(self):
amine@403 38 reader = AudioReader(input=self.audio_source)
amine@410 39 err_msg = "wrong object type, expected: 'AudioReader', found: {0}"
amine@403 40 assert isinstance(reader, AudioReader), err_msg.format(type(reader))
amine@330 41
amine@403 42 def _test_default_block_size(self):
amine@403 43 reader = AudioReader(input=self.audio_source)
amine@403 44 data = reader.read()
amine@403 45 size = len(data)
amine@400 46 assert (
amine@400 47 size == 160
amine@400 48 ), "Wrong default block_size, expected: 160, found: {0}".format(size)
amine@330 49
amine@403 50 @pytest.mark.parametrize(
amine@403 51 "block_dur, expected_nb_samples",
amine@403 52 [
amine@403 53 (None, 160), # default: 10 ms
amine@403 54 (0.025, 400), # 25 ms
amine@403 55 ],
amine@403 56 ids=["default", "_25ms"],
amine@403 57 )
amine@403 58 def test_block_duration(self, block_dur, expected_nb_samples):
amine@403 59 """Test the number of samples read for a given block duration."""
amine@403 60 if block_dur is not None:
amine@403 61 reader = AudioReader(input=self.audio_source, block_dur=block_dur)
amine@403 62 else:
amine@403 63 reader = AudioReader(input=self.audio_source)
amine@403 64 data = reader.read()
amine@403 65 nb_samples = len(data) // reader.sample_width
amine@400 66 assert (
amine@403 67 nb_samples == expected_nb_samples
amine@403 68 ), f"Wrong block_size, expected: {expected_nb_samples}, found: {nb_samples}"
amine@330 69
amine@403 70 @pytest.mark.parametrize(
amine@403 71 "block_dur, hop_dur, expected_nb_blocks, expected_last_block_nb_samples",
amine@403 72 [
amine@403 73 (None, None, 1879, 126), # default: 10 ms
amine@403 74 (0.01, None, 1879, 126), # block_dur_10ms_hop_dur_None
amine@403 75 (0.01, 0.01, 1879, 126), # block_dur_10ms_hop_dur_10ms
amine@403 76 (0.02, None, 940, 126), # block_dur_20ms_hop_dur_None
amine@403 77 (0.025, None, 752, 206), # block_dur_25ms_hop_dur_None
amine@403 78 (0.02, 0.01, 1878, 286), # block_dur_20ms_hop_dur_10ms
amine@410 79 (0.025, 0.005, 3754, 366), # block_dur_25ms_hop_dur_5ms
amine@403 80 ],
amine@403 81 ids=[
amine@403 82 "default",
amine@403 83 "block_dur_10ms_hop_dur_None",
amine@410 84 "block_dur_10ms_hop_dur_10ms",
amine@403 85 "block_dur_20ms_hop_dur_None",
amine@403 86 "block_dur_25ms_hop_dur_None",
amine@403 87 "block_dur_20ms_hop_dur_10ms",
amine@410 88 "block_dur_25ms_hop_dur_5ms",
amine@403 89 ],
amine@403 90 )
amine@403 91 def test_hop_duration(
amine@403 92 self,
amine@403 93 block_dur,
amine@403 94 hop_dur,
amine@403 95 expected_nb_blocks,
amine@403 96 expected_last_block_nb_samples,
amine@403 97 ):
amine@403 98 """Test the number of read blocks and the duration of last block for
amine@403 99 different 'block_dur' and 'hop_dur' values.
amine@330 100
amine@403 101 Args:
amine@403 102 block_dur (float or None): block duration in seconds.
amine@403 103 hop_dur (float or None): hop duration in seconds.
amine@403 104 expected_nb_blocks (int): expected number of read block.
amine@403 105 expected_last_block_nb_samples (int): expected number of sample
amine@403 106 in the last block.
amine@403 107 """
amine@403 108 if block_dur is not None:
amine@403 109 reader = AudioReader(
amine@403 110 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur
amine@403 111 )
amine@403 112 else:
amine@403 113 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur)
amine@330 114
amine@403 115 nb_blocks = 0
amine@403 116 last_block_nb_samples = None
amine@403 117 while True:
amine@403 118 data = reader.read()
amine@403 119 if data is not None:
amine@403 120 nb_blocks += 1
amine@403 121 last_block_nb_samples = len(data) // reader.sample_width
amine@403 122 else:
amine@403 123 break
amine@403 124 err_msg = "Wrong number of blocks read from source, expected: "
amine@403 125 err_msg += f"{expected_nb_blocks}, found: {nb_blocks}"
amine@403 126 assert nb_blocks == expected_nb_blocks, err_msg
amine@330 127
amine@403 128 err_msg = (
amine@403 129 "Wrong number of samples in last block read from source, expected: "
amine@403 130 )
amine@403 131 err_msg += (
amine@403 132 f"{expected_last_block_nb_samples}, found: {last_block_nb_samples}"
amine@330 133 )
amine@330 134
amine@403 135 assert last_block_nb_samples == expected_last_block_nb_samples, err_msg
amine@403 136
amine@403 137 def test_hop_duration_exception(self):
amine@403 138 """Test passing hop_dur > block_dur raises ValueError"""
amine@403 139 with pytest.raises(ValueError):
amine@403 140 AudioReader(self.audio_source, block_dur=0.01, hop_dur=0.015)
amine@403 141
amine@403 142 @pytest.mark.parametrize(
amine@403 143 "block_dur, hop_dur",
amine@403 144 [
amine@403 145 (None, None), # default
amine@403 146 (0.01, None), # block_dur_10ms_hop_dur_None
amine@403 147 (None, 0.01), # block_dur_None__hop_dur_10ms
amine@403 148 (0.05, 0.05), # block_dur_50ms_hop_dur_50ms
amine@403 149 ],
amine@403 150 ids=[
amine@403 151 "default",
amine@403 152 "block_dur_10ms_hop_dur_None",
amine@403 153 "block_dur_None__hop_dur_10ms",
amine@403 154 "block_dur_50ms_hop_dur_50ms",
amine@403 155 ],
amine@403 156 )
amine@403 157 def test_reader_class_block_dur_equals_hop_dur(self, block_dur, hop_dur):
amine@403 158 """Test passing hop_dur == block_dur does not create an instance of
amine@403 159 '_OverlapAudioReader'.
amine@403 160 """
amine@403 161 if block_dur is not None:
amine@403 162 reader = AudioReader(
amine@403 163 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur
amine@403 164 )
amine@403 165 else:
amine@403 166 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur)
amine@403 167 assert not isinstance(reader, _OverlapAudioReader)
amine@330 168
amine@330 169 def test_sampling_rate(self):
amine@403 170 reader = AudioReader(input=self.audio_source)
amine@403 171 sampling_rate = reader.sampling_rate
amine@400 172 assert (
amine@403 173 sampling_rate == 16000
amine@403 174 ), f"Wrong sampling rate, expected: 16000, found: {sampling_rate}"
amine@330 175
amine@330 176 def test_sample_width(self):
amine@403 177 reader = AudioReader(input=self.audio_source)
amine@403 178 sample_width = reader.sample_width
amine@400 179 assert (
amine@403 180 sample_width == 2
amine@403 181 ), f"Wrong sample width, expected: 2, found: {sample_width}"
amine@330 182
amine@330 183 def test_channels(self):
amine@403 184 reader = AudioReader(input=self.audio_source)
amine@403 185 channels = reader.channels
amine@400 186 assert (
amine@400 187 channels == 1
amine@403 188 ), f"Wrong number of channels, expected: 1, found: {channels}"
amine@330 189
amine@330 190 def test_read(self):
amine@403 191 reader = AudioReader(input=self.audio_source, block_dur=0.02)
amine@403 192 reader_data = reader.read()
amine@330 193 audio_source = WaveAudioSource(
amine@330 194 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@330 195 )
amine@330 196 audio_source.open()
amine@403 197 audio_source_data = audio_source.read(320)
amine@330 198 audio_source.close()
amine@403 199 assert (
amine@403 200 reader_data == audio_source_data
amine@403 201 ), "Unexpected data read from AudioReader"
amine@330 202
amine@403 203 def test_read_with_overlap(self):
amine@403 204 reader = AudioReader(
amine@403 205 input=self.audio_source, block_dur=0.02, hop_dur=0.01
amine@403 206 )
amine@403 207 _ = reader.read() # first block
amine@403 208 reader_data = reader.read() # second block with 0.01 S overlap
amine@403 209 audio_source = WaveAudioSource(
amine@403 210 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@403 211 )
amine@403 212 audio_source.open()
amine@403 213 _ = audio_source.read(160)
amine@403 214 audio_source_data = audio_source.read(320)
amine@403 215 audio_source.close()
amine@403 216 assert (
amine@403 217 reader_data == audio_source_data
amine@403 218 ), "Unexpected data read from AudioReader"
amine@330 219
amine@403 220 def test_read_from_AudioReader_with_max_read(self):
amine@330 221 # read a maximum of 0.75 seconds from audio source
amine@403 222 reader = AudioReader(input=self.audio_source, max_read=0.75)
amine@403 223 assert isinstance(reader._audio_source._audio_source, _Limiter)
amine@403 224 reader_data = _read_all_data(reader)
amine@330 225
amine@330 226 audio_source = WaveAudioSource(
amine@330 227 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@330 228 )
amine@330 229 audio_source.open()
amine@330 230 audio_source_data = audio_source.read(int(16000 * 0.75))
amine@330 231 audio_source.close()
amine@330 232
amine@400 233 assert (
amine@403 234 reader_data == audio_source_data
amine@403 235 ), f"Unexpected data read from AudioReader with 'max_read = {0.75}'"
amine@330 236
amine@403 237 def test_read_data_size_from_AudioReader_with_max_read(self):
amine@330 238 # read a maximum of 1.191 seconds from audio source
amine@403 239 reader = AudioReader(input=self.audio_source, max_read=1.191)
amine@403 240 assert isinstance(reader._audio_source._audio_source, _Limiter)
amine@403 241 total_samples = round(reader.sampling_rate * 1.191)
amine@403 242 block_size = int(reader.block_dur * reader.sampling_rate)
amine@403 243 nb_full_blocks, last_block_size = divmod(total_samples, block_size)
amine@330 244 total_samples_with_overlap = (
amine@403 245 nb_full_blocks * block_size + last_block_size
amine@330 246 )
amine@403 247 expected_read_bytes = (
amine@403 248 total_samples_with_overlap * reader.sample_width * reader.channels
amine@403 249 )
amine@330 250
amine@403 251 reader_data = _read_all_data(reader)
amine@403 252 total_read = len(reader_data)
amine@403 253 err_msg = f"Wrong data length read from LimiterADS, expected: {expected_read_bytes}, found: {total_read}"
amine@403 254 assert total_read == expected_read_bytes, err_msg
amine@403 255
amine@403 256 def test_read_from_Recorder(self):
amine@403 257 reader = Recorder(input=self.audio_source, block_dur=0.025)
amine@403 258 reader_data = []
amine@403 259 for _ in range(10):
amine@403 260 block = reader.read()
amine@330 261 if block is None:
amine@330 262 break
amine@403 263 reader_data.append(block)
amine@403 264 reader_data = b"".join(reader_data)
amine@330 265
amine@330 266 audio_source = WaveAudioSource(
amine@330 267 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@330 268 )
amine@330 269 audio_source.open()
amine@403 270 audio_source_data = audio_source.read(400 * 10)
amine@330 271 audio_source.close()
amine@330 272
amine@400 273 assert (
amine@403 274 reader_data == audio_source_data
amine@403 275 ), "Unexpected data read from Recorder"
amine@330 276
amine@403 277 def test_AudioReader_rewindable(self):
amine@403 278 reader = AudioReader(input=self.audio_source, record=True)
amine@403 279 assert (
amine@403 280 reader.rewindable
amine@403 281 ), "AudioReader with record=True should be rewindable"
amine@330 282
amine@403 283 def test_AudioReader_record_and_rewind(self):
amine@403 284 reader = AudioReader(
amine@403 285 input=self.audio_source, record=True, block_dur=0.02
amine@330 286 )
amine@403 287 # read 0.02 * 10 = 0.2 sec. of data
amine@330 288 for i in range(10):
amine@403 289 reader.read()
amine@403 290 reader.rewind()
amine@330 291
amine@330 292 # read all available data after rewind
amine@403 293 reader_data = _read_all_data(reader)
amine@330 294
amine@330 295 audio_source = WaveAudioSource(
amine@330 296 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@330 297 )
amine@330 298 audio_source.open()
amine@403 299 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data
amine@330 300 audio_source.close()
amine@330 301
amine@400 302 assert (
amine@403 303 reader_data == audio_source_data
amine@403 304 ), "Unexpected data read from AudioReader with record = True"
amine@330 305
amine@403 306 def test_Recorder_record_and_rewind(self):
amine@403 307 recorder = Recorder(input=self.audio_source, block_dur=0.02)
amine@403 308 # read 0.02 * 10 = 0.2 sec. of data
amine@403 309 for i in range(10):
amine@403 310 recorder.read()
amine@403 311
amine@403 312 recorder.rewind()
amine@403 313
amine@403 314 # read all available data after rewind
amine@403 315 recorder_data = []
amine@403 316 recorder_data = _read_all_data(recorder)
amine@403 317
amine@403 318 audio_source = WaveAudioSource(
amine@403 319 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
amine@403 320 )
amine@403 321 audio_source.open()
amine@403 322 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data
amine@403 323 audio_source.close()
amine@403 324
amine@403 325 assert (
amine@403 326 recorder_data == audio_source_data
amine@403 327 ), "Unexpected data read from Recorder"
amine@403 328
amine@403 329 def test_read_overlapping_blocks(self):
amine@330 330 # Use arbitrary valid block_size and hop_size
amine@330 331 block_size = 1714
amine@330 332 hop_size = 313
amine@403 333 block_dur = block_size / self.audio_source.sampling_rate
amine@403 334 hop_dur = hop_size / self.audio_source.sampling_rate
amine@330 335
amine@403 336 reader = AudioReader(
amine@403 337 input=self.audio_source,
amine@403 338 block_dur=block_dur,
amine@403 339 hop_dur=hop_dur,
amine@330 340 )
amine@330 341
amine@403 342 # Read all available overlapping blocks of data
amine@403 343 reader_data = []
amine@330 344 while True:
amine@403 345 block = reader.read()
amine@330 346 if block is None:
amine@330 347 break
amine@403 348 reader_data.append(block)
amine@330 349
amine@330 350 # Read all data from file and build a BufferAudioSource
amine@330 351 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
amine@330 352 wave_data = fp.readframes(fp.getnframes())
amine@330 353 fp.close()
amine@330 354 audio_source = BufferAudioSource(
amine@403 355 wave_data,
amine@403 356 reader.sampling_rate,
amine@403 357 reader.sample_width,
amine@403 358 reader.channels,
amine@330 359 )
amine@330 360 audio_source.open()
amine@330 361
amine@403 362 # Compare all blocks read from OverlapADS to those read from an
amine@403 363 # audio source with a manual position setting
amine@403 364 for i, block in enumerate(reader_data):
amine@330 365 tmp = audio_source.read(block_size)
amine@400 366 assert (
amine@400 367 block == tmp
amine@403 368 ), f"Unexpected data (block {i}) from reader with overlapping blocks"
amine@330 369 audio_source.position = (i + 1) * hop_size
amine@330 370
amine@330 371 audio_source.close()
amine@330 372
amine@403 373 def test_read_overlapping_blocks_with_max_read(self):
amine@330 374 block_size = 256
amine@330 375 hop_size = 200
amine@403 376 block_dur = block_size / self.audio_source.sampling_rate
amine@403 377 hop_dur = hop_size / self.audio_source.sampling_rate
amine@330 378
amine@403 379 reader = AudioReader(
amine@403 380 input=self.audio_source,
amine@403 381 block_dur=block_dur,
amine@403 382 hop_dur=hop_dur,
amine@403 383 max_read=0.5,
amine@330 384 )
amine@330 385
amine@403 386 # Read all available overlapping blocks of data
amine@403 387 reader_data = []
amine@330 388 while True:
amine@403 389 block = reader.read()
amine@330 390 if block is None:
amine@330 391 break
amine@403 392 reader_data.append(block)
amine@330 393
amine@330 394 # Read all data from file and build a BufferAudioSource
amine@330 395 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
amine@330 396 wave_data = fp.readframes(fp.getnframes())
amine@330 397 fp.close()
amine@330 398 audio_source = BufferAudioSource(
amine@403 399 wave_data,
amine@403 400 reader.sampling_rate,
amine@403 401 reader.sample_width,
amine@403 402 reader.channels,
amine@330 403 )
amine@330 404 audio_source.open()
amine@330 405
amine@403 406 # Compare all blocks read from OverlapADS to those read from an
amine@403 407 # audio source with a manual position setting
amine@403 408 for i, block in enumerate(reader_data):
amine@403 409 tmp = audio_source.read(len(block) // (reader.sw * reader.ch))
amine@403 410 assert (
amine@403 411 block == tmp
amine@403 412 ), f"Unexpected data (block {i}) from reader with overlapping blocks and max_read"
amine@330 413 audio_source.position = (i + 1) * hop_size
amine@330 414
amine@330 415 audio_source.close()
amine@330 416
amine@403 417 def test_length_read_overlapping_blocks_with_max_read(self):
amine@330 418 block_size = 313
amine@330 419 hop_size = 207
amine@403 420 block_dur = block_size / self.audio_source.sampling_rate
amine@403 421 hop_dur = hop_size / self.audio_source.sampling_rate
amine@403 422
amine@403 423 reader = AudioReader(
amine@403 424 input=self.audio_source,
amine@403 425 max_read=1.932,
amine@403 426 block_dur=block_dur,
amine@403 427 hop_dur=hop_dur,
amine@330 428 )
amine@330 429
amine@403 430 total_samples = round(reader.sampling_rate * 1.932)
amine@330 431 first_read_size = block_size
amine@330 432 next_read_size = block_size - hop_size
amine@330 433 nb_next_blocks, last_block_size = divmod(
amine@330 434 (total_samples - first_read_size), next_read_size
amine@330 435 )
amine@330 436 total_samples_with_overlap = (
amine@330 437 first_read_size + next_read_size * nb_next_blocks + last_block_size
amine@330 438 )
amine@403 439 expected_read_bytes = (
amine@403 440 total_samples_with_overlap * reader.sw * reader.channels
amine@403 441 )
amine@330 442
amine@403 443 cache_size = (
amine@403 444 (block_size - hop_size) * reader.sample_width * reader.channels
amine@403 445 )
amine@330 446 total_read = cache_size
amine@330 447
amine@330 448 i = 0
amine@330 449 while True:
amine@403 450 block = reader.read()
amine@330 451 if block is None:
amine@330 452 break
amine@330 453 i += 1
amine@330 454 total_read += len(block) - cache_size
amine@330 455
amine@400 456 err_msg = (
amine@400 457 "Wrong data length read from LimiterADS, expected: {0}, found: {1}"
amine@400 458 )
amine@400 459 assert total_read == expected_read_bytes, err_msg.format(
amine@400 460 expected_read_bytes, total_read
amine@330 461 )
amine@330 462
amine@403 463 def test_reader_with_overlapping_blocks__rewindable(self):
amine@403 464 reader = AudioReader(
amine@403 465 input=self.audio_source,
amine@403 466 block_dur=320,
amine@403 467 hop_dur=160,
amine@330 468 record=True,
amine@330 469 )
amine@403 470 assert (
amine@403 471 reader.rewindable
amine@403 472 ), "AudioReader with record=True should be rewindable"
amine@330 473
amine@403 474 def test_overlapping_blocks_with_max_read_rewind_and_read(self):
amine@330 475 # Use arbitrary valid block_size and hop_size
amine@330 476 block_size = 1600
amine@330 477 hop_size = 400
amine@403 478 block_dur = block_size / self.audio_source.sampling_rate
amine@403 479 hop_dur = hop_size / self.audio_source.sampling_rate
amine@330 480
amine@403 481 reader = AudioReader(
amine@403 482 input=self.audio_source,
amine@403 483 block_dur=block_dur,
amine@403 484 hop_dur=hop_dur,
amine@330 485 record=True,
amine@330 486 )
amine@330 487
amine@330 488 # Read all available data overlapping blocks
amine@330 489 i = 0
amine@330 490 while True:
amine@403 491 block = reader.read()
amine@330 492 if block is None:
amine@330 493 break
amine@330 494 i += 1
amine@330 495
amine@403 496 reader.rewind()
amine@330 497
amine@330 498 # Read all data from file and build a BufferAudioSource
amine@330 499 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
amine@330 500 wave_data = fp.readframes(fp.getnframes())
amine@330 501 fp.close()
amine@330 502 audio_source = BufferAudioSource(
amine@403 503 wave_data,
amine@403 504 reader.sampling_rate,
amine@403 505 reader.sample_width,
amine@403 506 reader.channels,
amine@330 507 )
amine@330 508 audio_source.open()
amine@330 509
amine@403 510 # Compare blocks read from AudioReader to those read from an BufferAudioSource with manual position setting
amine@330 511 for j in range(i):
amine@330 512 tmp = audio_source.read(block_size)
amine@400 513 assert (
amine@403 514 reader.read() == tmp
amine@403 515 ), f"Unexpected data (block {i}) from reader with overlapping blocks and record = True"
amine@330 516 audio_source.position = (j + 1) * hop_size
amine@330 517
amine@330 518 audio_source.close()
amine@330 519
amine@403 520 def test_overlapping_blocks_with_record_and_max_read_rewind_and_read(self):
amine@330 521 # Use arbitrary valid block_size and hop_size
amine@330 522 block_size = 1600
amine@330 523 hop_size = 400
amine@403 524 block_dur = block_size / self.audio_source.sampling_rate
amine@403 525 hop_dur = hop_size / self.audio_source.sampling_rate
amine@330 526
amine@403 527 reader = AudioReader(
amine@403 528 input=self.audio_source,
amine@330 529 max_time=1.50,
amine@403 530 block_dur=block_dur,
amine@403 531 hop_dur=hop_dur,
amine@330 532 record=True,
amine@330 533 )
amine@330 534
amine@330 535 # Read all available data overlapping blocks
amine@330 536 i = 0
amine@330 537 while True:
amine@403 538 block = reader.read()
amine@330 539 if block is None:
amine@330 540 break
amine@330 541 i += 1
amine@330 542
amine@403 543 reader.rewind()
amine@330 544
amine@330 545 # Read all data from file and build a BufferAudioSource
amine@330 546 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
amine@330 547 wave_data = fp.readframes(fp.getnframes())
amine@330 548 fp.close()
amine@330 549 audio_source = BufferAudioSource(
amine@403 550 wave_data,
amine@403 551 reader.sampling_rate,
amine@403 552 reader.sample_width,
amine@403 553 reader.channels,
amine@330 554 )
amine@330 555 audio_source.open()
amine@330 556
amine@403 557 # Compare all blocks read from AudioReader to those read from BufferAudioSource with a manual position setting
amine@330 558 for j in range(i):
amine@330 559 tmp = audio_source.read(block_size)
amine@400 560 assert (
amine@403 561 reader.read() == tmp
amine@400 562 ), "Unexpected block (N={0}) read from OverlapADS".format(i)
amine@330 563 audio_source.position = (j + 1) * hop_size
amine@330 564
amine@330 565 audio_source.close()
amine@330 566
amine@403 567 def test_length_read_overlapping_blocks_with_record_and_max_read(self):
amine@330 568 # Use arbitrary valid block_size and hop_size
amine@330 569 block_size = 1000
amine@330 570 hop_size = 200
amine@403 571 block_dur = block_size / self.audio_source.sampling_rate
amine@403 572 hop_dur = hop_size / self.audio_source.sampling_rate
amine@330 573
amine@403 574 reader = AudioReader(
amine@403 575 input=self.audio_source,
amine@403 576 block_dur=block_dur,
amine@403 577 hop_dur=hop_dur,
amine@330 578 record=True,
amine@403 579 max_read=1.317,
amine@330 580 )
amine@403 581 total_samples = round(reader.sampling_rate * 1.317)
amine@330 582 first_read_size = block_size
amine@330 583 next_read_size = block_size - hop_size
amine@330 584 nb_next_blocks, last_block_size = divmod(
amine@330 585 (total_samples - first_read_size), next_read_size
amine@330 586 )
amine@330 587 total_samples_with_overlap = (
amine@330 588 first_read_size + next_read_size * nb_next_blocks + last_block_size
amine@330 589 )
amine@403 590 expected_read_bytes = (
amine@403 591 total_samples_with_overlap * reader.sample_width * reader.channels
amine@403 592 )
amine@330 593
amine@403 594 cache_size = (
amine@403 595 (block_size - hop_size) * reader.sample_width * reader.channels
amine@403 596 )
amine@330 597 total_read = cache_size
amine@330 598
amine@330 599 i = 0
amine@330 600 while True:
amine@403 601 block = reader.read()
amine@330 602 if block is None:
amine@330 603 break
amine@330 604 i += 1
amine@330 605 total_read += len(block) - cache_size
amine@330 606
amine@403 607 err_msg = f"Wrong data length read from AudioReader, expected: {expected_read_bytes}, found: {total_read}"
amine@403 608 assert total_read == expected_read_bytes, err_msg
amine@330 609
amine@330 610
amine@403 611 def test_AudioReader_raw_data():
amine@330 612
amine@403 613 data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
amine@403 614 block_size = 5
amine@403 615 hop_size = 4
amine@403 616 reader = AudioReader(
amine@403 617 input=data,
amine@403 618 sampling_rate=16,
amine@403 619 sample_width=2,
amine@403 620 channels=1,
amine@403 621 block_dur=block_size / 16,
amine@403 622 hop_dur=hop_size / 16,
amine@403 623 max_read=0.80,
amine@403 624 record=True,
amine@403 625 )
amine@403 626 reader.open()
amine@403 627
amine@403 628 assert (
amine@403 629 reader.sampling_rate == 16
amine@403 630 ), f"Wrong sampling rate, expected: 16, found: {reader.sampling_rate }"
amine@403 631
amine@403 632 assert (
amine@403 633 reader.sample_width == 2
amine@403 634 ), f"Wrong sample width, expected: 2, found: {reader.sample_width}"
amine@403 635
amine@403 636 # Read all available data overlapping blocks
amine@403 637 i = 0
amine@403 638 while True:
amine@403 639 block = reader.read()
amine@403 640 if block is None:
amine@403 641 break
amine@403 642 i += 1
amine@403 643
amine@403 644 reader.rewind()
amine@403 645
amine@403 646 # Build a BufferAudioSource
amine@403 647 audio_source = BufferAudioSource(
amine@403 648 data, reader.sampling_rate, reader.sample_width, reader.channels
amine@403 649 )
amine@403 650 audio_source.open()
amine@403 651
amine@403 652 # Compare all blocks read from AudioReader to those read from an audio
amine@403 653 # source with a manual position setting
amine@403 654 for j in range(i):
amine@403 655 tmp = audio_source.read(block_size)
amine@403 656 block = reader.read()
amine@400 657 assert (
amine@403 658 block == tmp
amine@403 659 ), f"Unexpected block '{block}' (N={i}) read from OverlapADS"
amine@403 660 audio_source.position = (j + 1) * hop_size
amine@403 661 audio_source.close()
amine@403 662 reader.close()
amine@330 663
amine@330 664
amine@403 665 def test_AudioReader_alias_params():
amine@403 666 reader = AudioReader(
amine@403 667 input=b"0" * 1600,
amine@403 668 sr=16000,
amine@403 669 sw=2,
amine@403 670 channels=1,
amine@403 671 )
amine@403 672 assert reader.sampling_rate == 16000, (
amine@403 673 "Unexpected sampling rate: reader.sampling_rate = "
amine@403 674 + f"{reader.sampling_rate} instead of 16000"
amine@403 675 )
amine@403 676 assert reader.sr == 16000, (
amine@403 677 "Unexpected sampling rate: reader.sr = "
amine@403 678 + f"{reader.sr} instead of 16000"
amine@403 679 )
amine@403 680 assert reader.sample_width == 2, (
amine@403 681 "Unexpected sample width: reader.sample_width = "
amine@403 682 + f"{reader.sample_width} instead of 2"
amine@403 683 )
amine@403 684 assert reader.sw == 2, (
amine@403 685 "Unexpected sample width: reader.sw = " + f"{reader.sw} instead of 2"
amine@403 686 )
amine@403 687 assert reader.channels == 1, (
amine@403 688 "Unexpected number of channels: reader.channels = "
amine@403 689 + f"{reader.channels} instead of 1"
amine@403 690 )
amine@403 691 assert reader.ch == 1, (
amine@403 692 "Unexpected number of channels: reader.ch = "
amine@403 693 + f"{reader.ch} instead of 1"
amine@403 694 )
amine@330 695
amine@330 696
amine@400 697 @pytest.mark.parametrize(
amine@400 698 "file_id, max_read, size",
amine@400 699 [
amine@400 700 ("mono_400", 0.5, 16000), # mono
amine@400 701 ("3channel_400-800-1600", 0.5, 16000 * 3), # multichannel
amine@400 702 ],
amine@400 703 ids=["mono", "multichannel"],
amine@400 704 )
amine@400 705 def test_Limiter(file_id, max_read, size):
amine@400 706 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
amine@400 707 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
amine@400 708 with open(input_raw, "rb") as fp:
amine@400 709 expected = fp.read(size)
amine@330 710
amine@400 711 reader = AudioReader(input_wav, block_dur=0.1, max_read=max_read)
amine@400 712 reader.open()
amine@400 713 data = _read_all_data(reader)
amine@400 714 reader.close()
amine@400 715 assert data == expected
amine@330 716
amine@330 717
amine@400 718 @pytest.mark.parametrize(
amine@400 719 "file_id",
amine@400 720 [
amine@400 721 "mono_400", # mono
amine@400 722 "3channel_400-800-1600", # multichannel
amine@400 723 ],
amine@400 724 ids=["mono", "multichannel"],
amine@400 725 )
amine@400 726 def test_Recorder(file_id):
amine@400 727 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
amine@400 728 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
amine@400 729 with open(input_raw, "rb") as fp:
amine@400 730 expected = fp.read()
amine@400 731
amine@400 732 reader = AudioReader(input_wav, block_dur=0.1, record=True)
amine@400 733 reader.open()
amine@400 734 data = _read_all_data(reader)
amine@400 735 assert data == expected
amine@400 736
amine@400 737 # rewind many times
amine@400 738 for _ in range(3):
amine@400 739 reader.rewind()
amine@330 740 data = _read_all_data(reader)
amine@400 741 assert data == expected
amine@400 742 assert data == reader.data
amine@400 743 reader.close()
amine@330 744
amine@330 745
amine@400 746 @pytest.mark.parametrize(
amine@400 747 "file_id",
amine@400 748 [
amine@400 749 "mono_400", # mono
amine@400 750 "3channel_400-800-1600", # multichannel
amine@400 751 ],
amine@400 752 ids=["mono", "multichannel"],
amine@400 753 )
amine@400 754 def test_Recorder_alias(file_id):
amine@400 755 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
amine@400 756 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
amine@400 757 with open(input_raw, "rb") as fp:
amine@400 758 expected = fp.read()
amine@400 759
amine@400 760 reader = Recorder(input_wav, block_dur=0.1)
amine@400 761 reader.open()
amine@400 762 data = _read_all_data(reader)
amine@400 763 assert data == expected
amine@400 764
amine@400 765 # rewind many times
amine@400 766 for _ in range(3):
amine@400 767 reader.rewind()
amine@330 768 data = _read_all_data(reader)
amine@400 769 assert data == expected
amine@400 770 assert data == reader.data
amine@400 771 reader.close()