comparison tests/test_AudioReader.py @ 403:996948ada980

Update tests
author Amine Sehili <amine.sehili@gmail.com>
date Sun, 26 May 2024 22:43:08 +0200
parents 323d59b404a2
children 9c9112e23c1c
comparison
equal deleted inserted replaced
402:954c1e279068 403:996948ada980
1 import pytest
2 from functools import partial
3 import sys 1 import sys
4 import wave 2 import wave
3 from functools import partial
4
5 import pytest
6
5 from auditok import ( 7 from auditok import (
8 AudioReader,
9 BufferAudioSource,
10 Recorder,
11 WaveAudioSource,
6 dataset, 12 dataset,
7 ADSFactory,
8 AudioDataSource,
9 AudioReader,
10 Recorder,
11 BufferAudioSource,
12 WaveAudioSource,
13 DuplicateArgument,
14 ) 13 )
15 14 from auditok.util import _Limiter, _OverlapAudioReader
16
17 class TestADSFactoryFileAudioSource:
18 def setup_method(self):
19 self.audio_source = WaveAudioSource(
20 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
21 )
22
23 def test_ADS_type(self):
24 ads = ADSFactory.ads(audio_source=self.audio_source)
25 err_msg = (
26 "wrong type for ads object, expected: 'AudioDataSource', found: {0}"
27 )
28 assert isinstance(ads, AudioDataSource), err_msg.format(type(ads))
29
30 def test_default_block_size(self):
31 ads = ADSFactory.ads(audio_source=self.audio_source)
32 size = ads.block_size
33 assert (
34 size == 160
35 ), "Wrong default block_size, expected: 160, found: {0}".format(size)
36
37 def test_block_size(self):
38 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=512)
39 size = ads.block_size
40 assert (
41 size == 512
42 ), "Wrong block_size, expected: 512, found: {0}".format(size)
43
44 # with alias keyword
45 ads = ADSFactory.ads(audio_source=self.audio_source, bs=160)
46 size = ads.block_size
47 assert (
48 size == 160
49 ), "Wrong block_size, expected: 160, found: {0}".format(size)
50
51 def test_block_duration(self):
52 ads = ADSFactory.ads(
53 audio_source=self.audio_source, block_dur=0.01
54 ) # 10 ms
55 size = ads.block_size
56 assert (
57 size == 160
58 ), "Wrong block_size, expected: 160, found: {0}".format(size)
59
60 # with alias keyword
61 ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025) # 25 ms
62 size = ads.block_size
63 assert (
64 size == 400
65 ), "Wrong block_size, expected: 400, found: {0}".format(size)
66
67 def test_hop_duration(self):
68 ads = ADSFactory.ads(
69 audio_source=self.audio_source, block_dur=0.02, hop_dur=0.01
70 ) # 10 ms
71 size = ads.hop_size
72 assert size == 160, "Wrong hop_size, expected: 160, found: {0}".format(
73 size
74 )
75
76 # with alias keyword
77 ads = ADSFactory.ads(
78 audio_source=self.audio_source, bd=0.025, hop_dur=0.015
79 ) # 15 ms
80 size = ads.hop_size
81 assert (
82 size == 240
83 ), "Wrong block_size, expected: 240, found: {0}".format(size)
84
85 def test_sampling_rate(self):
86 ads = ADSFactory.ads(audio_source=self.audio_source)
87 srate = ads.sampling_rate
88 assert (
89 srate == 16000
90 ), "Wrong sampling rate, expected: 16000, found: {0}".format(srate)
91
92 def test_sample_width(self):
93 ads = ADSFactory.ads(audio_source=self.audio_source)
94 swidth = ads.sample_width
95 assert (
96 swidth == 2
97 ), "Wrong sample width, expected: 2, found: {0}".format(swidth)
98
99 def test_channels(self):
100 ads = ADSFactory.ads(audio_source=self.audio_source)
101 channels = ads.channels
102 assert (
103 channels == 1
104 ), "Wrong number of channels, expected: 1, found: {0}".format(channels)
105
106 def test_read(self):
107 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=256)
108 ads.open()
109 ads_data = ads.read()
110 ads.close()
111
112 audio_source = WaveAudioSource(
113 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
114 )
115 audio_source.open()
116 audio_source_data = audio_source.read(256)
117 audio_source.close()
118
119 assert ads_data == audio_source_data, "Unexpected data read from ads"
120
121 def test_Limiter_Deco_read(self):
122 # read a maximum of 0.75 seconds from audio source
123 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.75)
124 ads_data = []
125 ads.open()
126 while True:
127 block = ads.read()
128 if block is None:
129 break
130 ads_data.append(block)
131 ads.close()
132 ads_data = b"".join(ads_data)
133
134 audio_source = WaveAudioSource(
135 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
136 )
137 audio_source.open()
138 audio_source_data = audio_source.read(int(16000 * 0.75))
139 audio_source.close()
140
141 assert (
142 ads_data == audio_source_data
143 ), "Unexpected data read from LimiterADS"
144
145 def test_Limiter_Deco_read_limit(self):
146 # read a maximum of 1.191 seconds from audio source
147 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1.191)
148 total_samples = round(ads.sampling_rate * 1.191)
149 nb_full_blocks, last_block_size = divmod(total_samples, ads.block_size)
150 total_samples_with_overlap = (
151 nb_full_blocks * ads.block_size + last_block_size
152 )
153 expected_read_bytes = total_samples_with_overlap * ads.sw * ads.channels
154
155 total_read = 0
156 ads.open()
157 i = 0
158 while True:
159 block = ads.read()
160 if block is None:
161 break
162 i += 1
163 total_read += len(block)
164
165 ads.close()
166 err_msg = (
167 "Wrong data length read from LimiterADS, expected: {0}, found: {1}"
168 )
169 assert total_read == expected_read_bytes, err_msg.format(
170 expected_read_bytes, total_read
171 )
172
173 def test_Recorder_Deco_read(self):
174 ads = ADSFactory.ads(
175 audio_source=self.audio_source, record=True, block_size=500
176 )
177 ads_data = []
178 ads.open()
179 for i in range(10):
180 block = ads.read()
181 if block is None:
182 break
183 ads_data.append(block)
184 ads.close()
185 ads_data = b"".join(ads_data)
186
187 audio_source = WaveAudioSource(
188 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
189 )
190 audio_source.open()
191 audio_source_data = audio_source.read(500 * 10)
192 audio_source.close()
193
194 assert (
195 ads_data == audio_source_data
196 ), "Unexpected data read from RecorderADS"
197
198 def test_Recorder_Deco_is_rewindable(self):
199 ads = ADSFactory.ads(audio_source=self.audio_source, record=True)
200 assert ads.rewindable, "RecorderADS.is_rewindable should return True"
201
202 def test_Recorder_Deco_rewind_and_read(self):
203 ads = ADSFactory.ads(
204 audio_source=self.audio_source, record=True, block_size=320
205 )
206 ads.open()
207 for i in range(10):
208 ads.read()
209
210 ads.rewind()
211
212 # read all available data after rewind
213 ads_data = []
214 while True:
215 block = ads.read()
216 if block is None:
217 break
218 ads_data.append(block)
219 ads.close()
220 ads_data = b"".join(ads_data)
221
222 audio_source = WaveAudioSource(
223 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
224 )
225 audio_source.open()
226 audio_source_data = audio_source.read(320 * 10)
227 audio_source.close()
228
229 assert (
230 ads_data == audio_source_data
231 ), "Unexpected data read from RecorderADS"
232
233 def test_Overlap_Deco_read(self):
234 # Use arbitrary valid block_size and hop_size
235 block_size = 1714
236 hop_size = 313
237
238 ads = ADSFactory.ads(
239 audio_source=self.audio_source,
240 block_size=block_size,
241 hop_size=hop_size,
242 )
243
244 # Read all available data overlapping blocks
245 ads.open()
246 ads_data = []
247 while True:
248 block = ads.read()
249 if block is None:
250 break
251 ads_data.append(block)
252 ads.close()
253
254 # Read all data from file and build a BufferAudioSource
255 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
256 wave_data = fp.readframes(fp.getnframes())
257 fp.close()
258 audio_source = BufferAudioSource(
259 wave_data, ads.sampling_rate, ads.sample_width, ads.channels
260 )
261 audio_source.open()
262
263 # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting
264 for i, block in enumerate(ads_data):
265 tmp = audio_source.read(block_size)
266 assert (
267 block == tmp
268 ), "Unexpected block (N={0}) read from OverlapADS".format(i)
269 audio_source.position = (i + 1) * hop_size
270
271 audio_source.close()
272
273 def test_Limiter_Overlap_Deco_read(self):
274 block_size = 256
275 hop_size = 200
276
277 ads = ADSFactory.ads(
278 audio_source=self.audio_source,
279 max_time=0.50,
280 block_size=block_size,
281 hop_size=hop_size,
282 )
283
284 # Read all available data overlapping blocks
285 ads.open()
286 ads_data = []
287 while True:
288 block = ads.read()
289 if block is None:
290 break
291 ads_data.append(block)
292 ads.close()
293
294 # Read all data from file and build a BufferAudioSource
295 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
296 wave_data = fp.readframes(fp.getnframes())
297 fp.close()
298 audio_source = BufferAudioSource(
299 wave_data, ads.sampling_rate, ads.sample_width, ads.channels
300 )
301 audio_source.open()
302
303 # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting
304 for i, block in enumerate(ads_data):
305 tmp = audio_source.read(len(block) // (ads.sw * ads.ch))
306 assert len(block) == len(
307 tmp
308 ), "Unexpected block (N={0}) read from OverlapADS".format(i)
309 audio_source.position = (i + 1) * hop_size
310
311 audio_source.close()
312
313 def test_Limiter_Overlap_Deco_read_limit(self):
314 block_size = 313
315 hop_size = 207
316 ads = ADSFactory.ads(
317 audio_source=self.audio_source,
318 max_time=1.932,
319 block_size=block_size,
320 hop_size=hop_size,
321 )
322
323 total_samples = round(ads.sampling_rate * 1.932)
324 first_read_size = block_size
325 next_read_size = block_size - hop_size
326 nb_next_blocks, last_block_size = divmod(
327 (total_samples - first_read_size), next_read_size
328 )
329 total_samples_with_overlap = (
330 first_read_size + next_read_size * nb_next_blocks + last_block_size
331 )
332 expected_read_bytes = total_samples_with_overlap * ads.sw * ads.channels
333
334 cache_size = (block_size - hop_size) * ads.sample_width * ads.channels
335 total_read = cache_size
336
337 ads.open()
338 i = 0
339 while True:
340 block = ads.read()
341 if block is None:
342 break
343 i += 1
344 total_read += len(block) - cache_size
345
346 ads.close()
347 err_msg = (
348 "Wrong data length read from LimiterADS, expected: {0}, found: {1}"
349 )
350 assert total_read == expected_read_bytes, err_msg.format(
351 expected_read_bytes, total_read
352 )
353
354 def test_Recorder_Overlap_Deco_is_rewindable(self):
355 ads = ADSFactory.ads(
356 audio_source=self.audio_source,
357 block_size=320,
358 hop_size=160,
359 record=True,
360 )
361 assert ads.rewindable, "RecorderADS.is_rewindable should return True"
362
363 def test_Recorder_Overlap_Deco_rewind_and_read(self):
364 # Use arbitrary valid block_size and hop_size
365 block_size = 1600
366 hop_size = 400
367
368 ads = ADSFactory.ads(
369 audio_source=self.audio_source,
370 block_size=block_size,
371 hop_size=hop_size,
372 record=True,
373 )
374
375 # Read all available data overlapping blocks
376 ads.open()
377 i = 0
378 while True:
379 block = ads.read()
380 if block is None:
381 break
382 i += 1
383
384 ads.rewind()
385
386 # Read all data from file and build a BufferAudioSource
387 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
388 wave_data = fp.readframes(fp.getnframes())
389 fp.close()
390 audio_source = BufferAudioSource(
391 wave_data, ads.sampling_rate, ads.sample_width, ads.channels
392 )
393 audio_source.open()
394
395 # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting
396 for j in range(i):
397 tmp = audio_source.read(block_size)
398 assert (
399 ads.read() == tmp
400 ), "Unexpected block (N={0}) read from OverlapADS".format(i)
401 audio_source.position = (j + 1) * hop_size
402
403 ads.close()
404 audio_source.close()
405
406 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):
407 # Use arbitrary valid block_size and hop_size
408 block_size = 1600
409 hop_size = 400
410
411 ads = ADSFactory.ads(
412 audio_source=self.audio_source,
413 max_time=1.50,
414 block_size=block_size,
415 hop_size=hop_size,
416 record=True,
417 )
418
419 # Read all available data overlapping blocks
420 ads.open()
421 i = 0
422 while True:
423 block = ads.read()
424 if block is None:
425 break
426 i += 1
427
428 ads.rewind()
429
430 # Read all data from file and build a BufferAudioSource
431 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
432 wave_data = fp.readframes(fp.getnframes())
433 fp.close()
434 audio_source = BufferAudioSource(
435 wave_data, ads.sampling_rate, ads.sample_width, ads.channels
436 )
437 audio_source.open()
438
439 # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting
440 for j in range(i):
441 tmp = audio_source.read(block_size)
442 assert (
443 ads.read() == tmp
444 ), "Unexpected block (N={0}) read from OverlapADS".format(i)
445 audio_source.position = (j + 1) * hop_size
446
447 ads.close()
448 audio_source.close()
449
450 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_limit(self):
451 # Use arbitrary valid block_size and hop_size
452 block_size = 1000
453 hop_size = 200
454
455 ads = ADSFactory.ads(
456 audio_source=self.audio_source,
457 max_time=1.317,
458 block_size=block_size,
459 hop_size=hop_size,
460 record=True,
461 )
462 total_samples = round(ads.sampling_rate * 1.317)
463 first_read_size = block_size
464 next_read_size = block_size - hop_size
465 nb_next_blocks, last_block_size = divmod(
466 (total_samples - first_read_size), next_read_size
467 )
468 total_samples_with_overlap = (
469 first_read_size + next_read_size * nb_next_blocks + last_block_size
470 )
471 expected_read_bytes = total_samples_with_overlap * ads.sw * ads.channels
472
473 cache_size = (block_size - hop_size) * ads.sample_width * ads.channels
474 total_read = cache_size
475
476 ads.open()
477 i = 0
478 while True:
479 block = ads.read()
480 if block is None:
481 break
482 i += 1
483 total_read += len(block) - cache_size
484
485 ads.close()
486 err_msg = (
487 "Wrong data length read from LimiterADS, expected: {0}, found: {1}"
488 )
489 assert total_read == expected_read_bytes, err_msg.format(
490 expected_read_bytes, total_read
491 )
492
493
494 class TestADSFactoryBufferAudioSource:
495 def setup_method(self):
496 self.signal = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
497 self.ads = ADSFactory.ads(
498 data_buffer=self.signal,
499 sampling_rate=16,
500 sample_width=2,
501 channels=1,
502 block_size=4,
503 )
504
505 def test_ADS_BAS_sampling_rate(self):
506 srate = self.ads.sampling_rate
507 assert (
508 srate == 16
509 ), "Wrong sampling rate, expected: 16000, found: {0}".format(srate)
510
511 def test_ADS_BAS_sample_width(self):
512 swidth = self.ads.sample_width
513 assert (
514 swidth == 2
515 ), "Wrong sample width, expected: 2, found: {0}".format(swidth)
516
517 def test_ADS_BAS_channels(self):
518 channels = self.ads.channels
519 assert (
520 channels == 1
521 ), "Wrong number of channels, expected: 1, found: {0}".format(channels)
522
523 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):
524 # Use arbitrary valid block_size and hop_size
525 block_size = 5
526 hop_size = 4
527
528 ads = ADSFactory.ads(
529 data_buffer=self.signal,
530 sampling_rate=16,
531 sample_width=2,
532 channels=1,
533 max_time=0.80,
534 block_size=block_size,
535 hop_size=hop_size,
536 record=True,
537 )
538
539 # Read all available data overlapping blocks
540 ads.open()
541 i = 0
542 while True:
543 block = ads.read()
544 if block is None:
545 break
546 i += 1
547
548 ads.rewind()
549
550 # Build a BufferAudioSource
551 audio_source = BufferAudioSource(
552 self.signal, ads.sampling_rate, ads.sample_width, ads.channels
553 )
554 audio_source.open()
555
556 # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting
557 for j in range(i):
558 tmp = audio_source.read(block_size)
559 block = ads.read()
560 assert (
561 block == tmp
562 ), "Unexpected block '{}' (N={}) read from OverlapADS".format(
563 block, i
564 )
565 audio_source.position = (j + 1) * hop_size
566
567 ads.close()
568 audio_source.close()
569
570
571 class TestADSFactoryAlias:
572 def setup_method(self):
573 self.signal = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
574
575 def test_sampling_rate_alias(self):
576 ads = ADSFactory.ads(
577 data_buffer=self.signal,
578 sr=16,
579 sample_width=2,
580 channels=1,
581 block_dur=0.5,
582 )
583 srate = ads.sampling_rate
584 assert (
585 srate == 16
586 ), "Wrong sampling rate, expected: 16000, found: {0}".format(srate)
587
588 def test_sampling_rate_duplicate(self):
589 func = partial(
590 ADSFactory.ads,
591 data_buffer=self.signal,
592 sr=16,
593 sampling_rate=16,
594 sample_width=2,
595 channels=1,
596 )
597 with pytest.raises(DuplicateArgument):
598 func()
599
600 def test_sample_width_alias(self):
601 ads = ADSFactory.ads(
602 data_buffer=self.signal,
603 sampling_rate=16,
604 sw=2,
605 channels=1,
606 block_dur=0.5,
607 )
608 swidth = ads.sample_width
609 assert (
610 swidth == 2
611 ), "Wrong sample width, expected: 2, found: {0}".format(swidth)
612
613 def test_sample_width_duplicate(self):
614 func = partial(
615 ADSFactory.ads,
616 data_buffer=self.signal,
617 sampling_rate=16,
618 sw=2,
619 sample_width=2,
620 channels=1,
621 )
622 with pytest.raises(DuplicateArgument):
623 func()
624
625 def test_channels_alias(self):
626 ads = ADSFactory.ads(
627 data_buffer=self.signal,
628 sampling_rate=16,
629 sample_width=2,
630 ch=1,
631 block_dur=4,
632 )
633 channels = ads.channels
634 assert (
635 channels == 1
636 ), "Wrong number of channels, expected: 1, found: {0}".format(channels)
637
638 def test_channels_duplicate(self):
639 func = partial(
640 ADSFactory.ads,
641 data_buffer=self.signal,
642 sampling_rate=16,
643 sample_width=2,
644 ch=1,
645 channels=1,
646 )
647 with pytest.raises(DuplicateArgument):
648 func()
649
650 def test_block_size_alias(self):
651 ads = ADSFactory.ads(
652 data_buffer=self.signal,
653 sampling_rate=16,
654 sample_width=2,
655 channels=1,
656 bs=8,
657 )
658 size = ads.block_size
659 assert (
660 size == 8
661 ), "Wrong block_size using bs alias, expected: 8, found: {0}".format(
662 size
663 )
664
665 def test_block_size_duplicate(self):
666 func = partial(
667 ADSFactory.ads,
668 data_buffer=self.signal,
669 sampling_rate=16,
670 sample_width=2,
671 channels=1,
672 bs=4,
673 block_size=4,
674 )
675 with pytest.raises(DuplicateArgument):
676 func()
677
678 def test_block_duration_alias(self):
679 ads = ADSFactory.ads(
680 data_buffer=self.signal,
681 sampling_rate=16,
682 sample_width=2,
683 channels=1,
684 bd=0.75,
685 )
686 size = ads.block_size
687 err_msg = "Wrong block_size set with a block_dur alias 'bd', expected: 8, found: {0}"
688 assert size == 12, err_msg.format(size)
689
690 def test_block_duration_duplicate(self):
691 func = partial(
692 ADSFactory.ads,
693 data_buffer=self.signal,
694 sampling_rate=16,
695 sample_width=2,
696 channels=1,
697 bd=4,
698 block_dur=4,
699 )
700 with pytest.raises(DuplicateArgument):
701 func()
702
703 def test_block_size_duration_duplicate(self):
704 func = partial(
705 ADSFactory.ads,
706 data_buffer=self.signal,
707 sampling_rate=16,
708 sample_width=2,
709 channels=1,
710 bd=4,
711 bs=12,
712 )
713 with pytest.raises(DuplicateArgument):
714 func()
715
716 def test_hop_duration_alias(self):
717 ads = ADSFactory.ads(
718 data_buffer=self.signal,
719 sampling_rate=16,
720 sample_width=2,
721 channels=1,
722 bd=0.75,
723 hd=0.5,
724 )
725 size = ads.hop_size
726 assert (
727 size == 8
728 ), "Wrong block_size using bs alias, expected: 8, found: {0}".format(
729 size
730 )
731
732 def test_hop_duration_duplicate(self):
733 func = partial(
734 ADSFactory.ads,
735 data_buffer=self.signal,
736 sampling_rate=16,
737 sample_width=2,
738 channels=1,
739 bd=0.75,
740 hd=0.5,
741 hop_dur=0.5,
742 )
743 with pytest.raises(DuplicateArgument):
744 func()
745
746 def test_hop_size_duration_duplicate(self):
747 func = partial(
748 ADSFactory.ads,
749 data_buffer=self.signal,
750 sampling_rate=16,
751 sample_width=2,
752 channels=1,
753 bs=8,
754 hs=4,
755 hd=1,
756 )
757 with pytest.raises(DuplicateArgument):
758 func()
759
760 def test_hop_size_greater_than_block_size(self):
761 func = partial(
762 ADSFactory.ads,
763 data_buffer=self.signal,
764 sampling_rate=16,
765 sample_width=2,
766 channels=1,
767 bs=4,
768 hs=8,
769 )
770 with pytest.raises(ValueError):
771 func()
772
773 def test_filename_duplicate(self):
774 func = partial(
775 ADSFactory.ads,
776 fn=dataset.one_to_six_arabic_16000_mono_bc_noise,
777 filename=dataset.one_to_six_arabic_16000_mono_bc_noise,
778 )
779 with pytest.raises(DuplicateArgument):
780 func()
781
782 def test_data_buffer_duplicate(self):
783 func = partial(
784 ADSFactory.ads,
785 data_buffer=self.signal,
786 db=self.signal,
787 sampling_rate=16,
788 sample_width=2,
789 channels=1,
790 )
791 with pytest.raises(DuplicateArgument):
792 func()
793
794 def test_max_time_alias(self):
795 ads = ADSFactory.ads(
796 data_buffer=self.signal,
797 sampling_rate=16,
798 sample_width=2,
799 channels=1,
800 mt=10,
801 block_dur=0.5,
802 )
803 assert (
804 ads.max_read == 10
805 ), "Wrong AudioDataSource.max_read, expected: 10, found: {}".format(
806 ads.max_read
807 )
808
809 def test_max_time_duplicate(self):
810 func = partial(
811 ADSFactory.ads,
812 data_buffer=self.signal,
813 sampling_rate=16,
814 sample_width=2,
815 channels=1,
816 mt=True,
817 max_time=True,
818 )
819 with pytest.raises(DuplicateArgument):
820 func()
821
822 def test_record_alias(self):
823 ads = ADSFactory.ads(
824 data_buffer=self.signal,
825 sampling_rate=16,
826 sample_width=2,
827 channels=1,
828 rec=True,
829 block_dur=0.5,
830 )
831 assert ads.rewindable, "AudioDataSource.rewindable expected to be True"
832
833 def test_record_duplicate(self):
834 func = partial(
835 ADSFactory.ads,
836 data_buffer=self.signal,
837 sampling_rate=16,
838 sample_width=2,
839 channels=1,
840 rec=True,
841 record=True,
842 )
843 with pytest.raises(DuplicateArgument):
844 func()
845
846 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_alias(self):
847 # Use arbitrary valid block_size and hop_size
848 block_size = 5
849 hop_size = 4
850
851 ads = ADSFactory.ads(
852 db=self.signal,
853 sr=16,
854 sw=2,
855 ch=1,
856 mt=0.80,
857 bs=block_size,
858 hs=hop_size,
859 rec=True,
860 )
861
862 # Read all available data overlapping blocks
863 ads.open()
864 i = 0
865 while True:
866 block = ads.read()
867 if block is None:
868 break
869 i += 1
870
871 ads.rewind()
872
873 # Build a BufferAudioSource
874 audio_source = BufferAudioSource(
875 self.signal, ads.sampling_rate, ads.sample_width, ads.channels
876 )
877 audio_source.open()
878
879 # Compare all blocks read from AudioDataSource to those read from an audio source with manual position definition
880 for j in range(i):
881 tmp = audio_source.read(block_size)
882 block = ads.read()
883 assert (
884 block == tmp
885 ), "Unexpected block (N={0}) read from OverlapADS".format(i)
886 audio_source.position = (j + 1) * hop_size
887 ads.close()
888 audio_source.close()
889 15
890 16
891 def _read_all_data(reader): 17 def _read_all_data(reader):
892 blocks = [] 18 blocks = []
893 while True: 19 while True:
894 data = reader.read() 20 data = reader.read()
895 if data is None: 21 if data is None:
896 break 22 break
897 blocks.append(data) 23 blocks.append(data)
898 return b"".join(blocks) 24 return b"".join(blocks)
25
26
27 class TestAudioReaderWithFileAudioSource:
28 @pytest.fixture(autouse=True)
29 def setup_and_teardown(self):
30 self.audio_source = WaveAudioSource(
31 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
32 )
33 self.audio_source.open()
34 yield
35 self.audio_source.close()
36
37 def test_AudioReader_type(self):
38 reader = AudioReader(input=self.audio_source)
39 err_msg = (
40 "wrong object type, expected: 'AudioReader', found: {0}"
41 )
42 assert isinstance(reader, AudioReader), err_msg.format(type(reader))
43
44 def _test_default_block_size(self):
45 reader = AudioReader(input=self.audio_source)
46 data = reader.read()
47 size = len(data)
48 assert (
49 size == 160
50 ), "Wrong default block_size, expected: 160, found: {0}".format(size)
51
52 @pytest.mark.parametrize(
53 "block_dur, expected_nb_samples",
54 [
55 (None, 160), # default: 10 ms
56 (0.025, 400), # 25 ms
57 ],
58 ids=["default", "_25ms"],
59 )
60 def test_block_duration(self, block_dur, expected_nb_samples):
61 """Test the number of samples read for a given block duration."""
62 if block_dur is not None:
63 reader = AudioReader(input=self.audio_source, block_dur=block_dur)
64 else:
65 reader = AudioReader(input=self.audio_source)
66 data = reader.read()
67 nb_samples = len(data) // reader.sample_width
68 assert (
69 nb_samples == expected_nb_samples
70 ), f"Wrong block_size, expected: {expected_nb_samples}, found: {nb_samples}"
71
72 @pytest.mark.parametrize(
73 "block_dur, hop_dur, expected_nb_blocks, expected_last_block_nb_samples",
74 [
75 (None, None, 1879, 126), # default: 10 ms
76 (0.01, None, 1879, 126), # block_dur_10ms_hop_dur_None
77 (0.01, 0.01, 1879, 126), # block_dur_10ms_hop_dur_10ms
78 (0.02, None, 940, 126), # block_dur_20ms_hop_dur_None
79 (0.025, None, 752, 206), # block_dur_25ms_hop_dur_None
80 (0.02, 0.01, 1878, 286), # block_dur_20ms_hop_dur_10ms
81 (0.025, 0.005, 3754, 366), # block_dur_25ms_hop_dur_None
82 ],
83 ids=[
84 "default",
85 "block_dur_10ms_hop_dur_None",
86 "block_dur_10ms_hop_dur_100ms",
87 "block_dur_20ms_hop_dur_None",
88 "block_dur_25ms_hop_dur_None",
89 "block_dur_20ms_hop_dur_10ms",
90 "block_dur_25ms_hop_dur_None",
91 ],
92 )
93 def test_hop_duration(
94 self,
95 block_dur,
96 hop_dur,
97 expected_nb_blocks,
98 expected_last_block_nb_samples,
99 ):
100 """Test the number of read blocks and the duration of last block for
101 different 'block_dur' and 'hop_dur' values.
102
103 Args:
104 block_dur (float or None): block duration in seconds.
105 hop_dur (float or None): hop duration in seconds.
106 expected_nb_blocks (int): expected number of read block.
107 expected_last_block_nb_samples (int): expected number of sample
108 in the last block.
109 """
110 if block_dur is not None:
111 reader = AudioReader(
112 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur
113 )
114 else:
115 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur)
116
117 nb_blocks = 0
118 last_block_nb_samples = None
119 while True:
120 data = reader.read()
121 if data is not None:
122 nb_blocks += 1
123 last_block_nb_samples = len(data) // reader.sample_width
124 else:
125 break
126 err_msg = "Wrong number of blocks read from source, expected: "
127 err_msg += f"{expected_nb_blocks}, found: {nb_blocks}"
128 assert nb_blocks == expected_nb_blocks, err_msg
129
130 err_msg = (
131 "Wrong number of samples in last block read from source, expected: "
132 )
133 err_msg += (
134 f"{expected_last_block_nb_samples}, found: {last_block_nb_samples}"
135 )
136
137 assert last_block_nb_samples == expected_last_block_nb_samples, err_msg
138
139 def test_hop_duration_exception(self):
140 """Test passing hop_dur > block_dur raises ValueError"""
141 with pytest.raises(ValueError):
142 AudioReader(self.audio_source, block_dur=0.01, hop_dur=0.015)
143
144 @pytest.mark.parametrize(
145 "block_dur, hop_dur",
146 [
147 (None, None), # default
148 (0.01, None), # block_dur_10ms_hop_dur_None
149 (None, 0.01), # block_dur_None__hop_dur_10ms
150 (0.05, 0.05), # block_dur_50ms_hop_dur_50ms
151 ],
152 ids=[
153 "default",
154 "block_dur_10ms_hop_dur_None",
155 "block_dur_None__hop_dur_10ms",
156 "block_dur_50ms_hop_dur_50ms",
157 ],
158 )
159 def test_reader_class_block_dur_equals_hop_dur(self, block_dur, hop_dur):
160 """Test passing hop_dur == block_dur does not create an instance of
161 '_OverlapAudioReader'.
162 """
163 if block_dur is not None:
164 reader = AudioReader(
165 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur
166 )
167 else:
168 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur)
169 assert not isinstance(reader, _OverlapAudioReader)
170
171 def test_sampling_rate(self):
172 reader = AudioReader(input=self.audio_source)
173 sampling_rate = reader.sampling_rate
174 assert (
175 sampling_rate == 16000
176 ), f"Wrong sampling rate, expected: 16000, found: {sampling_rate}"
177
178 def test_sample_width(self):
179 reader = AudioReader(input=self.audio_source)
180 sample_width = reader.sample_width
181 assert (
182 sample_width == 2
183 ), f"Wrong sample width, expected: 2, found: {sample_width}"
184
185 def test_channels(self):
186 reader = AudioReader(input=self.audio_source)
187 channels = reader.channels
188 assert (
189 channels == 1
190 ), f"Wrong number of channels, expected: 1, found: {channels}"
191
192 def test_read(self):
193 reader = AudioReader(input=self.audio_source, block_dur=0.02)
194 reader_data = reader.read()
195 audio_source = WaveAudioSource(
196 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
197 )
198 audio_source.open()
199 audio_source_data = audio_source.read(320)
200 audio_source.close()
201 assert (
202 reader_data == audio_source_data
203 ), "Unexpected data read from AudioReader"
204
205 def test_read_with_overlap(self):
206 reader = AudioReader(
207 input=self.audio_source, block_dur=0.02, hop_dur=0.01
208 )
209 _ = reader.read() # first block
210 reader_data = reader.read() # second block with 0.01 S overlap
211 audio_source = WaveAudioSource(
212 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
213 )
214 audio_source.open()
215 _ = audio_source.read(160)
216 audio_source_data = audio_source.read(320)
217 audio_source.close()
218 assert (
219 reader_data == audio_source_data
220 ), "Unexpected data read from AudioReader"
221
222 def test_read_from_AudioReader_with_max_read(self):
223 # read a maximum of 0.75 seconds from audio source
224 reader = AudioReader(input=self.audio_source, max_read=0.75)
225 assert isinstance(reader._audio_source._audio_source, _Limiter)
226 reader_data = _read_all_data(reader)
227
228 audio_source = WaveAudioSource(
229 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
230 )
231 audio_source.open()
232 audio_source_data = audio_source.read(int(16000 * 0.75))
233 audio_source.close()
234
235 assert (
236 reader_data == audio_source_data
237 ), f"Unexpected data read from AudioReader with 'max_read = {0.75}'"
238
239 def test_read_data_size_from_AudioReader_with_max_read(self):
240 # read a maximum of 1.191 seconds from audio source
241 reader = AudioReader(input=self.audio_source, max_read=1.191)
242 assert isinstance(reader._audio_source._audio_source, _Limiter)
243 total_samples = round(reader.sampling_rate * 1.191)
244 block_size = int(reader.block_dur * reader.sampling_rate)
245 nb_full_blocks, last_block_size = divmod(total_samples, block_size)
246 total_samples_with_overlap = (
247 nb_full_blocks * block_size + last_block_size
248 )
249 expected_read_bytes = (
250 total_samples_with_overlap * reader.sample_width * reader.channels
251 )
252
253 reader_data = _read_all_data(reader)
254 total_read = len(reader_data)
255 err_msg = f"Wrong data length read from LimiterADS, expected: {expected_read_bytes}, found: {total_read}"
256 assert total_read == expected_read_bytes, err_msg
257
258 def test_read_from_Recorder(self):
259 reader = Recorder(input=self.audio_source, block_dur=0.025)
260 reader_data = []
261 for _ in range(10):
262 block = reader.read()
263 if block is None:
264 break
265 reader_data.append(block)
266 reader_data = b"".join(reader_data)
267
268 audio_source = WaveAudioSource(
269 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
270 )
271 audio_source.open()
272 audio_source_data = audio_source.read(400 * 10)
273 audio_source.close()
274
275 assert (
276 reader_data == audio_source_data
277 ), "Unexpected data read from Recorder"
278
279 def test_AudioReader_rewindable(self):
280 reader = AudioReader(input=self.audio_source, record=True)
281 assert (
282 reader.rewindable
283 ), "AudioReader with record=True should be rewindable"
284
285 def test_AudioReader_record_and_rewind(self):
286 reader = AudioReader(
287 input=self.audio_source, record=True, block_dur=0.02
288 )
289 # read 0.02 * 10 = 0.2 sec. of data
290 for i in range(10):
291 reader.read()
292 reader.rewind()
293
294 # read all available data after rewind
295 reader_data = _read_all_data(reader)
296
297 audio_source = WaveAudioSource(
298 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
299 )
300 audio_source.open()
301 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data
302 audio_source.close()
303
304 assert (
305 reader_data == audio_source_data
306 ), "Unexpected data read from AudioReader with record = True"
307
308 def test_Recorder_record_and_rewind(self):
309 recorder = Recorder(input=self.audio_source, block_dur=0.02)
310 # read 0.02 * 10 = 0.2 sec. of data
311 for i in range(10):
312 recorder.read()
313
314 recorder.rewind()
315
316 # read all available data after rewind
317 recorder_data = []
318 recorder_data = _read_all_data(recorder)
319
320 audio_source = WaveAudioSource(
321 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
322 )
323 audio_source.open()
324 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data
325 audio_source.close()
326
327 assert (
328 recorder_data == audio_source_data
329 ), "Unexpected data read from Recorder"
330
331 def test_read_overlapping_blocks(self):
332 # Use arbitrary valid block_size and hop_size
333 block_size = 1714
334 hop_size = 313
335 block_dur = block_size / self.audio_source.sampling_rate
336 hop_dur = hop_size / self.audio_source.sampling_rate
337
338 reader = AudioReader(
339 input=self.audio_source,
340 block_dur=block_dur,
341 hop_dur=hop_dur,
342 )
343
344 # Read all available overlapping blocks of data
345 reader_data = []
346 while True:
347 block = reader.read()
348 if block is None:
349 break
350 reader_data.append(block)
351
352 # Read all data from file and build a BufferAudioSource
353 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
354 wave_data = fp.readframes(fp.getnframes())
355 fp.close()
356 audio_source = BufferAudioSource(
357 wave_data,
358 reader.sampling_rate,
359 reader.sample_width,
360 reader.channels,
361 )
362 audio_source.open()
363
364 # Compare all blocks read from OverlapADS to those read from an
365 # audio source with a manual position setting
366 for i, block in enumerate(reader_data):
367 tmp = audio_source.read(block_size)
368 assert (
369 block == tmp
370 ), f"Unexpected data (block {i}) from reader with overlapping blocks"
371 audio_source.position = (i + 1) * hop_size
372
373 audio_source.close()
374
375 def test_read_overlapping_blocks_with_max_read(self):
376 block_size = 256
377 hop_size = 200
378 block_dur = block_size / self.audio_source.sampling_rate
379 hop_dur = hop_size / self.audio_source.sampling_rate
380
381 reader = AudioReader(
382 input=self.audio_source,
383 block_dur=block_dur,
384 hop_dur=hop_dur,
385 max_read=0.5,
386 )
387
388 # Read all available overlapping blocks of data
389 reader_data = []
390 while True:
391 block = reader.read()
392 if block is None:
393 break
394 reader_data.append(block)
395
396 # Read all data from file and build a BufferAudioSource
397 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
398 wave_data = fp.readframes(fp.getnframes())
399 fp.close()
400 audio_source = BufferAudioSource(
401 wave_data,
402 reader.sampling_rate,
403 reader.sample_width,
404 reader.channels,
405 )
406 audio_source.open()
407
408 # Compare all blocks read from OverlapADS to those read from an
409 # audio source with a manual position setting
410 for i, block in enumerate(reader_data):
411 tmp = audio_source.read(len(block) // (reader.sw * reader.ch))
412 assert (
413 block == tmp
414 ), f"Unexpected data (block {i}) from reader with overlapping blocks and max_read"
415 audio_source.position = (i + 1) * hop_size
416
417 audio_source.close()
418
419 def test_length_read_overlapping_blocks_with_max_read(self):
420 block_size = 313
421 hop_size = 207
422 block_dur = block_size / self.audio_source.sampling_rate
423 hop_dur = hop_size / self.audio_source.sampling_rate
424
425 reader = AudioReader(
426 input=self.audio_source,
427 max_read=1.932,
428 block_dur=block_dur,
429 hop_dur=hop_dur,
430 )
431
432 total_samples = round(reader.sampling_rate * 1.932)
433 first_read_size = block_size
434 next_read_size = block_size - hop_size
435 nb_next_blocks, last_block_size = divmod(
436 (total_samples - first_read_size), next_read_size
437 )
438 total_samples_with_overlap = (
439 first_read_size + next_read_size * nb_next_blocks + last_block_size
440 )
441 expected_read_bytes = (
442 total_samples_with_overlap * reader.sw * reader.channels
443 )
444
445 cache_size = (
446 (block_size - hop_size) * reader.sample_width * reader.channels
447 )
448 total_read = cache_size
449
450 i = 0
451 while True:
452 block = reader.read()
453 if block is None:
454 break
455 i += 1
456 total_read += len(block) - cache_size
457
458 err_msg = (
459 "Wrong data length read from LimiterADS, expected: {0}, found: {1}"
460 )
461 assert total_read == expected_read_bytes, err_msg.format(
462 expected_read_bytes, total_read
463 )
464
465 def test_reader_with_overlapping_blocks__rewindable(self):
466 reader = AudioReader(
467 input=self.audio_source,
468 block_dur=320,
469 hop_dur=160,
470 record=True,
471 )
472 assert (
473 reader.rewindable
474 ), "AudioReader with record=True should be rewindable"
475
476 def test_overlapping_blocks_with_max_read_rewind_and_read(self):
477 # Use arbitrary valid block_size and hop_size
478 block_size = 1600
479 hop_size = 400
480 block_dur = block_size / self.audio_source.sampling_rate
481 hop_dur = hop_size / self.audio_source.sampling_rate
482
483 reader = AudioReader(
484 input=self.audio_source,
485 block_dur=block_dur,
486 hop_dur=hop_dur,
487 record=True,
488 )
489
490 # Read all available data overlapping blocks
491 i = 0
492 while True:
493 block = reader.read()
494 if block is None:
495 break
496 i += 1
497
498 reader.rewind()
499
500 # Read all data from file and build a BufferAudioSource
501 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
502 wave_data = fp.readframes(fp.getnframes())
503 fp.close()
504 audio_source = BufferAudioSource(
505 wave_data,
506 reader.sampling_rate,
507 reader.sample_width,
508 reader.channels,
509 )
510 audio_source.open()
511
512 # Compare blocks read from AudioReader to those read from an BufferAudioSource with manual position setting
513 for j in range(i):
514 tmp = audio_source.read(block_size)
515 assert (
516 reader.read() == tmp
517 ), f"Unexpected data (block {i}) from reader with overlapping blocks and record = True"
518 audio_source.position = (j + 1) * hop_size
519
520 audio_source.close()
521
522 def test_overlapping_blocks_with_record_and_max_read_rewind_and_read(self):
523 # Use arbitrary valid block_size and hop_size
524 block_size = 1600
525 hop_size = 400
526 block_dur = block_size / self.audio_source.sampling_rate
527 hop_dur = hop_size / self.audio_source.sampling_rate
528
529 reader = AudioReader(
530 input=self.audio_source,
531 max_time=1.50,
532 block_dur=block_dur,
533 hop_dur=hop_dur,
534 record=True,
535 )
536
537 # Read all available data overlapping blocks
538 i = 0
539 while True:
540 block = reader.read()
541 if block is None:
542 break
543 i += 1
544
545 reader.rewind()
546
547 # Read all data from file and build a BufferAudioSource
548 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
549 wave_data = fp.readframes(fp.getnframes())
550 fp.close()
551 audio_source = BufferAudioSource(
552 wave_data,
553 reader.sampling_rate,
554 reader.sample_width,
555 reader.channels,
556 )
557 audio_source.open()
558
559 # Compare all blocks read from AudioReader to those read from BufferAudioSource with a manual position setting
560 for j in range(i):
561 tmp = audio_source.read(block_size)
562 assert (
563 reader.read() == tmp
564 ), "Unexpected block (N={0}) read from OverlapADS".format(i)
565 audio_source.position = (j + 1) * hop_size
566
567 audio_source.close()
568
569 def test_length_read_overlapping_blocks_with_record_and_max_read(self):
570 # Use arbitrary valid block_size and hop_size
571 block_size = 1000
572 hop_size = 200
573 block_dur = block_size / self.audio_source.sampling_rate
574 hop_dur = hop_size / self.audio_source.sampling_rate
575
576 reader = AudioReader(
577 input=self.audio_source,
578 block_dur=block_dur,
579 hop_dur=hop_dur,
580 record=True,
581 max_read=1.317,
582 )
583 total_samples = round(reader.sampling_rate * 1.317)
584 first_read_size = block_size
585 next_read_size = block_size - hop_size
586 nb_next_blocks, last_block_size = divmod(
587 (total_samples - first_read_size), next_read_size
588 )
589 total_samples_with_overlap = (
590 first_read_size + next_read_size * nb_next_blocks + last_block_size
591 )
592 expected_read_bytes = (
593 total_samples_with_overlap * reader.sample_width * reader.channels
594 )
595
596 cache_size = (
597 (block_size - hop_size) * reader.sample_width * reader.channels
598 )
599 total_read = cache_size
600
601 i = 0
602 while True:
603 block = reader.read()
604 if block is None:
605 break
606 i += 1
607 total_read += len(block) - cache_size
608
609 err_msg = f"Wrong data length read from AudioReader, expected: {expected_read_bytes}, found: {total_read}"
610 assert total_read == expected_read_bytes, err_msg
611
612
613 def test_AudioReader_raw_data():
614
615 data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
616 block_size = 5
617 hop_size = 4
618 reader = AudioReader(
619 input=data,
620 sampling_rate=16,
621 sample_width=2,
622 channels=1,
623 block_dur=block_size / 16,
624 hop_dur=hop_size / 16,
625 max_read=0.80,
626 record=True,
627 )
628 reader.open()
629
630 assert (
631 reader.sampling_rate == 16
632 ), f"Wrong sampling rate, expected: 16, found: {reader.sampling_rate }"
633
634 assert (
635 reader.sample_width == 2
636 ), f"Wrong sample width, expected: 2, found: {reader.sample_width}"
637
638 # Read all available data overlapping blocks
639 i = 0
640 while True:
641 block = reader.read()
642 if block is None:
643 break
644 i += 1
645
646 reader.rewind()
647
648 # Build a BufferAudioSource
649 audio_source = BufferAudioSource(
650 data, reader.sampling_rate, reader.sample_width, reader.channels
651 )
652 audio_source.open()
653
654 # Compare all blocks read from AudioReader to those read from an audio
655 # source with a manual position setting
656 for j in range(i):
657 tmp = audio_source.read(block_size)
658 block = reader.read()
659 assert (
660 block == tmp
661 ), f"Unexpected block '{block}' (N={i}) read from OverlapADS"
662 audio_source.position = (j + 1) * hop_size
663 audio_source.close()
664 reader.close()
665
666
667 def test_AudioReader_alias_params():
668 reader = AudioReader(
669 input=b"0" * 1600,
670 sr=16000,
671 sw=2,
672 channels=1,
673 )
674 assert reader.sampling_rate == 16000, (
675 "Unexpected sampling rate: reader.sampling_rate = "
676 + f"{reader.sampling_rate} instead of 16000"
677 )
678 assert reader.sr == 16000, (
679 "Unexpected sampling rate: reader.sr = "
680 + f"{reader.sr} instead of 16000"
681 )
682 assert reader.sample_width == 2, (
683 "Unexpected sample width: reader.sample_width = "
684 + f"{reader.sample_width} instead of 2"
685 )
686 assert reader.sw == 2, (
687 "Unexpected sample width: reader.sw = " + f"{reader.sw} instead of 2"
688 )
689 assert reader.channels == 1, (
690 "Unexpected number of channels: reader.channels = "
691 + f"{reader.channels} instead of 1"
692 )
693 assert reader.ch == 1, (
694 "Unexpected number of channels: reader.ch = "
695 + f"{reader.ch} instead of 1"
696 )
899 697
900 698
901 @pytest.mark.parametrize( 699 @pytest.mark.parametrize(
902 "file_id, max_read, size", 700 "file_id, max_read, size",
903 [ 701 [