Mercurial > hg > auditok
comparison tests/test_AudioDataSource.py @ 172:e526cfd6a056
rename test_AudioDataSourceFactory.py as test_AudioDataSource.py
author | Amine Sehili <amine.sehili@gmail.com> |
---|---|
date | Sat, 09 Mar 2019 18:26:24 +0100 |
parents | |
children | f35c0a18f4cb |
comparison
equal
deleted
inserted
replaced
171:a486c424fdff | 172:e526cfd6a056 |
---|---|
1 ''' | |
2 @author: Amine Sehili <amine.sehili@gmail.com> | |
3 September 2015 | |
4 | |
5 ''' | |
6 | |
7 import unittest | |
8 from functools import partial | |
9 import sys | |
10 from auditok import dataset, ADSFactory, BufferAudioSource, WaveAudioSource, DuplicateArgument | |
11 import wave | |
12 | |
13 | |
14 try: | |
15 from builtins import range | |
16 except ImportError: | |
17 if sys.version_info < (3, 0): | |
18 range = xrange | |
19 | |
20 class TestADSFactoryFileAudioSource(unittest.TestCase): | |
21 | |
22 def setUp(self): | |
23 self.audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise) | |
24 | |
25 | |
26 def test_ADS_type(self): | |
27 | |
28 ads = ADSFactory.ads(audio_source=self.audio_source) | |
29 | |
30 self.assertIsInstance(ads, ADSFactory.AudioDataSource, | |
31 msg="wrong type for ads object, expected: 'ADSFactory.AudioDataSource', found: {0}".format(type(ads))) | |
32 | |
33 | |
34 def test_default_block_size(self): | |
35 ads = ADSFactory.ads(audio_source=self.audio_source) | |
36 size = ads.get_block_size() | |
37 self.assertEqual(size, 160, "Wrong default block_size, expected: 160, found: {0}".format(size)) | |
38 | |
39 | |
40 def test_block_size(self): | |
41 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=512) | |
42 size = ads.get_block_size() | |
43 self.assertEqual(size, 512, "Wrong block_size, expected: 512, found: {0}".format(size)) | |
44 | |
45 # with alias keyword | |
46 ads = ADSFactory.ads(audio_source=self.audio_source, bs=160) | |
47 size = ads.get_block_size() | |
48 self.assertEqual(size, 160, "Wrong block_size, expected: 160, found: {0}".format(size)) | |
49 | |
50 def test_block_duration(self): | |
51 | |
52 ads = ADSFactory.ads(audio_source=self.audio_source, block_dur=0.01) # 10 ms | |
53 size = ads.get_block_size() | |
54 self.assertEqual(size, 160, "Wrong block_size, expected: 160, found: {0}".format(size)) | |
55 | |
56 # with alias keyword | |
57 ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025) # 25 ms | |
58 size = ads.get_block_size() | |
59 self.assertEqual(size, 400, "Wrong block_size, expected: 400, found: {0}".format(size)) | |
60 | |
61 def test_hop_duration(self): | |
62 | |
63 ads = ADSFactory.ads(audio_source=self.audio_source, block_dur=0.02, hop_dur=0.01) # 10 ms | |
64 size = ads.hop_size | |
65 self.assertEqual(size, 160, "Wrong hop_size, expected: 160, found: {0}".format(size)) | |
66 | |
67 # with alias keyword | |
68 ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025, hop_dur=0.015) # 15 ms | |
69 size = ads.hop_size | |
70 self.assertEqual(size, 240, "Wrong block_size, expected: 240, found: {0}".format(size)) | |
71 | |
72 | |
73 def test_sampling_rate(self): | |
74 ads = ADSFactory.ads(audio_source=self.audio_source) | |
75 | |
76 srate = ads.get_sampling_rate() | |
77 self.assertEqual(srate, 16000, "Wrong sampling rate, expected: 16000, found: {0}".format(srate)) | |
78 | |
79 def test_sample_width(self): | |
80 ads = ADSFactory.ads(audio_source=self.audio_source) | |
81 | |
82 swidth = ads.get_sample_width() | |
83 self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth)) | |
84 | |
85 def test_channels(self): | |
86 ads = ADSFactory.ads(audio_source=self.audio_source) | |
87 | |
88 channels = ads.get_channels() | |
89 self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels)) | |
90 | |
91 def test_read(self): | |
92 ads = ADSFactory.ads(audio_source=self.audio_source, block_size = 256) | |
93 | |
94 ads.open() | |
95 ads_data = ads.read() | |
96 ads.close() | |
97 | |
98 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise) | |
99 audio_source.open() | |
100 audio_source_data = audio_source.read(256) | |
101 audio_source.close() | |
102 | |
103 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from ads") | |
104 | |
105 def test_Limiter_Deco_type(self): | |
106 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1) | |
107 | |
108 self.assertIsInstance(ads, ADSFactory.LimiterADS, | |
109 msg="wrong type for ads object, expected: 'ADSFactory.LimiterADS', found: {0}".format(type(ads))) | |
110 | |
111 | |
112 def test_Limiter_Deco_read(self): | |
113 # read a maximum of 0.75 seconds from audio source | |
114 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.75) | |
115 | |
116 ads_data = [] | |
117 ads.open() | |
118 while True: | |
119 block = ads.read() | |
120 if block is None: | |
121 break | |
122 ads_data.append(block) | |
123 ads.close() | |
124 ads_data = b''.join(ads_data) | |
125 | |
126 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise) | |
127 audio_source.open() | |
128 audio_source_data = audio_source.read(int(16000 * 0.75)) | |
129 audio_source.close() | |
130 | |
131 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from LimiterADS") | |
132 | |
133 | |
134 def test_Limiter_Deco_read_limit(self): | |
135 # read a maximum of 1.25 seconds from audio source | |
136 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1.191) | |
137 | |
138 # desired duration into bytes is obtained by: | |
139 # max_time * sampling_rate * sample_width * nb_channels | |
140 # Limiter deco tries to a total quantity of data as | |
141 # possible to the desired duration in bytes. | |
142 # It reads N block of size block_size where: | |
143 # (N - 1) * block_size < desired duration, AND | |
144 # N * block_size >= desired duration | |
145 | |
146 # theoretical size to reach | |
147 expected_size = int(ads.get_sampling_rate() * 1.191) * \ | |
148 ads.get_sample_width() * ads.get_channels() | |
149 | |
150 | |
151 # how much data are required to get N blocks of size block_size | |
152 block_size_bytes = ads.get_block_size() * ads.get_sample_width() * ads.get_channels() | |
153 r = expected_size % block_size_bytes | |
154 if r > 0: | |
155 expected_size += block_size_bytes - r | |
156 | |
157 total_read = 0 | |
158 ads.open() | |
159 i = 0 | |
160 while True: | |
161 block = ads.read() | |
162 if block is None: | |
163 break | |
164 i += 1 | |
165 total_read += len(block) | |
166 | |
167 ads.close() | |
168 | |
169 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read)) | |
170 | |
171 | |
172 | |
173 def test_Recorder_Deco_type(self): | |
174 ads = ADSFactory.ads(audio_source=self.audio_source, record=True) | |
175 | |
176 self.assertIsInstance(ads, ADSFactory.RecorderADS, | |
177 msg="wrong type for ads object, expected: 'ADSFactory.RecorderADS', found: {0}".format(type(ads))) | |
178 | |
179 | |
180 def test_Recorder_Deco_read(self): | |
181 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size=500) | |
182 | |
183 ads_data = [] | |
184 ads.open() | |
185 for i in range(10): | |
186 block = ads.read() | |
187 if block is None: | |
188 break | |
189 ads_data.append(block) | |
190 ads.close() | |
191 ads_data = b''.join(ads_data) | |
192 | |
193 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise) | |
194 audio_source.open() | |
195 audio_source_data = audio_source.read(500 * 10) | |
196 audio_source.close() | |
197 | |
198 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from RecorderADS") | |
199 | |
200 def test_Recorder_Deco_is_rewindable(self): | |
201 ads = ADSFactory.ads(audio_source=self.audio_source, record=True) | |
202 | |
203 self.assertTrue(ads.is_rewindable(), "RecorderADS.is_rewindable should return True") | |
204 | |
205 | |
206 def test_Recorder_Deco_rewind(self): | |
207 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320) | |
208 | |
209 ads.open() | |
210 ads.read() | |
211 ads.rewind() | |
212 | |
213 | |
214 self.assertIsInstance(ads.get_audio_source(), | |
215 BufferAudioSource, "After rewind RecorderADS.get_audio_source should \ | |
216 be an instance of BufferAudioSource") | |
217 ads.close() | |
218 | |
219 | |
220 def test_Recorder_Deco_rewind_and_read(self): | |
221 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320) | |
222 | |
223 ads.open() | |
224 for i in range(10): | |
225 ads.read() | |
226 | |
227 ads.rewind() | |
228 | |
229 # read all available data after rewind | |
230 ads_data = [] | |
231 while True: | |
232 block = ads.read() | |
233 if block is None: | |
234 break | |
235 ads_data.append(block) | |
236 ads.close() | |
237 ads_data = b''.join(ads_data) | |
238 | |
239 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise) | |
240 audio_source.open() | |
241 audio_source_data = audio_source.read(320 * 10) | |
242 audio_source.close() | |
243 | |
244 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from RecorderADS") | |
245 | |
246 def test_Overlap_Deco_type(self): | |
247 # an OverlapADS is obtained if a valid hop_size is given | |
248 ads = ADSFactory.ads(audio_source=self.audio_source, block_size = 256, hop_size = 128) | |
249 | |
250 self.assertIsInstance(ads, ADSFactory.OverlapADS, | |
251 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads))) | |
252 | |
253 | |
254 | |
255 | |
256 def test_Overlap_Deco_read(self): | |
257 | |
258 # Use arbitrary valid block_size and hop_size | |
259 block_size = 1714 | |
260 hop_size = 313 | |
261 | |
262 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=block_size, hop_size=hop_size) | |
263 | |
264 # Read all available data overlapping blocks | |
265 ads.open() | |
266 ads_data = [] | |
267 while True: | |
268 block = ads.read() | |
269 if block is None: | |
270 break | |
271 ads_data.append(block) | |
272 ads.close() | |
273 | |
274 # Read all data from file and build a BufferAudioSource | |
275 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
276 wave_data = fp.readframes(fp.getnframes()) | |
277 fp.close() | |
278 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(), | |
279 ads.get_sample_width(), ads.get_channels()) | |
280 audio_source.open() | |
281 | |
282 # Compare all blocks read from OverlapADS to those read | |
283 # from an audio source with a manual set_position | |
284 for i,block in enumerate(ads_data): | |
285 | |
286 tmp = audio_source.read(block_size) | |
287 | |
288 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i)) | |
289 | |
290 audio_source.set_position((i+1) * hop_size) | |
291 | |
292 audio_source.close() | |
293 | |
294 | |
295 | |
296 | |
297 def test_Limiter_Overlap_Deco_type(self): | |
298 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1, block_size = 256, hop_size = 128) | |
299 | |
300 self.assertIsInstance(ads, ADSFactory.OverlapADS, | |
301 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads))) | |
302 | |
303 | |
304 self.assertIsInstance(ads.ads, ADSFactory.LimiterADS, | |
305 msg="wrong type for ads object, expected: 'ADSFactory.LimiterADS', found: {0}".format(type(ads))) | |
306 | |
307 | |
308 | |
309 def test_Limiter_Overlap_Deco_read(self): | |
310 | |
311 block_size = 256 | |
312 hop_size = 200 | |
313 | |
314 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.50, block_size=block_size, hop_size=hop_size) | |
315 | |
316 # Read all available data overlapping blocks | |
317 ads.open() | |
318 ads_data = [] | |
319 while True: | |
320 block = ads.read() | |
321 if block is None: | |
322 break | |
323 ads_data.append(block) | |
324 ads.close() | |
325 | |
326 # Read all data from file and build a BufferAudioSource | |
327 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
328 wave_data = fp.readframes(fp.getnframes()) | |
329 fp.close() | |
330 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(), | |
331 ads.get_sample_width(), ads.get_channels()) | |
332 audio_source.open() | |
333 | |
334 # Compare all blocks read from OverlapADS to those read | |
335 # from an audio source with a manual set_position | |
336 for i,block in enumerate(ads_data): | |
337 tmp = audio_source.read(block_size) | |
338 | |
339 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i)) | |
340 | |
341 audio_source.set_position((i+1) * hop_size) | |
342 | |
343 audio_source.close() | |
344 | |
345 | |
346 | |
347 def test_Limiter_Overlap_Deco_read_limit(self): | |
348 | |
349 block_size = 313 | |
350 hop_size = 207 | |
351 ads = ADSFactory.ads(audio_source=self.audio_source, | |
352 max_time=1.932, block_size=block_size, | |
353 hop_size=hop_size) | |
354 | |
355 # Limiter + Overlap decos => read N block of actual data | |
356 # one block of size block_size | |
357 # N - 1 blocks of size hop_size | |
358 # the total size of read data might be a slightly greater | |
359 # than the required size calculated from max_time | |
360 | |
361 # theoretical size to reach | |
362 expected_size = int(ads.get_sampling_rate() * 1.932) * \ | |
363 ads.get_sample_width() * ads.get_channels() | |
364 | |
365 # minus block_size | |
366 expected_size -= (block_size * ads.get_sample_width() * ads.get_channels()) | |
367 | |
368 # how much data are required to get N - 1 blocks of size hop_size | |
369 hop_size_bytes = hop_size * ads.get_sample_width() * ads.get_channels() | |
370 r = expected_size % hop_size_bytes | |
371 if r > 0: | |
372 expected_size += hop_size_bytes - r | |
373 | |
374 expected_size += block_size * ads.get_sample_width() * ads.get_channels() | |
375 | |
376 cache_size = (block_size - hop_size) * ads.get_sample_width() * ads.get_channels() | |
377 total_read = cache_size | |
378 | |
379 ads.open() | |
380 i = 0 | |
381 while True: | |
382 block = ads.read() | |
383 if block is None: | |
384 break | |
385 i += 1 | |
386 total_read += len(block) - cache_size | |
387 | |
388 ads.close() | |
389 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read)) | |
390 | |
391 | |
392 | |
393 def test_Recorder_Overlap_Deco_type(self): | |
394 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=256, hop_size=128, record=True) | |
395 | |
396 self.assertIsInstance(ads, ADSFactory.OverlapADS, | |
397 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads))) | |
398 | |
399 | |
400 self.assertIsInstance(ads.ads, ADSFactory.RecorderADS, | |
401 msg="wrong type for ads object, expected: 'ADSFactory.RecorderADS', found: {0}".format(type(ads))) | |
402 | |
403 | |
404 | |
405 def test_Recorder_Overlap_Deco_is_rewindable(self): | |
406 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=320, hop_size=160, record=True) | |
407 self.assertTrue(ads.is_rewindable(), "RecorderADS.is_rewindable should return True") | |
408 | |
409 | |
410 def test_Recorder_Overlap_Deco_rewind_and_read(self): | |
411 | |
412 # Use arbitrary valid block_size and hop_size | |
413 block_size = 1600 | |
414 hop_size = 400 | |
415 | |
416 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=block_size, hop_size=hop_size, record=True) | |
417 | |
418 # Read all available data overlapping blocks | |
419 ads.open() | |
420 i = 0 | |
421 while True: | |
422 block = ads.read() | |
423 if block is None: | |
424 break | |
425 i += 1 | |
426 | |
427 ads.rewind() | |
428 | |
429 # Read all data from file and build a BufferAudioSource | |
430 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
431 wave_data = fp.readframes(fp.getnframes()) | |
432 fp.close() | |
433 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(), | |
434 ads.get_sample_width(), ads.get_channels()) | |
435 audio_source.open() | |
436 | |
437 # Compare all blocks read from OverlapADS to those read | |
438 # from an audio source with a manual set_position | |
439 for j in range(i): | |
440 | |
441 tmp = audio_source.read(block_size) | |
442 | |
443 self.assertEqual(ads.read(), tmp, "Unexpected block (N={0}) read from OverlapADS".format(i)) | |
444 audio_source.set_position((j+1) * hop_size) | |
445 | |
446 ads.close() | |
447 audio_source.close() | |
448 | |
449 | |
450 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self): | |
451 | |
452 # Use arbitrary valid block_size and hop_size | |
453 block_size = 1600 | |
454 hop_size = 400 | |
455 | |
456 ads = ADSFactory.ads(audio_source=self.audio_source, max_time = 1.50, block_size=block_size, hop_size=hop_size, record=True) | |
457 | |
458 # Read all available data overlapping blocks | |
459 ads.open() | |
460 i = 0 | |
461 while True: | |
462 block = ads.read() | |
463 if block is None: | |
464 break | |
465 i += 1 | |
466 | |
467 ads.rewind() | |
468 | |
469 # Read all data from file and build a BufferAudioSource | |
470 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") | |
471 wave_data = fp.readframes(fp.getnframes()) | |
472 fp.close() | |
473 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(), | |
474 ads.get_sample_width(), ads.get_channels()) | |
475 audio_source.open() | |
476 | |
477 # Compare all blocks read from OverlapADS to those read | |
478 # from an audio source with a manual set_position | |
479 for j in range(i): | |
480 | |
481 tmp = audio_source.read(block_size) | |
482 | |
483 self.assertEqual(ads.read(), tmp, "Unexpected block (N={0}) read from OverlapADS".format(i)) | |
484 audio_source.set_position((j+1) * hop_size) | |
485 | |
486 ads.close() | |
487 audio_source.close() | |
488 | |
489 | |
490 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_limit(self): | |
491 | |
492 # Use arbitrary valid block_size and hop_size | |
493 block_size = 1000 | |
494 hop_size = 200 | |
495 | |
496 ads = ADSFactory.ads(audio_source=self.audio_source, max_time = 1.317, block_size=block_size, hop_size=hop_size, record=True) | |
497 | |
498 # Limiter + Overlap decos => read N block of actual data | |
499 # one block of size block_size | |
500 # N - 1 blocks of size hop_size | |
501 # the total size of read data might be a slightly greater | |
502 # than the required size calculated from max_time | |
503 | |
504 # theoretical size to reach | |
505 expected_size = int(ads.get_sampling_rate() * 1.317) * \ | |
506 ads.get_sample_width() * ads.get_channels() | |
507 | |
508 # minus block_size | |
509 expected_size -= (block_size * ads.get_sample_width() * ads.get_channels()) | |
510 | |
511 # how much data are required to get N - 1 blocks of size hop_size | |
512 hop_size_bytes = hop_size * ads.get_sample_width() * ads.get_channels() | |
513 r = expected_size % hop_size_bytes | |
514 if r > 0: | |
515 expected_size += hop_size_bytes - r | |
516 | |
517 expected_size += block_size * ads.get_sample_width() * ads.get_channels() | |
518 | |
519 cache_size = (block_size - hop_size) * ads.get_sample_width() * ads.get_channels() | |
520 total_read = cache_size | |
521 | |
522 ads.open() | |
523 i = 0 | |
524 while True: | |
525 block = ads.read() | |
526 if block is None: | |
527 break | |
528 i += 1 | |
529 total_read += len(block) - cache_size | |
530 | |
531 ads.close() | |
532 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read)) | |
533 | |
534 class TestADSFactoryBufferAudioSource(unittest.TestCase): | |
535 | |
536 def setUp(self): | |
537 self.signal = "ABCDEFGHIJKLMNOPQRSTUVWXYZ012345" | |
538 self.ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, | |
539 sample_width=2, channels=1) | |
540 | |
541 def test_ADS_BAS_type(self): | |
542 self.assertIsInstance(self.ads.get_audio_source(), | |
543 BufferAudioSource, "ads should \ | |
544 be an instance of BufferAudioSource") | |
545 | |
546 def test_ADS_BAS_sampling_rate(self): | |
547 srate = self.ads.get_sampling_rate() | |
548 self.assertEqual(srate, 16, "Wrong sampling rate, expected: 16000, found: {0}".format(srate)) | |
549 | |
550 | |
551 def test_ADS_BAS_get_sample_width(self): | |
552 swidth = self.ads.get_sample_width() | |
553 self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth)) | |
554 | |
555 def test_ADS_BAS_get_channels(self): | |
556 channels = self.ads.get_channels() | |
557 self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels)) | |
558 | |
559 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self): | |
560 | |
561 # Use arbitrary valid block_size and hop_size | |
562 block_size = 5 | |
563 hop_size = 4 | |
564 | |
565 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, | |
566 sample_width=2, channels=1, max_time = 0.80, | |
567 block_size=block_size, hop_size=hop_size, | |
568 record=True) | |
569 | |
570 # Read all available data overlapping blocks | |
571 ads.open() | |
572 i = 0 | |
573 while True: | |
574 block = ads.read() | |
575 if block is None: | |
576 break | |
577 i += 1 | |
578 | |
579 ads.rewind() | |
580 | |
581 # Build a BufferAudioSource | |
582 audio_source = BufferAudioSource(self.signal, ads.get_sampling_rate(), | |
583 ads.get_sample_width(), ads.get_channels()) | |
584 audio_source.open() | |
585 | |
586 # Compare all blocks read from OverlapADS to those read | |
587 # from an audio source with a manual set_position | |
588 for j in range(i): | |
589 | |
590 tmp = audio_source.read(block_size) | |
591 | |
592 block = ads.read() | |
593 | |
594 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i)) | |
595 audio_source.set_position((j+1) * hop_size) | |
596 | |
597 ads.close() | |
598 audio_source.close() | |
599 | |
600 | |
601 class TestADSFactoryAlias(unittest.TestCase): | |
602 | |
603 def setUp(self): | |
604 self.signal = "ABCDEFGHIJKLMNOPQRSTUVWXYZ012345" | |
605 | |
606 def test_sampling_rate_alias(self): | |
607 ads = ADSFactory.ads(data_buffer=self.signal, sr=16, | |
608 sample_width=2, channels=1) | |
609 srate = ads.get_sampling_rate() | |
610 self.assertEqual(srate, 16, "Wrong sampling rate, expected: 16000, found: {0}".format(srate)) | |
611 | |
612 def test_sampling_rate_duplicate(self): | |
613 func = partial(ADSFactory.ads, data_buffer=self.signal, sr=16, sampling_rate=16, | |
614 sample_width=2, channels=1) | |
615 self.assertRaises(DuplicateArgument, func) | |
616 | |
617 def test_sample_width_alias(self): | |
618 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, | |
619 sw=2, channels=1) | |
620 swidth = ads.get_sample_width() | |
621 self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth)) | |
622 | |
623 def test_sample_width_duplicate(self): | |
624 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, | |
625 sw=2, sample_width=2, channels=1) | |
626 self.assertRaises(DuplicateArgument, func) | |
627 | |
628 def test_channels_alias(self): | |
629 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, | |
630 sample_width=2, ch=1) | |
631 channels = ads.get_channels() | |
632 self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels)) | |
633 | |
634 def test_channels_duplicate(self): | |
635 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, | |
636 sample_width=2, ch=1, channels=1) | |
637 self.assertRaises(DuplicateArgument, func) | |
638 | |
639 | |
640 def test_block_size_alias(self): | |
641 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, | |
642 sample_width=2, channels=1, bs=8) | |
643 size = ads.get_block_size() | |
644 self.assertEqual(size, 8, "Wrong block_size using bs alias, expected: 8, found: {0}".format(size)) | |
645 | |
646 def test_block_size_duplicate(self): | |
647 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, | |
648 sample_width=2, channels=1, bs=4, block_size=4) | |
649 self.assertRaises(DuplicateArgument, func) | |
650 | |
651 def test_block_duration_alias(self): | |
652 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, | |
653 sample_width=2, channels=1, bd=0.75) | |
654 # 0.75 ms = 0.75 * 16 = 12 | |
655 size = ads.get_block_size() | |
656 self.assertEqual(size, 12, "Wrong block_size set with a block_dur alias 'bd', expected: 8, found: {0}".format(size)) | |
657 | |
658 def test_block_duration_duplicate(self): | |
659 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, | |
660 sample_width=2, channels=1, bd=4, block_dur=4) | |
661 self.assertRaises(DuplicateArgument, func) | |
662 | |
663 def test_block_size_duration_duplicate(self): | |
664 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, | |
665 sample_width=2, channels=1, bd=4, bs=12) | |
666 self.assertRaises(DuplicateArgument, func) | |
667 | |
668 def test_hop_duration_alias(self): | |
669 | |
670 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, | |
671 sample_width=2, channels=1, bd=0.75, hd=0.5 ) | |
672 size = ads.hop_size | |
673 self.assertEqual(size, 8, "Wrong block_size using bs alias, expected: 8, found: {0}".format(size)) | |
674 self.assertIsInstance(ads, ADSFactory.OverlapADS, "ads expected to an ADSFactory.OverlapADS object") | |
675 | |
676 | |
677 def test_hop_duration_duplicate(self): | |
678 | |
679 func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16, | |
680 sample_width=2, channels=1, bd=0.75, hd=0.5, hop_dur=0.5) | |
681 self.assertRaises(DuplicateArgument, func) | |
682 | |
683 | |
684 def test_hop_size_duration_duplicate(self): | |
685 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, | |
686 sample_width=2, channels=1, bs=8, hs=4, hd=1) | |
687 self.assertRaises(DuplicateArgument, func) | |
688 | |
689 | |
690 def test_hop_size_greater_than_block_size(self): | |
691 func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16, | |
692 sample_width=2, channels=1, bs=4, hs=8) | |
693 self.assertRaises(ValueError, func) | |
694 | |
695 | |
696 def test_filename_alias(self): | |
697 ads = ADSFactory.ads(fn=dataset.one_to_six_arabic_16000_mono_bc_noise) | |
698 | |
699 | |
700 def test_filename_duplicate(self): | |
701 | |
702 func = partial(ADSFactory.ads, fn=dataset.one_to_six_arabic_16000_mono_bc_noise, filename=dataset.one_to_six_arabic_16000_mono_bc_noise) | |
703 self.assertRaises(DuplicateArgument, func) | |
704 | |
705 | |
706 def test_data_buffer_alias(self): | |
707 ads = ADSFactory.ads(db=self.signal, sampling_rate=16, | |
708 sample_width=2, channels=1) | |
709 self.assertEqual(ads.get_audio_source().get_data_buffer(), self.signal, "Wrong value for data buffer") | |
710 | |
711 | |
712 def test_data_buffer_duplicate(self): | |
713 func = partial(ADSFactory.ads, data_buffer=self.signal, db=self.signal, sampling_rate=16, | |
714 sample_width=2, channels=1) | |
715 self.assertRaises(DuplicateArgument, func) | |
716 | |
717 | |
718 def test_max_time_alias(self): | |
719 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, | |
720 sample_width=2, channels=1, mt=10) | |
721 self.assertIsInstance(ads, ADSFactory.LimiterADS, "ads expected to an ADSFactory.LimiterADS object") | |
722 | |
723 | |
724 def test_max_time_duplicate(self): | |
725 func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16, | |
726 sample_width=2, channels=1, mt=True, max_time=True) | |
727 | |
728 self.assertRaises(DuplicateArgument, func) | |
729 | |
730 def test_record_alias(self): | |
731 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, | |
732 sample_width=2, channels=1, rec=True) | |
733 self.assertIsInstance(ads, ADSFactory.RecorderADS, "ads expected to an ADSFactory.RecorderADS object") | |
734 | |
735 | |
736 def test_record_duplicate(self): | |
737 func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16, | |
738 sample_width=2, channels=1, rec=True, record=True) | |
739 self.assertRaises(DuplicateArgument, func) | |
740 | |
741 | |
742 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_alias(self): | |
743 | |
744 # Use arbitrary valid block_size and hop_size | |
745 block_size = 5 | |
746 hop_size = 4 | |
747 | |
748 ads = ADSFactory.ads(db=self.signal, sr=16, | |
749 sw=2, ch=1, mt = 0.80, | |
750 bs=block_size, hs=hop_size, | |
751 rec=True) | |
752 | |
753 # Read all available data overlapping blocks | |
754 ads.open() | |
755 i = 0 | |
756 while True: | |
757 block = ads.read() | |
758 if block is None: | |
759 break | |
760 i += 1 | |
761 | |
762 ads.rewind() | |
763 | |
764 # Build a BufferAudioSource | |
765 audio_source = BufferAudioSource(self.signal, ads.get_sampling_rate(), | |
766 ads.get_sample_width(), ads.get_channels()) | |
767 audio_source.open() | |
768 | |
769 # Compare all blocks read from OverlapADS to those read | |
770 # from an audio source with a manual set_position | |
771 for j in range(i): | |
772 | |
773 tmp = audio_source.read(block_size) | |
774 | |
775 block = ads.read() | |
776 | |
777 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i)) | |
778 audio_source.set_position((j+1) * hop_size) | |
779 | |
780 ads.close() | |
781 audio_source.close() | |
782 | |
783 | |
784 if __name__ == "__main__": | |
785 #import sys;sys.argv = ['', 'Test.testName'] | |
786 unittest.main() |