amine@2
|
1 '''
|
amine@2
|
2 @author: Amine Sehili <amine.sehili@gmail.com>
|
amine@2
|
3 September 2015
|
amine@2
|
4
|
amine@2
|
5 '''
|
amine@2
|
6
|
amine@2
|
7 import unittest
|
amine@2
|
8 from auditok import dataset, ADSFactory, BufferAudioSource, WaveAudioSource
|
amine@2
|
9 import wave
|
amine@2
|
10 from Crypto.Cipher.AES import block_size
|
amine@2
|
11
|
amine@2
|
12
|
amine@2
|
13 class TestADSFactoryFileAudioSource(unittest.TestCase):
|
amine@2
|
14
|
amine@2
|
15 def setUp(self):
|
amine@2
|
16 self.audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
17
|
amine@2
|
18
|
amine@2
|
19 def test_ADS_type(self):
|
amine@2
|
20
|
amine@2
|
21 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
22
|
amine@2
|
23 self.assertIsInstance(ads, ADSFactory.AudioDataSource,
|
amine@2
|
24 msg="wrong type for ads object, expected: 'ADSFactory.AudioDataSource', found: {0}".format(type(ads)))
|
amine@2
|
25
|
amine@2
|
26
|
amine@2
|
27 def test_default_block_size(self):
|
amine@2
|
28 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
29
|
amine@2
|
30 size = ads.get_block_size()
|
amine@2
|
31 self.assertEqual(size, 160, "Wrong default block_size, expected: 160, found: {0}".format(size))
|
amine@2
|
32
|
amine@2
|
33
|
amine@2
|
34 def test_block_size(self):
|
amine@2
|
35 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=512)
|
amine@2
|
36
|
amine@2
|
37 size = ads.get_block_size()
|
amine@2
|
38 self.assertEqual(size, 512, "Wrong block_size, expected: 512, found: {0}".format(size))
|
amine@2
|
39
|
amine@2
|
40 def test_sampling_rate(self):
|
amine@2
|
41 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
42
|
amine@2
|
43 srate = ads.get_sampling_rate()
|
amine@2
|
44 self.assertEqual(srate, 16000, "Wrong sampling rate, expected: 16000, found: {0}".format(srate))
|
amine@2
|
45
|
amine@2
|
46 def test_sample_width(self):
|
amine@2
|
47 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
48
|
amine@2
|
49 swidth = ads.get_sample_width()
|
amine@2
|
50 self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth))
|
amine@2
|
51
|
amine@2
|
52 def test_channels(self):
|
amine@2
|
53 ads = ADSFactory.ads(audio_source=self.audio_source)
|
amine@2
|
54
|
amine@2
|
55 channels = ads.get_channels()
|
amine@2
|
56 self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels))
|
amine@2
|
57
|
amine@2
|
58 def test_read(self):
|
amine@2
|
59 ads = ADSFactory.ads(audio_source=self.audio_source, block_size = 256)
|
amine@2
|
60
|
amine@2
|
61 ads.open()
|
amine@2
|
62 ads_data = ads.read()
|
amine@2
|
63 ads.close()
|
amine@2
|
64
|
amine@2
|
65 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
66 audio_source.open()
|
amine@2
|
67 audio_source_data = audio_source.read(256)
|
amine@2
|
68 audio_source.close()
|
amine@2
|
69
|
amine@2
|
70 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from ads")
|
amine@2
|
71
|
amine@2
|
72 def test_Limiter_Deco_type(self):
|
amine@2
|
73 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1)
|
amine@2
|
74
|
amine@2
|
75 self.assertIsInstance(ads, ADSFactory.LimiterADS,
|
amine@2
|
76 msg="wrong type for ads object, expected: 'ADSFactory.LimiterADS', found: {0}".format(type(ads)))
|
amine@2
|
77
|
amine@2
|
78
|
amine@2
|
79 def test_Limiter_Deco_read(self):
|
amine@2
|
80 # read a maximum of 0.75 seconds from audio source
|
amine@2
|
81 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.75)
|
amine@2
|
82
|
amine@2
|
83 ads_data = []
|
amine@2
|
84 ads.open()
|
amine@2
|
85 while True:
|
amine@2
|
86 block = ads.read()
|
amine@2
|
87 if block is None:
|
amine@2
|
88 break
|
amine@2
|
89 ads_data.append(block)
|
amine@2
|
90 ads.close()
|
amine@2
|
91 ads_data = ''.join(ads_data)
|
amine@2
|
92
|
amine@2
|
93 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
94 audio_source.open()
|
amine@2
|
95 audio_source_data = audio_source.read(int(16000 * 0.75))
|
amine@2
|
96 audio_source.close()
|
amine@2
|
97
|
amine@2
|
98 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from LimiterADS")
|
amine@2
|
99
|
amine@2
|
100
|
amine@2
|
101 def test_Limiter_Deco_read_limit(self):
|
amine@2
|
102 # read a maximum of 1.25 seconds from audio source
|
amine@2
|
103 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1.191)
|
amine@2
|
104
|
amine@2
|
105 # desired duration into bytes is obtained by:
|
amine@2
|
106 # max_time * sampling_rate * sample_width * nb_channels
|
amine@2
|
107 # Limiter deco tries to a total quantity of data as
|
amine@2
|
108 # possible to the desired duration in bytes.
|
amine@2
|
109 # It reads N block of size block_size where:
|
amine@2
|
110 # (N - 1) * block_size < desired duration, AND
|
amine@2
|
111 # N * block_size >= desired duration
|
amine@2
|
112
|
amine@2
|
113 # theoretical size to reach
|
amine@2
|
114 expected_size = int(ads.get_sampling_rate() * 1.191) * \
|
amine@2
|
115 ads.get_sample_width() * ads.get_channels()
|
amine@2
|
116
|
amine@2
|
117
|
amine@2
|
118 # how much data are required to get N blocks of size block_size
|
amine@2
|
119 block_size_bytes = ads.get_block_size() * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
120 r = expected_size % block_size_bytes
|
amine@2
|
121 if r > 0:
|
amine@2
|
122 expected_size += block_size_bytes - r
|
amine@2
|
123
|
amine@2
|
124 total_read = 0
|
amine@2
|
125 ads.open()
|
amine@2
|
126 i = 0
|
amine@2
|
127 while True:
|
amine@2
|
128 block = ads.read()
|
amine@2
|
129 if block is None:
|
amine@2
|
130 break
|
amine@2
|
131 i += 1
|
amine@2
|
132 total_read += len(block)
|
amine@2
|
133
|
amine@2
|
134 ads.close()
|
amine@2
|
135
|
amine@2
|
136 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
|
amine@2
|
137
|
amine@2
|
138
|
amine@2
|
139
|
amine@2
|
140 def test_Recorder_Deco_type(self):
|
amine@2
|
141 ads = ADSFactory.ads(audio_source=self.audio_source, record=True)
|
amine@2
|
142
|
amine@2
|
143 self.assertIsInstance(ads, ADSFactory.RecorderADS,
|
amine@2
|
144 msg="wrong type for ads object, expected: 'ADSFactory.RecorderADS', found: {0}".format(type(ads)))
|
amine@2
|
145
|
amine@2
|
146
|
amine@2
|
147 def test_Recorder_Deco_read(self):
|
amine@2
|
148 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size=500)
|
amine@2
|
149
|
amine@2
|
150 ads_data = []
|
amine@2
|
151 ads.open()
|
amine@2
|
152 for i in xrange(10):
|
amine@2
|
153 block = ads.read()
|
amine@2
|
154 if block is None:
|
amine@2
|
155 break
|
amine@2
|
156 ads_data.append(block)
|
amine@2
|
157 ads.close()
|
amine@2
|
158 ads_data = ''.join(ads_data)
|
amine@2
|
159
|
amine@2
|
160 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
161 audio_source.open()
|
amine@2
|
162 audio_source_data = audio_source.read(500 * 10)
|
amine@2
|
163 audio_source.close()
|
amine@2
|
164
|
amine@2
|
165 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from RecorderADS")
|
amine@2
|
166
|
amine@2
|
167 def test_Recorder_Deco_is_rewindable(self):
|
amine@2
|
168 ads = ADSFactory.ads(audio_source=self.audio_source, record=True)
|
amine@2
|
169
|
amine@2
|
170 self.assertTrue(ads.is_rewindable(), "RecorderADS.is_rewindable should return True")
|
amine@2
|
171
|
amine@2
|
172
|
amine@2
|
173 def test_Recorder_Deco_rewind(self):
|
amine@2
|
174 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320)
|
amine@2
|
175
|
amine@2
|
176 ads.open()
|
amine@2
|
177 ads.read()
|
amine@2
|
178 ads.rewind()
|
amine@2
|
179
|
amine@2
|
180
|
amine@2
|
181 self.assertIsInstance(ads.get_audio_source(),
|
amine@2
|
182 BufferAudioSource, "After rewind RecorderADS.get_audio_source should \
|
amine@2
|
183 be an instance of BufferAudioSource")
|
amine@2
|
184 ads.close()
|
amine@2
|
185
|
amine@2
|
186
|
amine@2
|
187 def test_Recorder_Deco_rewind_and_read(self):
|
amine@2
|
188 ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320)
|
amine@2
|
189
|
amine@2
|
190 ads.open()
|
amine@2
|
191 for i in xrange(10):
|
amine@2
|
192 ads.read()
|
amine@2
|
193
|
amine@2
|
194 ads.rewind()
|
amine@2
|
195
|
amine@2
|
196 # read all available data after rewind
|
amine@2
|
197 ads_data = []
|
amine@2
|
198 while True:
|
amine@2
|
199 block = ads.read()
|
amine@2
|
200 if block is None:
|
amine@2
|
201 break
|
amine@2
|
202 ads_data.append(block)
|
amine@2
|
203 ads.close()
|
amine@2
|
204 ads_data = ''.join(ads_data)
|
amine@2
|
205
|
amine@2
|
206 audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
|
amine@2
|
207 audio_source.open()
|
amine@2
|
208 audio_source_data = audio_source.read(320 * 10)
|
amine@2
|
209 audio_source.close()
|
amine@2
|
210
|
amine@2
|
211 self.assertEqual(ads_data, audio_source_data, "Unexpected data read from RecorderADS")
|
amine@2
|
212
|
amine@2
|
213 def test_Overlap_Deco_type(self):
|
amine@2
|
214 # an OverlapADS is obtained if a valid hop_size is given
|
amine@2
|
215 ads = ADSFactory.ads(audio_source=self.audio_source, block_size = 256, hop_size = 128)
|
amine@2
|
216
|
amine@2
|
217 self.assertIsInstance(ads, ADSFactory.OverlapADS,
|
amine@2
|
218 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
|
amine@2
|
219
|
amine@2
|
220
|
amine@2
|
221
|
amine@2
|
222
|
amine@2
|
223 def test_Overlap_Deco_read(self):
|
amine@2
|
224
|
amine@2
|
225 # Use arbitrary valid block_size and hop_size
|
amine@2
|
226 block_size = 1714
|
amine@2
|
227 hop_size = 313
|
amine@2
|
228
|
amine@2
|
229 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=block_size, hop_size=hop_size)
|
amine@2
|
230
|
amine@2
|
231 # Read all available data overlapping blocks
|
amine@2
|
232 ads.open()
|
amine@2
|
233 ads_data = []
|
amine@2
|
234 while True:
|
amine@2
|
235 block = ads.read()
|
amine@2
|
236 if block is None:
|
amine@2
|
237 break
|
amine@2
|
238 ads_data.append(block)
|
amine@2
|
239 ads.close()
|
amine@2
|
240
|
amine@2
|
241 # Read all data from file and build a BufferAudioSource
|
amine@2
|
242 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
243 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
244 fp.close()
|
amine@2
|
245 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
246 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
247 audio_source.open()
|
amine@2
|
248
|
amine@2
|
249 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
250 # from an audio source with a manual set_position
|
amine@2
|
251 for i,block in enumerate(ads_data):
|
amine@2
|
252
|
amine@2
|
253 tmp = audio_source.read(block_size)
|
amine@2
|
254
|
amine@2
|
255 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
256
|
amine@2
|
257 audio_source.set_position((i+1) * hop_size)
|
amine@2
|
258
|
amine@2
|
259 audio_source.close()
|
amine@2
|
260
|
amine@2
|
261
|
amine@2
|
262
|
amine@2
|
263
|
amine@2
|
264 def test_Limiter_Overlap_Deco_type(self):
|
amine@2
|
265 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1, block_size = 256, hop_size = 128)
|
amine@2
|
266
|
amine@2
|
267 self.assertIsInstance(ads, ADSFactory.OverlapADS,
|
amine@2
|
268 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
|
amine@2
|
269
|
amine@2
|
270
|
amine@2
|
271 self.assertIsInstance(ads.ads, ADSFactory.LimiterADS,
|
amine@2
|
272 msg="wrong type for ads object, expected: 'ADSFactory.LimiterADS', found: {0}".format(type(ads)))
|
amine@2
|
273
|
amine@2
|
274
|
amine@2
|
275
|
amine@2
|
276 def test_Limiter_Overlap_Deco_read(self):
|
amine@2
|
277
|
amine@2
|
278 block_size = 256
|
amine@2
|
279 hop_size = 200
|
amine@2
|
280
|
amine@2
|
281 ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.50, block_size=block_size, hop_size=hop_size)
|
amine@2
|
282
|
amine@2
|
283 # Read all available data overlapping blocks
|
amine@2
|
284 ads.open()
|
amine@2
|
285 ads_data = []
|
amine@2
|
286 while True:
|
amine@2
|
287 block = ads.read()
|
amine@2
|
288 if block is None:
|
amine@2
|
289 break
|
amine@2
|
290 ads_data.append(block)
|
amine@2
|
291 ads.close()
|
amine@2
|
292
|
amine@2
|
293 # Read all data from file and build a BufferAudioSource
|
amine@2
|
294 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
295 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
296 fp.close()
|
amine@2
|
297 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
298 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
299 audio_source.open()
|
amine@2
|
300
|
amine@2
|
301 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
302 # from an audio source with a manual set_position
|
amine@2
|
303 for i,block in enumerate(ads_data):
|
amine@2
|
304 tmp = audio_source.read(block_size)
|
amine@2
|
305
|
amine@2
|
306 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
307
|
amine@2
|
308 audio_source.set_position((i+1) * hop_size)
|
amine@2
|
309
|
amine@2
|
310 audio_source.close()
|
amine@2
|
311
|
amine@2
|
312
|
amine@2
|
313
|
amine@2
|
314 def test_Limiter_Overlap_Deco_read_limit(self):
|
amine@2
|
315
|
amine@2
|
316 block_size = 313
|
amine@2
|
317 hop_size = 207
|
amine@2
|
318 ads = ADSFactory.ads(audio_source=self.audio_source,
|
amine@2
|
319 max_time=1.932, block_size=block_size,
|
amine@2
|
320 hop_size=hop_size)
|
amine@2
|
321
|
amine@2
|
322 # Limiter + Overlap decos => read N block of actual data
|
amine@2
|
323 # one block of size block_size
|
amine@2
|
324 # N - 1 blocks of size hop_size
|
amine@2
|
325 # the total size of read data might be a slightly greater
|
amine@2
|
326 # than the required size calculated from max_time
|
amine@2
|
327
|
amine@2
|
328 # theoretical size to reach
|
amine@2
|
329 expected_size = int(ads.get_sampling_rate() * 1.932) * \
|
amine@2
|
330 ads.get_sample_width() * ads.get_channels()
|
amine@2
|
331
|
amine@2
|
332 # minus block_size
|
amine@2
|
333 expected_size -= (block_size * ads.get_sample_width() * ads.get_channels())
|
amine@2
|
334
|
amine@2
|
335 # how much data are required to get N - 1 blocks of size hop_size
|
amine@2
|
336 hop_size_bytes = hop_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
337 r = expected_size % hop_size_bytes
|
amine@2
|
338 if r > 0:
|
amine@2
|
339 expected_size += hop_size_bytes - r
|
amine@2
|
340
|
amine@2
|
341 expected_size += block_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
342
|
amine@2
|
343 cache_size = (block_size - hop_size) * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
344 total_read = cache_size
|
amine@2
|
345
|
amine@2
|
346 ads.open()
|
amine@2
|
347 i = 0
|
amine@2
|
348 while True:
|
amine@2
|
349 block = ads.read()
|
amine@2
|
350 if block is None:
|
amine@2
|
351 break
|
amine@2
|
352 i += 1
|
amine@2
|
353 total_read += len(block) - cache_size
|
amine@2
|
354
|
amine@2
|
355 ads.close()
|
amine@2
|
356 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
|
amine@2
|
357
|
amine@2
|
358
|
amine@2
|
359
|
amine@2
|
360 def test_Recorder_Overlap_Deco_type(self):
|
amine@2
|
361 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=256, hop_size=128, record=True)
|
amine@2
|
362
|
amine@2
|
363 self.assertIsInstance(ads, ADSFactory.OverlapADS,
|
amine@2
|
364 msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
|
amine@2
|
365
|
amine@2
|
366
|
amine@2
|
367 self.assertIsInstance(ads.ads, ADSFactory.RecorderADS,
|
amine@2
|
368 msg="wrong type for ads object, expected: 'ADSFactory.RecorderADS', found: {0}".format(type(ads)))
|
amine@2
|
369
|
amine@2
|
370
|
amine@2
|
371
|
amine@2
|
372 def test_Recorder_Overlap_Deco_is_rewindable(self):
|
amine@2
|
373 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=320, hop_size=160, record=True)
|
amine@2
|
374 self.assertTrue(ads.is_rewindable(), "RecorderADS.is_rewindable should return True")
|
amine@2
|
375
|
amine@2
|
376
|
amine@2
|
377 def test_Recorder_Overlap_Deco_rewind_and_read(self):
|
amine@2
|
378
|
amine@2
|
379 # Use arbitrary valid block_size and hop_size
|
amine@2
|
380 block_size = 1600
|
amine@2
|
381 hop_size = 400
|
amine@2
|
382
|
amine@2
|
383 ads = ADSFactory.ads(audio_source=self.audio_source, block_size=block_size, hop_size=hop_size, record=True)
|
amine@2
|
384
|
amine@2
|
385 # Read all available data overlapping blocks
|
amine@2
|
386 ads.open()
|
amine@2
|
387 i = 0
|
amine@2
|
388 while True:
|
amine@2
|
389 block = ads.read()
|
amine@2
|
390 if block is None:
|
amine@2
|
391 break
|
amine@2
|
392 i += 1
|
amine@2
|
393
|
amine@2
|
394 ads.rewind()
|
amine@2
|
395
|
amine@2
|
396 # Read all data from file and build a BufferAudioSource
|
amine@2
|
397 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
398 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
399 fp.close()
|
amine@2
|
400 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
401 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
402 audio_source.open()
|
amine@2
|
403
|
amine@2
|
404 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
405 # from an audio source with a manual set_position
|
amine@2
|
406 for j in xrange(i):
|
amine@2
|
407
|
amine@2
|
408 tmp = audio_source.read(block_size)
|
amine@2
|
409
|
amine@2
|
410 self.assertEqual(ads.read(), tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
411 audio_source.set_position((j+1) * hop_size)
|
amine@2
|
412
|
amine@2
|
413 ads.close()
|
amine@2
|
414 audio_source.close()
|
amine@2
|
415
|
amine@2
|
416
|
amine@2
|
417 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):
|
amine@2
|
418
|
amine@2
|
419 # Use arbitrary valid block_size and hop_size
|
amine@2
|
420 block_size = 1600
|
amine@2
|
421 hop_size = 400
|
amine@2
|
422
|
amine@2
|
423 ads = ADSFactory.ads(audio_source=self.audio_source, max_time = 1.50, block_size=block_size, hop_size=hop_size, record=True)
|
amine@2
|
424
|
amine@2
|
425 # Read all available data overlapping blocks
|
amine@2
|
426 ads.open()
|
amine@2
|
427 i = 0
|
amine@2
|
428 while True:
|
amine@2
|
429 block = ads.read()
|
amine@2
|
430 if block is None:
|
amine@2
|
431 break
|
amine@2
|
432 i += 1
|
amine@2
|
433
|
amine@2
|
434 ads.rewind()
|
amine@2
|
435
|
amine@2
|
436 # Read all data from file and build a BufferAudioSource
|
amine@2
|
437 fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
|
amine@2
|
438 wave_data = fp.readframes(fp.getnframes())
|
amine@2
|
439 fp.close()
|
amine@2
|
440 audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
|
amine@2
|
441 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
442 audio_source.open()
|
amine@2
|
443
|
amine@2
|
444 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
445 # from an audio source with a manual set_position
|
amine@2
|
446 for j in xrange(i):
|
amine@2
|
447
|
amine@2
|
448 tmp = audio_source.read(block_size)
|
amine@2
|
449
|
amine@2
|
450 self.assertEqual(ads.read(), tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
451 audio_source.set_position((j+1) * hop_size)
|
amine@2
|
452
|
amine@2
|
453 ads.close()
|
amine@2
|
454 audio_source.close()
|
amine@2
|
455
|
amine@2
|
456
|
amine@2
|
457 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_limit(self):
|
amine@2
|
458
|
amine@2
|
459 # Use arbitrary valid block_size and hop_size
|
amine@2
|
460 block_size = 1000
|
amine@2
|
461 hop_size = 200
|
amine@2
|
462
|
amine@2
|
463 ads = ADSFactory.ads(audio_source=self.audio_source, max_time = 1.317, block_size=block_size, hop_size=hop_size, record=True)
|
amine@2
|
464
|
amine@2
|
465 # Limiter + Overlap decos => read N block of actual data
|
amine@2
|
466 # one block of size block_size
|
amine@2
|
467 # N - 1 blocks of size hop_size
|
amine@2
|
468 # the total size of read data might be a slightly greater
|
amine@2
|
469 # than the required size calculated from max_time
|
amine@2
|
470
|
amine@2
|
471 # theoretical size to reach
|
amine@2
|
472 expected_size = int(ads.get_sampling_rate() * 1.317) * \
|
amine@2
|
473 ads.get_sample_width() * ads.get_channels()
|
amine@2
|
474
|
amine@2
|
475 # minus block_size
|
amine@2
|
476 expected_size -= (block_size * ads.get_sample_width() * ads.get_channels())
|
amine@2
|
477
|
amine@2
|
478 # how much data are required to get N - 1 blocks of size hop_size
|
amine@2
|
479 hop_size_bytes = hop_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
480 r = expected_size % hop_size_bytes
|
amine@2
|
481 if r > 0:
|
amine@2
|
482 expected_size += hop_size_bytes - r
|
amine@2
|
483
|
amine@2
|
484 expected_size += block_size * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
485
|
amine@2
|
486 cache_size = (block_size - hop_size) * ads.get_sample_width() * ads.get_channels()
|
amine@2
|
487 total_read = cache_size
|
amine@2
|
488
|
amine@2
|
489 ads.open()
|
amine@2
|
490 i = 0
|
amine@2
|
491 while True:
|
amine@2
|
492 block = ads.read()
|
amine@2
|
493 if block is None:
|
amine@2
|
494 break
|
amine@2
|
495 i += 1
|
amine@2
|
496 total_read += len(block) - cache_size
|
amine@2
|
497
|
amine@2
|
498 ads.close()
|
amine@2
|
499 self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
|
amine@2
|
500
|
amine@2
|
501 class TestADSFactoryBufferAudioSource(unittest.TestCase):
|
amine@2
|
502
|
amine@2
|
503 def setUp(self):
|
amine@2
|
504 self.signal = "ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
|
amine@2
|
505 self.ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@2
|
506 sample_width=2, channels=1)
|
amine@2
|
507
|
amine@2
|
508 def test_ADS_BAS_type(self):
|
amine@2
|
509 self.assertIsInstance(self.ads.get_audio_source(),
|
amine@2
|
510 BufferAudioSource, "ads should \
|
amine@2
|
511 be an instance of BufferAudioSource")
|
amine@2
|
512
|
amine@2
|
513 def test_ADS_BAS_sampling_rate(self):
|
amine@2
|
514 srate = self.ads.get_sampling_rate()
|
amine@2
|
515 self.assertEqual(srate, 16, "Wrong sampling rate, expected: 16000, found: {0}".format(srate))
|
amine@2
|
516
|
amine@2
|
517
|
amine@2
|
518 def test_ADS_BAS_get_sample_width(self):
|
amine@2
|
519 swidth = self.ads.get_sample_width()
|
amine@2
|
520 self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth))
|
amine@2
|
521
|
amine@2
|
522 def test_ADS_BAS_get_channels(self):
|
amine@2
|
523 channels = self.ads.get_channels()
|
amine@2
|
524 self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels))
|
amine@2
|
525
|
amine@2
|
526
|
amine@2
|
527 def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):
|
amine@2
|
528
|
amine@2
|
529 # Use arbitrary valid block_size and hop_size
|
amine@2
|
530 block_size = 5
|
amine@2
|
531 hop_size = 4
|
amine@2
|
532
|
amine@2
|
533 ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
|
amine@2
|
534 sample_width=2, channels=1, max_time = 0.80,
|
amine@2
|
535 block_size=block_size, hop_size=hop_size,
|
amine@2
|
536 record=True)
|
amine@2
|
537
|
amine@2
|
538 # Read all available data overlapping blocks
|
amine@2
|
539 ads.open()
|
amine@2
|
540 i = 0
|
amine@2
|
541 while True:
|
amine@2
|
542 block = ads.read()
|
amine@2
|
543 if block is None:
|
amine@2
|
544 break
|
amine@2
|
545 i += 1
|
amine@2
|
546
|
amine@2
|
547 ads.rewind()
|
amine@2
|
548
|
amine@2
|
549 # Build a BufferAudioSource
|
amine@2
|
550 audio_source = BufferAudioSource(self.signal, ads.get_sampling_rate(),
|
amine@2
|
551 ads.get_sample_width(), ads.get_channels())
|
amine@2
|
552 audio_source.open()
|
amine@2
|
553
|
amine@2
|
554 # Compare all blocks read from OverlapADS to those read
|
amine@2
|
555 # from an audio source with a manual set_position
|
amine@2
|
556 for j in xrange(i):
|
amine@2
|
557
|
amine@2
|
558 tmp = audio_source.read(block_size)
|
amine@2
|
559
|
amine@2
|
560 block = ads.read()
|
amine@2
|
561
|
amine@2
|
562 self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
|
amine@2
|
563 audio_source.set_position((j+1) * hop_size)
|
amine@2
|
564
|
amine@2
|
565 ads.close()
|
amine@2
|
566 audio_source.close()
|
amine@2
|
567
|
amine@2
|
568
|
amine@2
|
569 if __name__ == "__main__":
|
amine@2
|
570 #import sys;sys.argv = ['', 'Test.testName']
|
amine@2
|
571 unittest.main()
|