Mercurial > hg > auditok
comparison tests/test_AudioReader.py @ 403:996948ada980
Update tests
author | Amine Sehili <amine.sehili@gmail.com> |
---|---|
date | Sun, 26 May 2024 22:43:08 +0200 |
parents | 323d59b404a2 |
children | 9c9112e23c1c |
comparison
equal
deleted
inserted
replaced
402:954c1e279068 | 403:996948ada980 |
---|---|
1 import pytest | |
2 from functools import partial | |
3 import sys | 1 import sys |
4 import wave | 2 import wave |
3 from functools import partial | |
4 | |
5 import pytest | |
6 | |
5 from auditok import ( | 7 from auditok import ( |
8 AudioReader, | |
9 BufferAudioSource, | |
10 Recorder, | |
11 WaveAudioSource, | |
6 dataset, | 12 dataset, |
7 ADSFactory, | |
8 AudioDataSource, | |
9 AudioReader, | |
10 Recorder, | |
11 BufferAudioSource, | |
12 WaveAudioSource, | |
13 DuplicateArgument, | |
14 ) | 13 ) |
15 | 14 from auditok.util import _Limiter, _OverlapAudioReader |
16 | |
17 class TestADSFactoryFileAudioSource: | |
18 def setup_method(self): | |
19 self.audio_source = WaveAudioSource( | |
20 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
21 ) | |
22 | |
23 def test_ADS_type(self): | |
24 ads = ADSFactory.ads(audio_source=self.audio_source) | |
25 err_msg = ( | |
26 "wrong type for ads object, expected: 'AudioDataSource', found: {0}" | |
27 ) | |
28 assert isinstance(ads, AudioDataSource), err_msg.format(type(ads)) | |
29 | |
30 def test_default_block_size(self): | |
31 ads = ADSFactory.ads(audio_source=self.audio_source) | |
32 size = ads.block_size | |
33 assert ( | |
34 size == 160 | |
35 ), "Wrong default block_size, expected: 160, found: {0}".format(size) | |
36 | |
37 def test_block_size(self): | |
38 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=512) | |
39 size = ads.block_size | |
40 assert ( | |
41 size == 512 | |
42 ), "Wrong block_size, expected: 512, found: {0}".format(size) | |
43 | |
44 # with alias keyword | |
45 ads = ADSFactory.ads(audio_source=self.audio_source, bs=160) | |
46 size = ads.block_size | |
47 assert ( | |
48 size == 160 | |
49 ), "Wrong block_size, expected: 160, found: {0}".format(size) | |
50 | |
51 def test_block_duration(self): | |
52 ads = ADSFactory.ads( | |
53 audio_source=self.audio_source, block_dur=0.01 | |
54 ) # 10 ms | |
55 size = ads.block_size | |
56 assert ( | |
57 size == 160 | |
58 ), "Wrong block_size, expected: 160, found: {0}".format(size) | |
59 | |
60 # with alias keyword | |
61 ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025) # 25 ms | |
62 size = ads.block_size | |
63 assert ( | |
64 size == 400 | |
65 ), "Wrong block_size, expected: 400, found: {0}".format(size) | |
66 | |
67 def test_hop_duration(self): | |
68 ads = ADSFactory.ads( | |
69 audio_source=self.audio_source, block_dur=0.02, hop_dur=0.01 | |
70 ) # 10 ms | |
71 size = ads.hop_size | |
72 assert size == 160, "Wrong hop_size, expected: 160, found: {0}".format( | |
73 size | |
74 ) | |
75 | |
76 # with alias keyword | |
77 ads = ADSFactory.ads( | |
78 audio_source=self.audio_source, bd=0.025, hop_dur=0.015 | |
79 ) # 15 ms | |
80 size = ads.hop_size | |
81 assert ( | |
82 size == 240 | |
83 ), "Wrong block_size, expected: 240, found: {0}".format(size) | |
84 | |
85 def test_sampling_rate(self): | |
86 ads = ADSFactory.ads(audio_source=self.audio_source) | |
87 srate = ads.sampling_rate | |
88 assert ( | |
89 srate == 16000 | |
90 ), "Wrong sampling rate, expected: 16000, found: {0}".format(srate) | |
91 | |
92 def test_sample_width(self): | |
93 ads = ADSFactory.ads(audio_source=self.audio_source) | |
94 swidth = ads.sample_width | |
95 assert ( | |
96 swidth == 2 | |
97 ), "Wrong sample width, expected: 2, found: {0}".format(swidth) | |
98 | |
99 def test_channels(self): | |
100 ads = ADSFactory.ads(audio_source=self.audio_source) | |
101 channels = ads.channels | |
102 assert ( | |
103 channels == 1 | |
104 ), "Wrong number of channels, expected: 1, found: {0}".format(channels) | |
105 | |
106 def test_read(self): | |
107 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=256) | |
108 ads.open() | |
109 ads_data = ads.read() | |
110 ads.close() | |
111 | |
112 audio_source = WaveAudioSource( | |
113 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
114 ) | |
115 audio_source.open() | |
116 audio_source_data = audio_source.read(256) | |
117 audio_source.close() | |
118 | |
119 assert ads_data == audio_source_data, "Unexpected data read from ads" | |
120 | |
121 def test_Limiter_Deco_read(self): | |
122 # read a maximum of 0.75 seconds from audio source | |
123 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.75) | |
124 ads_data = [] | |
125 ads.open() | |
126 while True: | |
127 block = ads.read() | |
128 if block is None: | |
129 break | |
130 ads_data.append(block) | |
131 ads.close() | |
132 ads_data = b"".join(ads_data) | |
133 | |
134 audio_source = WaveAudioSource( | |
135 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
136 ) | |
137 audio_source.open() | |
138 audio_source_data = audio_source.read(int(16000 * 0.75)) | |
139 audio_source.close() | |
140 | |
141 assert ( | |
142 ads_data == audio_source_data | |
143 ), "Unexpected data read from LimiterADS" | |
144 | |
145 def test_Limiter_Deco_read_limit(self): | |
146 # read a maximum of 1.191 seconds from audio source | |
147 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1.191) | |
148 total_samples = round(ads.sampling_rate * 1.191) | |
149 nb_full_blocks, last_block_size = divmod(total_samples, ads.block_size) | |
150 total_samples_with_overlap = ( | |
151 nb_full_blocks * ads.block_size + last_block_size | |
152 ) | |
153 expected_read_bytes = total_samples_with_overlap * ads.sw * ads.channels | |
154 | |
155 total_read = 0 | |
156 ads.open() | |
157 i = 0 | |
158 while True: | |
159 block = ads.read() | |
160 if block is None: | |
161 break | |
162 i += 1 | |
163 total_read += len(block) | |
164 | |
165 ads.close() | |
166 err_msg = ( | |
167 "Wrong data length read from LimiterADS, expected: {0}, found: {1}" | |
168 ) | |
169 assert total_read == expected_read_bytes, err_msg.format( | |
170 expected_read_bytes, total_read | |
171 ) | |
172 | |
173 def test_Recorder_Deco_read(self): | |
174 ads = ADSFactory.ads( | |
175 audio_source=self.audio_source, record=True, block_size=500 | |
176 ) | |
177 ads_data = [] | |
178 ads.open() | |
179 for i in range(10): | |
180 block = ads.read() | |
181 if block is None: | |
182 break | |
183 ads_data.append(block) | |
184 ads.close() | |
185 ads_data = b"".join(ads_data) | |
186 | |
187 audio_source = WaveAudioSource( | |
188 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
189 ) | |
190 audio_source.open() | |
191 audio_source_data = audio_source.read(500 * 10) | |
192 audio_source.close() | |
193 | |
194 assert ( | |
195 ads_data == audio_source_data | |
196 ), "Unexpected data read from RecorderADS" | |
197 | |
198 def test_Recorder_Deco_is_rewindable(self): | |
199 ads = ADSFactory.ads(audio_source=self.audio_source, record=True) | |
200 assert ads.rewindable, "RecorderADS.is_rewindable should return True" | |
201 | |
202 def test_Recorder_Deco_rewind_and_read(self): | |
203 ads = ADSFactory.ads( | |
204 audio_source=self.audio_source, record=True, block_size=320 | |
205 ) | |
206 ads.open() | |
207 for i in range(10): | |
208 ads.read() | |
209 | |
210 ads.rewind() | |
211 | |
212 # read all available data after rewind | |
213 ads_data = [] | |
214 while True: | |
215 block = ads.read() | |
216 if block is None: | |
217 break | |
218 ads_data.append(block) | |
219 ads.close() | |
220 ads_data = b"".join(ads_data) | |
221 | |
222 audio_source = WaveAudioSource( | |
223 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
224 ) | |
225 audio_source.open() | |
226 audio_source_data = audio_source.read(320 * 10) | |
227 audio_source.close() | |
228 | |
229 assert ( | |
230 ads_data == audio_source_data | |
231 ), "Unexpected data read from RecorderADS" | |
232 | |
233 def test_Overlap_Deco_read(self): | |
234 # Use arbitrary valid block_size and hop_size | |
235 block_size = 1714 | |
236 hop_size = 313 | |
237 | |
238 ads = ADSFactory.ads( | |
239 audio_source=self.audio_source, | |
240 block_size=block_size, | |
241 hop_size=hop_size, | |
242 ) | |
243 | |
244 # Read all available data overlapping blocks | |
245 ads.open() | |
246 ads_data = [] | |
247 while True: | |
248 block = ads.read() | |
249 if block is None: | |
250 break | |
251 ads_data.append(block) | |
252 ads.close() | |
253 | |
254 # Read all data from file and build a BufferAudioSource | |
255 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
256 wave_data = fp.readframes(fp.getnframes()) | |
257 fp.close() | |
258 audio_source = BufferAudioSource( | |
259 wave_data, ads.sampling_rate, ads.sample_width, ads.channels | |
260 ) | |
261 audio_source.open() | |
262 | |
263 # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting | |
264 for i, block in enumerate(ads_data): | |
265 tmp = audio_source.read(block_size) | |
266 assert ( | |
267 block == tmp | |
268 ), "Unexpected block (N={0}) read from OverlapADS".format(i) | |
269 audio_source.position = (i + 1) * hop_size | |
270 | |
271 audio_source.close() | |
272 | |
273 def test_Limiter_Overlap_Deco_read(self): | |
274 block_size = 256 | |
275 hop_size = 200 | |
276 | |
277 ads = ADSFactory.ads( | |
278 audio_source=self.audio_source, | |
279 max_time=0.50, | |
280 block_size=block_size, | |
281 hop_size=hop_size, | |
282 ) | |
283 | |
284 # Read all available data overlapping blocks | |
285 ads.open() | |
286 ads_data = [] | |
287 while True: | |
288 block = ads.read() | |
289 if block is None: | |
290 break | |
291 ads_data.append(block) | |
292 ads.close() | |
293 | |
294 # Read all data from file and build a BufferAudioSource | |
295 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
296 wave_data = fp.readframes(fp.getnframes()) | |
297 fp.close() | |
298 audio_source = BufferAudioSource( | |
299 wave_data, ads.sampling_rate, ads.sample_width, ads.channels | |
300 ) | |
301 audio_source.open() | |
302 | |
303 # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting | |
304 for i, block in enumerate(ads_data): | |
305 tmp = audio_source.read(len(block) // (ads.sw * ads.ch)) | |
306 assert len(block) == len( | |
307 tmp | |
308 ), "Unexpected block (N={0}) read from OverlapADS".format(i) | |
309 audio_source.position = (i + 1) * hop_size | |
310 | |
311 audio_source.close() | |
312 | |
313 def test_Limiter_Overlap_Deco_read_limit(self): | |
314 block_size = 313 | |
315 hop_size = 207 | |
316 ads = ADSFactory.ads( | |
317 audio_source=self.audio_source, | |
318 max_time=1.932, | |
319 block_size=block_size, | |
320 hop_size=hop_size, | |
321 ) | |
322 | |
323 total_samples = round(ads.sampling_rate * 1.932) | |
324 first_read_size = block_size | |
325 next_read_size = block_size - hop_size | |
326 nb_next_blocks, last_block_size = divmod( | |
327 (total_samples - first_read_size), next_read_size | |
328 ) | |
329 total_samples_with_overlap = ( | |
330 first_read_size + next_read_size * nb_next_blocks + last_block_size | |
331 ) | |
332 expected_read_bytes = total_samples_with_overlap * ads.sw * ads.channels | |
333 | |
334 cache_size = (block_size - hop_size) * ads.sample_width * ads.channels | |
335 total_read = cache_size | |
336 | |
337 ads.open() | |
338 i = 0 | |
339 while True: | |
340 block = ads.read() | |
341 if block is None: | |
342 break | |
343 i += 1 | |
344 total_read += len(block) - cache_size | |
345 | |
346 ads.close() | |
347 err_msg = ( | |
348 "Wrong data length read from LimiterADS, expected: {0}, found: {1}" | |
349 ) | |
350 assert total_read == expected_read_bytes, err_msg.format( | |
351 expected_read_bytes, total_read | |
352 ) | |
353 | |
354 def test_Recorder_Overlap_Deco_is_rewindable(self): | |
355 ads = ADSFactory.ads( | |
356 audio_source=self.audio_source, | |
357 block_size=320, | |
358 hop_size=160, | |
359 record=True, | |
360 ) | |
361 assert ads.rewindable, "RecorderADS.is_rewindable should return True" | |
362 | |
363 def test_Recorder_Overlap_Deco_rewind_and_read(self): | |
364 # Use arbitrary valid block_size and hop_size | |
365 block_size = 1600 | |
366 hop_size = 400 | |
367 | |
368 ads = ADSFactory.ads( | |
369 audio_source=self.audio_source, | |
370 block_size=block_size, | |
371 hop_size=hop_size, | |
372 record=True, | |
373 ) | |
374 | |
375 # Read all available data overlapping blocks | |
376 ads.open() | |
377 i = 0 | |
378 while True: | |
379 block = ads.read() | |
380 if block is None: | |
381 break | |
382 i += 1 | |
383 | |
384 ads.rewind() | |
385 | |
386 # Read all data from file and build a BufferAudioSource | |
387 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
388 wave_data = fp.readframes(fp.getnframes()) | |
389 fp.close() | |
390 audio_source = BufferAudioSource( | |
391 wave_data, ads.sampling_rate, ads.sample_width, ads.channels | |
392 ) | |
393 audio_source.open() | |
394 | |
395 # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting | |
396 for j in range(i): | |
397 tmp = audio_source.read(block_size) | |
398 assert ( | |
399 ads.read() == tmp | |
400 ), "Unexpected block (N={0}) read from OverlapADS".format(i) | |
401 audio_source.position = (j + 1) * hop_size | |
402 | |
403 ads.close() | |
404 audio_source.close() | |
405 | |
406 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self): | |
407 # Use arbitrary valid block_size and hop_size | |
408 block_size = 1600 | |
409 hop_size = 400 | |
410 | |
411 ads = ADSFactory.ads( | |
412 audio_source=self.audio_source, | |
413 max_time=1.50, | |
414 block_size=block_size, | |
415 hop_size=hop_size, | |
416 record=True, | |
417 ) | |
418 | |
419 # Read all available data overlapping blocks | |
420 ads.open() | |
421 i = 0 | |
422 while True: | |
423 block = ads.read() | |
424 if block is None: | |
425 break | |
426 i += 1 | |
427 | |
428 ads.rewind() | |
429 | |
430 # Read all data from file and build a BufferAudioSource | |
431 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
432 wave_data = fp.readframes(fp.getnframes()) | |
433 fp.close() | |
434 audio_source = BufferAudioSource( | |
435 wave_data, ads.sampling_rate, ads.sample_width, ads.channels | |
436 ) | |
437 audio_source.open() | |
438 | |
439 # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting | |
440 for j in range(i): | |
441 tmp = audio_source.read(block_size) | |
442 assert ( | |
443 ads.read() == tmp | |
444 ), "Unexpected block (N={0}) read from OverlapADS".format(i) | |
445 audio_source.position = (j + 1) * hop_size | |
446 | |
447 ads.close() | |
448 audio_source.close() | |
449 | |
450 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_limit(self): | |
451 # Use arbitrary valid block_size and hop_size | |
452 block_size = 1000 | |
453 hop_size = 200 | |
454 | |
455 ads = ADSFactory.ads( | |
456 audio_source=self.audio_source, | |
457 max_time=1.317, | |
458 block_size=block_size, | |
459 hop_size=hop_size, | |
460 record=True, | |
461 ) | |
462 total_samples = round(ads.sampling_rate * 1.317) | |
463 first_read_size = block_size | |
464 next_read_size = block_size - hop_size | |
465 nb_next_blocks, last_block_size = divmod( | |
466 (total_samples - first_read_size), next_read_size | |
467 ) | |
468 total_samples_with_overlap = ( | |
469 first_read_size + next_read_size * nb_next_blocks + last_block_size | |
470 ) | |
471 expected_read_bytes = total_samples_with_overlap * ads.sw * ads.channels | |
472 | |
473 cache_size = (block_size - hop_size) * ads.sample_width * ads.channels | |
474 total_read = cache_size | |
475 | |
476 ads.open() | |
477 i = 0 | |
478 while True: | |
479 block = ads.read() | |
480 if block is None: | |
481 break | |
482 i += 1 | |
483 total_read += len(block) - cache_size | |
484 | |
485 ads.close() | |
486 err_msg = ( | |
487 "Wrong data length read from LimiterADS, expected: {0}, found: {1}" | |
488 ) | |
489 assert total_read == expected_read_bytes, err_msg.format( | |
490 expected_read_bytes, total_read | |
491 ) | |
492 | |
493 | |
494 class TestADSFactoryBufferAudioSource: | |
495 def setup_method(self): | |
496 self.signal = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345" | |
497 self.ads = ADSFactory.ads( | |
498 data_buffer=self.signal, | |
499 sampling_rate=16, | |
500 sample_width=2, | |
501 channels=1, | |
502 block_size=4, | |
503 ) | |
504 | |
505 def test_ADS_BAS_sampling_rate(self): | |
506 srate = self.ads.sampling_rate | |
507 assert ( | |
508 srate == 16 | |
509 ), "Wrong sampling rate, expected: 16000, found: {0}".format(srate) | |
510 | |
511 def test_ADS_BAS_sample_width(self): | |
512 swidth = self.ads.sample_width | |
513 assert ( | |
514 swidth == 2 | |
515 ), "Wrong sample width, expected: 2, found: {0}".format(swidth) | |
516 | |
517 def test_ADS_BAS_channels(self): | |
518 channels = self.ads.channels | |
519 assert ( | |
520 channels == 1 | |
521 ), "Wrong number of channels, expected: 1, found: {0}".format(channels) | |
522 | |
523 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self): | |
524 # Use arbitrary valid block_size and hop_size | |
525 block_size = 5 | |
526 hop_size = 4 | |
527 | |
528 ads = ADSFactory.ads( | |
529 data_buffer=self.signal, | |
530 sampling_rate=16, | |
531 sample_width=2, | |
532 channels=1, | |
533 max_time=0.80, | |
534 block_size=block_size, | |
535 hop_size=hop_size, | |
536 record=True, | |
537 ) | |
538 | |
539 # Read all available data overlapping blocks | |
540 ads.open() | |
541 i = 0 | |
542 while True: | |
543 block = ads.read() | |
544 if block is None: | |
545 break | |
546 i += 1 | |
547 | |
548 ads.rewind() | |
549 | |
550 # Build a BufferAudioSource | |
551 audio_source = BufferAudioSource( | |
552 self.signal, ads.sampling_rate, ads.sample_width, ads.channels | |
553 ) | |
554 audio_source.open() | |
555 | |
556 # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting | |
557 for j in range(i): | |
558 tmp = audio_source.read(block_size) | |
559 block = ads.read() | |
560 assert ( | |
561 block == tmp | |
562 ), "Unexpected block '{}' (N={}) read from OverlapADS".format( | |
563 block, i | |
564 ) | |
565 audio_source.position = (j + 1) * hop_size | |
566 | |
567 ads.close() | |
568 audio_source.close() | |
569 | |
570 | |
571 class TestADSFactoryAlias: | |
572 def setup_method(self): | |
573 self.signal = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345" | |
574 | |
575 def test_sampling_rate_alias(self): | |
576 ads = ADSFactory.ads( | |
577 data_buffer=self.signal, | |
578 sr=16, | |
579 sample_width=2, | |
580 channels=1, | |
581 block_dur=0.5, | |
582 ) | |
583 srate = ads.sampling_rate | |
584 assert ( | |
585 srate == 16 | |
586 ), "Wrong sampling rate, expected: 16000, found: {0}".format(srate) | |
587 | |
588 def test_sampling_rate_duplicate(self): | |
589 func = partial( | |
590 ADSFactory.ads, | |
591 data_buffer=self.signal, | |
592 sr=16, | |
593 sampling_rate=16, | |
594 sample_width=2, | |
595 channels=1, | |
596 ) | |
597 with pytest.raises(DuplicateArgument): | |
598 func() | |
599 | |
600 def test_sample_width_alias(self): | |
601 ads = ADSFactory.ads( | |
602 data_buffer=self.signal, | |
603 sampling_rate=16, | |
604 sw=2, | |
605 channels=1, | |
606 block_dur=0.5, | |
607 ) | |
608 swidth = ads.sample_width | |
609 assert ( | |
610 swidth == 2 | |
611 ), "Wrong sample width, expected: 2, found: {0}".format(swidth) | |
612 | |
613 def test_sample_width_duplicate(self): | |
614 func = partial( | |
615 ADSFactory.ads, | |
616 data_buffer=self.signal, | |
617 sampling_rate=16, | |
618 sw=2, | |
619 sample_width=2, | |
620 channels=1, | |
621 ) | |
622 with pytest.raises(DuplicateArgument): | |
623 func() | |
624 | |
625 def test_channels_alias(self): | |
626 ads = ADSFactory.ads( | |
627 data_buffer=self.signal, | |
628 sampling_rate=16, | |
629 sample_width=2, | |
630 ch=1, | |
631 block_dur=4, | |
632 ) | |
633 channels = ads.channels | |
634 assert ( | |
635 channels == 1 | |
636 ), "Wrong number of channels, expected: 1, found: {0}".format(channels) | |
637 | |
638 def test_channels_duplicate(self): | |
639 func = partial( | |
640 ADSFactory.ads, | |
641 data_buffer=self.signal, | |
642 sampling_rate=16, | |
643 sample_width=2, | |
644 ch=1, | |
645 channels=1, | |
646 ) | |
647 with pytest.raises(DuplicateArgument): | |
648 func() | |
649 | |
650 def test_block_size_alias(self): | |
651 ads = ADSFactory.ads( | |
652 data_buffer=self.signal, | |
653 sampling_rate=16, | |
654 sample_width=2, | |
655 channels=1, | |
656 bs=8, | |
657 ) | |
658 size = ads.block_size | |
659 assert ( | |
660 size == 8 | |
661 ), "Wrong block_size using bs alias, expected: 8, found: {0}".format( | |
662 size | |
663 ) | |
664 | |
665 def test_block_size_duplicate(self): | |
666 func = partial( | |
667 ADSFactory.ads, | |
668 data_buffer=self.signal, | |
669 sampling_rate=16, | |
670 sample_width=2, | |
671 channels=1, | |
672 bs=4, | |
673 block_size=4, | |
674 ) | |
675 with pytest.raises(DuplicateArgument): | |
676 func() | |
677 | |
678 def test_block_duration_alias(self): | |
679 ads = ADSFactory.ads( | |
680 data_buffer=self.signal, | |
681 sampling_rate=16, | |
682 sample_width=2, | |
683 channels=1, | |
684 bd=0.75, | |
685 ) | |
686 size = ads.block_size | |
687 err_msg = "Wrong block_size set with a block_dur alias 'bd', expected: 8, found: {0}" | |
688 assert size == 12, err_msg.format(size) | |
689 | |
690 def test_block_duration_duplicate(self): | |
691 func = partial( | |
692 ADSFactory.ads, | |
693 data_buffer=self.signal, | |
694 sampling_rate=16, | |
695 sample_width=2, | |
696 channels=1, | |
697 bd=4, | |
698 block_dur=4, | |
699 ) | |
700 with pytest.raises(DuplicateArgument): | |
701 func() | |
702 | |
703 def test_block_size_duration_duplicate(self): | |
704 func = partial( | |
705 ADSFactory.ads, | |
706 data_buffer=self.signal, | |
707 sampling_rate=16, | |
708 sample_width=2, | |
709 channels=1, | |
710 bd=4, | |
711 bs=12, | |
712 ) | |
713 with pytest.raises(DuplicateArgument): | |
714 func() | |
715 | |
716 def test_hop_duration_alias(self): | |
717 ads = ADSFactory.ads( | |
718 data_buffer=self.signal, | |
719 sampling_rate=16, | |
720 sample_width=2, | |
721 channels=1, | |
722 bd=0.75, | |
723 hd=0.5, | |
724 ) | |
725 size = ads.hop_size | |
726 assert ( | |
727 size == 8 | |
728 ), "Wrong block_size using bs alias, expected: 8, found: {0}".format( | |
729 size | |
730 ) | |
731 | |
732 def test_hop_duration_duplicate(self): | |
733 func = partial( | |
734 ADSFactory.ads, | |
735 data_buffer=self.signal, | |
736 sampling_rate=16, | |
737 sample_width=2, | |
738 channels=1, | |
739 bd=0.75, | |
740 hd=0.5, | |
741 hop_dur=0.5, | |
742 ) | |
743 with pytest.raises(DuplicateArgument): | |
744 func() | |
745 | |
746 def test_hop_size_duration_duplicate(self): | |
747 func = partial( | |
748 ADSFactory.ads, | |
749 data_buffer=self.signal, | |
750 sampling_rate=16, | |
751 sample_width=2, | |
752 channels=1, | |
753 bs=8, | |
754 hs=4, | |
755 hd=1, | |
756 ) | |
757 with pytest.raises(DuplicateArgument): | |
758 func() | |
759 | |
760 def test_hop_size_greater_than_block_size(self): | |
761 func = partial( | |
762 ADSFactory.ads, | |
763 data_buffer=self.signal, | |
764 sampling_rate=16, | |
765 sample_width=2, | |
766 channels=1, | |
767 bs=4, | |
768 hs=8, | |
769 ) | |
770 with pytest.raises(ValueError): | |
771 func() | |
772 | |
773 def test_filename_duplicate(self): | |
774 func = partial( | |
775 ADSFactory.ads, | |
776 fn=dataset.one_to_six_arabic_16000_mono_bc_noise, | |
777 filename=dataset.one_to_six_arabic_16000_mono_bc_noise, | |
778 ) | |
779 with pytest.raises(DuplicateArgument): | |
780 func() | |
781 | |
782 def test_data_buffer_duplicate(self): | |
783 func = partial( | |
784 ADSFactory.ads, | |
785 data_buffer=self.signal, | |
786 db=self.signal, | |
787 sampling_rate=16, | |
788 sample_width=2, | |
789 channels=1, | |
790 ) | |
791 with pytest.raises(DuplicateArgument): | |
792 func() | |
793 | |
794 def test_max_time_alias(self): | |
795 ads = ADSFactory.ads( | |
796 data_buffer=self.signal, | |
797 sampling_rate=16, | |
798 sample_width=2, | |
799 channels=1, | |
800 mt=10, | |
801 block_dur=0.5, | |
802 ) | |
803 assert ( | |
804 ads.max_read == 10 | |
805 ), "Wrong AudioDataSource.max_read, expected: 10, found: {}".format( | |
806 ads.max_read | |
807 ) | |
808 | |
809 def test_max_time_duplicate(self): | |
810 func = partial( | |
811 ADSFactory.ads, | |
812 data_buffer=self.signal, | |
813 sampling_rate=16, | |
814 sample_width=2, | |
815 channels=1, | |
816 mt=True, | |
817 max_time=True, | |
818 ) | |
819 with pytest.raises(DuplicateArgument): | |
820 func() | |
821 | |
822 def test_record_alias(self): | |
823 ads = ADSFactory.ads( | |
824 data_buffer=self.signal, | |
825 sampling_rate=16, | |
826 sample_width=2, | |
827 channels=1, | |
828 rec=True, | |
829 block_dur=0.5, | |
830 ) | |
831 assert ads.rewindable, "AudioDataSource.rewindable expected to be True" | |
832 | |
833 def test_record_duplicate(self): | |
834 func = partial( | |
835 ADSFactory.ads, | |
836 data_buffer=self.signal, | |
837 sampling_rate=16, | |
838 sample_width=2, | |
839 channels=1, | |
840 rec=True, | |
841 record=True, | |
842 ) | |
843 with pytest.raises(DuplicateArgument): | |
844 func() | |
845 | |
846 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_alias(self): | |
847 # Use arbitrary valid block_size and hop_size | |
848 block_size = 5 | |
849 hop_size = 4 | |
850 | |
851 ads = ADSFactory.ads( | |
852 db=self.signal, | |
853 sr=16, | |
854 sw=2, | |
855 ch=1, | |
856 mt=0.80, | |
857 bs=block_size, | |
858 hs=hop_size, | |
859 rec=True, | |
860 ) | |
861 | |
862 # Read all available data overlapping blocks | |
863 ads.open() | |
864 i = 0 | |
865 while True: | |
866 block = ads.read() | |
867 if block is None: | |
868 break | |
869 i += 1 | |
870 | |
871 ads.rewind() | |
872 | |
873 # Build a BufferAudioSource | |
874 audio_source = BufferAudioSource( | |
875 self.signal, ads.sampling_rate, ads.sample_width, ads.channels | |
876 ) | |
877 audio_source.open() | |
878 | |
879 # Compare all blocks read from AudioDataSource to those read from an audio source with manual position definition | |
880 for j in range(i): | |
881 tmp = audio_source.read(block_size) | |
882 block = ads.read() | |
883 assert ( | |
884 block == tmp | |
885 ), "Unexpected block (N={0}) read from OverlapADS".format(i) | |
886 audio_source.position = (j + 1) * hop_size | |
887 ads.close() | |
888 audio_source.close() | |
889 | 15 |
890 | 16 |
891 def _read_all_data(reader): | 17 def _read_all_data(reader): |
892 blocks = [] | 18 blocks = [] |
893 while True: | 19 while True: |
894 data = reader.read() | 20 data = reader.read() |
895 if data is None: | 21 if data is None: |
896 break | 22 break |
897 blocks.append(data) | 23 blocks.append(data) |
898 return b"".join(blocks) | 24 return b"".join(blocks) |
25 | |
26 | |
27 class TestAudioReaderWithFileAudioSource: | |
28 @pytest.fixture(autouse=True) | |
29 def setup_and_teardown(self): | |
30 self.audio_source = WaveAudioSource( | |
31 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
32 ) | |
33 self.audio_source.open() | |
34 yield | |
35 self.audio_source.close() | |
36 | |
37 def test_AudioReader_type(self): | |
38 reader = AudioReader(input=self.audio_source) | |
39 err_msg = ( | |
40 "wrong object type, expected: 'AudioReader', found: {0}" | |
41 ) | |
42 assert isinstance(reader, AudioReader), err_msg.format(type(reader)) | |
43 | |
44 def _test_default_block_size(self): | |
45 reader = AudioReader(input=self.audio_source) | |
46 data = reader.read() | |
47 size = len(data) | |
48 assert ( | |
49 size == 160 | |
50 ), "Wrong default block_size, expected: 160, found: {0}".format(size) | |
51 | |
52 @pytest.mark.parametrize( | |
53 "block_dur, expected_nb_samples", | |
54 [ | |
55 (None, 160), # default: 10 ms | |
56 (0.025, 400), # 25 ms | |
57 ], | |
58 ids=["default", "_25ms"], | |
59 ) | |
60 def test_block_duration(self, block_dur, expected_nb_samples): | |
61 """Test the number of samples read for a given block duration.""" | |
62 if block_dur is not None: | |
63 reader = AudioReader(input=self.audio_source, block_dur=block_dur) | |
64 else: | |
65 reader = AudioReader(input=self.audio_source) | |
66 data = reader.read() | |
67 nb_samples = len(data) // reader.sample_width | |
68 assert ( | |
69 nb_samples == expected_nb_samples | |
70 ), f"Wrong block_size, expected: {expected_nb_samples}, found: {nb_samples}" | |
71 | |
72 @pytest.mark.parametrize( | |
73 "block_dur, hop_dur, expected_nb_blocks, expected_last_block_nb_samples", | |
74 [ | |
75 (None, None, 1879, 126), # default: 10 ms | |
76 (0.01, None, 1879, 126), # block_dur_10ms_hop_dur_None | |
77 (0.01, 0.01, 1879, 126), # block_dur_10ms_hop_dur_10ms | |
78 (0.02, None, 940, 126), # block_dur_20ms_hop_dur_None | |
79 (0.025, None, 752, 206), # block_dur_25ms_hop_dur_None | |
80 (0.02, 0.01, 1878, 286), # block_dur_20ms_hop_dur_10ms | |
81 (0.025, 0.005, 3754, 366), # block_dur_25ms_hop_dur_None | |
82 ], | |
83 ids=[ | |
84 "default", | |
85 "block_dur_10ms_hop_dur_None", | |
86 "block_dur_10ms_hop_dur_100ms", | |
87 "block_dur_20ms_hop_dur_None", | |
88 "block_dur_25ms_hop_dur_None", | |
89 "block_dur_20ms_hop_dur_10ms", | |
90 "block_dur_25ms_hop_dur_None", | |
91 ], | |
92 ) | |
93 def test_hop_duration( | |
94 self, | |
95 block_dur, | |
96 hop_dur, | |
97 expected_nb_blocks, | |
98 expected_last_block_nb_samples, | |
99 ): | |
100 """Test the number of read blocks and the duration of last block for | |
101 different 'block_dur' and 'hop_dur' values. | |
102 | |
103 Args: | |
104 block_dur (float or None): block duration in seconds. | |
105 hop_dur (float or None): hop duration in seconds. | |
106 expected_nb_blocks (int): expected number of read block. | |
107 expected_last_block_nb_samples (int): expected number of sample | |
108 in the last block. | |
109 """ | |
110 if block_dur is not None: | |
111 reader = AudioReader( | |
112 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur | |
113 ) | |
114 else: | |
115 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur) | |
116 | |
117 nb_blocks = 0 | |
118 last_block_nb_samples = None | |
119 while True: | |
120 data = reader.read() | |
121 if data is not None: | |
122 nb_blocks += 1 | |
123 last_block_nb_samples = len(data) // reader.sample_width | |
124 else: | |
125 break | |
126 err_msg = "Wrong number of blocks read from source, expected: " | |
127 err_msg += f"{expected_nb_blocks}, found: {nb_blocks}" | |
128 assert nb_blocks == expected_nb_blocks, err_msg | |
129 | |
130 err_msg = ( | |
131 "Wrong number of samples in last block read from source, expected: " | |
132 ) | |
133 err_msg += ( | |
134 f"{expected_last_block_nb_samples}, found: {last_block_nb_samples}" | |
135 ) | |
136 | |
137 assert last_block_nb_samples == expected_last_block_nb_samples, err_msg | |
138 | |
139 def test_hop_duration_exception(self): | |
140 """Test passing hop_dur > block_dur raises ValueError""" | |
141 with pytest.raises(ValueError): | |
142 AudioReader(self.audio_source, block_dur=0.01, hop_dur=0.015) | |
143 | |
144 @pytest.mark.parametrize( | |
145 "block_dur, hop_dur", | |
146 [ | |
147 (None, None), # default | |
148 (0.01, None), # block_dur_10ms_hop_dur_None | |
149 (None, 0.01), # block_dur_None__hop_dur_10ms | |
150 (0.05, 0.05), # block_dur_50ms_hop_dur_50ms | |
151 ], | |
152 ids=[ | |
153 "default", | |
154 "block_dur_10ms_hop_dur_None", | |
155 "block_dur_None__hop_dur_10ms", | |
156 "block_dur_50ms_hop_dur_50ms", | |
157 ], | |
158 ) | |
159 def test_reader_class_block_dur_equals_hop_dur(self, block_dur, hop_dur): | |
160 """Test passing hop_dur == block_dur does not create an instance of | |
161 '_OverlapAudioReader'. | |
162 """ | |
163 if block_dur is not None: | |
164 reader = AudioReader( | |
165 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur | |
166 ) | |
167 else: | |
168 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur) | |
169 assert not isinstance(reader, _OverlapAudioReader) | |
170 | |
171 def test_sampling_rate(self): | |
172 reader = AudioReader(input=self.audio_source) | |
173 sampling_rate = reader.sampling_rate | |
174 assert ( | |
175 sampling_rate == 16000 | |
176 ), f"Wrong sampling rate, expected: 16000, found: {sampling_rate}" | |
177 | |
178 def test_sample_width(self): | |
179 reader = AudioReader(input=self.audio_source) | |
180 sample_width = reader.sample_width | |
181 assert ( | |
182 sample_width == 2 | |
183 ), f"Wrong sample width, expected: 2, found: {sample_width}" | |
184 | |
185 def test_channels(self): | |
186 reader = AudioReader(input=self.audio_source) | |
187 channels = reader.channels | |
188 assert ( | |
189 channels == 1 | |
190 ), f"Wrong number of channels, expected: 1, found: {channels}" | |
191 | |
192 def test_read(self): | |
193 reader = AudioReader(input=self.audio_source, block_dur=0.02) | |
194 reader_data = reader.read() | |
195 audio_source = WaveAudioSource( | |
196 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
197 ) | |
198 audio_source.open() | |
199 audio_source_data = audio_source.read(320) | |
200 audio_source.close() | |
201 assert ( | |
202 reader_data == audio_source_data | |
203 ), "Unexpected data read from AudioReader" | |
204 | |
205 def test_read_with_overlap(self): | |
206 reader = AudioReader( | |
207 input=self.audio_source, block_dur=0.02, hop_dur=0.01 | |
208 ) | |
209 _ = reader.read() # first block | |
210 reader_data = reader.read() # second block with 0.01 S overlap | |
211 audio_source = WaveAudioSource( | |
212 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
213 ) | |
214 audio_source.open() | |
215 _ = audio_source.read(160) | |
216 audio_source_data = audio_source.read(320) | |
217 audio_source.close() | |
218 assert ( | |
219 reader_data == audio_source_data | |
220 ), "Unexpected data read from AudioReader" | |
221 | |
222 def test_read_from_AudioReader_with_max_read(self): | |
223 # read a maximum of 0.75 seconds from audio source | |
224 reader = AudioReader(input=self.audio_source, max_read=0.75) | |
225 assert isinstance(reader._audio_source._audio_source, _Limiter) | |
226 reader_data = _read_all_data(reader) | |
227 | |
228 audio_source = WaveAudioSource( | |
229 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
230 ) | |
231 audio_source.open() | |
232 audio_source_data = audio_source.read(int(16000 * 0.75)) | |
233 audio_source.close() | |
234 | |
235 assert ( | |
236 reader_data == audio_source_data | |
237 ), f"Unexpected data read from AudioReader with 'max_read = {0.75}'" | |
238 | |
239 def test_read_data_size_from_AudioReader_with_max_read(self): | |
240 # read a maximum of 1.191 seconds from audio source | |
241 reader = AudioReader(input=self.audio_source, max_read=1.191) | |
242 assert isinstance(reader._audio_source._audio_source, _Limiter) | |
243 total_samples = round(reader.sampling_rate * 1.191) | |
244 block_size = int(reader.block_dur * reader.sampling_rate) | |
245 nb_full_blocks, last_block_size = divmod(total_samples, block_size) | |
246 total_samples_with_overlap = ( | |
247 nb_full_blocks * block_size + last_block_size | |
248 ) | |
249 expected_read_bytes = ( | |
250 total_samples_with_overlap * reader.sample_width * reader.channels | |
251 ) | |
252 | |
253 reader_data = _read_all_data(reader) | |
254 total_read = len(reader_data) | |
255 err_msg = f"Wrong data length read from LimiterADS, expected: {expected_read_bytes}, found: {total_read}" | |
256 assert total_read == expected_read_bytes, err_msg | |
257 | |
258 def test_read_from_Recorder(self): | |
259 reader = Recorder(input=self.audio_source, block_dur=0.025) | |
260 reader_data = [] | |
261 for _ in range(10): | |
262 block = reader.read() | |
263 if block is None: | |
264 break | |
265 reader_data.append(block) | |
266 reader_data = b"".join(reader_data) | |
267 | |
268 audio_source = WaveAudioSource( | |
269 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
270 ) | |
271 audio_source.open() | |
272 audio_source_data = audio_source.read(400 * 10) | |
273 audio_source.close() | |
274 | |
275 assert ( | |
276 reader_data == audio_source_data | |
277 ), "Unexpected data read from Recorder" | |
278 | |
279 def test_AudioReader_rewindable(self): | |
280 reader = AudioReader(input=self.audio_source, record=True) | |
281 assert ( | |
282 reader.rewindable | |
283 ), "AudioReader with record=True should be rewindable" | |
284 | |
285 def test_AudioReader_record_and_rewind(self): | |
286 reader = AudioReader( | |
287 input=self.audio_source, record=True, block_dur=0.02 | |
288 ) | |
289 # read 0.02 * 10 = 0.2 sec. of data | |
290 for i in range(10): | |
291 reader.read() | |
292 reader.rewind() | |
293 | |
294 # read all available data after rewind | |
295 reader_data = _read_all_data(reader) | |
296 | |
297 audio_source = WaveAudioSource( | |
298 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
299 ) | |
300 audio_source.open() | |
301 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data | |
302 audio_source.close() | |
303 | |
304 assert ( | |
305 reader_data == audio_source_data | |
306 ), "Unexpected data read from AudioReader with record = True" | |
307 | |
308 def test_Recorder_record_and_rewind(self): | |
309 recorder = Recorder(input=self.audio_source, block_dur=0.02) | |
310 # read 0.02 * 10 = 0.2 sec. of data | |
311 for i in range(10): | |
312 recorder.read() | |
313 | |
314 recorder.rewind() | |
315 | |
316 # read all available data after rewind | |
317 recorder_data = [] | |
318 recorder_data = _read_all_data(recorder) | |
319 | |
320 audio_source = WaveAudioSource( | |
321 filename=dataset.one_to_six_arabic_16000_mono_bc_noise | |
322 ) | |
323 audio_source.open() | |
324 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data | |
325 audio_source.close() | |
326 | |
327 assert ( | |
328 recorder_data == audio_source_data | |
329 ), "Unexpected data read from Recorder" | |
330 | |
331 def test_read_overlapping_blocks(self): | |
332 # Use arbitrary valid block_size and hop_size | |
333 block_size = 1714 | |
334 hop_size = 313 | |
335 block_dur = block_size / self.audio_source.sampling_rate | |
336 hop_dur = hop_size / self.audio_source.sampling_rate | |
337 | |
338 reader = AudioReader( | |
339 input=self.audio_source, | |
340 block_dur=block_dur, | |
341 hop_dur=hop_dur, | |
342 ) | |
343 | |
344 # Read all available overlapping blocks of data | |
345 reader_data = [] | |
346 while True: | |
347 block = reader.read() | |
348 if block is None: | |
349 break | |
350 reader_data.append(block) | |
351 | |
352 # Read all data from file and build a BufferAudioSource | |
353 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
354 wave_data = fp.readframes(fp.getnframes()) | |
355 fp.close() | |
356 audio_source = BufferAudioSource( | |
357 wave_data, | |
358 reader.sampling_rate, | |
359 reader.sample_width, | |
360 reader.channels, | |
361 ) | |
362 audio_source.open() | |
363 | |
364 # Compare all blocks read from OverlapADS to those read from an | |
365 # audio source with a manual position setting | |
366 for i, block in enumerate(reader_data): | |
367 tmp = audio_source.read(block_size) | |
368 assert ( | |
369 block == tmp | |
370 ), f"Unexpected data (block {i}) from reader with overlapping blocks" | |
371 audio_source.position = (i + 1) * hop_size | |
372 | |
373 audio_source.close() | |
374 | |
375 def test_read_overlapping_blocks_with_max_read(self): | |
376 block_size = 256 | |
377 hop_size = 200 | |
378 block_dur = block_size / self.audio_source.sampling_rate | |
379 hop_dur = hop_size / self.audio_source.sampling_rate | |
380 | |
381 reader = AudioReader( | |
382 input=self.audio_source, | |
383 block_dur=block_dur, | |
384 hop_dur=hop_dur, | |
385 max_read=0.5, | |
386 ) | |
387 | |
388 # Read all available overlapping blocks of data | |
389 reader_data = [] | |
390 while True: | |
391 block = reader.read() | |
392 if block is None: | |
393 break | |
394 reader_data.append(block) | |
395 | |
396 # Read all data from file and build a BufferAudioSource | |
397 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
398 wave_data = fp.readframes(fp.getnframes()) | |
399 fp.close() | |
400 audio_source = BufferAudioSource( | |
401 wave_data, | |
402 reader.sampling_rate, | |
403 reader.sample_width, | |
404 reader.channels, | |
405 ) | |
406 audio_source.open() | |
407 | |
408 # Compare all blocks read from OverlapADS to those read from an | |
409 # audio source with a manual position setting | |
410 for i, block in enumerate(reader_data): | |
411 tmp = audio_source.read(len(block) // (reader.sw * reader.ch)) | |
412 assert ( | |
413 block == tmp | |
414 ), f"Unexpected data (block {i}) from reader with overlapping blocks and max_read" | |
415 audio_source.position = (i + 1) * hop_size | |
416 | |
417 audio_source.close() | |
418 | |
419 def test_length_read_overlapping_blocks_with_max_read(self): | |
420 block_size = 313 | |
421 hop_size = 207 | |
422 block_dur = block_size / self.audio_source.sampling_rate | |
423 hop_dur = hop_size / self.audio_source.sampling_rate | |
424 | |
425 reader = AudioReader( | |
426 input=self.audio_source, | |
427 max_read=1.932, | |
428 block_dur=block_dur, | |
429 hop_dur=hop_dur, | |
430 ) | |
431 | |
432 total_samples = round(reader.sampling_rate * 1.932) | |
433 first_read_size = block_size | |
434 next_read_size = block_size - hop_size | |
435 nb_next_blocks, last_block_size = divmod( | |
436 (total_samples - first_read_size), next_read_size | |
437 ) | |
438 total_samples_with_overlap = ( | |
439 first_read_size + next_read_size * nb_next_blocks + last_block_size | |
440 ) | |
441 expected_read_bytes = ( | |
442 total_samples_with_overlap * reader.sw * reader.channels | |
443 ) | |
444 | |
445 cache_size = ( | |
446 (block_size - hop_size) * reader.sample_width * reader.channels | |
447 ) | |
448 total_read = cache_size | |
449 | |
450 i = 0 | |
451 while True: | |
452 block = reader.read() | |
453 if block is None: | |
454 break | |
455 i += 1 | |
456 total_read += len(block) - cache_size | |
457 | |
458 err_msg = ( | |
459 "Wrong data length read from LimiterADS, expected: {0}, found: {1}" | |
460 ) | |
461 assert total_read == expected_read_bytes, err_msg.format( | |
462 expected_read_bytes, total_read | |
463 ) | |
464 | |
465 def test_reader_with_overlapping_blocks__rewindable(self): | |
466 reader = AudioReader( | |
467 input=self.audio_source, | |
468 block_dur=320, | |
469 hop_dur=160, | |
470 record=True, | |
471 ) | |
472 assert ( | |
473 reader.rewindable | |
474 ), "AudioReader with record=True should be rewindable" | |
475 | |
476 def test_overlapping_blocks_with_max_read_rewind_and_read(self): | |
477 # Use arbitrary valid block_size and hop_size | |
478 block_size = 1600 | |
479 hop_size = 400 | |
480 block_dur = block_size / self.audio_source.sampling_rate | |
481 hop_dur = hop_size / self.audio_source.sampling_rate | |
482 | |
483 reader = AudioReader( | |
484 input=self.audio_source, | |
485 block_dur=block_dur, | |
486 hop_dur=hop_dur, | |
487 record=True, | |
488 ) | |
489 | |
490 # Read all available data overlapping blocks | |
491 i = 0 | |
492 while True: | |
493 block = reader.read() | |
494 if block is None: | |
495 break | |
496 i += 1 | |
497 | |
498 reader.rewind() | |
499 | |
500 # Read all data from file and build a BufferAudioSource | |
501 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
502 wave_data = fp.readframes(fp.getnframes()) | |
503 fp.close() | |
504 audio_source = BufferAudioSource( | |
505 wave_data, | |
506 reader.sampling_rate, | |
507 reader.sample_width, | |
508 reader.channels, | |
509 ) | |
510 audio_source.open() | |
511 | |
512 # Compare blocks read from AudioReader to those read from an BufferAudioSource with manual position setting | |
513 for j in range(i): | |
514 tmp = audio_source.read(block_size) | |
515 assert ( | |
516 reader.read() == tmp | |
517 ), f"Unexpected data (block {i}) from reader with overlapping blocks and record = True" | |
518 audio_source.position = (j + 1) * hop_size | |
519 | |
520 audio_source.close() | |
521 | |
522 def test_overlapping_blocks_with_record_and_max_read_rewind_and_read(self): | |
523 # Use arbitrary valid block_size and hop_size | |
524 block_size = 1600 | |
525 hop_size = 400 | |
526 block_dur = block_size / self.audio_source.sampling_rate | |
527 hop_dur = hop_size / self.audio_source.sampling_rate | |
528 | |
529 reader = AudioReader( | |
530 input=self.audio_source, | |
531 max_time=1.50, | |
532 block_dur=block_dur, | |
533 hop_dur=hop_dur, | |
534 record=True, | |
535 ) | |
536 | |
537 # Read all available data overlapping blocks | |
538 i = 0 | |
539 while True: | |
540 block = reader.read() | |
541 if block is None: | |
542 break | |
543 i += 1 | |
544 | |
545 reader.rewind() | |
546 | |
547 # Read all data from file and build a BufferAudioSource | |
548 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
549 wave_data = fp.readframes(fp.getnframes()) | |
550 fp.close() | |
551 audio_source = BufferAudioSource( | |
552 wave_data, | |
553 reader.sampling_rate, | |
554 reader.sample_width, | |
555 reader.channels, | |
556 ) | |
557 audio_source.open() | |
558 | |
559 # Compare all blocks read from AudioReader to those read from BufferAudioSource with a manual position setting | |
560 for j in range(i): | |
561 tmp = audio_source.read(block_size) | |
562 assert ( | |
563 reader.read() == tmp | |
564 ), "Unexpected block (N={0}) read from OverlapADS".format(i) | |
565 audio_source.position = (j + 1) * hop_size | |
566 | |
567 audio_source.close() | |
568 | |
569 def test_length_read_overlapping_blocks_with_record_and_max_read(self): | |
570 # Use arbitrary valid block_size and hop_size | |
571 block_size = 1000 | |
572 hop_size = 200 | |
573 block_dur = block_size / self.audio_source.sampling_rate | |
574 hop_dur = hop_size / self.audio_source.sampling_rate | |
575 | |
576 reader = AudioReader( | |
577 input=self.audio_source, | |
578 block_dur=block_dur, | |
579 hop_dur=hop_dur, | |
580 record=True, | |
581 max_read=1.317, | |
582 ) | |
583 total_samples = round(reader.sampling_rate * 1.317) | |
584 first_read_size = block_size | |
585 next_read_size = block_size - hop_size | |
586 nb_next_blocks, last_block_size = divmod( | |
587 (total_samples - first_read_size), next_read_size | |
588 ) | |
589 total_samples_with_overlap = ( | |
590 first_read_size + next_read_size * nb_next_blocks + last_block_size | |
591 ) | |
592 expected_read_bytes = ( | |
593 total_samples_with_overlap * reader.sample_width * reader.channels | |
594 ) | |
595 | |
596 cache_size = ( | |
597 (block_size - hop_size) * reader.sample_width * reader.channels | |
598 ) | |
599 total_read = cache_size | |
600 | |
601 i = 0 | |
602 while True: | |
603 block = reader.read() | |
604 if block is None: | |
605 break | |
606 i += 1 | |
607 total_read += len(block) - cache_size | |
608 | |
609 err_msg = f"Wrong data length read from AudioReader, expected: {expected_read_bytes}, found: {total_read}" | |
610 assert total_read == expected_read_bytes, err_msg | |
611 | |
612 | |
613 def test_AudioReader_raw_data(): | |
614 | |
615 data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345" | |
616 block_size = 5 | |
617 hop_size = 4 | |
618 reader = AudioReader( | |
619 input=data, | |
620 sampling_rate=16, | |
621 sample_width=2, | |
622 channels=1, | |
623 block_dur=block_size / 16, | |
624 hop_dur=hop_size / 16, | |
625 max_read=0.80, | |
626 record=True, | |
627 ) | |
628 reader.open() | |
629 | |
630 assert ( | |
631 reader.sampling_rate == 16 | |
632 ), f"Wrong sampling rate, expected: 16, found: {reader.sampling_rate }" | |
633 | |
634 assert ( | |
635 reader.sample_width == 2 | |
636 ), f"Wrong sample width, expected: 2, found: {reader.sample_width}" | |
637 | |
638 # Read all available data overlapping blocks | |
639 i = 0 | |
640 while True: | |
641 block = reader.read() | |
642 if block is None: | |
643 break | |
644 i += 1 | |
645 | |
646 reader.rewind() | |
647 | |
648 # Build a BufferAudioSource | |
649 audio_source = BufferAudioSource( | |
650 data, reader.sampling_rate, reader.sample_width, reader.channels | |
651 ) | |
652 audio_source.open() | |
653 | |
654 # Compare all blocks read from AudioReader to those read from an audio | |
655 # source with a manual position setting | |
656 for j in range(i): | |
657 tmp = audio_source.read(block_size) | |
658 block = reader.read() | |
659 assert ( | |
660 block == tmp | |
661 ), f"Unexpected block '{block}' (N={i}) read from OverlapADS" | |
662 audio_source.position = (j + 1) * hop_size | |
663 audio_source.close() | |
664 reader.close() | |
665 | |
666 | |
667 def test_AudioReader_alias_params(): | |
668 reader = AudioReader( | |
669 input=b"0" * 1600, | |
670 sr=16000, | |
671 sw=2, | |
672 channels=1, | |
673 ) | |
674 assert reader.sampling_rate == 16000, ( | |
675 "Unexpected sampling rate: reader.sampling_rate = " | |
676 + f"{reader.sampling_rate} instead of 16000" | |
677 ) | |
678 assert reader.sr == 16000, ( | |
679 "Unexpected sampling rate: reader.sr = " | |
680 + f"{reader.sr} instead of 16000" | |
681 ) | |
682 assert reader.sample_width == 2, ( | |
683 "Unexpected sample width: reader.sample_width = " | |
684 + f"{reader.sample_width} instead of 2" | |
685 ) | |
686 assert reader.sw == 2, ( | |
687 "Unexpected sample width: reader.sw = " + f"{reader.sw} instead of 2" | |
688 ) | |
689 assert reader.channels == 1, ( | |
690 "Unexpected number of channels: reader.channels = " | |
691 + f"{reader.channels} instead of 1" | |
692 ) | |
693 assert reader.ch == 1, ( | |
694 "Unexpected number of channels: reader.ch = " | |
695 + f"{reader.ch} instead of 1" | |
696 ) | |
899 | 697 |
900 | 698 |
901 @pytest.mark.parametrize( | 699 @pytest.mark.parametrize( |
902 "file_id, max_read, size", | 700 "file_id, max_read, size", |
903 [ | 701 [ |