amine@2
|
1 '''
|
amine@2
|
2 @author: Amine Sehili <amine.sehili@gmail.com>
|
amine@2
|
3 September 2015
|
amine@2
|
4
|
amine@2
|
5 '''
|
amine@2
|
6
|
amine@2
|
7 import unittest
|
amine@2
|
8 from auditok import dataset, ADSFactory, BufferAudioSource, WaveAudioSource
|
amine@2
|
9 import wave
|
amine@2
|
10
|
amine@2
|
11
|
amine@2
|
12 class TestADSFactoryFileAudioSource(unittest.TestCase):
|
amine@2
|
13
|
amine@2
|
14 def setUp(self):
|
amine@2
|
15 self.audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
16
|
amine@2
|
17
|
amine@2
|
18 def test_ADS_type(self):
|
amine@2
|
19
|
amine@2
|
20 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
21
|
amine@2
|
22 self.assertIsInstance(ads, ADSFactory.AudioDataSource,
|
amine@2
|
23 msg="wrong type for ads object, expected: 'ADSFactory.AudioDataSource', found: {0}".format(type(ads)))
|
amine@2
|
24
|
amine@2
|
25
|
amine@2
|
26 def test_default_block_size(self):
|
amine@2
|
27 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
28
|
amine@2
|
29 size = ads.get_block_size()
|
amine@2
|
30 self.assertEqual(size, 160, "Wrong default block_size, expected: 160, found: {0}".format(size))
|
amine@2
|
31
|
amine@2
|
32
|
amine@2
|
33 def test_block_size(self):
|
amine@2
|
34 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=512)
|
amine@2
|
35
|
amine@2
|
36 size = ads.get_block_size()
|
amine@2
|
37 self.assertEqual(size, 512, "Wrong block_size, expected: 512, found: {0}".format(size))
|
amine@2
|
38
|
amine@2
|
39 def test_sampling_rate(self):
|
amine@2
|
40 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
41
|
amine@2
|
42 srate = ads.get_sampling_rate()
|
amine@2
|
43 self.assertEqual(srate, 16000, "Wrong sampling rate, expected: 16000, found: {0}".format(srate))
|
amine@2
|
44
|
amine@2
|
45 def test_sample_width(self):
|
amine@2
|
46 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
47
|
amine@2
|
48 swidth = ads.get_sample_width()
|
amine@2
|
49 self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth))
|
amine@2
|
50
|
amine@2
|
51 def test_channels(self):
|
amine@2
|
52 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
53
|
amine@2
|
54 channels = ads.get_channels()
|
amine@2
|
55 self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels))
|
amine@2
|
56
|
amine@2
|
57 def test_read(self):
|
amine@2
|
58 ads = ADSFactory.ads(audio_source=self.audio_source, block_size = 256)
|
amine@2
|
59
|
amine@2
|
60 ads.open()
|
amine@2
|
61 ads_data = ads.read()
|
amine@2
|
62 ads.close()
|
amine@2
|
63
|
amine@2
|
64 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
65 audio_source.open()
|
amine@2
|
66 audio_source_data = audio_source.read(256)
|
amine@2
|
67 audio_source.close()
|
amine@2
|
68
|
amine@2
|
69 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from ads")
|
amine@2
|
70
|
amine@2
|
71 def test_Limiter_Deco_type(self):
|
amine@2
|
72 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1)
|
amine@2
|
73
|
amine@2
|
74 self.assertIsInstance(ads, ADSFactory.LimiterADS,
|
amine@2
|
75 msg="wrong type for ads object, expected: 'ADSFactory.LimiterADS', found: {0}".format(type(ads)))
|
amine@2
|
76
|
amine@2
|
77
|
amine@2
|
78 def test_Limiter_Deco_read(self):
|
amine@2
|
79 # read a maximum of 0.75 seconds from audio source
|
amine@2
|
80 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.75)
|
amine@2
|
81
|
amine@2
|
82 ads_data = []
|
amine@2
|
83 ads.open()
|
amine@2
|
84 while True:
|
amine@2
|
85 block = ads.read()
|
amine@2
|
86 if block is None:
|
amine@2
|
87 break
|
amine@2
|
88 ads_data.append(block)
|
amine@2
|
89 ads.close()
|
amine@2
|
90 ads_data = ''.join(ads_data)
|
amine@2
|
91
|
amine@2
|
92 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
93 audio_source.open()
|
amine@2
|
94 audio_source_data = audio_source.read(int(16000 * 0.75))
|
amine@2
|
95 audio_source.close()
|
amine@2
|
96
|
amine@2
|
97 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from LimiterADS")
|
amine@2
|
98
|
amine@2
|
99
|
amine@2
|
100 def test_Limiter_Deco_read_limit(self):
|
amine@2
|
101 # read a maximum of 1.25 seconds from audio source
|
amine@2
|
102 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1.191)
|
amine@2
|
103
|
amine@2
|
104 # desired duration into bytes is obtained by:
|
amine@2
|
105 # max_time * sampling_rate * sample_width * nb_channels
|
amine@2
|
106 # Limiter deco tries to a total quantity of data as
|
amine@2
|
107 # possible to the desired duration in bytes.
|
amine@2
|
108 # It reads N block of size block_size where:
|
amine@2
|
109 # (N - 1) * block_size < desired duration, AND
|
amine@2
|
110 # N * block_size >= desired duration
|
amine@2
|
111
|
amine@2
|
112 # theoretical size to reach
|
amine@2
|
113 expected_size = int(ads.get_sampling_rate() * 1.191) * \
|
amine@2
|
114 ads.get_sample_width() * ads.get_channels()
|
amine@2
|
115
|
amine@2
|
116
|
amine@2
|
117 # how much data are required to get N blocks of size block_size
|
amine@2
|
118 block_size_bytes = ads.get_block_size() * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
119 r = expected_size % block_size_bytes
|
amine@2
|
120 if r > 0:
|
amine@2
|
121 expected_size += block_size_bytes - r
|
amine@2
|
122
|
amine@2
|
123 total_read = 0
|
amine@2
|
124 ads.open()
|
amine@2
|
125 i = 0
|
amine@2
|
126 while True:
|
amine@2
|
127 block = ads.read()
|
amine@2
|
128 if block is None:
|
amine@2
|
129 break
|
amine@2
|
130 i += 1
|
amine@2
|
131 total_read += len(block)
|
amine@2
|
132
|
amine@2
|
133 ads.close()
|
amine@2
|
134
|
amine@2
|
135 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
|
amine@2
|
136
|
amine@2
|
137
|
amine@2
|
138
|
amine@2
|
139 def test_Recorder_Deco_type(self):
|
amine@2
|
140 ads = ADSFactory.ads(audio_source=self.audio_source, record=True)
|
amine@2
|
141
|
amine@2
|
142 self.assertIsInstance(ads, ADSFactory.RecorderADS,
|
amine@2
|
143 msg="wrong type for ads object, expected: 'ADSFactory.RecorderADS', found: {0}".format(type(ads)))
|
amine@2
|
144
|
amine@2
|
145
|
amine@2
|
146 def test_Recorder_Deco_read(self):
|
amine@2
|
147 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size=500)
|
amine@2
|
148
|
amine@2
|
149 ads_data = []
|
amine@2
|
150 ads.open()
|
amine@2
|
151 for i in xrange(10):
|
amine@2
|
152 block = ads.read()
|
amine@2
|
153 if block is None:
|
amine@2
|
154 break
|
amine@2
|
155 ads_data.append(block)
|
amine@2
|
156 ads.close()
|
amine@2
|
157 ads_data = ''.join(ads_data)
|
amine@2
|
158
|
amine@2
|
159 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
160 audio_source.open()
|
amine@2
|
161 audio_source_data = audio_source.read(500 * 10)
|
amine@2
|
162 audio_source.close()
|
amine@2
|
163
|
amine@2
|
164 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from RecorderADS")
|
amine@2
|
165
|
amine@2
|
166 def test_Recorder_Deco_is_rewindable(self):
|
amine@2
|
167 ads = ADSFactory.ads(audio_source=self.audio_source, record=True)
|
amine@2
|
168
|
amine@2
|
169 self.assertTrue(ads.is_rewindable(), "RecorderADS.is_rewindable should return True")
|
amine@2
|
170
|
amine@2
|
171
|
amine@2
|
172 def test_Recorder_Deco_rewind(self):
|
amine@2
|
173 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320)
|
amine@2
|
174
|
amine@2
|
175 ads.open()
|
amine@2
|
176 ads.read()
|
amine@2
|
177 ads.rewind()
|
amine@2
|
178
|
amine@2
|
179
|
amine@2
|
180 self.assertIsInstance(ads.get_audio_source(),
|
amine@2
|
181 BufferAudioSource, "After rewind RecorderADS.get_audio_source should \
|
amine@2
|
182 be an instance of BufferAudioSource")
|
amine@2
|
183 ads.close()
|
amine@2
|
184
|
amine@2
|
185
|
amine@2
|
186 def test_Recorder_Deco_rewind_and_read(self):
|
amine@2
|
187 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320)
|
amine@2
|
188
|
amine@2
|
189 ads.open()
|
amine@2
|
190 for i in xrange(10):
|
amine@2
|
191 ads.read()
|
amine@2
|
192
|
amine@2
|
193 ads.rewind()
|
amine@2
|
194
|
amine@2
|
195 # read all available data after rewind
|
amine@2
|
196 ads_data = []
|
amine@2
|
197 while True:
|
amine@2
|
198 block = ads.read()
|
amine@2
|
199 if block is None:
|
amine@2
|
200 break
|
amine@2
|
201 ads_data.append(block)
|
amine@2
|
202 ads.close()
|
amine@2
|
203 ads_data = ''.join(ads_data)
|
amine@2
|
204
|
amine@2
|
205 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
206 audio_source.open()
|
amine@2
|
207 audio_source_data = audio_source.read(320 * 10)
|
amine@2
|
208 audio_source.close()
|
amine@2
|
209
|
amine@2
|
210 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from RecorderADS")
|
amine@2
|
211
|
amine@2
|
212 def test_Overlap_Deco_type(self):
|
amine@2
|
213 # an OverlapADS is obtained if a valid hop_size is given
|
amine@2
|
214 ads = ADSFactory.ads(audio_source=self.audio_source, block_size = 256, hop_size = 128)
|
amine@2
|
215
|
amine@2
|
216 self.assertIsInstance(ads, ADSFactory.OverlapADS,
|
amine@2
|
217 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
|
amine@2
|
218
|
amine@2
|
219
|
amine@2
|
220
|
amine@2
|
221
|
amine@2
|
222 def test_Overlap_Deco_read(self):
|
amine@2
|
223
|
amine@2
|
224 # Use arbitrary valid block_size and hop_size
|
amine@2
|
225 block_size = 1714
|
amine@2
|
226 hop_size = 313
|
amine@2
|
227
|
amine@2
|
228 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=block_size, hop_size=hop_size)
|
amine@2
|
229
|
amine@2
|
230 # Read all available data overlapping blocks
|
amine@2
|
231 ads.open()
|
amine@2
|
232 ads_data = []
|
amine@2
|
233 while True:
|
amine@2
|
234 block = ads.read()
|
amine@2
|
235 if block is None:
|
amine@2
|
236 break
|
amine@2
|
237 ads_data.append(block)
|
amine@2
|
238 ads.close()
|
amine@2
|
239
|
amine@2
|
240 # Read all data from file and build a BufferAudioSource
|
amine@2
|
241 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
242 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
243 fp.close()
|
amine@2
|
244 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
245 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
246 audio_source.open()
|
amine@2
|
247
|
amine@2
|
248 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
249 # from an audio source with a manual set_position
|
amine@2
|
250 for i,block in enumerate(ads_data):
|
amine@2
|
251
|
amine@2
|
252 tmp = audio_source.read(block_size)
|
amine@2
|
253
|
amine@2
|
254 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
255
|
amine@2
|
256 audio_source.set_position((i+1) * hop_size)
|
amine@2
|
257
|
amine@2
|
258 audio_source.close()
|
amine@2
|
259
|
amine@2
|
260
|
amine@2
|
261
|
amine@2
|
262
|
amine@2
|
263 def test_Limiter_Overlap_Deco_type(self):
|
amine@2
|
264 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1, block_size = 256, hop_size = 128)
|
amine@2
|
265
|
amine@2
|
266 self.assertIsInstance(ads, ADSFactory.OverlapADS,
|
amine@2
|
267 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
|
amine@2
|
268
|
amine@2
|
269
|
amine@2
|
270 self.assertIsInstance(ads.ads, ADSFactory.LimiterADS,
|
amine@2
|
271 msg="wrong type for ads object, expected: 'ADSFactory.LimiterADS', found: {0}".format(type(ads)))
|
amine@2
|
272
|
amine@2
|
273
|
amine@2
|
274
|
amine@2
|
275 def test_Limiter_Overlap_Deco_read(self):
|
amine@2
|
276
|
amine@2
|
277 block_size = 256
|
amine@2
|
278 hop_size = 200
|
amine@2
|
279
|
amine@2
|
280 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.50, block_size=block_size, hop_size=hop_size)
|
amine@2
|
281
|
amine@2
|
282 # Read all available data overlapping blocks
|
amine@2
|
283 ads.open()
|
amine@2
|
284 ads_data = []
|
amine@2
|
285 while True:
|
amine@2
|
286 block = ads.read()
|
amine@2
|
287 if block is None:
|
amine@2
|
288 break
|
amine@2
|
289 ads_data.append(block)
|
amine@2
|
290 ads.close()
|
amine@2
|
291
|
amine@2
|
292 # Read all data from file and build a BufferAudioSource
|
amine@2
|
293 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
294 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
295 fp.close()
|
amine@2
|
296 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
297 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
298 audio_source.open()
|
amine@2
|
299
|
amine@2
|
300 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
301 # from an audio source with a manual set_position
|
amine@2
|
302 for i,block in enumerate(ads_data):
|
amine@2
|
303 tmp = audio_source.read(block_size)
|
amine@2
|
304
|
amine@2
|
305 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
306
|
amine@2
|
307 audio_source.set_position((i+1) * hop_size)
|
amine@2
|
308
|
amine@2
|
309 audio_source.close()
|
amine@2
|
310
|
amine@2
|
311
|
amine@2
|
312
|
amine@2
|
313 def test_Limiter_Overlap_Deco_read_limit(self):
|
amine@2
|
314
|
amine@2
|
315 block_size = 313
|
amine@2
|
316 hop_size = 207
|
amine@2
|
317 ads = ADSFactory.ads(audio_source=self.audio_source,
|
amine@2
|
318 max_time=1.932, block_size=block_size,
|
amine@2
|
319 hop_size=hop_size)
|
amine@2
|
320
|
amine@2
|
321 # Limiter + Overlap decos => read N block of actual data
|
amine@2
|
322 # one block of size block_size
|
amine@2
|
323 # N - 1 blocks of size hop_size
|
amine@2
|
324 # the total size of read data might be a slightly greater
|
amine@2
|
325 # than the required size calculated from max_time
|
amine@2
|
326
|
amine@2
|
327 # theoretical size to reach
|
amine@2
|
328 expected_size = int(ads.get_sampling_rate() * 1.932) * \
|
amine@2
|
329 ads.get_sample_width() * ads.get_channels()
|
amine@2
|
330
|
amine@2
|
331 # minus block_size
|
amine@2
|
332 expected_size -= (block_size * ads.get_sample_width() * ads.get_channels())
|
amine@2
|
333
|
amine@2
|
334 # how much data are required to get N - 1 blocks of size hop_size
|
amine@2
|
335 hop_size_bytes = hop_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
336 r = expected_size % hop_size_bytes
|
amine@2
|
337 if r > 0:
|
amine@2
|
338 expected_size += hop_size_bytes - r
|
amine@2
|
339
|
amine@2
|
340 expected_size += block_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
341
|
amine@2
|
342 cache_size = (block_size - hop_size) * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
343 total_read = cache_size
|
amine@2
|
344
|
amine@2
|
345 ads.open()
|
amine@2
|
346 i = 0
|
amine@2
|
347 while True:
|
amine@2
|
348 block = ads.read()
|
amine@2
|
349 if block is None:
|
amine@2
|
350 break
|
amine@2
|
351 i += 1
|
amine@2
|
352 total_read += len(block) - cache_size
|
amine@2
|
353
|
amine@2
|
354 ads.close()
|
amine@2
|
355 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
|
amine@2
|
356
|
amine@2
|
357
|
amine@2
|
358
|
amine@2
|
359 def test_Recorder_Overlap_Deco_type(self):
|
amine@2
|
360 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=256, hop_size=128, record=True)
|
amine@2
|
361
|
amine@2
|
362 self.assertIsInstance(ads, ADSFactory.OverlapADS,
|
amine@2
|
363 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
|
amine@2
|
364
|
amine@2
|
365
|
amine@2
|
366 self.assertIsInstance(ads.ads, ADSFactory.RecorderADS,
|
amine@2
|
367 msg="wrong type for ads object, expected: 'ADSFactory.RecorderADS', found: {0}".format(type(ads)))
|
amine@2
|
368
|
amine@2
|
369
|
amine@2
|
370
|
amine@2
|
371 def test_Recorder_Overlap_Deco_is_rewindable(self):
|
amine@2
|
372 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=320, hop_size=160, record=True)
|
amine@2
|
373 self.assertTrue(ads.is_rewindable(), "RecorderADS.is_rewindable should return True")
|
amine@2
|
374
|
amine@2
|
375
|
amine@2
|
376 def test_Recorder_Overlap_Deco_rewind_and_read(self):
|
amine@2
|
377
|
amine@2
|
378 # Use arbitrary valid block_size and hop_size
|
amine@2
|
379 block_size = 1600
|
amine@2
|
380 hop_size = 400
|
amine@2
|
381
|
amine@2
|
382 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=block_size, hop_size=hop_size, record=True)
|
amine@2
|
383
|
amine@2
|
384 # Read all available data overlapping blocks
|
amine@2
|
385 ads.open()
|
amine@2
|
386 i = 0
|
amine@2
|
387 while True:
|
amine@2
|
388 block = ads.read()
|
amine@2
|
389 if block is None:
|
amine@2
|
390 break
|
amine@2
|
391 i += 1
|
amine@2
|
392
|
amine@2
|
393 ads.rewind()
|
amine@2
|
394
|
amine@2
|
395 # Read all data from file and build a BufferAudioSource
|
amine@2
|
396 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
397 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
398 fp.close()
|
amine@2
|
399 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
400 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
401 audio_source.open()
|
amine@2
|
402
|
amine@2
|
403 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
404 # from an audio source with a manual set_position
|
amine@2
|
405 for j in xrange(i):
|
amine@2
|
406
|
amine@2
|
407 tmp = audio_source.read(block_size)
|
amine@2
|
408
|
amine@2
|
409 self.assertEqual(ads.read(), tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
410 audio_source.set_position((j+1) * hop_size)
|
amine@2
|
411
|
amine@2
|
412 ads.close()
|
amine@2
|
413 audio_source.close()
|
amine@2
|
414
|
amine@2
|
415
|
amine@2
|
416 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):
|
amine@2
|
417
|
amine@2
|
418 # Use arbitrary valid block_size and hop_size
|
amine@2
|
419 block_size = 1600
|
amine@2
|
420 hop_size = 400
|
amine@2
|
421
|
amine@2
|
422 ads = ADSFactory.ads(audio_source=self.audio_source, max_time = 1.50, block_size=block_size, hop_size=hop_size, record=True)
|
amine@2
|
423
|
amine@2
|
424 # Read all available data overlapping blocks
|
amine@2
|
425 ads.open()
|
amine@2
|
426 i = 0
|
amine@2
|
427 while True:
|
amine@2
|
428 block = ads.read()
|
amine@2
|
429 if block is None:
|
amine@2
|
430 break
|
amine@2
|
431 i += 1
|
amine@2
|
432
|
amine@2
|
433 ads.rewind()
|
amine@2
|
434
|
amine@2
|
435 # Read all data from file and build a BufferAudioSource
|
amine@2
|
436 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
437 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
438 fp.close()
|
amine@2
|
439 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
440 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
441 audio_source.open()
|
amine@2
|
442
|
amine@2
|
443 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
444 # from an audio source with a manual set_position
|
amine@2
|
445 for j in xrange(i):
|
amine@2
|
446
|
amine@2
|
447 tmp = audio_source.read(block_size)
|
amine@2
|
448
|
amine@2
|
449 self.assertEqual(ads.read(), tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
450 audio_source.set_position((j+1) * hop_size)
|
amine@2
|
451
|
amine@2
|
452 ads.close()
|
amine@2
|
453 audio_source.close()
|
amine@2
|
454
|
amine@2
|
455
|
amine@2
|
456 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_limit(self):
|
amine@2
|
457
|
amine@2
|
458 # Use arbitrary valid block_size and hop_size
|
amine@2
|
459 block_size = 1000
|
amine@2
|
460 hop_size = 200
|
amine@2
|
461
|
amine@2
|
462 ads = ADSFactory.ads(audio_source=self.audio_source, max_time = 1.317, block_size=block_size, hop_size=hop_size, record=True)
|
amine@2
|
463
|
amine@2
|
464 # Limiter + Overlap decos => read N block of actual data
|
amine@2
|
465 # one block of size block_size
|
amine@2
|
466 # N - 1 blocks of size hop_size
|
amine@2
|
467 # the total size of read data might be a slightly greater
|
amine@2
|
468 # than the required size calculated from max_time
|
amine@2
|
469
|
amine@2
|
470 # theoretical size to reach
|
amine@2
|
471 expected_size = int(ads.get_sampling_rate() * 1.317) * \
|
amine@2
|
472 ads.get_sample_width() * ads.get_channels()
|
amine@2
|
473
|
amine@2
|
474 # minus block_size
|
amine@2
|
475 expected_size -= (block_size * ads.get_sample_width() * ads.get_channels())
|
amine@2
|
476
|
amine@2
|
477 # how much data are required to get N - 1 blocks of size hop_size
|
amine@2
|
478 hop_size_bytes = hop_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
479 r = expected_size % hop_size_bytes
|
amine@2
|
480 if r > 0:
|
amine@2
|
481 expected_size += hop_size_bytes - r
|
amine@2
|
482
|
amine@2
|
483 expected_size += block_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
484
|
amine@2
|
485 cache_size = (block_size - hop_size) * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
486 total_read = cache_size
|
amine@2
|
487
|
amine@2
|
488 ads.open()
|
amine@2
|
489 i = 0
|
amine@2
|
490 while True:
|
amine@2
|
491 block = ads.read()
|
amine@2
|
492 if block is None:
|
amine@2
|
493 break
|
amine@2
|
494 i += 1
|
amine@2
|
495 total_read += len(block) - cache_size
|
amine@2
|
496
|
amine@2
|
497 ads.close()
|
amine@2
|
498 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
|
amine@2
|
499
|
amine@2
|
500 class TestADSFactoryBufferAudioSource(unittest.TestCase):
|
amine@2
|
501
|
amine@2
|
502 def setUp(self):
|
amine@2
|
503 self.signal = "ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
|
amine@2
|
504 self.ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@2
|
505 sample_width=2, channels=1)
|
amine@2
|
506
|
amine@2
|
507 def test_ADS_BAS_type(self):
|
amine@2
|
508 self.assertIsInstance(self.ads.get_audio_source(),
|
amine@2
|
509 BufferAudioSource, "ads should \
|
amine@2
|
510 be an instance of BufferAudioSource")
|
amine@2
|
511
|
amine@2
|
512 def test_ADS_BAS_sampling_rate(self):
|
amine@2
|
513 srate = self.ads.get_sampling_rate()
|
amine@2
|
514 self.assertEqual(srate, 16, "Wrong sampling rate, expected: 16000, found: {0}".format(srate))
|
amine@2
|
515
|
amine@2
|
516
|
amine@2
|
517 def test_ADS_BAS_get_sample_width(self):
|
amine@2
|
518 swidth = self.ads.get_sample_width()
|
amine@2
|
519 self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth))
|
amine@2
|
520
|
amine@2
|
521 def test_ADS_BAS_get_channels(self):
|
amine@2
|
522 channels = self.ads.get_channels()
|
amine@2
|
523 self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels))
|
amine@2
|
524
|
amine@2
|
525
|
amine@2
|
526 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):
|
amine@2
|
527
|
amine@2
|
528 # Use arbitrary valid block_size and hop_size
|
amine@2
|
529 block_size = 5
|
amine@2
|
530 hop_size = 4
|
amine@2
|
531
|
amine@2
|
532 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@2
|
533 sample_width=2, channels=1, max_time = 0.80,
|
amine@2
|
534 block_size=block_size, hop_size=hop_size,
|
amine@2
|
535 record=True)
|
amine@2
|
536
|
amine@2
|
537 # Read all available data overlapping blocks
|
amine@2
|
538 ads.open()
|
amine@2
|
539 i = 0
|
amine@2
|
540 while True:
|
amine@2
|
541 block = ads.read()
|
amine@2
|
542 if block is None:
|
amine@2
|
543 break
|
amine@2
|
544 i += 1
|
amine@2
|
545
|
amine@2
|
546 ads.rewind()
|
amine@2
|
547
|
amine@2
|
548 # Build a BufferAudioSource
|
amine@2
|
549 audio_source = BufferAudioSource(self.signal, ads.get_sampling_rate(),
|
amine@2
|
550 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
551 audio_source.open()
|
amine@2
|
552
|
amine@2
|
553 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
554 # from an audio source with a manual set_position
|
amine@2
|
555 for j in xrange(i):
|
amine@2
|
556
|
amine@2
|
557 tmp = audio_source.read(block_size)
|
amine@2
|
558
|
amine@2
|
559 block = ads.read()
|
amine@2
|
560
|
amine@2
|
561 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
562 audio_source.set_position((j+1) * hop_size)
|
amine@2
|
563
|
amine@2
|
564 ads.close()
|
amine@2
|
565 audio_source.close()
|
amine@2
|
566
|
amine@2
|
567
|
amine@2
|
568 if __name__ == "__main__":
|
amine@2
|
569 #import sys;sys.argv = ['', 'Test.testName']
|
amine@2
|
570 unittest.main()
|