amine@330
|
1 import sys
|
amine@330
|
2 import wave
|
amine@403
|
3 from functools import partial
|
amine@403
|
4
|
amine@403
|
5 import pytest
|
amine@403
|
6
|
amine@330
|
7 from auditok import (
|
amine@403
|
8 AudioReader,
|
amine@403
|
9 BufferAudioSource,
|
amine@403
|
10 Recorder,
|
amine@403
|
11 WaveAudioSource,
|
amine@330
|
12 dataset,
|
amine@330
|
13 )
|
amine@403
|
14 from auditok.util import _Limiter, _OverlapAudioReader
|
amine@330
|
15
|
amine@330
|
16
|
amine@403
|
17 def _read_all_data(reader):
|
amine@403
|
18 blocks = []
|
amine@403
|
19 while True:
|
amine@403
|
20 data = reader.read()
|
amine@403
|
21 if data is None:
|
amine@403
|
22 break
|
amine@403
|
23 blocks.append(data)
|
amine@403
|
24 return b"".join(blocks)
|
amine@403
|
25
|
amine@403
|
26
|
amine@403
|
27 class TestAudioReaderWithFileAudioSource:
|
amine@403
|
28 @pytest.fixture(autouse=True)
|
amine@403
|
29 def setup_and_teardown(self):
|
amine@330
|
30 self.audio_source = WaveAudioSource(
|
amine@330
|
31 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@330
|
32 )
|
amine@403
|
33 self.audio_source.open()
|
amine@403
|
34 yield
|
amine@403
|
35 self.audio_source.close()
|
amine@330
|
36
|
amine@403
|
37 def test_AudioReader_type(self):
|
amine@403
|
38 reader = AudioReader(input=self.audio_source)
|
amine@410
|
39 err_msg = "wrong object type, expected: 'AudioReader', found: {0}"
|
amine@403
|
40 assert isinstance(reader, AudioReader), err_msg.format(type(reader))
|
amine@330
|
41
|
amine@403
|
42 def _test_default_block_size(self):
|
amine@403
|
43 reader = AudioReader(input=self.audio_source)
|
amine@403
|
44 data = reader.read()
|
amine@403
|
45 size = len(data)
|
amine@400
|
46 assert (
|
amine@400
|
47 size == 160
|
amine@400
|
48 ), "Wrong default block_size, expected: 160, found: {0}".format(size)
|
amine@330
|
49
|
amine@403
|
50 @pytest.mark.parametrize(
|
amine@403
|
51 "block_dur, expected_nb_samples",
|
amine@403
|
52 [
|
amine@403
|
53 (None, 160), # default: 10 ms
|
amine@403
|
54 (0.025, 400), # 25 ms
|
amine@403
|
55 ],
|
amine@403
|
56 ids=["default", "_25ms"],
|
amine@403
|
57 )
|
amine@403
|
58 def test_block_duration(self, block_dur, expected_nb_samples):
|
amine@403
|
59 """Test the number of samples read for a given block duration."""
|
amine@403
|
60 if block_dur is not None:
|
amine@403
|
61 reader = AudioReader(input=self.audio_source, block_dur=block_dur)
|
amine@403
|
62 else:
|
amine@403
|
63 reader = AudioReader(input=self.audio_source)
|
amine@403
|
64 data = reader.read()
|
amine@403
|
65 nb_samples = len(data) // reader.sample_width
|
amine@400
|
66 assert (
|
amine@403
|
67 nb_samples == expected_nb_samples
|
amine@403
|
68 ), f"Wrong block_size, expected: {expected_nb_samples}, found: {nb_samples}"
|
amine@330
|
69
|
amine@403
|
70 @pytest.mark.parametrize(
|
amine@403
|
71 "block_dur, hop_dur, expected_nb_blocks, expected_last_block_nb_samples",
|
amine@403
|
72 [
|
amine@403
|
73 (None, None, 1879, 126), # default: 10 ms
|
amine@403
|
74 (0.01, None, 1879, 126), # block_dur_10ms_hop_dur_None
|
amine@403
|
75 (0.01, 0.01, 1879, 126), # block_dur_10ms_hop_dur_10ms
|
amine@403
|
76 (0.02, None, 940, 126), # block_dur_20ms_hop_dur_None
|
amine@403
|
77 (0.025, None, 752, 206), # block_dur_25ms_hop_dur_None
|
amine@403
|
78 (0.02, 0.01, 1878, 286), # block_dur_20ms_hop_dur_10ms
|
amine@410
|
79 (0.025, 0.005, 3754, 366), # block_dur_25ms_hop_dur_5ms
|
amine@403
|
80 ],
|
amine@403
|
81 ids=[
|
amine@403
|
82 "default",
|
amine@403
|
83 "block_dur_10ms_hop_dur_None",
|
amine@410
|
84 "block_dur_10ms_hop_dur_10ms",
|
amine@403
|
85 "block_dur_20ms_hop_dur_None",
|
amine@403
|
86 "block_dur_25ms_hop_dur_None",
|
amine@403
|
87 "block_dur_20ms_hop_dur_10ms",
|
amine@410
|
88 "block_dur_25ms_hop_dur_5ms",
|
amine@403
|
89 ],
|
amine@403
|
90 )
|
amine@403
|
91 def test_hop_duration(
|
amine@403
|
92 self,
|
amine@403
|
93 block_dur,
|
amine@403
|
94 hop_dur,
|
amine@403
|
95 expected_nb_blocks,
|
amine@403
|
96 expected_last_block_nb_samples,
|
amine@403
|
97 ):
|
amine@403
|
98 """Test the number of read blocks and the duration of last block for
|
amine@403
|
99 different 'block_dur' and 'hop_dur' values.
|
amine@330
|
100
|
amine@403
|
101 Args:
|
amine@403
|
102 block_dur (float or None): block duration in seconds.
|
amine@403
|
103 hop_dur (float or None): hop duration in seconds.
|
amine@403
|
104 expected_nb_blocks (int): expected number of read block.
|
amine@403
|
105 expected_last_block_nb_samples (int): expected number of sample
|
amine@403
|
106 in the last block.
|
amine@403
|
107 """
|
amine@403
|
108 if block_dur is not None:
|
amine@403
|
109 reader = AudioReader(
|
amine@403
|
110 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur
|
amine@403
|
111 )
|
amine@403
|
112 else:
|
amine@403
|
113 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur)
|
amine@330
|
114
|
amine@403
|
115 nb_blocks = 0
|
amine@403
|
116 last_block_nb_samples = None
|
amine@403
|
117 while True:
|
amine@403
|
118 data = reader.read()
|
amine@403
|
119 if data is not None:
|
amine@403
|
120 nb_blocks += 1
|
amine@403
|
121 last_block_nb_samples = len(data) // reader.sample_width
|
amine@403
|
122 else:
|
amine@403
|
123 break
|
amine@403
|
124 err_msg = "Wrong number of blocks read from source, expected: "
|
amine@403
|
125 err_msg += f"{expected_nb_blocks}, found: {nb_blocks}"
|
amine@403
|
126 assert nb_blocks == expected_nb_blocks, err_msg
|
amine@330
|
127
|
amine@403
|
128 err_msg = (
|
amine@403
|
129 "Wrong number of samples in last block read from source, expected: "
|
amine@403
|
130 )
|
amine@403
|
131 err_msg += (
|
amine@403
|
132 f"{expected_last_block_nb_samples}, found: {last_block_nb_samples}"
|
amine@330
|
133 )
|
amine@330
|
134
|
amine@403
|
135 assert last_block_nb_samples == expected_last_block_nb_samples, err_msg
|
amine@403
|
136
|
amine@403
|
137 def test_hop_duration_exception(self):
|
amine@403
|
138 """Test passing hop_dur > block_dur raises ValueError"""
|
amine@403
|
139 with pytest.raises(ValueError):
|
amine@403
|
140 AudioReader(self.audio_source, block_dur=0.01, hop_dur=0.015)
|
amine@403
|
141
|
amine@403
|
142 @pytest.mark.parametrize(
|
amine@403
|
143 "block_dur, hop_dur",
|
amine@403
|
144 [
|
amine@403
|
145 (None, None), # default
|
amine@403
|
146 (0.01, None), # block_dur_10ms_hop_dur_None
|
amine@403
|
147 (None, 0.01), # block_dur_None__hop_dur_10ms
|
amine@403
|
148 (0.05, 0.05), # block_dur_50ms_hop_dur_50ms
|
amine@403
|
149 ],
|
amine@403
|
150 ids=[
|
amine@403
|
151 "default",
|
amine@403
|
152 "block_dur_10ms_hop_dur_None",
|
amine@403
|
153 "block_dur_None__hop_dur_10ms",
|
amine@403
|
154 "block_dur_50ms_hop_dur_50ms",
|
amine@403
|
155 ],
|
amine@403
|
156 )
|
amine@403
|
157 def test_reader_class_block_dur_equals_hop_dur(self, block_dur, hop_dur):
|
amine@403
|
158 """Test passing hop_dur == block_dur does not create an instance of
|
amine@403
|
159 '_OverlapAudioReader'.
|
amine@403
|
160 """
|
amine@403
|
161 if block_dur is not None:
|
amine@403
|
162 reader = AudioReader(
|
amine@403
|
163 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur
|
amine@403
|
164 )
|
amine@403
|
165 else:
|
amine@403
|
166 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur)
|
amine@403
|
167 assert not isinstance(reader, _OverlapAudioReader)
|
amine@330
|
168
|
amine@330
|
169 def test_sampling_rate(self):
|
amine@403
|
170 reader = AudioReader(input=self.audio_source)
|
amine@403
|
171 sampling_rate = reader.sampling_rate
|
amine@400
|
172 assert (
|
amine@403
|
173 sampling_rate == 16000
|
amine@403
|
174 ), f"Wrong sampling rate, expected: 16000, found: {sampling_rate}"
|
amine@330
|
175
|
amine@330
|
176 def test_sample_width(self):
|
amine@403
|
177 reader = AudioReader(input=self.audio_source)
|
amine@403
|
178 sample_width = reader.sample_width
|
amine@400
|
179 assert (
|
amine@403
|
180 sample_width == 2
|
amine@403
|
181 ), f"Wrong sample width, expected: 2, found: {sample_width}"
|
amine@330
|
182
|
amine@330
|
183 def test_channels(self):
|
amine@403
|
184 reader = AudioReader(input=self.audio_source)
|
amine@403
|
185 channels = reader.channels
|
amine@400
|
186 assert (
|
amine@400
|
187 channels == 1
|
amine@403
|
188 ), f"Wrong number of channels, expected: 1, found: {channels}"
|
amine@330
|
189
|
amine@330
|
190 def test_read(self):
|
amine@403
|
191 reader = AudioReader(input=self.audio_source, block_dur=0.02)
|
amine@403
|
192 reader_data = reader.read()
|
amine@330
|
193 audio_source = WaveAudioSource(
|
amine@330
|
194 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@330
|
195 )
|
amine@330
|
196 audio_source.open()
|
amine@403
|
197 audio_source_data = audio_source.read(320)
|
amine@330
|
198 audio_source.close()
|
amine@403
|
199 assert (
|
amine@403
|
200 reader_data == audio_source_data
|
amine@403
|
201 ), "Unexpected data read from AudioReader"
|
amine@330
|
202
|
amine@403
|
203 def test_read_with_overlap(self):
|
amine@403
|
204 reader = AudioReader(
|
amine@403
|
205 input=self.audio_source, block_dur=0.02, hop_dur=0.01
|
amine@403
|
206 )
|
amine@403
|
207 _ = reader.read() # first block
|
amine@403
|
208 reader_data = reader.read() # second block with 0.01 S overlap
|
amine@403
|
209 audio_source = WaveAudioSource(
|
amine@403
|
210 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@403
|
211 )
|
amine@403
|
212 audio_source.open()
|
amine@403
|
213 _ = audio_source.read(160)
|
amine@403
|
214 audio_source_data = audio_source.read(320)
|
amine@403
|
215 audio_source.close()
|
amine@403
|
216 assert (
|
amine@403
|
217 reader_data == audio_source_data
|
amine@403
|
218 ), "Unexpected data read from AudioReader"
|
amine@330
|
219
|
amine@403
|
220 def test_read_from_AudioReader_with_max_read(self):
|
amine@330
|
221 # read a maximum of 0.75 seconds from audio source
|
amine@403
|
222 reader = AudioReader(input=self.audio_source, max_read=0.75)
|
amine@403
|
223 assert isinstance(reader._audio_source._audio_source, _Limiter)
|
amine@403
|
224 reader_data = _read_all_data(reader)
|
amine@330
|
225
|
amine@330
|
226 audio_source = WaveAudioSource(
|
amine@330
|
227 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@330
|
228 )
|
amine@330
|
229 audio_source.open()
|
amine@330
|
230 audio_source_data = audio_source.read(int(16000 * 0.75))
|
amine@330
|
231 audio_source.close()
|
amine@330
|
232
|
amine@400
|
233 assert (
|
amine@403
|
234 reader_data == audio_source_data
|
amine@403
|
235 ), f"Unexpected data read from AudioReader with 'max_read = {0.75}'"
|
amine@330
|
236
|
amine@403
|
237 def test_read_data_size_from_AudioReader_with_max_read(self):
|
amine@330
|
238 # read a maximum of 1.191 seconds from audio source
|
amine@403
|
239 reader = AudioReader(input=self.audio_source, max_read=1.191)
|
amine@403
|
240 assert isinstance(reader._audio_source._audio_source, _Limiter)
|
amine@403
|
241 total_samples = round(reader.sampling_rate * 1.191)
|
amine@403
|
242 block_size = int(reader.block_dur * reader.sampling_rate)
|
amine@403
|
243 nb_full_blocks, last_block_size = divmod(total_samples, block_size)
|
amine@330
|
244 total_samples_with_overlap = (
|
amine@403
|
245 nb_full_blocks * block_size + last_block_size
|
amine@330
|
246 )
|
amine@403
|
247 expected_read_bytes = (
|
amine@403
|
248 total_samples_with_overlap * reader.sample_width * reader.channels
|
amine@403
|
249 )
|
amine@330
|
250
|
amine@403
|
251 reader_data = _read_all_data(reader)
|
amine@403
|
252 total_read = len(reader_data)
|
amine@403
|
253 err_msg = f"Wrong data length read from LimiterADS, expected: {expected_read_bytes}, found: {total_read}"
|
amine@403
|
254 assert total_read == expected_read_bytes, err_msg
|
amine@403
|
255
|
amine@403
|
256 def test_read_from_Recorder(self):
|
amine@403
|
257 reader = Recorder(input=self.audio_source, block_dur=0.025)
|
amine@403
|
258 reader_data = []
|
amine@403
|
259 for _ in range(10):
|
amine@403
|
260 block = reader.read()
|
amine@330
|
261 if block is None:
|
amine@330
|
262 break
|
amine@403
|
263 reader_data.append(block)
|
amine@403
|
264 reader_data = b"".join(reader_data)
|
amine@330
|
265
|
amine@330
|
266 audio_source = WaveAudioSource(
|
amine@330
|
267 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@330
|
268 )
|
amine@330
|
269 audio_source.open()
|
amine@403
|
270 audio_source_data = audio_source.read(400 * 10)
|
amine@330
|
271 audio_source.close()
|
amine@330
|
272
|
amine@400
|
273 assert (
|
amine@403
|
274 reader_data == audio_source_data
|
amine@403
|
275 ), "Unexpected data read from Recorder"
|
amine@330
|
276
|
amine@403
|
277 def test_AudioReader_rewindable(self):
|
amine@403
|
278 reader = AudioReader(input=self.audio_source, record=True)
|
amine@403
|
279 assert (
|
amine@403
|
280 reader.rewindable
|
amine@403
|
281 ), "AudioReader with record=True should be rewindable"
|
amine@330
|
282
|
amine@403
|
283 def test_AudioReader_record_and_rewind(self):
|
amine@403
|
284 reader = AudioReader(
|
amine@403
|
285 input=self.audio_source, record=True, block_dur=0.02
|
amine@330
|
286 )
|
amine@403
|
287 # read 0.02 * 10 = 0.2 sec. of data
|
amine@330
|
288 for i in range(10):
|
amine@403
|
289 reader.read()
|
amine@403
|
290 reader.rewind()
|
amine@330
|
291
|
amine@330
|
292 # read all available data after rewind
|
amine@403
|
293 reader_data = _read_all_data(reader)
|
amine@330
|
294
|
amine@330
|
295 audio_source = WaveAudioSource(
|
amine@330
|
296 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@330
|
297 )
|
amine@330
|
298 audio_source.open()
|
amine@403
|
299 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data
|
amine@330
|
300 audio_source.close()
|
amine@330
|
301
|
amine@400
|
302 assert (
|
amine@403
|
303 reader_data == audio_source_data
|
amine@403
|
304 ), "Unexpected data read from AudioReader with record = True"
|
amine@330
|
305
|
amine@403
|
306 def test_Recorder_record_and_rewind(self):
|
amine@403
|
307 recorder = Recorder(input=self.audio_source, block_dur=0.02)
|
amine@403
|
308 # read 0.02 * 10 = 0.2 sec. of data
|
amine@403
|
309 for i in range(10):
|
amine@403
|
310 recorder.read()
|
amine@403
|
311
|
amine@403
|
312 recorder.rewind()
|
amine@403
|
313
|
amine@403
|
314 # read all available data after rewind
|
amine@403
|
315 recorder_data = []
|
amine@403
|
316 recorder_data = _read_all_data(recorder)
|
amine@403
|
317
|
amine@403
|
318 audio_source = WaveAudioSource(
|
amine@403
|
319 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@403
|
320 )
|
amine@403
|
321 audio_source.open()
|
amine@403
|
322 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data
|
amine@403
|
323 audio_source.close()
|
amine@403
|
324
|
amine@403
|
325 assert (
|
amine@403
|
326 recorder_data == audio_source_data
|
amine@403
|
327 ), "Unexpected data read from Recorder"
|
amine@403
|
328
|
amine@403
|
329 def test_read_overlapping_blocks(self):
|
amine@330
|
330 # Use arbitrary valid block_size and hop_size
|
amine@330
|
331 block_size = 1714
|
amine@330
|
332 hop_size = 313
|
amine@403
|
333 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
334 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@330
|
335
|
amine@403
|
336 reader = AudioReader(
|
amine@403
|
337 input=self.audio_source,
|
amine@403
|
338 block_dur=block_dur,
|
amine@403
|
339 hop_dur=hop_dur,
|
amine@330
|
340 )
|
amine@330
|
341
|
amine@403
|
342 # Read all available overlapping blocks of data
|
amine@403
|
343 reader_data = []
|
amine@330
|
344 while True:
|
amine@403
|
345 block = reader.read()
|
amine@330
|
346 if block is None:
|
amine@330
|
347 break
|
amine@403
|
348 reader_data.append(block)
|
amine@330
|
349
|
amine@330
|
350 # Read all data from file and build a BufferAudioSource
|
amine@330
|
351 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@330
|
352 wave_data = fp.readframes(fp.getnframes())
|
amine@330
|
353 fp.close()
|
amine@330
|
354 audio_source = BufferAudioSource(
|
amine@403
|
355 wave_data,
|
amine@403
|
356 reader.sampling_rate,
|
amine@403
|
357 reader.sample_width,
|
amine@403
|
358 reader.channels,
|
amine@330
|
359 )
|
amine@330
|
360 audio_source.open()
|
amine@330
|
361
|
amine@403
|
362 # Compare all blocks read from OverlapADS to those read from an
|
amine@403
|
363 # audio source with a manual position setting
|
amine@403
|
364 for i, block in enumerate(reader_data):
|
amine@330
|
365 tmp = audio_source.read(block_size)
|
amine@400
|
366 assert (
|
amine@400
|
367 block == tmp
|
amine@403
|
368 ), f"Unexpected data (block {i}) from reader with overlapping blocks"
|
amine@330
|
369 audio_source.position = (i + 1) * hop_size
|
amine@330
|
370
|
amine@330
|
371 audio_source.close()
|
amine@330
|
372
|
amine@403
|
373 def test_read_overlapping_blocks_with_max_read(self):
|
amine@330
|
374 block_size = 256
|
amine@330
|
375 hop_size = 200
|
amine@403
|
376 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
377 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@330
|
378
|
amine@403
|
379 reader = AudioReader(
|
amine@403
|
380 input=self.audio_source,
|
amine@403
|
381 block_dur=block_dur,
|
amine@403
|
382 hop_dur=hop_dur,
|
amine@403
|
383 max_read=0.5,
|
amine@330
|
384 )
|
amine@330
|
385
|
amine@403
|
386 # Read all available overlapping blocks of data
|
amine@403
|
387 reader_data = []
|
amine@330
|
388 while True:
|
amine@403
|
389 block = reader.read()
|
amine@330
|
390 if block is None:
|
amine@330
|
391 break
|
amine@403
|
392 reader_data.append(block)
|
amine@330
|
393
|
amine@330
|
394 # Read all data from file and build a BufferAudioSource
|
amine@330
|
395 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@330
|
396 wave_data = fp.readframes(fp.getnframes())
|
amine@330
|
397 fp.close()
|
amine@330
|
398 audio_source = BufferAudioSource(
|
amine@403
|
399 wave_data,
|
amine@403
|
400 reader.sampling_rate,
|
amine@403
|
401 reader.sample_width,
|
amine@403
|
402 reader.channels,
|
amine@330
|
403 )
|
amine@330
|
404 audio_source.open()
|
amine@330
|
405
|
amine@403
|
406 # Compare all blocks read from OverlapADS to those read from an
|
amine@403
|
407 # audio source with a manual position setting
|
amine@403
|
408 for i, block in enumerate(reader_data):
|
amine@403
|
409 tmp = audio_source.read(len(block) // (reader.sw * reader.ch))
|
amine@403
|
410 assert (
|
amine@403
|
411 block == tmp
|
amine@403
|
412 ), f"Unexpected data (block {i}) from reader with overlapping blocks and max_read"
|
amine@330
|
413 audio_source.position = (i + 1) * hop_size
|
amine@330
|
414
|
amine@330
|
415 audio_source.close()
|
amine@330
|
416
|
amine@403
|
417 def test_length_read_overlapping_blocks_with_max_read(self):
|
amine@330
|
418 block_size = 313
|
amine@330
|
419 hop_size = 207
|
amine@403
|
420 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
421 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@403
|
422
|
amine@403
|
423 reader = AudioReader(
|
amine@403
|
424 input=self.audio_source,
|
amine@403
|
425 max_read=1.932,
|
amine@403
|
426 block_dur=block_dur,
|
amine@403
|
427 hop_dur=hop_dur,
|
amine@330
|
428 )
|
amine@330
|
429
|
amine@403
|
430 total_samples = round(reader.sampling_rate * 1.932)
|
amine@330
|
431 first_read_size = block_size
|
amine@330
|
432 next_read_size = block_size - hop_size
|
amine@330
|
433 nb_next_blocks, last_block_size = divmod(
|
amine@330
|
434 (total_samples - first_read_size), next_read_size
|
amine@330
|
435 )
|
amine@330
|
436 total_samples_with_overlap = (
|
amine@330
|
437 first_read_size + next_read_size * nb_next_blocks + last_block_size
|
amine@330
|
438 )
|
amine@403
|
439 expected_read_bytes = (
|
amine@403
|
440 total_samples_with_overlap * reader.sw * reader.channels
|
amine@403
|
441 )
|
amine@330
|
442
|
amine@403
|
443 cache_size = (
|
amine@403
|
444 (block_size - hop_size) * reader.sample_width * reader.channels
|
amine@403
|
445 )
|
amine@330
|
446 total_read = cache_size
|
amine@330
|
447
|
amine@330
|
448 i = 0
|
amine@330
|
449 while True:
|
amine@403
|
450 block = reader.read()
|
amine@330
|
451 if block is None:
|
amine@330
|
452 break
|
amine@330
|
453 i += 1
|
amine@330
|
454 total_read += len(block) - cache_size
|
amine@330
|
455
|
amine@400
|
456 err_msg = (
|
amine@400
|
457 "Wrong data length read from LimiterADS, expected: {0}, found: {1}"
|
amine@400
|
458 )
|
amine@400
|
459 assert total_read == expected_read_bytes, err_msg.format(
|
amine@400
|
460 expected_read_bytes, total_read
|
amine@330
|
461 )
|
amine@330
|
462
|
amine@403
|
463 def test_reader_with_overlapping_blocks__rewindable(self):
|
amine@403
|
464 reader = AudioReader(
|
amine@403
|
465 input=self.audio_source,
|
amine@403
|
466 block_dur=320,
|
amine@403
|
467 hop_dur=160,
|
amine@330
|
468 record=True,
|
amine@330
|
469 )
|
amine@403
|
470 assert (
|
amine@403
|
471 reader.rewindable
|
amine@403
|
472 ), "AudioReader with record=True should be rewindable"
|
amine@330
|
473
|
amine@403
|
474 def test_overlapping_blocks_with_max_read_rewind_and_read(self):
|
amine@330
|
475 # Use arbitrary valid block_size and hop_size
|
amine@330
|
476 block_size = 1600
|
amine@330
|
477 hop_size = 400
|
amine@403
|
478 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
479 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@330
|
480
|
amine@403
|
481 reader = AudioReader(
|
amine@403
|
482 input=self.audio_source,
|
amine@403
|
483 block_dur=block_dur,
|
amine@403
|
484 hop_dur=hop_dur,
|
amine@330
|
485 record=True,
|
amine@330
|
486 )
|
amine@330
|
487
|
amine@330
|
488 # Read all available data overlapping blocks
|
amine@330
|
489 i = 0
|
amine@330
|
490 while True:
|
amine@403
|
491 block = reader.read()
|
amine@330
|
492 if block is None:
|
amine@330
|
493 break
|
amine@330
|
494 i += 1
|
amine@330
|
495
|
amine@403
|
496 reader.rewind()
|
amine@330
|
497
|
amine@330
|
498 # Read all data from file and build a BufferAudioSource
|
amine@330
|
499 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@330
|
500 wave_data = fp.readframes(fp.getnframes())
|
amine@330
|
501 fp.close()
|
amine@330
|
502 audio_source = BufferAudioSource(
|
amine@403
|
503 wave_data,
|
amine@403
|
504 reader.sampling_rate,
|
amine@403
|
505 reader.sample_width,
|
amine@403
|
506 reader.channels,
|
amine@330
|
507 )
|
amine@330
|
508 audio_source.open()
|
amine@330
|
509
|
amine@403
|
510 # Compare blocks read from AudioReader to those read from an BufferAudioSource with manual position setting
|
amine@330
|
511 for j in range(i):
|
amine@330
|
512 tmp = audio_source.read(block_size)
|
amine@400
|
513 assert (
|
amine@403
|
514 reader.read() == tmp
|
amine@403
|
515 ), f"Unexpected data (block {i}) from reader with overlapping blocks and record = True"
|
amine@330
|
516 audio_source.position = (j + 1) * hop_size
|
amine@330
|
517
|
amine@330
|
518 audio_source.close()
|
amine@330
|
519
|
amine@403
|
520 def test_overlapping_blocks_with_record_and_max_read_rewind_and_read(self):
|
amine@330
|
521 # Use arbitrary valid block_size and hop_size
|
amine@330
|
522 block_size = 1600
|
amine@330
|
523 hop_size = 400
|
amine@403
|
524 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
525 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@330
|
526
|
amine@403
|
527 reader = AudioReader(
|
amine@403
|
528 input=self.audio_source,
|
amine@330
|
529 max_time=1.50,
|
amine@403
|
530 block_dur=block_dur,
|
amine@403
|
531 hop_dur=hop_dur,
|
amine@330
|
532 record=True,
|
amine@330
|
533 )
|
amine@330
|
534
|
amine@330
|
535 # Read all available data overlapping blocks
|
amine@330
|
536 i = 0
|
amine@330
|
537 while True:
|
amine@403
|
538 block = reader.read()
|
amine@330
|
539 if block is None:
|
amine@330
|
540 break
|
amine@330
|
541 i += 1
|
amine@330
|
542
|
amine@403
|
543 reader.rewind()
|
amine@330
|
544
|
amine@330
|
545 # Read all data from file and build a BufferAudioSource
|
amine@330
|
546 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@330
|
547 wave_data = fp.readframes(fp.getnframes())
|
amine@330
|
548 fp.close()
|
amine@330
|
549 audio_source = BufferAudioSource(
|
amine@403
|
550 wave_data,
|
amine@403
|
551 reader.sampling_rate,
|
amine@403
|
552 reader.sample_width,
|
amine@403
|
553 reader.channels,
|
amine@330
|
554 )
|
amine@330
|
555 audio_source.open()
|
amine@330
|
556
|
amine@403
|
557 # Compare all blocks read from AudioReader to those read from BufferAudioSource with a manual position setting
|
amine@330
|
558 for j in range(i):
|
amine@330
|
559 tmp = audio_source.read(block_size)
|
amine@400
|
560 assert (
|
amine@403
|
561 reader.read() == tmp
|
amine@400
|
562 ), "Unexpected block (N={0}) read from OverlapADS".format(i)
|
amine@330
|
563 audio_source.position = (j + 1) * hop_size
|
amine@330
|
564
|
amine@330
|
565 audio_source.close()
|
amine@330
|
566
|
amine@403
|
567 def test_length_read_overlapping_blocks_with_record_and_max_read(self):
|
amine@330
|
568 # Use arbitrary valid block_size and hop_size
|
amine@330
|
569 block_size = 1000
|
amine@330
|
570 hop_size = 200
|
amine@403
|
571 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
572 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@330
|
573
|
amine@403
|
574 reader = AudioReader(
|
amine@403
|
575 input=self.audio_source,
|
amine@403
|
576 block_dur=block_dur,
|
amine@403
|
577 hop_dur=hop_dur,
|
amine@330
|
578 record=True,
|
amine@403
|
579 max_read=1.317,
|
amine@330
|
580 )
|
amine@403
|
581 total_samples = round(reader.sampling_rate * 1.317)
|
amine@330
|
582 first_read_size = block_size
|
amine@330
|
583 next_read_size = block_size - hop_size
|
amine@330
|
584 nb_next_blocks, last_block_size = divmod(
|
amine@330
|
585 (total_samples - first_read_size), next_read_size
|
amine@330
|
586 )
|
amine@330
|
587 total_samples_with_overlap = (
|
amine@330
|
588 first_read_size + next_read_size * nb_next_blocks + last_block_size
|
amine@330
|
589 )
|
amine@403
|
590 expected_read_bytes = (
|
amine@403
|
591 total_samples_with_overlap * reader.sample_width * reader.channels
|
amine@403
|
592 )
|
amine@330
|
593
|
amine@403
|
594 cache_size = (
|
amine@403
|
595 (block_size - hop_size) * reader.sample_width * reader.channels
|
amine@403
|
596 )
|
amine@330
|
597 total_read = cache_size
|
amine@330
|
598
|
amine@330
|
599 i = 0
|
amine@330
|
600 while True:
|
amine@403
|
601 block = reader.read()
|
amine@330
|
602 if block is None:
|
amine@330
|
603 break
|
amine@330
|
604 i += 1
|
amine@330
|
605 total_read += len(block) - cache_size
|
amine@330
|
606
|
amine@403
|
607 err_msg = f"Wrong data length read from AudioReader, expected: {expected_read_bytes}, found: {total_read}"
|
amine@403
|
608 assert total_read == expected_read_bytes, err_msg
|
amine@330
|
609
|
amine@330
|
610
|
amine@403
|
611 def test_AudioReader_raw_data():
|
amine@330
|
612
|
amine@403
|
613 data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
|
amine@403
|
614 block_size = 5
|
amine@403
|
615 hop_size = 4
|
amine@403
|
616 reader = AudioReader(
|
amine@403
|
617 input=data,
|
amine@403
|
618 sampling_rate=16,
|
amine@403
|
619 sample_width=2,
|
amine@403
|
620 channels=1,
|
amine@403
|
621 block_dur=block_size / 16,
|
amine@403
|
622 hop_dur=hop_size / 16,
|
amine@403
|
623 max_read=0.80,
|
amine@403
|
624 record=True,
|
amine@403
|
625 )
|
amine@403
|
626 reader.open()
|
amine@403
|
627
|
amine@403
|
628 assert (
|
amine@403
|
629 reader.sampling_rate == 16
|
amine@403
|
630 ), f"Wrong sampling rate, expected: 16, found: {reader.sampling_rate }"
|
amine@403
|
631
|
amine@403
|
632 assert (
|
amine@403
|
633 reader.sample_width == 2
|
amine@403
|
634 ), f"Wrong sample width, expected: 2, found: {reader.sample_width}"
|
amine@403
|
635
|
amine@403
|
636 # Read all available data overlapping blocks
|
amine@403
|
637 i = 0
|
amine@403
|
638 while True:
|
amine@403
|
639 block = reader.read()
|
amine@403
|
640 if block is None:
|
amine@403
|
641 break
|
amine@403
|
642 i += 1
|
amine@403
|
643
|
amine@403
|
644 reader.rewind()
|
amine@403
|
645
|
amine@403
|
646 # Build a BufferAudioSource
|
amine@403
|
647 audio_source = BufferAudioSource(
|
amine@403
|
648 data, reader.sampling_rate, reader.sample_width, reader.channels
|
amine@403
|
649 )
|
amine@403
|
650 audio_source.open()
|
amine@403
|
651
|
amine@403
|
652 # Compare all blocks read from AudioReader to those read from an audio
|
amine@403
|
653 # source with a manual position setting
|
amine@403
|
654 for j in range(i):
|
amine@403
|
655 tmp = audio_source.read(block_size)
|
amine@403
|
656 block = reader.read()
|
amine@400
|
657 assert (
|
amine@403
|
658 block == tmp
|
amine@403
|
659 ), f"Unexpected block '{block}' (N={i}) read from OverlapADS"
|
amine@403
|
660 audio_source.position = (j + 1) * hop_size
|
amine@403
|
661 audio_source.close()
|
amine@403
|
662 reader.close()
|
amine@330
|
663
|
amine@330
|
664
|
amine@403
|
665 def test_AudioReader_alias_params():
|
amine@403
|
666 reader = AudioReader(
|
amine@403
|
667 input=b"0" * 1600,
|
amine@403
|
668 sr=16000,
|
amine@403
|
669 sw=2,
|
amine@403
|
670 channels=1,
|
amine@403
|
671 )
|
amine@403
|
672 assert reader.sampling_rate == 16000, (
|
amine@403
|
673 "Unexpected sampling rate: reader.sampling_rate = "
|
amine@403
|
674 + f"{reader.sampling_rate} instead of 16000"
|
amine@403
|
675 )
|
amine@403
|
676 assert reader.sr == 16000, (
|
amine@403
|
677 "Unexpected sampling rate: reader.sr = "
|
amine@403
|
678 + f"{reader.sr} instead of 16000"
|
amine@403
|
679 )
|
amine@403
|
680 assert reader.sample_width == 2, (
|
amine@403
|
681 "Unexpected sample width: reader.sample_width = "
|
amine@403
|
682 + f"{reader.sample_width} instead of 2"
|
amine@403
|
683 )
|
amine@403
|
684 assert reader.sw == 2, (
|
amine@403
|
685 "Unexpected sample width: reader.sw = " + f"{reader.sw} instead of 2"
|
amine@403
|
686 )
|
amine@403
|
687 assert reader.channels == 1, (
|
amine@403
|
688 "Unexpected number of channels: reader.channels = "
|
amine@403
|
689 + f"{reader.channels} instead of 1"
|
amine@403
|
690 )
|
amine@403
|
691 assert reader.ch == 1, (
|
amine@403
|
692 "Unexpected number of channels: reader.ch = "
|
amine@403
|
693 + f"{reader.ch} instead of 1"
|
amine@403
|
694 )
|
amine@330
|
695
|
amine@330
|
696
|
amine@400
|
697 @pytest.mark.parametrize(
|
amine@400
|
698 "file_id, max_read, size",
|
amine@400
|
699 [
|
amine@400
|
700 ("mono_400", 0.5, 16000), # mono
|
amine@400
|
701 ("3channel_400-800-1600", 0.5, 16000 * 3), # multichannel
|
amine@400
|
702 ],
|
amine@400
|
703 ids=["mono", "multichannel"],
|
amine@400
|
704 )
|
amine@400
|
705 def test_Limiter(file_id, max_read, size):
|
amine@400
|
706 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
|
amine@400
|
707 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
|
amine@400
|
708 with open(input_raw, "rb") as fp:
|
amine@400
|
709 expected = fp.read(size)
|
amine@330
|
710
|
amine@400
|
711 reader = AudioReader(input_wav, block_dur=0.1, max_read=max_read)
|
amine@400
|
712 reader.open()
|
amine@400
|
713 data = _read_all_data(reader)
|
amine@400
|
714 reader.close()
|
amine@400
|
715 assert data == expected
|
amine@330
|
716
|
amine@330
|
717
|
amine@400
|
718 @pytest.mark.parametrize(
|
amine@400
|
719 "file_id",
|
amine@400
|
720 [
|
amine@400
|
721 "mono_400", # mono
|
amine@400
|
722 "3channel_400-800-1600", # multichannel
|
amine@400
|
723 ],
|
amine@400
|
724 ids=["mono", "multichannel"],
|
amine@400
|
725 )
|
amine@400
|
726 def test_Recorder(file_id):
|
amine@400
|
727 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
|
amine@400
|
728 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
|
amine@400
|
729 with open(input_raw, "rb") as fp:
|
amine@400
|
730 expected = fp.read()
|
amine@400
|
731
|
amine@400
|
732 reader = AudioReader(input_wav, block_dur=0.1, record=True)
|
amine@400
|
733 reader.open()
|
amine@400
|
734 data = _read_all_data(reader)
|
amine@400
|
735 assert data == expected
|
amine@400
|
736
|
amine@400
|
737 # rewind many times
|
amine@400
|
738 for _ in range(3):
|
amine@400
|
739 reader.rewind()
|
amine@330
|
740 data = _read_all_data(reader)
|
amine@400
|
741 assert data == expected
|
amine@400
|
742 assert data == reader.data
|
amine@400
|
743 reader.close()
|
amine@330
|
744
|
amine@330
|
745
|
amine@400
|
746 @pytest.mark.parametrize(
|
amine@400
|
747 "file_id",
|
amine@400
|
748 [
|
amine@400
|
749 "mono_400", # mono
|
amine@400
|
750 "3channel_400-800-1600", # multichannel
|
amine@400
|
751 ],
|
amine@400
|
752 ids=["mono", "multichannel"],
|
amine@400
|
753 )
|
amine@400
|
754 def test_Recorder_alias(file_id):
|
amine@400
|
755 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
|
amine@400
|
756 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
|
amine@400
|
757 with open(input_raw, "rb") as fp:
|
amine@400
|
758 expected = fp.read()
|
amine@400
|
759
|
amine@400
|
760 reader = Recorder(input_wav, block_dur=0.1)
|
amine@400
|
761 reader.open()
|
amine@400
|
762 data = _read_all_data(reader)
|
amine@400
|
763 assert data == expected
|
amine@400
|
764
|
amine@400
|
765 # rewind many times
|
amine@400
|
766 for _ in range(3):
|
amine@400
|
767 reader.rewind()
|
amine@330
|
768 data = _read_all_data(reader)
|
amine@400
|
769 assert data == expected
|
amine@400
|
770 assert data == reader.data
|
amine@400
|
771 reader.close()
|