amine@330
|
1 import sys
|
amine@330
|
2 import wave
|
amine@403
|
3 from functools import partial
|
amine@403
|
4
|
amine@403
|
5 import pytest
|
amine@403
|
6
|
amine@330
|
7 from auditok import (
|
amine@403
|
8 AudioReader,
|
amine@403
|
9 BufferAudioSource,
|
amine@403
|
10 Recorder,
|
amine@403
|
11 WaveAudioSource,
|
amine@330
|
12 dataset,
|
amine@330
|
13 )
|
amine@403
|
14 from auditok.util import _Limiter, _OverlapAudioReader
|
amine@330
|
15
|
amine@330
|
16
|
amine@403
|
17 def _read_all_data(reader):
|
amine@403
|
18 blocks = []
|
amine@403
|
19 while True:
|
amine@403
|
20 data = reader.read()
|
amine@403
|
21 if data is None:
|
amine@403
|
22 break
|
amine@403
|
23 blocks.append(data)
|
amine@403
|
24 return b"".join(blocks)
|
amine@403
|
25
|
amine@403
|
26
|
amine@403
|
27 class TestAudioReaderWithFileAudioSource:
|
amine@403
|
28 @pytest.fixture(autouse=True)
|
amine@403
|
29 def setup_and_teardown(self):
|
amine@330
|
30 self.audio_source = WaveAudioSource(
|
amine@330
|
31 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@330
|
32 )
|
amine@403
|
33 self.audio_source.open()
|
amine@403
|
34 yield
|
amine@403
|
35 self.audio_source.close()
|
amine@330
|
36
|
amine@403
|
37 def test_AudioReader_type(self):
|
amine@403
|
38 reader = AudioReader(input=self.audio_source)
|
amine@400
|
39 err_msg = (
|
amine@403
|
40 "wrong object type, expected: 'AudioReader', found: {0}"
|
amine@330
|
41 )
|
amine@403
|
42 assert isinstance(reader, AudioReader), err_msg.format(type(reader))
|
amine@330
|
43
|
amine@403
|
44 def _test_default_block_size(self):
|
amine@403
|
45 reader = AudioReader(input=self.audio_source)
|
amine@403
|
46 data = reader.read()
|
amine@403
|
47 size = len(data)
|
amine@400
|
48 assert (
|
amine@400
|
49 size == 160
|
amine@400
|
50 ), "Wrong default block_size, expected: 160, found: {0}".format(size)
|
amine@330
|
51
|
amine@403
|
52 @pytest.mark.parametrize(
|
amine@403
|
53 "block_dur, expected_nb_samples",
|
amine@403
|
54 [
|
amine@403
|
55 (None, 160), # default: 10 ms
|
amine@403
|
56 (0.025, 400), # 25 ms
|
amine@403
|
57 ],
|
amine@403
|
58 ids=["default", "_25ms"],
|
amine@403
|
59 )
|
amine@403
|
60 def test_block_duration(self, block_dur, expected_nb_samples):
|
amine@403
|
61 """Test the number of samples read for a given block duration."""
|
amine@403
|
62 if block_dur is not None:
|
amine@403
|
63 reader = AudioReader(input=self.audio_source, block_dur=block_dur)
|
amine@403
|
64 else:
|
amine@403
|
65 reader = AudioReader(input=self.audio_source)
|
amine@403
|
66 data = reader.read()
|
amine@403
|
67 nb_samples = len(data) // reader.sample_width
|
amine@400
|
68 assert (
|
amine@403
|
69 nb_samples == expected_nb_samples
|
amine@403
|
70 ), f"Wrong block_size, expected: {expected_nb_samples}, found: {nb_samples}"
|
amine@330
|
71
|
amine@403
|
72 @pytest.mark.parametrize(
|
amine@403
|
73 "block_dur, hop_dur, expected_nb_blocks, expected_last_block_nb_samples",
|
amine@403
|
74 [
|
amine@403
|
75 (None, None, 1879, 126), # default: 10 ms
|
amine@403
|
76 (0.01, None, 1879, 126), # block_dur_10ms_hop_dur_None
|
amine@403
|
77 (0.01, 0.01, 1879, 126), # block_dur_10ms_hop_dur_10ms
|
amine@403
|
78 (0.02, None, 940, 126), # block_dur_20ms_hop_dur_None
|
amine@403
|
79 (0.025, None, 752, 206), # block_dur_25ms_hop_dur_None
|
amine@403
|
80 (0.02, 0.01, 1878, 286), # block_dur_20ms_hop_dur_10ms
|
amine@403
|
81 (0.025, 0.005, 3754, 366), # block_dur_25ms_hop_dur_None
|
amine@403
|
82 ],
|
amine@403
|
83 ids=[
|
amine@403
|
84 "default",
|
amine@403
|
85 "block_dur_10ms_hop_dur_None",
|
amine@403
|
86 "block_dur_10ms_hop_dur_100ms",
|
amine@403
|
87 "block_dur_20ms_hop_dur_None",
|
amine@403
|
88 "block_dur_25ms_hop_dur_None",
|
amine@403
|
89 "block_dur_20ms_hop_dur_10ms",
|
amine@403
|
90 "block_dur_25ms_hop_dur_None",
|
amine@403
|
91 ],
|
amine@403
|
92 )
|
amine@403
|
93 def test_hop_duration(
|
amine@403
|
94 self,
|
amine@403
|
95 block_dur,
|
amine@403
|
96 hop_dur,
|
amine@403
|
97 expected_nb_blocks,
|
amine@403
|
98 expected_last_block_nb_samples,
|
amine@403
|
99 ):
|
amine@403
|
100 """Test the number of read blocks and the duration of last block for
|
amine@403
|
101 different 'block_dur' and 'hop_dur' values.
|
amine@330
|
102
|
amine@403
|
103 Args:
|
amine@403
|
104 block_dur (float or None): block duration in seconds.
|
amine@403
|
105 hop_dur (float or None): hop duration in seconds.
|
amine@403
|
106 expected_nb_blocks (int): expected number of read block.
|
amine@403
|
107 expected_last_block_nb_samples (int): expected number of sample
|
amine@403
|
108 in the last block.
|
amine@403
|
109 """
|
amine@403
|
110 if block_dur is not None:
|
amine@403
|
111 reader = AudioReader(
|
amine@403
|
112 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur
|
amine@403
|
113 )
|
amine@403
|
114 else:
|
amine@403
|
115 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur)
|
amine@330
|
116
|
amine@403
|
117 nb_blocks = 0
|
amine@403
|
118 last_block_nb_samples = None
|
amine@403
|
119 while True:
|
amine@403
|
120 data = reader.read()
|
amine@403
|
121 if data is not None:
|
amine@403
|
122 nb_blocks += 1
|
amine@403
|
123 last_block_nb_samples = len(data) // reader.sample_width
|
amine@403
|
124 else:
|
amine@403
|
125 break
|
amine@403
|
126 err_msg = "Wrong number of blocks read from source, expected: "
|
amine@403
|
127 err_msg += f"{expected_nb_blocks}, found: {nb_blocks}"
|
amine@403
|
128 assert nb_blocks == expected_nb_blocks, err_msg
|
amine@330
|
129
|
amine@403
|
130 err_msg = (
|
amine@403
|
131 "Wrong number of samples in last block read from source, expected: "
|
amine@403
|
132 )
|
amine@403
|
133 err_msg += (
|
amine@403
|
134 f"{expected_last_block_nb_samples}, found: {last_block_nb_samples}"
|
amine@330
|
135 )
|
amine@330
|
136
|
amine@403
|
137 assert last_block_nb_samples == expected_last_block_nb_samples, err_msg
|
amine@403
|
138
|
amine@403
|
139 def test_hop_duration_exception(self):
|
amine@403
|
140 """Test passing hop_dur > block_dur raises ValueError"""
|
amine@403
|
141 with pytest.raises(ValueError):
|
amine@403
|
142 AudioReader(self.audio_source, block_dur=0.01, hop_dur=0.015)
|
amine@403
|
143
|
amine@403
|
144 @pytest.mark.parametrize(
|
amine@403
|
145 "block_dur, hop_dur",
|
amine@403
|
146 [
|
amine@403
|
147 (None, None), # default
|
amine@403
|
148 (0.01, None), # block_dur_10ms_hop_dur_None
|
amine@403
|
149 (None, 0.01), # block_dur_None__hop_dur_10ms
|
amine@403
|
150 (0.05, 0.05), # block_dur_50ms_hop_dur_50ms
|
amine@403
|
151 ],
|
amine@403
|
152 ids=[
|
amine@403
|
153 "default",
|
amine@403
|
154 "block_dur_10ms_hop_dur_None",
|
amine@403
|
155 "block_dur_None__hop_dur_10ms",
|
amine@403
|
156 "block_dur_50ms_hop_dur_50ms",
|
amine@403
|
157 ],
|
amine@403
|
158 )
|
amine@403
|
159 def test_reader_class_block_dur_equals_hop_dur(self, block_dur, hop_dur):
|
amine@403
|
160 """Test passing hop_dur == block_dur does not create an instance of
|
amine@403
|
161 '_OverlapAudioReader'.
|
amine@403
|
162 """
|
amine@403
|
163 if block_dur is not None:
|
amine@403
|
164 reader = AudioReader(
|
amine@403
|
165 input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur
|
amine@403
|
166 )
|
amine@403
|
167 else:
|
amine@403
|
168 reader = AudioReader(input=self.audio_source, hop_dur=hop_dur)
|
amine@403
|
169 assert not isinstance(reader, _OverlapAudioReader)
|
amine@330
|
170
|
amine@330
|
171 def test_sampling_rate(self):
|
amine@403
|
172 reader = AudioReader(input=self.audio_source)
|
amine@403
|
173 sampling_rate = reader.sampling_rate
|
amine@400
|
174 assert (
|
amine@403
|
175 sampling_rate == 16000
|
amine@403
|
176 ), f"Wrong sampling rate, expected: 16000, found: {sampling_rate}"
|
amine@330
|
177
|
amine@330
|
178 def test_sample_width(self):
|
amine@403
|
179 reader = AudioReader(input=self.audio_source)
|
amine@403
|
180 sample_width = reader.sample_width
|
amine@400
|
181 assert (
|
amine@403
|
182 sample_width == 2
|
amine@403
|
183 ), f"Wrong sample width, expected: 2, found: {sample_width}"
|
amine@330
|
184
|
amine@330
|
185 def test_channels(self):
|
amine@403
|
186 reader = AudioReader(input=self.audio_source)
|
amine@403
|
187 channels = reader.channels
|
amine@400
|
188 assert (
|
amine@400
|
189 channels == 1
|
amine@403
|
190 ), f"Wrong number of channels, expected: 1, found: {channels}"
|
amine@330
|
191
|
amine@330
|
192 def test_read(self):
|
amine@403
|
193 reader = AudioReader(input=self.audio_source, block_dur=0.02)
|
amine@403
|
194 reader_data = reader.read()
|
amine@330
|
195 audio_source = WaveAudioSource(
|
amine@330
|
196 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@330
|
197 )
|
amine@330
|
198 audio_source.open()
|
amine@403
|
199 audio_source_data = audio_source.read(320)
|
amine@330
|
200 audio_source.close()
|
amine@403
|
201 assert (
|
amine@403
|
202 reader_data == audio_source_data
|
amine@403
|
203 ), "Unexpected data read from AudioReader"
|
amine@330
|
204
|
amine@403
|
205 def test_read_with_overlap(self):
|
amine@403
|
206 reader = AudioReader(
|
amine@403
|
207 input=self.audio_source, block_dur=0.02, hop_dur=0.01
|
amine@403
|
208 )
|
amine@403
|
209 _ = reader.read() # first block
|
amine@403
|
210 reader_data = reader.read() # second block with 0.01 S overlap
|
amine@403
|
211 audio_source = WaveAudioSource(
|
amine@403
|
212 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@403
|
213 )
|
amine@403
|
214 audio_source.open()
|
amine@403
|
215 _ = audio_source.read(160)
|
amine@403
|
216 audio_source_data = audio_source.read(320)
|
amine@403
|
217 audio_source.close()
|
amine@403
|
218 assert (
|
amine@403
|
219 reader_data == audio_source_data
|
amine@403
|
220 ), "Unexpected data read from AudioReader"
|
amine@330
|
221
|
amine@403
|
222 def test_read_from_AudioReader_with_max_read(self):
|
amine@330
|
223 # read a maximum of 0.75 seconds from audio source
|
amine@403
|
224 reader = AudioReader(input=self.audio_source, max_read=0.75)
|
amine@403
|
225 assert isinstance(reader._audio_source._audio_source, _Limiter)
|
amine@403
|
226 reader_data = _read_all_data(reader)
|
amine@330
|
227
|
amine@330
|
228 audio_source = WaveAudioSource(
|
amine@330
|
229 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@330
|
230 )
|
amine@330
|
231 audio_source.open()
|
amine@330
|
232 audio_source_data = audio_source.read(int(16000 * 0.75))
|
amine@330
|
233 audio_source.close()
|
amine@330
|
234
|
amine@400
|
235 assert (
|
amine@403
|
236 reader_data == audio_source_data
|
amine@403
|
237 ), f"Unexpected data read from AudioReader with 'max_read = {0.75}'"
|
amine@330
|
238
|
amine@403
|
239 def test_read_data_size_from_AudioReader_with_max_read(self):
|
amine@330
|
240 # read a maximum of 1.191 seconds from audio source
|
amine@403
|
241 reader = AudioReader(input=self.audio_source, max_read=1.191)
|
amine@403
|
242 assert isinstance(reader._audio_source._audio_source, _Limiter)
|
amine@403
|
243 total_samples = round(reader.sampling_rate * 1.191)
|
amine@403
|
244 block_size = int(reader.block_dur * reader.sampling_rate)
|
amine@403
|
245 nb_full_blocks, last_block_size = divmod(total_samples, block_size)
|
amine@330
|
246 total_samples_with_overlap = (
|
amine@403
|
247 nb_full_blocks * block_size + last_block_size
|
amine@330
|
248 )
|
amine@403
|
249 expected_read_bytes = (
|
amine@403
|
250 total_samples_with_overlap * reader.sample_width * reader.channels
|
amine@403
|
251 )
|
amine@330
|
252
|
amine@403
|
253 reader_data = _read_all_data(reader)
|
amine@403
|
254 total_read = len(reader_data)
|
amine@403
|
255 err_msg = f"Wrong data length read from LimiterADS, expected: {expected_read_bytes}, found: {total_read}"
|
amine@403
|
256 assert total_read == expected_read_bytes, err_msg
|
amine@403
|
257
|
amine@403
|
258 def test_read_from_Recorder(self):
|
amine@403
|
259 reader = Recorder(input=self.audio_source, block_dur=0.025)
|
amine@403
|
260 reader_data = []
|
amine@403
|
261 for _ in range(10):
|
amine@403
|
262 block = reader.read()
|
amine@330
|
263 if block is None:
|
amine@330
|
264 break
|
amine@403
|
265 reader_data.append(block)
|
amine@403
|
266 reader_data = b"".join(reader_data)
|
amine@330
|
267
|
amine@330
|
268 audio_source = WaveAudioSource(
|
amine@330
|
269 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@330
|
270 )
|
amine@330
|
271 audio_source.open()
|
amine@403
|
272 audio_source_data = audio_source.read(400 * 10)
|
amine@330
|
273 audio_source.close()
|
amine@330
|
274
|
amine@400
|
275 assert (
|
amine@403
|
276 reader_data == audio_source_data
|
amine@403
|
277 ), "Unexpected data read from Recorder"
|
amine@330
|
278
|
amine@403
|
279 def test_AudioReader_rewindable(self):
|
amine@403
|
280 reader = AudioReader(input=self.audio_source, record=True)
|
amine@403
|
281 assert (
|
amine@403
|
282 reader.rewindable
|
amine@403
|
283 ), "AudioReader with record=True should be rewindable"
|
amine@330
|
284
|
amine@403
|
285 def test_AudioReader_record_and_rewind(self):
|
amine@403
|
286 reader = AudioReader(
|
amine@403
|
287 input=self.audio_source, record=True, block_dur=0.02
|
amine@330
|
288 )
|
amine@403
|
289 # read 0.02 * 10 = 0.2 sec. of data
|
amine@330
|
290 for i in range(10):
|
amine@403
|
291 reader.read()
|
amine@403
|
292 reader.rewind()
|
amine@330
|
293
|
amine@330
|
294 # read all available data after rewind
|
amine@403
|
295 reader_data = _read_all_data(reader)
|
amine@330
|
296
|
amine@330
|
297 audio_source = WaveAudioSource(
|
amine@330
|
298 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@330
|
299 )
|
amine@330
|
300 audio_source.open()
|
amine@403
|
301 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data
|
amine@330
|
302 audio_source.close()
|
amine@330
|
303
|
amine@400
|
304 assert (
|
amine@403
|
305 reader_data == audio_source_data
|
amine@403
|
306 ), "Unexpected data read from AudioReader with record = True"
|
amine@330
|
307
|
amine@403
|
308 def test_Recorder_record_and_rewind(self):
|
amine@403
|
309 recorder = Recorder(input=self.audio_source, block_dur=0.02)
|
amine@403
|
310 # read 0.02 * 10 = 0.2 sec. of data
|
amine@403
|
311 for i in range(10):
|
amine@403
|
312 recorder.read()
|
amine@403
|
313
|
amine@403
|
314 recorder.rewind()
|
amine@403
|
315
|
amine@403
|
316 # read all available data after rewind
|
amine@403
|
317 recorder_data = []
|
amine@403
|
318 recorder_data = _read_all_data(recorder)
|
amine@403
|
319
|
amine@403
|
320 audio_source = WaveAudioSource(
|
amine@403
|
321 filename=dataset.one_to_six_arabic_16000_mono_bc_noise
|
amine@403
|
322 )
|
amine@403
|
323 audio_source.open()
|
amine@403
|
324 audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data
|
amine@403
|
325 audio_source.close()
|
amine@403
|
326
|
amine@403
|
327 assert (
|
amine@403
|
328 recorder_data == audio_source_data
|
amine@403
|
329 ), "Unexpected data read from Recorder"
|
amine@403
|
330
|
amine@403
|
331 def test_read_overlapping_blocks(self):
|
amine@330
|
332 # Use arbitrary valid block_size and hop_size
|
amine@330
|
333 block_size = 1714
|
amine@330
|
334 hop_size = 313
|
amine@403
|
335 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
336 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@330
|
337
|
amine@403
|
338 reader = AudioReader(
|
amine@403
|
339 input=self.audio_source,
|
amine@403
|
340 block_dur=block_dur,
|
amine@403
|
341 hop_dur=hop_dur,
|
amine@330
|
342 )
|
amine@330
|
343
|
amine@403
|
344 # Read all available overlapping blocks of data
|
amine@403
|
345 reader_data = []
|
amine@330
|
346 while True:
|
amine@403
|
347 block = reader.read()
|
amine@330
|
348 if block is None:
|
amine@330
|
349 break
|
amine@403
|
350 reader_data.append(block)
|
amine@330
|
351
|
amine@330
|
352 # Read all data from file and build a BufferAudioSource
|
amine@330
|
353 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@330
|
354 wave_data = fp.readframes(fp.getnframes())
|
amine@330
|
355 fp.close()
|
amine@330
|
356 audio_source = BufferAudioSource(
|
amine@403
|
357 wave_data,
|
amine@403
|
358 reader.sampling_rate,
|
amine@403
|
359 reader.sample_width,
|
amine@403
|
360 reader.channels,
|
amine@330
|
361 )
|
amine@330
|
362 audio_source.open()
|
amine@330
|
363
|
amine@403
|
364 # Compare all blocks read from OverlapADS to those read from an
|
amine@403
|
365 # audio source with a manual position setting
|
amine@403
|
366 for i, block in enumerate(reader_data):
|
amine@330
|
367 tmp = audio_source.read(block_size)
|
amine@400
|
368 assert (
|
amine@400
|
369 block == tmp
|
amine@403
|
370 ), f"Unexpected data (block {i}) from reader with overlapping blocks"
|
amine@330
|
371 audio_source.position = (i + 1) * hop_size
|
amine@330
|
372
|
amine@330
|
373 audio_source.close()
|
amine@330
|
374
|
amine@403
|
375 def test_read_overlapping_blocks_with_max_read(self):
|
amine@330
|
376 block_size = 256
|
amine@330
|
377 hop_size = 200
|
amine@403
|
378 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
379 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@330
|
380
|
amine@403
|
381 reader = AudioReader(
|
amine@403
|
382 input=self.audio_source,
|
amine@403
|
383 block_dur=block_dur,
|
amine@403
|
384 hop_dur=hop_dur,
|
amine@403
|
385 max_read=0.5,
|
amine@330
|
386 )
|
amine@330
|
387
|
amine@403
|
388 # Read all available overlapping blocks of data
|
amine@403
|
389 reader_data = []
|
amine@330
|
390 while True:
|
amine@403
|
391 block = reader.read()
|
amine@330
|
392 if block is None:
|
amine@330
|
393 break
|
amine@403
|
394 reader_data.append(block)
|
amine@330
|
395
|
amine@330
|
396 # Read all data from file and build a BufferAudioSource
|
amine@330
|
397 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@330
|
398 wave_data = fp.readframes(fp.getnframes())
|
amine@330
|
399 fp.close()
|
amine@330
|
400 audio_source = BufferAudioSource(
|
amine@403
|
401 wave_data,
|
amine@403
|
402 reader.sampling_rate,
|
amine@403
|
403 reader.sample_width,
|
amine@403
|
404 reader.channels,
|
amine@330
|
405 )
|
amine@330
|
406 audio_source.open()
|
amine@330
|
407
|
amine@403
|
408 # Compare all blocks read from OverlapADS to those read from an
|
amine@403
|
409 # audio source with a manual position setting
|
amine@403
|
410 for i, block in enumerate(reader_data):
|
amine@403
|
411 tmp = audio_source.read(len(block) // (reader.sw * reader.ch))
|
amine@403
|
412 assert (
|
amine@403
|
413 block == tmp
|
amine@403
|
414 ), f"Unexpected data (block {i}) from reader with overlapping blocks and max_read"
|
amine@330
|
415 audio_source.position = (i + 1) * hop_size
|
amine@330
|
416
|
amine@330
|
417 audio_source.close()
|
amine@330
|
418
|
amine@403
|
419 def test_length_read_overlapping_blocks_with_max_read(self):
|
amine@330
|
420 block_size = 313
|
amine@330
|
421 hop_size = 207
|
amine@403
|
422 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
423 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@403
|
424
|
amine@403
|
425 reader = AudioReader(
|
amine@403
|
426 input=self.audio_source,
|
amine@403
|
427 max_read=1.932,
|
amine@403
|
428 block_dur=block_dur,
|
amine@403
|
429 hop_dur=hop_dur,
|
amine@330
|
430 )
|
amine@330
|
431
|
amine@403
|
432 total_samples = round(reader.sampling_rate * 1.932)
|
amine@330
|
433 first_read_size = block_size
|
amine@330
|
434 next_read_size = block_size - hop_size
|
amine@330
|
435 nb_next_blocks, last_block_size = divmod(
|
amine@330
|
436 (total_samples - first_read_size), next_read_size
|
amine@330
|
437 )
|
amine@330
|
438 total_samples_with_overlap = (
|
amine@330
|
439 first_read_size + next_read_size * nb_next_blocks + last_block_size
|
amine@330
|
440 )
|
amine@403
|
441 expected_read_bytes = (
|
amine@403
|
442 total_samples_with_overlap * reader.sw * reader.channels
|
amine@403
|
443 )
|
amine@330
|
444
|
amine@403
|
445 cache_size = (
|
amine@403
|
446 (block_size - hop_size) * reader.sample_width * reader.channels
|
amine@403
|
447 )
|
amine@330
|
448 total_read = cache_size
|
amine@330
|
449
|
amine@330
|
450 i = 0
|
amine@330
|
451 while True:
|
amine@403
|
452 block = reader.read()
|
amine@330
|
453 if block is None:
|
amine@330
|
454 break
|
amine@330
|
455 i += 1
|
amine@330
|
456 total_read += len(block) - cache_size
|
amine@330
|
457
|
amine@400
|
458 err_msg = (
|
amine@400
|
459 "Wrong data length read from LimiterADS, expected: {0}, found: {1}"
|
amine@400
|
460 )
|
amine@400
|
461 assert total_read == expected_read_bytes, err_msg.format(
|
amine@400
|
462 expected_read_bytes, total_read
|
amine@330
|
463 )
|
amine@330
|
464
|
amine@403
|
465 def test_reader_with_overlapping_blocks__rewindable(self):
|
amine@403
|
466 reader = AudioReader(
|
amine@403
|
467 input=self.audio_source,
|
amine@403
|
468 block_dur=320,
|
amine@403
|
469 hop_dur=160,
|
amine@330
|
470 record=True,
|
amine@330
|
471 )
|
amine@403
|
472 assert (
|
amine@403
|
473 reader.rewindable
|
amine@403
|
474 ), "AudioReader with record=True should be rewindable"
|
amine@330
|
475
|
amine@403
|
476 def test_overlapping_blocks_with_max_read_rewind_and_read(self):
|
amine@330
|
477 # Use arbitrary valid block_size and hop_size
|
amine@330
|
478 block_size = 1600
|
amine@330
|
479 hop_size = 400
|
amine@403
|
480 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
481 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@330
|
482
|
amine@403
|
483 reader = AudioReader(
|
amine@403
|
484 input=self.audio_source,
|
amine@403
|
485 block_dur=block_dur,
|
amine@403
|
486 hop_dur=hop_dur,
|
amine@330
|
487 record=True,
|
amine@330
|
488 )
|
amine@330
|
489
|
amine@330
|
490 # Read all available data overlapping blocks
|
amine@330
|
491 i = 0
|
amine@330
|
492 while True:
|
amine@403
|
493 block = reader.read()
|
amine@330
|
494 if block is None:
|
amine@330
|
495 break
|
amine@330
|
496 i += 1
|
amine@330
|
497
|
amine@403
|
498 reader.rewind()
|
amine@330
|
499
|
amine@330
|
500 # Read all data from file and build a BufferAudioSource
|
amine@330
|
501 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@330
|
502 wave_data = fp.readframes(fp.getnframes())
|
amine@330
|
503 fp.close()
|
amine@330
|
504 audio_source = BufferAudioSource(
|
amine@403
|
505 wave_data,
|
amine@403
|
506 reader.sampling_rate,
|
amine@403
|
507 reader.sample_width,
|
amine@403
|
508 reader.channels,
|
amine@330
|
509 )
|
amine@330
|
510 audio_source.open()
|
amine@330
|
511
|
amine@403
|
512 # Compare blocks read from AudioReader to those read from an BufferAudioSource with manual position setting
|
amine@330
|
513 for j in range(i):
|
amine@330
|
514 tmp = audio_source.read(block_size)
|
amine@400
|
515 assert (
|
amine@403
|
516 reader.read() == tmp
|
amine@403
|
517 ), f"Unexpected data (block {i}) from reader with overlapping blocks and record = True"
|
amine@330
|
518 audio_source.position = (j + 1) * hop_size
|
amine@330
|
519
|
amine@330
|
520 audio_source.close()
|
amine@330
|
521
|
amine@403
|
522 def test_overlapping_blocks_with_record_and_max_read_rewind_and_read(self):
|
amine@330
|
523 # Use arbitrary valid block_size and hop_size
|
amine@330
|
524 block_size = 1600
|
amine@330
|
525 hop_size = 400
|
amine@403
|
526 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
527 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@330
|
528
|
amine@403
|
529 reader = AudioReader(
|
amine@403
|
530 input=self.audio_source,
|
amine@330
|
531 max_time=1.50,
|
amine@403
|
532 block_dur=block_dur,
|
amine@403
|
533 hop_dur=hop_dur,
|
amine@330
|
534 record=True,
|
amine@330
|
535 )
|
amine@330
|
536
|
amine@330
|
537 # Read all available data overlapping blocks
|
amine@330
|
538 i = 0
|
amine@330
|
539 while True:
|
amine@403
|
540 block = reader.read()
|
amine@330
|
541 if block is None:
|
amine@330
|
542 break
|
amine@330
|
543 i += 1
|
amine@330
|
544
|
amine@403
|
545 reader.rewind()
|
amine@330
|
546
|
amine@330
|
547 # Read all data from file and build a BufferAudioSource
|
amine@330
|
548 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@330
|
549 wave_data = fp.readframes(fp.getnframes())
|
amine@330
|
550 fp.close()
|
amine@330
|
551 audio_source = BufferAudioSource(
|
amine@403
|
552 wave_data,
|
amine@403
|
553 reader.sampling_rate,
|
amine@403
|
554 reader.sample_width,
|
amine@403
|
555 reader.channels,
|
amine@330
|
556 )
|
amine@330
|
557 audio_source.open()
|
amine@330
|
558
|
amine@403
|
559 # Compare all blocks read from AudioReader to those read from BufferAudioSource with a manual position setting
|
amine@330
|
560 for j in range(i):
|
amine@330
|
561 tmp = audio_source.read(block_size)
|
amine@400
|
562 assert (
|
amine@403
|
563 reader.read() == tmp
|
amine@400
|
564 ), "Unexpected block (N={0}) read from OverlapADS".format(i)
|
amine@330
|
565 audio_source.position = (j + 1) * hop_size
|
amine@330
|
566
|
amine@330
|
567 audio_source.close()
|
amine@330
|
568
|
amine@403
|
569 def test_length_read_overlapping_blocks_with_record_and_max_read(self):
|
amine@330
|
570 # Use arbitrary valid block_size and hop_size
|
amine@330
|
571 block_size = 1000
|
amine@330
|
572 hop_size = 200
|
amine@403
|
573 block_dur = block_size / self.audio_source.sampling_rate
|
amine@403
|
574 hop_dur = hop_size / self.audio_source.sampling_rate
|
amine@330
|
575
|
amine@403
|
576 reader = AudioReader(
|
amine@403
|
577 input=self.audio_source,
|
amine@403
|
578 block_dur=block_dur,
|
amine@403
|
579 hop_dur=hop_dur,
|
amine@330
|
580 record=True,
|
amine@403
|
581 max_read=1.317,
|
amine@330
|
582 )
|
amine@403
|
583 total_samples = round(reader.sampling_rate * 1.317)
|
amine@330
|
584 first_read_size = block_size
|
amine@330
|
585 next_read_size = block_size - hop_size
|
amine@330
|
586 nb_next_blocks, last_block_size = divmod(
|
amine@330
|
587 (total_samples - first_read_size), next_read_size
|
amine@330
|
588 )
|
amine@330
|
589 total_samples_with_overlap = (
|
amine@330
|
590 first_read_size + next_read_size * nb_next_blocks + last_block_size
|
amine@330
|
591 )
|
amine@403
|
592 expected_read_bytes = (
|
amine@403
|
593 total_samples_with_overlap * reader.sample_width * reader.channels
|
amine@403
|
594 )
|
amine@330
|
595
|
amine@403
|
596 cache_size = (
|
amine@403
|
597 (block_size - hop_size) * reader.sample_width * reader.channels
|
amine@403
|
598 )
|
amine@330
|
599 total_read = cache_size
|
amine@330
|
600
|
amine@330
|
601 i = 0
|
amine@330
|
602 while True:
|
amine@403
|
603 block = reader.read()
|
amine@330
|
604 if block is None:
|
amine@330
|
605 break
|
amine@330
|
606 i += 1
|
amine@330
|
607 total_read += len(block) - cache_size
|
amine@330
|
608
|
amine@403
|
609 err_msg = f"Wrong data length read from AudioReader, expected: {expected_read_bytes}, found: {total_read}"
|
amine@403
|
610 assert total_read == expected_read_bytes, err_msg
|
amine@330
|
611
|
amine@330
|
612
|
amine@403
|
613 def test_AudioReader_raw_data():
|
amine@330
|
614
|
amine@403
|
615 data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
|
amine@403
|
616 block_size = 5
|
amine@403
|
617 hop_size = 4
|
amine@403
|
618 reader = AudioReader(
|
amine@403
|
619 input=data,
|
amine@403
|
620 sampling_rate=16,
|
amine@403
|
621 sample_width=2,
|
amine@403
|
622 channels=1,
|
amine@403
|
623 block_dur=block_size / 16,
|
amine@403
|
624 hop_dur=hop_size / 16,
|
amine@403
|
625 max_read=0.80,
|
amine@403
|
626 record=True,
|
amine@403
|
627 )
|
amine@403
|
628 reader.open()
|
amine@403
|
629
|
amine@403
|
630 assert (
|
amine@403
|
631 reader.sampling_rate == 16
|
amine@403
|
632 ), f"Wrong sampling rate, expected: 16, found: {reader.sampling_rate }"
|
amine@403
|
633
|
amine@403
|
634 assert (
|
amine@403
|
635 reader.sample_width == 2
|
amine@403
|
636 ), f"Wrong sample width, expected: 2, found: {reader.sample_width}"
|
amine@403
|
637
|
amine@403
|
638 # Read all available data overlapping blocks
|
amine@403
|
639 i = 0
|
amine@403
|
640 while True:
|
amine@403
|
641 block = reader.read()
|
amine@403
|
642 if block is None:
|
amine@403
|
643 break
|
amine@403
|
644 i += 1
|
amine@403
|
645
|
amine@403
|
646 reader.rewind()
|
amine@403
|
647
|
amine@403
|
648 # Build a BufferAudioSource
|
amine@403
|
649 audio_source = BufferAudioSource(
|
amine@403
|
650 data, reader.sampling_rate, reader.sample_width, reader.channels
|
amine@403
|
651 )
|
amine@403
|
652 audio_source.open()
|
amine@403
|
653
|
amine@403
|
654 # Compare all blocks read from AudioReader to those read from an audio
|
amine@403
|
655 # source with a manual position setting
|
amine@403
|
656 for j in range(i):
|
amine@403
|
657 tmp = audio_source.read(block_size)
|
amine@403
|
658 block = reader.read()
|
amine@400
|
659 assert (
|
amine@403
|
660 block == tmp
|
amine@403
|
661 ), f"Unexpected block '{block}' (N={i}) read from OverlapADS"
|
amine@403
|
662 audio_source.position = (j + 1) * hop_size
|
amine@403
|
663 audio_source.close()
|
amine@403
|
664 reader.close()
|
amine@330
|
665
|
amine@330
|
666
|
amine@403
|
667 def test_AudioReader_alias_params():
|
amine@403
|
668 reader = AudioReader(
|
amine@403
|
669 input=b"0" * 1600,
|
amine@403
|
670 sr=16000,
|
amine@403
|
671 sw=2,
|
amine@403
|
672 channels=1,
|
amine@403
|
673 )
|
amine@403
|
674 assert reader.sampling_rate == 16000, (
|
amine@403
|
675 "Unexpected sampling rate: reader.sampling_rate = "
|
amine@403
|
676 + f"{reader.sampling_rate} instead of 16000"
|
amine@403
|
677 )
|
amine@403
|
678 assert reader.sr == 16000, (
|
amine@403
|
679 "Unexpected sampling rate: reader.sr = "
|
amine@403
|
680 + f"{reader.sr} instead of 16000"
|
amine@403
|
681 )
|
amine@403
|
682 assert reader.sample_width == 2, (
|
amine@403
|
683 "Unexpected sample width: reader.sample_width = "
|
amine@403
|
684 + f"{reader.sample_width} instead of 2"
|
amine@403
|
685 )
|
amine@403
|
686 assert reader.sw == 2, (
|
amine@403
|
687 "Unexpected sample width: reader.sw = " + f"{reader.sw} instead of 2"
|
amine@403
|
688 )
|
amine@403
|
689 assert reader.channels == 1, (
|
amine@403
|
690 "Unexpected number of channels: reader.channels = "
|
amine@403
|
691 + f"{reader.channels} instead of 1"
|
amine@403
|
692 )
|
amine@403
|
693 assert reader.ch == 1, (
|
amine@403
|
694 "Unexpected number of channels: reader.ch = "
|
amine@403
|
695 + f"{reader.ch} instead of 1"
|
amine@403
|
696 )
|
amine@330
|
697
|
amine@330
|
698
|
amine@400
|
699 @pytest.mark.parametrize(
|
amine@400
|
700 "file_id, max_read, size",
|
amine@400
|
701 [
|
amine@400
|
702 ("mono_400", 0.5, 16000), # mono
|
amine@400
|
703 ("3channel_400-800-1600", 0.5, 16000 * 3), # multichannel
|
amine@400
|
704 ],
|
amine@400
|
705 ids=["mono", "multichannel"],
|
amine@400
|
706 )
|
amine@400
|
707 def test_Limiter(file_id, max_read, size):
|
amine@400
|
708 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
|
amine@400
|
709 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
|
amine@400
|
710 with open(input_raw, "rb") as fp:
|
amine@400
|
711 expected = fp.read(size)
|
amine@330
|
712
|
amine@400
|
713 reader = AudioReader(input_wav, block_dur=0.1, max_read=max_read)
|
amine@400
|
714 reader.open()
|
amine@400
|
715 data = _read_all_data(reader)
|
amine@400
|
716 reader.close()
|
amine@400
|
717 assert data == expected
|
amine@330
|
718
|
amine@330
|
719
|
amine@400
|
720 @pytest.mark.parametrize(
|
amine@400
|
721 "file_id",
|
amine@400
|
722 [
|
amine@400
|
723 "mono_400", # mono
|
amine@400
|
724 "3channel_400-800-1600", # multichannel
|
amine@400
|
725 ],
|
amine@400
|
726 ids=["mono", "multichannel"],
|
amine@400
|
727 )
|
amine@400
|
728 def test_Recorder(file_id):
|
amine@400
|
729 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
|
amine@400
|
730 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
|
amine@400
|
731 with open(input_raw, "rb") as fp:
|
amine@400
|
732 expected = fp.read()
|
amine@400
|
733
|
amine@400
|
734 reader = AudioReader(input_wav, block_dur=0.1, record=True)
|
amine@400
|
735 reader.open()
|
amine@400
|
736 data = _read_all_data(reader)
|
amine@400
|
737 assert data == expected
|
amine@400
|
738
|
amine@400
|
739 # rewind many times
|
amine@400
|
740 for _ in range(3):
|
amine@400
|
741 reader.rewind()
|
amine@330
|
742 data = _read_all_data(reader)
|
amine@400
|
743 assert data == expected
|
amine@400
|
744 assert data == reader.data
|
amine@400
|
745 reader.close()
|
amine@330
|
746
|
amine@330
|
747
|
amine@400
|
748 @pytest.mark.parametrize(
|
amine@400
|
749 "file_id",
|
amine@400
|
750 [
|
amine@400
|
751 "mono_400", # mono
|
amine@400
|
752 "3channel_400-800-1600", # multichannel
|
amine@400
|
753 ],
|
amine@400
|
754 ids=["mono", "multichannel"],
|
amine@400
|
755 )
|
amine@400
|
756 def test_Recorder_alias(file_id):
|
amine@400
|
757 input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
|
amine@400
|
758 input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
|
amine@400
|
759 with open(input_raw, "rb") as fp:
|
amine@400
|
760 expected = fp.read()
|
amine@400
|
761
|
amine@400
|
762 reader = Recorder(input_wav, block_dur=0.1)
|
amine@400
|
763 reader.open()
|
amine@400
|
764 data = _read_all_data(reader)
|
amine@400
|
765 assert data == expected
|
amine@400
|
766
|
amine@400
|
767 # rewind many times
|
amine@400
|
768 for _ in range(3):
|
amine@400
|
769 reader.rewind()
|
amine@330
|
770 data = _read_all_data(reader)
|
amine@400
|
771 assert data == expected
|
amine@400
|
772 assert data == reader.data
|
amine@400
|
773 reader.close()
|