amine@2
|
1 '''
|
amine@2
|
2 @author: Amine Sehili <amine.sehili@gmail.com>
|
amine@2
|
3 September 2015
|
amine@2
|
4
|
amine@2
|
5 '''
|
amine@2
|
6
|
amine@2
|
7 import unittest
|
amine@10
|
8 from functools import partial
|
amine@10
|
9 import sys
|
amine@10
|
10 from auditok import dataset, ADSFactory, BufferAudioSource, WaveAudioSource, DuplicateArgument
|
amine@2
|
11 import wave
|
amine@2
|
12
|
amine@2
|
13
|
amine@10
|
14 try:
|
amine@10
|
15 from builtins import range
|
amine@10
|
16 except ImportError:
|
amine@10
|
17 if sys.version_info < (3, 0):
|
amine@10
|
18 range = xrange
|
amine@10
|
19
|
amine@2
|
20 class TestADSFactoryFileAudioSource(unittest.TestCase):
|
amine@2
|
21
|
amine@2
|
22 def setUp(self):
|
amine@2
|
23 self.audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
24
|
amine@2
|
25
|
amine@2
|
26 def test_ADS_type(self):
|
amine@2
|
27
|
amine@2
|
28 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
29
|
amine@2
|
30 self.assertIsInstance(ads, ADSFactory.AudioDataSource,
|
amine@2
|
31 msg="wrong type for ads object, expected: 'ADSFactory.AudioDataSource', found: {0}".format(type(ads)))
|
amine@2
|
32
|
amine@2
|
33
|
amine@2
|
34 def test_default_block_size(self):
|
amine@2
|
35 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
36 size = ads.get_block_size()
|
amine@2
|
37 self.assertEqual(size, 160, "Wrong default block_size, expected: 160, found: {0}".format(size))
|
amine@2
|
38
|
amine@2
|
39
|
amine@2
|
40 def test_block_size(self):
|
amine@2
|
41 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=512)
|
amine@2
|
42 size = ads.get_block_size()
|
amine@2
|
43 self.assertEqual(size, 512, "Wrong block_size, expected: 512, found: {0}".format(size))
|
amine@10
|
44
|
amine@10
|
45 # with alias keyword
|
amine@10
|
46 ads = ADSFactory.ads(audio_source=self.audio_source, bs=160)
|
amine@10
|
47 size = ads.get_block_size()
|
amine@10
|
48 self.assertEqual(size, 160, "Wrong block_size, expected: 160, found: {0}".format(size))
|
amine@10
|
49
|
amine@10
|
50 def test_block_duration(self):
|
amine@10
|
51
|
amine@10
|
52 ads = ADSFactory.ads(audio_source=self.audio_source, block_dur=0.01) # 10 ms
|
amine@10
|
53 size = ads.get_block_size()
|
amine@10
|
54 self.assertEqual(size, 160, "Wrong block_size, expected: 160, found: {0}".format(size))
|
amine@10
|
55
|
amine@10
|
56 # with alias keyword
|
amine@10
|
57 ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025) # 25 ms
|
amine@10
|
58 size = ads.get_block_size()
|
amine@10
|
59 self.assertEqual(size, 400, "Wrong block_size, expected: 400, found: {0}".format(size))
|
amine@10
|
60
|
amine@10
|
61 def test_hop_duration(self):
|
amine@10
|
62
|
amine@10
|
63 ads = ADSFactory.ads(audio_source=self.audio_source, block_dur=0.02, hop_dur=0.01) # 10 ms
|
amine@10
|
64 size = ads.hop_size
|
amine@10
|
65 self.assertEqual(size, 160, "Wrong hop_size, expected: 160, found: {0}".format(size))
|
amine@10
|
66
|
amine@10
|
67 # with alias keyword
|
amine@10
|
68 ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025, hop_dur=0.015) # 15 ms
|
amine@10
|
69 size = ads.hop_size
|
amine@10
|
70 self.assertEqual(size, 240, "Wrong block_size, expected: 240, found: {0}".format(size))
|
amine@10
|
71
|
amine@2
|
72
|
amine@2
|
73 def test_sampling_rate(self):
|
amine@2
|
74 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
75
|
amine@2
|
76 srate = ads.get_sampling_rate()
|
amine@2
|
77 self.assertEqual(srate, 16000, "Wrong sampling rate, expected: 16000, found: {0}".format(srate))
|
amine@2
|
78
|
amine@2
|
79 def test_sample_width(self):
|
amine@2
|
80 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
81
|
amine@2
|
82 swidth = ads.get_sample_width()
|
amine@2
|
83 self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth))
|
amine@2
|
84
|
amine@2
|
85 def test_channels(self):
|
amine@2
|
86 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
87
|
amine@2
|
88 channels = ads.get_channels()
|
amine@2
|
89 self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels))
|
amine@2
|
90
|
amine@2
|
91 def test_read(self):
|
amine@2
|
92 ads = ADSFactory.ads(audio_source=self.audio_source, block_size = 256)
|
amine@2
|
93
|
amine@2
|
94 ads.open()
|
amine@2
|
95 ads_data = ads.read()
|
amine@2
|
96 ads.close()
|
amine@2
|
97
|
amine@2
|
98 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
99 audio_source.open()
|
amine@2
|
100 audio_source_data = audio_source.read(256)
|
amine@2
|
101 audio_source.close()
|
amine@2
|
102
|
amine@2
|
103 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from ads")
|
amine@2
|
104
|
amine@2
|
105 def test_Limiter_Deco_type(self):
|
amine@2
|
106 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1)
|
amine@2
|
107
|
amine@2
|
108 self.assertIsInstance(ads, ADSFactory.LimiterADS,
|
amine@2
|
109 msg="wrong type for ads object, expected: 'ADSFactory.LimiterADS', found: {0}".format(type(ads)))
|
amine@2
|
110
|
amine@2
|
111
|
amine@2
|
112 def test_Limiter_Deco_read(self):
|
amine@2
|
113 # read a maximum of 0.75 seconds from audio source
|
amine@2
|
114 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.75)
|
amine@2
|
115
|
amine@2
|
116 ads_data = []
|
amine@2
|
117 ads.open()
|
amine@2
|
118 while True:
|
amine@2
|
119 block = ads.read()
|
amine@2
|
120 if block is None:
|
amine@2
|
121 break
|
amine@2
|
122 ads_data.append(block)
|
amine@2
|
123 ads.close()
|
amine@10
|
124 ads_data = b''.join(ads_data)
|
amine@2
|
125
|
amine@2
|
126 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
127 audio_source.open()
|
amine@2
|
128 audio_source_data = audio_source.read(int(16000 * 0.75))
|
amine@2
|
129 audio_source.close()
|
amine@2
|
130
|
amine@2
|
131 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from LimiterADS")
|
amine@2
|
132
|
amine@2
|
133
|
amine@2
|
134 def test_Limiter_Deco_read_limit(self):
|
amine@2
|
135 # read a maximum of 1.25 seconds from audio source
|
amine@2
|
136 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1.191)
|
amine@2
|
137
|
amine@2
|
138 # desired duration into bytes is obtained by:
|
amine@2
|
139 # max_time * sampling_rate * sample_width * nb_channels
|
amine@2
|
140 # Limiter deco tries to a total quantity of data as
|
amine@2
|
141 # possible to the desired duration in bytes.
|
amine@2
|
142 # It reads N block of size block_size where:
|
amine@2
|
143 # (N - 1) * block_size < desired duration, AND
|
amine@2
|
144 # N * block_size >= desired duration
|
amine@2
|
145
|
amine@2
|
146 # theoretical size to reach
|
amine@2
|
147 expected_size = int(ads.get_sampling_rate() * 1.191) * \
|
amine@2
|
148 ads.get_sample_width() * ads.get_channels()
|
amine@2
|
149
|
amine@2
|
150
|
amine@2
|
151 # how much data are required to get N blocks of size block_size
|
amine@2
|
152 block_size_bytes = ads.get_block_size() * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
153 r = expected_size % block_size_bytes
|
amine@2
|
154 if r > 0:
|
amine@2
|
155 expected_size += block_size_bytes - r
|
amine@2
|
156
|
amine@2
|
157 total_read = 0
|
amine@2
|
158 ads.open()
|
amine@2
|
159 i = 0
|
amine@2
|
160 while True:
|
amine@2
|
161 block = ads.read()
|
amine@2
|
162 if block is None:
|
amine@2
|
163 break
|
amine@2
|
164 i += 1
|
amine@2
|
165 total_read += len(block)
|
amine@2
|
166
|
amine@2
|
167 ads.close()
|
amine@2
|
168
|
amine@2
|
169 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
|
amine@2
|
170
|
amine@2
|
171
|
amine@2
|
172
|
amine@2
|
173 def test_Recorder_Deco_type(self):
|
amine@2
|
174 ads = ADSFactory.ads(audio_source=self.audio_source, record=True)
|
amine@2
|
175
|
amine@2
|
176 self.assertIsInstance(ads, ADSFactory.RecorderADS,
|
amine@2
|
177 msg="wrong type for ads object, expected: 'ADSFactory.RecorderADS', found: {0}".format(type(ads)))
|
amine@2
|
178
|
amine@2
|
179
|
amine@2
|
180 def test_Recorder_Deco_read(self):
|
amine@2
|
181 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size=500)
|
amine@2
|
182
|
amine@2
|
183 ads_data = []
|
amine@2
|
184 ads.open()
|
amine@10
|
185 for i in range(10):
|
amine@2
|
186 block = ads.read()
|
amine@2
|
187 if block is None:
|
amine@2
|
188 break
|
amine@2
|
189 ads_data.append(block)
|
amine@2
|
190 ads.close()
|
amine@10
|
191 ads_data = b''.join(ads_data)
|
amine@2
|
192
|
amine@2
|
193 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
194 audio_source.open()
|
amine@2
|
195 audio_source_data = audio_source.read(500 * 10)
|
amine@2
|
196 audio_source.close()
|
amine@2
|
197
|
amine@2
|
198 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from RecorderADS")
|
amine@2
|
199
|
amine@2
|
200 def test_Recorder_Deco_is_rewindable(self):
|
amine@2
|
201 ads = ADSFactory.ads(audio_source=self.audio_source, record=True)
|
amine@2
|
202
|
amine@2
|
203 self.assertTrue(ads.is_rewindable(), "RecorderADS.is_rewindable should return True")
|
amine@2
|
204
|
amine@2
|
205
|
amine@2
|
206 def test_Recorder_Deco_rewind(self):
|
amine@2
|
207 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320)
|
amine@2
|
208
|
amine@2
|
209 ads.open()
|
amine@2
|
210 ads.read()
|
amine@2
|
211 ads.rewind()
|
amine@2
|
212
|
amine@2
|
213
|
amine@2
|
214 self.assertIsInstance(ads.get_audio_source(),
|
amine@2
|
215 BufferAudioSource, "After rewind RecorderADS.get_audio_source should \
|
amine@2
|
216 be an instance of BufferAudioSource")
|
amine@2
|
217 ads.close()
|
amine@2
|
218
|
amine@2
|
219
|
amine@2
|
220 def test_Recorder_Deco_rewind_and_read(self):
|
amine@2
|
221 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320)
|
amine@2
|
222
|
amine@2
|
223 ads.open()
|
amine@10
|
224 for i in range(10):
|
amine@2
|
225 ads.read()
|
amine@2
|
226
|
amine@2
|
227 ads.rewind()
|
amine@2
|
228
|
amine@2
|
229 # read all available data after rewind
|
amine@2
|
230 ads_data = []
|
amine@2
|
231 while True:
|
amine@2
|
232 block = ads.read()
|
amine@2
|
233 if block is None:
|
amine@2
|
234 break
|
amine@2
|
235 ads_data.append(block)
|
amine@2
|
236 ads.close()
|
amine@10
|
237 ads_data = b''.join(ads_data)
|
amine@2
|
238
|
amine@2
|
239 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
240 audio_source.open()
|
amine@2
|
241 audio_source_data = audio_source.read(320 * 10)
|
amine@2
|
242 audio_source.close()
|
amine@2
|
243
|
amine@2
|
244 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from RecorderADS")
|
amine@2
|
245
|
amine@2
|
246 def test_Overlap_Deco_type(self):
|
amine@2
|
247 # an OverlapADS is obtained if a valid hop_size is given
|
amine@2
|
248 ads = ADSFactory.ads(audio_source=self.audio_source, block_size = 256, hop_size = 128)
|
amine@2
|
249
|
amine@2
|
250 self.assertIsInstance(ads, ADSFactory.OverlapADS,
|
amine@2
|
251 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
|
amine@2
|
252
|
amine@2
|
253
|
amine@2
|
254
|
amine@2
|
255
|
amine@2
|
256 def test_Overlap_Deco_read(self):
|
amine@2
|
257
|
amine@2
|
258 # Use arbitrary valid block_size and hop_size
|
amine@2
|
259 block_size = 1714
|
amine@2
|
260 hop_size = 313
|
amine@2
|
261
|
amine@2
|
262 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=block_size, hop_size=hop_size)
|
amine@2
|
263
|
amine@2
|
264 # Read all available data overlapping blocks
|
amine@2
|
265 ads.open()
|
amine@2
|
266 ads_data = []
|
amine@2
|
267 while True:
|
amine@2
|
268 block = ads.read()
|
amine@2
|
269 if block is None:
|
amine@2
|
270 break
|
amine@2
|
271 ads_data.append(block)
|
amine@2
|
272 ads.close()
|
amine@2
|
273
|
amine@2
|
274 # Read all data from file and build a BufferAudioSource
|
amine@2
|
275 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
276 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
277 fp.close()
|
amine@2
|
278 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
279 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
280 audio_source.open()
|
amine@2
|
281
|
amine@2
|
282 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
283 # from an audio source with a manual set_position
|
amine@2
|
284 for i,block in enumerate(ads_data):
|
amine@2
|
285
|
amine@2
|
286 tmp = audio_source.read(block_size)
|
amine@2
|
287
|
amine@2
|
288 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
289
|
amine@2
|
290 audio_source.set_position((i+1) * hop_size)
|
amine@2
|
291
|
amine@2
|
292 audio_source.close()
|
amine@2
|
293
|
amine@2
|
294
|
amine@2
|
295
|
amine@2
|
296
|
amine@2
|
297 def test_Limiter_Overlap_Deco_type(self):
|
amine@2
|
298 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1, block_size = 256, hop_size = 128)
|
amine@2
|
299
|
amine@2
|
300 self.assertIsInstance(ads, ADSFactory.OverlapADS,
|
amine@2
|
301 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
|
amine@2
|
302
|
amine@2
|
303
|
amine@2
|
304 self.assertIsInstance(ads.ads, ADSFactory.LimiterADS,
|
amine@2
|
305 msg="wrong type for ads object, expected: 'ADSFactory.LimiterADS', found: {0}".format(type(ads)))
|
amine@2
|
306
|
amine@2
|
307
|
amine@2
|
308
|
amine@2
|
309 def test_Limiter_Overlap_Deco_read(self):
|
amine@2
|
310
|
amine@2
|
311 block_size = 256
|
amine@2
|
312 hop_size = 200
|
amine@2
|
313
|
amine@2
|
314 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.50, block_size=block_size, hop_size=hop_size)
|
amine@2
|
315
|
amine@2
|
316 # Read all available data overlapping blocks
|
amine@2
|
317 ads.open()
|
amine@2
|
318 ads_data = []
|
amine@2
|
319 while True:
|
amine@2
|
320 block = ads.read()
|
amine@2
|
321 if block is None:
|
amine@2
|
322 break
|
amine@2
|
323 ads_data.append(block)
|
amine@2
|
324 ads.close()
|
amine@2
|
325
|
amine@2
|
326 # Read all data from file and build a BufferAudioSource
|
amine@2
|
327 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
328 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
329 fp.close()
|
amine@2
|
330 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
331 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
332 audio_source.open()
|
amine@2
|
333
|
amine@2
|
334 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
335 # from an audio source with a manual set_position
|
amine@2
|
336 for i,block in enumerate(ads_data):
|
amine@2
|
337 tmp = audio_source.read(block_size)
|
amine@2
|
338
|
amine@2
|
339 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
340
|
amine@2
|
341 audio_source.set_position((i+1) * hop_size)
|
amine@2
|
342
|
amine@2
|
343 audio_source.close()
|
amine@2
|
344
|
amine@2
|
345
|
amine@2
|
346
|
amine@2
|
347 def test_Limiter_Overlap_Deco_read_limit(self):
|
amine@2
|
348
|
amine@2
|
349 block_size = 313
|
amine@2
|
350 hop_size = 207
|
amine@2
|
351 ads = ADSFactory.ads(audio_source=self.audio_source,
|
amine@2
|
352 max_time=1.932, block_size=block_size,
|
amine@2
|
353 hop_size=hop_size)
|
amine@2
|
354
|
amine@2
|
355 # Limiter + Overlap decos => read N block of actual data
|
amine@2
|
356 # one block of size block_size
|
amine@2
|
357 # N - 1 blocks of size hop_size
|
amine@2
|
358 # the total size of read data might be a slightly greater
|
amine@2
|
359 # than the required size calculated from max_time
|
amine@2
|
360
|
amine@2
|
361 # theoretical size to reach
|
amine@2
|
362 expected_size = int(ads.get_sampling_rate() * 1.932) * \
|
amine@2
|
363 ads.get_sample_width() * ads.get_channels()
|
amine@2
|
364
|
amine@2
|
365 # minus block_size
|
amine@2
|
366 expected_size -= (block_size * ads.get_sample_width() * ads.get_channels())
|
amine@2
|
367
|
amine@2
|
368 # how much data are required to get N - 1 blocks of size hop_size
|
amine@2
|
369 hop_size_bytes = hop_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
370 r = expected_size % hop_size_bytes
|
amine@2
|
371 if r > 0:
|
amine@2
|
372 expected_size += hop_size_bytes - r
|
amine@2
|
373
|
amine@2
|
374 expected_size += block_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
375
|
amine@2
|
376 cache_size = (block_size - hop_size) * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
377 total_read = cache_size
|
amine@2
|
378
|
amine@2
|
379 ads.open()
|
amine@2
|
380 i = 0
|
amine@2
|
381 while True:
|
amine@2
|
382 block = ads.read()
|
amine@2
|
383 if block is None:
|
amine@2
|
384 break
|
amine@2
|
385 i += 1
|
amine@2
|
386 total_read += len(block) - cache_size
|
amine@2
|
387
|
amine@2
|
388 ads.close()
|
amine@2
|
389 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
|
amine@2
|
390
|
amine@2
|
391
|
amine@2
|
392
|
amine@2
|
393 def test_Recorder_Overlap_Deco_type(self):
|
amine@2
|
394 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=256, hop_size=128, record=True)
|
amine@2
|
395
|
amine@2
|
396 self.assertIsInstance(ads, ADSFactory.OverlapADS,
|
amine@2
|
397 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
|
amine@2
|
398
|
amine@2
|
399
|
amine@2
|
400 self.assertIsInstance(ads.ads, ADSFactory.RecorderADS,
|
amine@2
|
401 msg="wrong type for ads object, expected: 'ADSFactory.RecorderADS', found: {0}".format(type(ads)))
|
amine@2
|
402
|
amine@2
|
403
|
amine@2
|
404
|
amine@2
|
405 def test_Recorder_Overlap_Deco_is_rewindable(self):
|
amine@2
|
406 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=320, hop_size=160, record=True)
|
amine@2
|
407 self.assertTrue(ads.is_rewindable(), "RecorderADS.is_rewindable should return True")
|
amine@2
|
408
|
amine@2
|
409
|
amine@2
|
410 def test_Recorder_Overlap_Deco_rewind_and_read(self):
|
amine@2
|
411
|
amine@2
|
412 # Use arbitrary valid block_size and hop_size
|
amine@2
|
413 block_size = 1600
|
amine@2
|
414 hop_size = 400
|
amine@2
|
415
|
amine@2
|
416 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=block_size, hop_size=hop_size, record=True)
|
amine@2
|
417
|
amine@2
|
418 # Read all available data overlapping blocks
|
amine@2
|
419 ads.open()
|
amine@2
|
420 i = 0
|
amine@2
|
421 while True:
|
amine@2
|
422 block = ads.read()
|
amine@2
|
423 if block is None:
|
amine@2
|
424 break
|
amine@2
|
425 i += 1
|
amine@2
|
426
|
amine@2
|
427 ads.rewind()
|
amine@2
|
428
|
amine@2
|
429 # Read all data from file and build a BufferAudioSource
|
amine@2
|
430 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
431 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
432 fp.close()
|
amine@2
|
433 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
434 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
435 audio_source.open()
|
amine@2
|
436
|
amine@2
|
437 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
438 # from an audio source with a manual set_position
|
amine@10
|
439 for j in range(i):
|
amine@2
|
440
|
amine@2
|
441 tmp = audio_source.read(block_size)
|
amine@2
|
442
|
amine@2
|
443 self.assertEqual(ads.read(), tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
444 audio_source.set_position((j+1) * hop_size)
|
amine@2
|
445
|
amine@2
|
446 ads.close()
|
amine@2
|
447 audio_source.close()
|
amine@2
|
448
|
amine@2
|
449
|
amine@2
|
450 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):
|
amine@2
|
451
|
amine@2
|
452 # Use arbitrary valid block_size and hop_size
|
amine@2
|
453 block_size = 1600
|
amine@2
|
454 hop_size = 400
|
amine@2
|
455
|
amine@2
|
456 ads = ADSFactory.ads(audio_source=self.audio_source, max_time = 1.50, block_size=block_size, hop_size=hop_size, record=True)
|
amine@2
|
457
|
amine@2
|
458 # Read all available data overlapping blocks
|
amine@2
|
459 ads.open()
|
amine@2
|
460 i = 0
|
amine@2
|
461 while True:
|
amine@2
|
462 block = ads.read()
|
amine@2
|
463 if block is None:
|
amine@2
|
464 break
|
amine@2
|
465 i += 1
|
amine@2
|
466
|
amine@2
|
467 ads.rewind()
|
amine@2
|
468
|
amine@2
|
469 # Read all data from file and build a BufferAudioSource
|
amine@2
|
470 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
471 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
472 fp.close()
|
amine@2
|
473 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
474 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
475 audio_source.open()
|
amine@2
|
476
|
amine@2
|
477 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
478 # from an audio source with a manual set_position
|
amine@10
|
479 for j in range(i):
|
amine@2
|
480
|
amine@2
|
481 tmp = audio_source.read(block_size)
|
amine@2
|
482
|
amine@2
|
483 self.assertEqual(ads.read(), tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
484 audio_source.set_position((j+1) * hop_size)
|
amine@2
|
485
|
amine@2
|
486 ads.close()
|
amine@2
|
487 audio_source.close()
|
amine@2
|
488
|
amine@2
|
489
|
amine@2
|
490 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_limit(self):
|
amine@2
|
491
|
amine@2
|
492 # Use arbitrary valid block_size and hop_size
|
amine@2
|
493 block_size = 1000
|
amine@2
|
494 hop_size = 200
|
amine@2
|
495
|
amine@2
|
496 ads = ADSFactory.ads(audio_source=self.audio_source, max_time = 1.317, block_size=block_size, hop_size=hop_size, record=True)
|
amine@2
|
497
|
amine@2
|
498 # Limiter + Overlap decos => read N block of actual data
|
amine@2
|
499 # one block of size block_size
|
amine@2
|
500 # N - 1 blocks of size hop_size
|
amine@2
|
501 # the total size of read data might be a slightly greater
|
amine@2
|
502 # than the required size calculated from max_time
|
amine@2
|
503
|
amine@2
|
504 # theoretical size to reach
|
amine@2
|
505 expected_size = int(ads.get_sampling_rate() * 1.317) * \
|
amine@2
|
506 ads.get_sample_width() * ads.get_channels()
|
amine@2
|
507
|
amine@2
|
508 # minus block_size
|
amine@2
|
509 expected_size -= (block_size * ads.get_sample_width() * ads.get_channels())
|
amine@2
|
510
|
amine@2
|
511 # how much data are required to get N - 1 blocks of size hop_size
|
amine@2
|
512 hop_size_bytes = hop_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
513 r = expected_size % hop_size_bytes
|
amine@2
|
514 if r > 0:
|
amine@2
|
515 expected_size += hop_size_bytes - r
|
amine@2
|
516
|
amine@2
|
517 expected_size += block_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
518
|
amine@2
|
519 cache_size = (block_size - hop_size) * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
520 total_read = cache_size
|
amine@2
|
521
|
amine@2
|
522 ads.open()
|
amine@2
|
523 i = 0
|
amine@2
|
524 while True:
|
amine@2
|
525 block = ads.read()
|
amine@2
|
526 if block is None:
|
amine@2
|
527 break
|
amine@2
|
528 i += 1
|
amine@2
|
529 total_read += len(block) - cache_size
|
amine@2
|
530
|
amine@2
|
531 ads.close()
|
amine@2
|
532 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
|
amine@2
|
533
|
amine@2
|
534 class TestADSFactoryBufferAudioSource(unittest.TestCase):
|
amine@2
|
535
|
amine@2
|
536 def setUp(self):
|
amine@2
|
537 self.signal = "ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
|
amine@2
|
538 self.ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@2
|
539 sample_width=2, channels=1)
|
amine@2
|
540
|
amine@2
|
541 def test_ADS_BAS_type(self):
|
amine@2
|
542 self.assertIsInstance(self.ads.get_audio_source(),
|
amine@2
|
543 BufferAudioSource, "ads should \
|
amine@2
|
544 be an instance of BufferAudioSource")
|
amine@2
|
545
|
amine@2
|
546 def test_ADS_BAS_sampling_rate(self):
|
amine@2
|
547 srate = self.ads.get_sampling_rate()
|
amine@2
|
548 self.assertEqual(srate, 16, "Wrong sampling rate, expected: 16000, found: {0}".format(srate))
|
amine@2
|
549
|
amine@2
|
550
|
amine@2
|
551 def test_ADS_BAS_get_sample_width(self):
|
amine@2
|
552 swidth = self.ads.get_sample_width()
|
amine@2
|
553 self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth))
|
amine@2
|
554
|
amine@2
|
555 def test_ADS_BAS_get_channels(self):
|
amine@2
|
556 channels = self.ads.get_channels()
|
amine@2
|
557 self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels))
|
amine@2
|
558
|
amine@2
|
559 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):
|
amine@2
|
560
|
amine@2
|
561 # Use arbitrary valid block_size and hop_size
|
amine@2
|
562 block_size = 5
|
amine@2
|
563 hop_size = 4
|
amine@2
|
564
|
amine@2
|
565 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@2
|
566 sample_width=2, channels=1, max_time = 0.80,
|
amine@2
|
567 block_size=block_size, hop_size=hop_size,
|
amine@2
|
568 record=True)
|
amine@2
|
569
|
amine@2
|
570 # Read all available data overlapping blocks
|
amine@2
|
571 ads.open()
|
amine@2
|
572 i = 0
|
amine@2
|
573 while True:
|
amine@2
|
574 block = ads.read()
|
amine@2
|
575 if block is None:
|
amine@2
|
576 break
|
amine@2
|
577 i += 1
|
amine@2
|
578
|
amine@2
|
579 ads.rewind()
|
amine@2
|
580
|
amine@2
|
581 # Build a BufferAudioSource
|
amine@2
|
582 audio_source = BufferAudioSource(self.signal, ads.get_sampling_rate(),
|
amine@2
|
583 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
584 audio_source.open()
|
amine@2
|
585
|
amine@2
|
586 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
587 # from an audio source with a manual set_position
|
amine@10
|
588 for j in range(i):
|
amine@10
|
589
|
amine@10
|
590 tmp = audio_source.read(block_size)
|
amine@10
|
591
|
amine@10
|
592 block = ads.read()
|
amine@10
|
593
|
amine@10
|
594 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@10
|
595 audio_source.set_position((j+1) * hop_size)
|
amine@10
|
596
|
amine@10
|
597 ads.close()
|
amine@10
|
598 audio_source.close()
|
amine@10
|
599
|
amine@10
|
600
|
amine@10
|
601 class TestADSFactoryAlias(unittest.TestCase):
|
amine@10
|
602
|
amine@10
|
603 def setUp(self):
|
amine@10
|
604 self.signal = "ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
|
amine@10
|
605
|
amine@10
|
606 def test_sampling_rate_alias(self):
|
amine@10
|
607 ads = ADSFactory.ads(data_buffer=self.signal, sr=16,
|
amine@10
|
608 sample_width=2, channels=1)
|
amine@10
|
609 srate = ads.get_sampling_rate()
|
amine@10
|
610 self.assertEqual(srate, 16, "Wrong sampling rate, expected: 16000, found: {0}".format(srate))
|
amine@10
|
611
|
amine@10
|
612 def test_sampling_rate_duplicate(self):
|
amine@10
|
613 func = partial(ADSFactory.ads, data_buffer=self.signal, sr=16, sampling_rate=16,
|
amine@10
|
614 sample_width=2, channels=1)
|
amine@10
|
615 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
616
|
amine@10
|
617 def test_sample_width_alias(self):
|
amine@10
|
618 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
619 sw=2, channels=1)
|
amine@10
|
620 swidth = ads.get_sample_width()
|
amine@10
|
621 self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth))
|
amine@10
|
622
|
amine@10
|
623 def test_sample_width_duplicate(self):
|
amine@10
|
624 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
|
amine@10
|
625 sw=2, sample_width=2, channels=1)
|
amine@10
|
626 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
627
|
amine@10
|
628 def test_channels_alias(self):
|
amine@10
|
629 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
630 sample_width=2, ch=1)
|
amine@10
|
631 channels = ads.get_channels()
|
amine@10
|
632 self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels))
|
amine@10
|
633
|
amine@10
|
634 def test_channels_duplicate(self):
|
amine@10
|
635 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
|
amine@10
|
636 sample_width=2, ch=1, channels=1)
|
amine@10
|
637 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
638
|
amine@10
|
639
|
amine@10
|
640 def test_block_size_alias(self):
|
amine@10
|
641 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
642 sample_width=2, channels=1, bs=8)
|
amine@10
|
643 size = ads.get_block_size()
|
amine@10
|
644 self.assertEqual(size, 8, "Wrong block_size using bs alias, expected: 8, found: {0}".format(size))
|
amine@10
|
645
|
amine@10
|
646 def test_block_size_duplicate(self):
|
amine@10
|
647 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
|
amine@10
|
648 sample_width=2, channels=1, bs=4, block_size=4)
|
amine@10
|
649 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
650
|
amine@10
|
651 def test_block_duration_alias(self):
|
amine@10
|
652 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
653 sample_width=2, channels=1, bd=0.75)
|
amine@10
|
654 # 0.75 ms = 0.75 * 16 = 12
|
amine@10
|
655 size = ads.get_block_size()
|
amine@10
|
656 self.assertEqual(size, 12, "Wrong block_size set with a block_dur alias 'bd', expected: 8, found: {0}".format(size))
|
amine@10
|
657
|
amine@10
|
658 def test_block_duration_duplicate(self):
|
amine@10
|
659 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
|
amine@10
|
660 sample_width=2, channels=1, bd=4, block_dur=4)
|
amine@10
|
661 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
662
|
amine@10
|
663 def test_block_size_duration_duplicate(self):
|
amine@10
|
664 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
|
amine@10
|
665 sample_width=2, channels=1, bd=4, bs=12)
|
amine@10
|
666 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
667
|
amine@10
|
668 def test_hop_duration_alias(self):
|
amine@10
|
669
|
amine@10
|
670 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
671 sample_width=2, channels=1, bd=0.75, hd=0.5 )
|
amine@10
|
672 size = ads.hop_size
|
amine@10
|
673 self.assertEqual(size, 8, "Wrong block_size using bs alias, expected: 8, found: {0}".format(size))
|
amine@10
|
674 self.assertIsInstance(ads, ADSFactory.OverlapADS, "ads expected to an ADSFactory.OverlapADS object")
|
amine@10
|
675
|
amine@10
|
676
|
amine@10
|
677 def test_hop_duration_duplicate(self):
|
amine@10
|
678
|
amine@10
|
679 func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
680 sample_width=2, channels=1, bd=0.75, hd=0.5, hop_dur=0.5)
|
amine@10
|
681 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
682
|
amine@10
|
683
|
amine@10
|
684 def test_hop_size_duration_duplicate(self):
|
amine@10
|
685 func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
|
amine@10
|
686 sample_width=2, channels=1, bs=8, hs=4, hd=1)
|
amine@10
|
687 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
688
|
amine@10
|
689
|
amine@10
|
690 def test_hop_size_greater_than_block_size(self):
|
amine@10
|
691 func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
692 sample_width=2, channels=1, bs=4, hs=8)
|
amine@10
|
693 self.assertRaises(ValueError, func)
|
amine@10
|
694
|
amine@10
|
695
|
amine@10
|
696 def test_filename_alias(self):
|
amine@10
|
697 ads = ADSFactory.ads(fn=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@10
|
698
|
amine@10
|
699
|
amine@10
|
700 def test_filename_duplicate(self):
|
amine@10
|
701
|
amine@10
|
702 func = partial(ADSFactory.ads, fn=dataset.one_to_six_arabic_16000_mono_bc_noise, filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@10
|
703 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
704
|
amine@10
|
705
|
amine@10
|
706 def test_data_buffer_alias(self):
|
amine@10
|
707 ads = ADSFactory.ads(db=self.signal, sampling_rate=16,
|
amine@10
|
708 sample_width=2, channels=1)
|
amine@10
|
709 self.assertEqual(ads.get_audio_source().get_data_buffer(), self.signal, "Wrong value for data buffer")
|
amine@10
|
710
|
amine@10
|
711
|
amine@10
|
712 def test_data_buffer_duplicate(self):
|
amine@10
|
713 func = partial(ADSFactory.ads, data_buffer=self.signal, db=self.signal, sampling_rate=16,
|
amine@10
|
714 sample_width=2, channels=1)
|
amine@10
|
715 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
716
|
amine@10
|
717
|
amine@10
|
718 def test_max_time_alias(self):
|
amine@10
|
719 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
720 sample_width=2, channels=1, mt=10)
|
amine@10
|
721 self.assertIsInstance(ads, ADSFactory.LimiterADS, "ads expected to an ADSFactory.LimiterADS object")
|
amine@10
|
722
|
amine@10
|
723
|
amine@10
|
724 def test_max_time_duplicate(self):
|
amine@10
|
725 func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
726 sample_width=2, channels=1, mt=True, max_time=True)
|
amine@10
|
727
|
amine@10
|
728 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
729
|
amine@10
|
730 def test_record_alias(self):
|
amine@10
|
731 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
732 sample_width=2, channels=1, rec=True)
|
amine@10
|
733 self.assertIsInstance(ads, ADSFactory.RecorderADS, "ads expected to an ADSFactory.RecorderADS object")
|
amine@10
|
734
|
amine@10
|
735
|
amine@10
|
736 def test_record_duplicate(self):
|
amine@10
|
737 func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16,
|
amine@10
|
738 sample_width=2, channels=1, rec=True, record=True)
|
amine@10
|
739 self.assertRaises(DuplicateArgument, func)
|
amine@10
|
740
|
amine@10
|
741
|
amine@10
|
742 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_alias(self):
|
amine@10
|
743
|
amine@10
|
744 # Use arbitrary valid block_size and hop_size
|
amine@10
|
745 block_size = 5
|
amine@10
|
746 hop_size = 4
|
amine@10
|
747
|
amine@10
|
748 ads = ADSFactory.ads(db=self.signal, sr=16,
|
amine@10
|
749 sw=2, ch=1, mt = 0.80,
|
amine@10
|
750 bs=block_size, hs=hop_size,
|
amine@10
|
751 rec=True)
|
amine@10
|
752
|
amine@10
|
753 # Read all available data overlapping blocks
|
amine@10
|
754 ads.open()
|
amine@10
|
755 i = 0
|
amine@10
|
756 while True:
|
amine@10
|
757 block = ads.read()
|
amine@10
|
758 if block is None:
|
amine@10
|
759 break
|
amine@10
|
760 i += 1
|
amine@10
|
761
|
amine@10
|
762 ads.rewind()
|
amine@10
|
763
|
amine@10
|
764 # Build a BufferAudioSource
|
amine@10
|
765 audio_source = BufferAudioSource(self.signal, ads.get_sampling_rate(),
|
amine@10
|
766 ads.get_sample_width(), ads.get_channels())
|
amine@10
|
767 audio_source.open()
|
amine@10
|
768
|
amine@10
|
769 # Compare all blocks read from OverlapADS to those read
|
amine@10
|
770 # from an audio source with a manual set_position
|
amine@10
|
771 for j in range(i):
|
amine@2
|
772
|
amine@2
|
773 tmp = audio_source.read(block_size)
|
amine@2
|
774
|
amine@2
|
775 block = ads.read()
|
amine@2
|
776
|
amine@2
|
777 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
778 audio_source.set_position((j+1) * hop_size)
|
amine@2
|
779
|
amine@2
|
780 ads.close()
|
amine@2
|
781 audio_source.close()
|
amine@2
|
782
|
amine@2
|
783
|
amine@2
|
784 if __name__ == "__main__":
|
amine@2
|
785 #import sys;sys.argv = ['', 'Test.testName']
|
amine@2
|
786 unittest.main()
|