amine@106
|
1 import os
|
amine@106
|
2 import sys
|
amine@106
|
3 import math
|
amine@107
|
4 from array import array
|
amine@133
|
5 from tempfile import NamedTemporaryFile, TemporaryDirectory
|
amine@110
|
6 import filecmp
|
amine@108
|
7 from unittest import TestCase
|
amine@108
|
8 from genty import genty, genty_dataset
|
amine@157
|
9 from test_util import _sample_generator, _generate_pure_tone, PURE_TONE_DICT
|
amine@110
|
10 from auditok.io import (
|
amine@126
|
11 DATA_FORMAT,
|
amine@121
|
12 AudioIOError,
|
amine@110
|
13 AudioParameterError,
|
amine@126
|
14 BufferAudioSource,
|
amine@162
|
15 RawAudioSource,
|
amine@162
|
16 WaveAudioSource,
|
amine@190
|
17 StdinAudioSource,
|
amine@110
|
18 check_audio_data,
|
amine@143
|
19 _guess_audio_format,
|
amine@144
|
20 _normalize_use_channel,
|
amine@128
|
21 _get_audio_parameters,
|
amine@116
|
22 _array_to_bytes,
|
amine@118
|
23 _mix_audio_channels,
|
amine@119
|
24 _extract_selected_channel,
|
amine@126
|
25 _load_raw,
|
amine@129
|
26 _load_wave,
|
amine@131
|
27 _load_with_pydub,
|
amine@190
|
28 get_audio_source,
|
amine@120
|
29 from_file,
|
amine@111
|
30 _save_raw,
|
amine@110
|
31 _save_wave,
|
amine@141
|
32 _save_with_pydub,
|
amine@135
|
33 to_file,
|
amine@110
|
34 )
|
amine@106
|
35
|
amine@106
|
36
|
amine@106
|
37 if sys.version_info >= (3, 0):
|
amine@106
|
38 PYTHON_3 = True
|
amine@124
|
39 from unittest.mock import patch, Mock
|
amine@106
|
40 else:
|
amine@106
|
41 PYTHON_3 = False
|
amine@124
|
42 from mock import patch, Mock
|
amine@120
|
43
|
amine@120
|
44 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
|
amine@106
|
45
|
amine@106
|
46
|
amine@108
|
47 @genty
|
amine@108
|
48 class TestIO(TestCase):
|
amine@108
|
49 @genty_dataset(
|
amine@108
|
50 valid_mono=(b"\0" * 113, 1, 1),
|
amine@108
|
51 valid_stereo=(b"\0" * 160, 1, 2),
|
amine@108
|
52 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
|
amine@108
|
53 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
|
amine@108
|
54 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
|
amine@108
|
55 )
|
amine@108
|
56 def test_check_audio_data(self, data, sample_width, channels, valid=True):
|
amine@108
|
57
|
amine@108
|
58 if not valid:
|
amine@108
|
59 with self.assertRaises(AudioParameterError):
|
amine@108
|
60 check_audio_data(data, sample_width, channels)
|
amine@108
|
61 else:
|
amine@108
|
62 self.assertIsNone(check_audio_data(data, sample_width, channels))
|
amine@110
|
63
|
amine@110
|
64 @genty_dataset(
|
amine@143
|
65 extention_and_format_same=("wav", "filename.wav", "wav"),
|
amine@143
|
66 extention_and_format_different=("wav", "filename.mp3", "wav"),
|
amine@143
|
67 extention_no_format=(None, "filename.wav", "wav"),
|
amine@143
|
68 format_no_extension=("wav", "filename", "wav"),
|
amine@143
|
69 no_format_no_extension=(None, "filename", None),
|
amine@289
|
70 wave_as_wav=("wave", "filename", "wav"),
|
amine@289
|
71 wave_as_wav_extension=(None, "filename.wave", "wav"),
|
amine@143
|
72 )
|
amine@143
|
73 def test_guess_audio_format(self, fmt, filename, expected):
|
amine@143
|
74 result = _guess_audio_format(fmt, filename)
|
amine@143
|
75 self.assertEqual(result, expected)
|
amine@143
|
76
|
amine@143
|
77 @genty_dataset(
|
amine@144
|
78 none=(None, 0),
|
amine@208
|
79 positive_int=(1, 0),
|
amine@144
|
80 left=("left", 0),
|
amine@144
|
81 right=("right", 1),
|
amine@144
|
82 mix=("mix", "mix"),
|
amine@144
|
83 )
|
amine@144
|
84 def test_normalize_use_channel(self, use_channel, expected):
|
amine@144
|
85 result = _normalize_use_channel(use_channel)
|
amine@144
|
86 self.assertEqual(result, expected)
|
amine@144
|
87
|
amine@240
|
88 def test_get_audio_parameters_short_params(self):
|
amine@240
|
89 expected = (8000, 2, 1)
|
amine@240
|
90 params = dict(zip(("sr", "sw", "ch"), expected))
|
amine@145
|
91 result = _get_audio_parameters(params)
|
amine@145
|
92 self.assertEqual(result, expected)
|
amine@145
|
93
|
amine@240
|
94 def test_get_audio_parameters_long_params(self):
|
amine@240
|
95 expected = (8000, 2, 1)
|
amine@209
|
96 params = dict(
|
amine@209
|
97 zip(
|
amine@209
|
98 ("sampling_rate", "sample_width", "channels", "use_channel"),
|
amine@240
|
99 expected,
|
amine@209
|
100 )
|
amine@209
|
101 )
|
amine@145
|
102 result = _get_audio_parameters(params)
|
amine@145
|
103 self.assertEqual(result, expected)
|
amine@145
|
104
|
amine@240
|
105 def test_get_audio_parameters_long_params_shadow_short_ones(self):
|
amine@240
|
106 expected = (8000, 2, 1)
|
amine@209
|
107 params = dict(
|
amine@240
|
108 zip(("sampling_rate", "sample_width", "channels"), expected)
|
amine@209
|
109 )
|
amine@240
|
110 params.update(dict(zip(("sr", "sw", "ch"), "xxx")))
|
amine@145
|
111 result = _get_audio_parameters(params)
|
amine@145
|
112 self.assertEqual(result, expected)
|
amine@145
|
113
|
amine@145
|
114 @genty_dataset(
|
amine@240
|
115 str_sampling_rate=(("x", 2, 1),),
|
amine@240
|
116 negative_sampling_rate=((-8000, 2, 1),),
|
amine@240
|
117 str_sample_width=((8000, "x", 1),),
|
amine@240
|
118 negative_sample_width=((8000, -2, 1),),
|
amine@240
|
119 str_channels=((8000, 2, "x"),),
|
amine@240
|
120 negative_channels=((8000, 2, -1),),
|
amine@146
|
121 )
|
amine@146
|
122 def test_get_audio_parameters_invalid(self, values):
|
amine@209
|
123 params = dict(
|
amine@209
|
124 zip(
|
amine@240
|
125 ("sampling_rate", "sample_width", "channels"),
|
amine@209
|
126 values,
|
amine@209
|
127 )
|
amine@209
|
128 )
|
amine@146
|
129 with self.assertRaises(AudioParameterError):
|
amine@146
|
130 _get_audio_parameters(params)
|
amine@146
|
131
|
amine@146
|
132 @genty_dataset(
|
amine@118
|
133 mono_1byte=([400], 1),
|
amine@118
|
134 stereo_1byte=([400, 600], 1),
|
amine@118
|
135 three_channel_1byte=([400, 600, 2400], 1),
|
amine@118
|
136 mono_2byte=([400], 2),
|
amine@118
|
137 stereo_2byte=([400, 600], 2),
|
amine@118
|
138 three_channel_2byte=([400, 600, 1150], 2),
|
amine@118
|
139 mono_4byte=([400], 4),
|
amine@118
|
140 stereo_4byte=([400, 600], 4),
|
amine@118
|
141 four_channel_2byte=([400, 600, 1150, 7220], 4),
|
amine@118
|
142 )
|
amine@118
|
143 def test_mix_audio_channels(self, frequencies, sample_width):
|
amine@118
|
144 sampling_rate = 16000
|
amine@118
|
145 sample_width = 2
|
amine@118
|
146 channels = len(frequencies)
|
amine@118
|
147 mono_channels = [
|
amine@118
|
148 _generate_pure_tone(
|
amine@118
|
149 freq,
|
amine@118
|
150 duration_sec=0.1,
|
amine@118
|
151 sampling_rate=sampling_rate,
|
amine@118
|
152 sample_width=sample_width,
|
amine@118
|
153 )
|
amine@118
|
154 for freq in frequencies
|
amine@118
|
155 ]
|
amine@118
|
156 fmt = DATA_FORMAT[sample_width]
|
amine@118
|
157 expected = _array_to_bytes(
|
amine@118
|
158 array(
|
amine@118
|
159 fmt,
|
amine@118
|
160 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@118
|
161 )
|
amine@118
|
162 )
|
amine@118
|
163 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@118
|
164 mixed = _mix_audio_channels(data, channels, sample_width)
|
amine@118
|
165 self.assertEqual(mixed, expected)
|
amine@118
|
166
|
amine@118
|
167 @genty_dataset(
|
amine@119
|
168 mono_1byte=([400], 1, 0),
|
amine@119
|
169 stereo_1byte_2st_channel=([400, 600], 1, 1),
|
amine@119
|
170 mono_2byte=([400], 2, 0),
|
amine@119
|
171 stereo_2byte_1st_channel=([400, 600], 2, 0),
|
amine@119
|
172 stereo_2byte_2nd_channel=([400, 600], 2, 1),
|
amine@119
|
173 three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
|
amine@119
|
174 three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
|
amine@119
|
175 three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
|
amine@119
|
176 three_channel_4byte_1st=([400, 600, 1150], 4, 0),
|
amine@119
|
177 three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
|
amine@119
|
178 )
|
amine@119
|
179 def test_extract_selected_channel(
|
amine@119
|
180 self, frequencies, sample_width, use_channel
|
amine@119
|
181 ):
|
amine@119
|
182
|
amine@119
|
183 mono_channels = [
|
amine@119
|
184 _generate_pure_tone(
|
amine@119
|
185 freq,
|
amine@119
|
186 duration_sec=0.1,
|
amine@119
|
187 sampling_rate=16000,
|
amine@119
|
188 sample_width=sample_width,
|
amine@119
|
189 )
|
amine@119
|
190 for freq in frequencies
|
amine@119
|
191 ]
|
amine@119
|
192 channels = len(frequencies)
|
amine@119
|
193 fmt = DATA_FORMAT[sample_width]
|
amine@119
|
194 expected = _array_to_bytes(mono_channels[use_channel])
|
amine@119
|
195 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@119
|
196 selected_channel = _extract_selected_channel(
|
amine@119
|
197 data, channels, sample_width, use_channel
|
amine@119
|
198 )
|
amine@119
|
199 self.assertEqual(selected_channel, expected)
|
amine@119
|
200
|
amine@148
|
201 @genty_dataset(mono=([400],), three_channel=([600, 1150, 2400],))
|
amine@148
|
202 def test_extract_selected_channel_mix(self, frequencies):
|
amine@148
|
203
|
amine@148
|
204 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@148
|
205 channels = len(frequencies)
|
amine@148
|
206 fmt = DATA_FORMAT[2]
|
amine@148
|
207 expected = _array_to_bytes(
|
amine@148
|
208 array(
|
amine@148
|
209 fmt,
|
amine@148
|
210 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@148
|
211 )
|
amine@148
|
212 )
|
amine@148
|
213 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@148
|
214 selected_channel = _extract_selected_channel(data, channels, 2, "mix")
|
amine@148
|
215 self.assertEqual(selected_channel, expected)
|
amine@148
|
216
|
amine@149
|
217 @genty_dataset(positive=(2,), negative=(-3,))
|
amine@149
|
218 def test_extract_selected_channel_invalid_use_channel(self, use_channel):
|
amine@149
|
219 with self.assertRaises(AudioParameterError):
|
amine@149
|
220 _extract_selected_channel(b"\0\0", 2, 2, use_channel)
|
amine@149
|
221
|
amine@119
|
222 @genty_dataset(
|
amine@120
|
223 raw_with_audio_format=(
|
amine@120
|
224 "audio",
|
amine@120
|
225 "raw",
|
amine@120
|
226 "_load_raw",
|
amine@120
|
227 AUDIO_PARAMS_SHORT,
|
amine@120
|
228 ),
|
amine@120
|
229 raw_with_extension=(
|
amine@120
|
230 "audio.raw",
|
amine@120
|
231 None,
|
amine@120
|
232 "_load_raw",
|
amine@120
|
233 AUDIO_PARAMS_SHORT,
|
amine@120
|
234 ),
|
amine@120
|
235 wave_with_audio_format=("audio", "wave", "_load_wave"),
|
amine@120
|
236 wav_with_audio_format=("audio", "wave", "_load_wave"),
|
amine@120
|
237 wav_with_extension=("audio.wav", None, "_load_wave"),
|
amine@120
|
238 format_and_extension_both_given=("audio.dat", "wav", "_load_wave"),
|
amine@120
|
239 format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"),
|
amine@120
|
240 no_format_nor_extension=("audio", None, "_load_with_pydub"),
|
amine@120
|
241 other_formats_ogg=("audio.ogg", None, "_load_with_pydub"),
|
amine@120
|
242 other_formats_webm=("audio", "webm", "_load_with_pydub"),
|
amine@120
|
243 )
|
amine@120
|
244 def test_from_file(
|
amine@120
|
245 self, filename, audio_format, funtion_name, kwargs=None
|
amine@120
|
246 ):
|
amine@120
|
247 funtion_name = "auditok.io." + funtion_name
|
amine@120
|
248 if kwargs is None:
|
amine@120
|
249 kwargs = {}
|
amine@120
|
250 with patch(funtion_name) as patch_function:
|
amine@120
|
251 from_file(filename, audio_format, **kwargs)
|
amine@120
|
252 self.assertTrue(patch_function.called)
|
amine@120
|
253
|
amine@190
|
254 def test_from_file_large_file_raw(self,):
|
amine@162
|
255 filename = "tests/data/test_16KHZ_mono_400Hz.raw"
|
amine@190
|
256 audio_source = from_file(
|
amine@190
|
257 filename,
|
amine@190
|
258 large_file=True,
|
amine@190
|
259 sampling_rate=16000,
|
amine@190
|
260 sample_width=2,
|
amine@190
|
261 channels=1,
|
amine@190
|
262 )
|
amine@162
|
263 self.assertIsInstance(audio_source, RawAudioSource)
|
amine@162
|
264
|
amine@190
|
265 def test_from_file_large_file_wave(self,):
|
amine@162
|
266 filename = "tests/data/test_16KHZ_mono_400Hz.wav"
|
amine@162
|
267 audio_source = from_file(filename, large_file=True)
|
amine@162
|
268 self.assertIsInstance(audio_source, WaveAudioSource)
|
amine@163
|
269
|
amine@190
|
270 def test_from_file_large_file_compressed(self,):
|
amine@163
|
271 filename = "tests/data/test_16KHZ_mono_400Hz.ogg"
|
amine@163
|
272 with self.assertRaises(AudioIOError):
|
amine@163
|
273 from_file(filename, large_file=True)
|
amine@162
|
274
|
amine@137
|
275 @genty_dataset(
|
amine@137
|
276 missing_sampling_rate=("sr",),
|
amine@137
|
277 missing_sample_width=("sw",),
|
amine@137
|
278 missing_channels=("ch",),
|
amine@137
|
279 )
|
amine@137
|
280 def test_from_file_missing_audio_param(self, missing_param):
|
amine@137
|
281 with self.assertRaises(AudioParameterError):
|
amine@137
|
282 params = AUDIO_PARAMS_SHORT.copy()
|
amine@137
|
283 del params[missing_param]
|
amine@137
|
284 from_file("audio", audio_format="raw", **params)
|
amine@137
|
285
|
amine@121
|
286 def test_from_file_no_pydub(self):
|
amine@121
|
287 with patch("auditok.io._WITH_PYDUB", False):
|
amine@121
|
288 with self.assertRaises(AudioIOError):
|
amine@121
|
289 from_file("audio", "mp3")
|
amine@121
|
290
|
amine@123
|
291
|
amine@124
|
292 @patch("auditok.io._WITH_PYDUB", True)
|
amine@124
|
293 @patch("auditok.io.BufferAudioSource")
|
amine@124
|
294 @genty_dataset(
|
amine@240
|
295 ogg_first_channel=("ogg", "from_ogg"),
|
amine@240
|
296 ogg_second_channel=("ogg", "from_ogg"),
|
amine@240
|
297 ogg_mix=("ogg", "from_ogg"),
|
amine@240
|
298 ogg_default=("ogg", "from_ogg"),
|
amine@240
|
299 mp3_left_channel=("mp3", "from_mp3"),
|
amine@240
|
300 mp3_right_channel=("mp3", "from_mp3"),
|
amine@240
|
301 flac_first_channel=("flac", "from_file"),
|
amine@240
|
302 flac_second_channel=("flac", "from_file"),
|
amine@240
|
303 flv_left_channel=("flv", "from_flv"),
|
amine@240
|
304 webm_right_channel=("webm", "from_file"),
|
amine@124
|
305 )
|
amine@124
|
306 def test_from_file_multichannel_audio_compressed(
|
amine@125
|
307 self, audio_format, function, *mocks
|
amine@125
|
308 ):
|
amine@125
|
309 filename = "audio.{}".format(audio_format)
|
amine@125
|
310 segment_mock = Mock()
|
amine@125
|
311 segment_mock.sample_width = 2
|
amine@125
|
312 segment_mock.channels = 2
|
amine@125
|
313 segment_mock._data = b"abcd"
|
amine@240
|
314 with patch(
|
amine@240
|
315 "auditok.io.AudioSegment.{}".format(function)
|
amine@240
|
316 ) as open_func:
|
amine@240
|
317 open_func.return_value = segment_mock
|
amine@240
|
318 from_file(filename)
|
amine@240
|
319 self.assertTrue(open_func.called)
|
amine@240
|
320
|
amine@125
|
321
|
amine@123
|
322 @genty_dataset(
|
amine@240
|
323 mono=("mono_400", (400,)),
|
amine@240
|
324 three_channel=("3channel_400-800-1600", (400, 800, 1600)),
|
amine@240
|
325
|
amine@240
|
326 mono_large_file=("mono_400", (400,), True),
|
amine@240
|
327 three_channel_large_file=("3channel_400-800-1600", (400, 800, 1600), True),
|
amine@126
|
328 )
|
amine@240
|
329 def test_load_raw(self, file_id, frequencies, large_file=False):
|
amine@240
|
330 filename = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
|
amine@240
|
331 audio_source = _load_raw(filename, 16000, 2, len(frequencies), large_file=large_file)
|
amine@240
|
332 audio_source.open()
|
amine@240
|
333 data = audio_source.read(-1)
|
amine@240
|
334 audio_source.close()
|
amine@240
|
335 expected_class = RawAudioSource if large_file else BufferAudioSource
|
amine@240
|
336 self.assertIsInstance(audio_source, expected_class)
|
amine@126
|
337 self.assertEqual(audio_source.sampling_rate, 16000)
|
amine@126
|
338 self.assertEqual(audio_source.sample_width, 2)
|
amine@240
|
339 self.assertEqual(audio_source.channels, len(frequencies))
|
amine@240
|
340 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@240
|
341 fmt = DATA_FORMAT[audio_source.sample_width]
|
amine@240
|
342 expected =_array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@126
|
343 self.assertEqual(data, expected)
|
amine@126
|
344
|
amine@126
|
345 @genty_dataset(
|
amine@128
|
346 missing_sampling_rate=("sr",),
|
amine@128
|
347 missing_sample_width=("sw",),
|
amine@128
|
348 missing_channels=("ch",),
|
amine@128
|
349 )
|
amine@128
|
350 def test_load_raw_missing_audio_param(self, missing_param):
|
amine@128
|
351 with self.assertRaises(AudioParameterError):
|
amine@128
|
352 params = AUDIO_PARAMS_SHORT.copy()
|
amine@128
|
353 del params[missing_param]
|
amine@128
|
354 srate, swidth, channels, _ = _get_audio_parameters(params)
|
amine@128
|
355 _load_raw("audio", srate, swidth, channels)
|
amine@128
|
356
|
amine@128
|
357 @genty_dataset(
|
amine@240
|
358 mono=("mono_400", (400,)),
|
amine@240
|
359 three_channel=("3channel_400-800-1600", (400, 800, 1600)),
|
amine@240
|
360
|
amine@240
|
361 mono_large_file=("mono_400", (400,), True),
|
amine@240
|
362 three_channel_large_file=("3channel_400-800-1600", (400, 800, 1600), True),
|
amine@129
|
363 )
|
amine@240
|
364 def test_load_wave(self, file_id, frequencies, large_file=False):
|
amine@240
|
365 filename = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
|
amine@240
|
366 audio_source = _load_wave(filename, large_file=large_file)
|
amine@240
|
367 audio_source.open()
|
amine@240
|
368 data = audio_source.read(-1)
|
amine@240
|
369 audio_source.close()
|
amine@240
|
370 expected_class = WaveAudioSource if large_file else BufferAudioSource
|
amine@240
|
371 self.assertIsInstance(audio_source, expected_class)
|
amine@129
|
372 self.assertEqual(audio_source.sampling_rate, 16000)
|
amine@129
|
373 self.assertEqual(audio_source.sample_width, 2)
|
amine@240
|
374 self.assertEqual(audio_source.channels, len(frequencies))
|
amine@240
|
375 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@240
|
376 fmt = DATA_FORMAT[audio_source.sample_width]
|
amine@240
|
377 expected =_array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@129
|
378 self.assertEqual(data, expected)
|
amine@129
|
379
|
amine@130
|
380
|
amine@131
|
381 @patch("auditok.io._WITH_PYDUB", True)
|
amine@131
|
382 @patch("auditok.io.BufferAudioSource")
|
amine@131
|
383 @genty_dataset(
|
amine@240
|
384 ogg_default_first_channel=("ogg", 2, "from_ogg"),
|
amine@240
|
385 ogg_first_channel=("ogg", 1, "from_ogg"),
|
amine@240
|
386 ogg_second_channel=("ogg", 2, "from_ogg"),
|
amine@240
|
387 ogg_mix_channels=("ogg", 3, "from_ogg"),
|
amine@240
|
388 mp3_left_channel=("mp3", 1, "from_mp3"),
|
amine@240
|
389 mp3_right_channel=("mp3", 2, "from_mp3"),
|
amine@240
|
390 mp3_mix_channels=("mp3", 3, "from_mp3"),
|
amine@240
|
391 flac_first_channel=("flac", 2, "from_file"),
|
amine@240
|
392 flac_second_channel=("flac", 2, "from_file"),
|
amine@240
|
393 flv_left_channel=("flv", 1, "from_flv"),
|
amine@240
|
394 webm_right_channel=("webm", 2, "from_file"),
|
amine@240
|
395 webm_mix_channels=("webm", 4, "from_file"),
|
amine@131
|
396 )
|
amine@131
|
397 def test_load_with_pydub(
|
amine@240
|
398 self, audio_format, channels, function, *mocks
|
amine@131
|
399 ):
|
amine@131
|
400 filename = "audio.{}".format(audio_format)
|
amine@131
|
401 segment_mock = Mock()
|
amine@131
|
402 segment_mock.sample_width = 2
|
amine@131
|
403 segment_mock.channels = channels
|
amine@131
|
404 segment_mock._data = b"abcdefgh"
|
amine@240
|
405 with patch(
|
amine@240
|
406 "auditok.io.AudioSegment.{}".format(function)
|
amine@240
|
407 ) as open_func:
|
amine@240
|
408 open_func.return_value = segment_mock
|
amine@240
|
409 _load_with_pydub(filename, audio_format)
|
amine@240
|
410 self.assertTrue(open_func.called)
|
amine@240
|
411
|
amine@131
|
412
|
amine@130
|
413 @genty_dataset(
|
amine@132
|
414 mono=("mono_400Hz.raw", (400,)),
|
amine@132
|
415 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
|
amine@132
|
416 )
|
amine@132
|
417 def test_save_raw(self, filename, frequencies):
|
amine@132
|
418 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@132
|
419 sample_width = 2
|
amine@132
|
420 fmt = DATA_FORMAT[sample_width]
|
amine@132
|
421 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@132
|
422 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@132
|
423 tmpfile = NamedTemporaryFile()
|
amine@136
|
424 _save_raw(data, tmpfile.name)
|
amine@132
|
425 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|
amine@132
|
426
|
amine@132
|
427 @genty_dataset(
|
amine@110
|
428 mono=("mono_400Hz.wav", (400,)),
|
amine@110
|
429 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
|
amine@110
|
430 )
|
amine@110
|
431 def test_save_wave(self, filename, frequencies):
|
amine@110
|
432 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@110
|
433 sampling_rate = 16000
|
amine@110
|
434 sample_width = 2
|
amine@110
|
435 channels = len(frequencies)
|
amine@110
|
436 fmt = DATA_FORMAT[sample_width]
|
amine@110
|
437 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@110
|
438 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@110
|
439 tmpfile = NamedTemporaryFile()
|
amine@136
|
440 _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels)
|
amine@110
|
441 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|
amine@132
|
442
|
amine@132
|
443 @genty_dataset(
|
amine@132
|
444 missing_sampling_rate=("sr",),
|
amine@132
|
445 missing_sample_width=("sw",),
|
amine@132
|
446 missing_channels=("ch",),
|
amine@132
|
447 )
|
amine@132
|
448 def test_save_wave_missing_audio_param(self, missing_param):
|
amine@132
|
449 with self.assertRaises(AudioParameterError):
|
amine@132
|
450 params = AUDIO_PARAMS_SHORT.copy()
|
amine@132
|
451 del params[missing_param]
|
amine@132
|
452 srate, swidth, channels, _ = _get_audio_parameters(params)
|
amine@136
|
453 _save_wave(b"\0\0", "audio", srate, swidth, channels)
|
amine@133
|
454
|
amine@141
|
455 def test_save_with_pydub(self):
|
amine@141
|
456 with patch("auditok.io.AudioSegment.export") as export:
|
amine@142
|
457 tmpdir = TemporaryDirectory()
|
amine@142
|
458 filename = os.path.join(tmpdir.name, "audio.ogg")
|
amine@142
|
459 _save_with_pydub(b"\0\0", filename, "ogg", 16000, 2, 1)
|
amine@141
|
460 self.assertTrue(export.called)
|
amine@142
|
461 tmpdir.cleanup()
|
amine@141
|
462
|
amine@133
|
463 @genty_dataset(
|
amine@133
|
464 raw_with_audio_format=("audio", "raw"),
|
amine@133
|
465 raw_with_extension=("audio.raw", None),
|
amine@133
|
466 raw_with_audio_format_and_extension=("audio.mp3", "raw"),
|
amine@133
|
467 raw_no_audio_format_nor_extension=("audio", None),
|
amine@133
|
468 )
|
amine@133
|
469 def test_to_file_raw(self, filename, audio_format):
|
amine@133
|
470 exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw"
|
amine@133
|
471 tmpdir = TemporaryDirectory()
|
amine@133
|
472 filename = os.path.join(tmpdir.name, filename)
|
amine@133
|
473 data = _array_to_bytes(PURE_TONE_DICT[400])
|
amine@135
|
474 to_file(data, filename, audio_format=audio_format)
|
amine@133
|
475 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
|
amine@133
|
476 tmpdir.cleanup()
|
amine@134
|
477
|
amine@134
|
478 @genty_dataset(
|
amine@134
|
479 wav_with_audio_format=("audio", "wav"),
|
amine@134
|
480 wav_with_extension=("audio.wav", None),
|
amine@134
|
481 wav_with_audio_format_and_extension=("audio.mp3", "wav"),
|
amine@134
|
482 wave_with_audio_format=("audio", "wave"),
|
amine@134
|
483 wave_with_extension=("audio.wave", None),
|
amine@134
|
484 wave_with_audio_format_and_extension=("audio.mp3", "wave"),
|
amine@134
|
485 )
|
amine@135
|
486 def test_to_file_wave(self, filename, audio_format):
|
amine@134
|
487 exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav"
|
amine@134
|
488 tmpdir = TemporaryDirectory()
|
amine@134
|
489 filename = os.path.join(tmpdir.name, filename)
|
amine@134
|
490 data = _array_to_bytes(PURE_TONE_DICT[400])
|
amine@135
|
491 to_file(
|
amine@135
|
492 data,
|
amine@135
|
493 filename,
|
amine@135
|
494 audio_format=audio_format,
|
amine@135
|
495 sampling_rate=16000,
|
amine@135
|
496 sample_width=2,
|
amine@135
|
497 channels=1,
|
amine@134
|
498 )
|
amine@134
|
499 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
|
amine@134
|
500 tmpdir.cleanup()
|
amine@138
|
501
|
amine@138
|
502 @genty_dataset(
|
amine@138
|
503 missing_sampling_rate=("sr",),
|
amine@138
|
504 missing_sample_width=("sw",),
|
amine@138
|
505 missing_channels=("ch",),
|
amine@138
|
506 )
|
amine@138
|
507 def test_to_file_missing_audio_param(self, missing_param):
|
amine@138
|
508 params = AUDIO_PARAMS_SHORT.copy()
|
amine@138
|
509 del params[missing_param]
|
amine@138
|
510 with self.assertRaises(AudioParameterError):
|
amine@138
|
511 to_file(b"\0\0", "audio", audio_format="wav", **params)
|
amine@138
|
512 with self.assertRaises(AudioParameterError):
|
amine@138
|
513 to_file(b"\0\0", "audio", audio_format="mp3", **params)
|
amine@139
|
514
|
amine@139
|
515 def test_to_file_no_pydub(self):
|
amine@139
|
516 with patch("auditok.io._WITH_PYDUB", False):
|
amine@139
|
517 with self.assertRaises(AudioIOError):
|
amine@139
|
518 to_file("audio", b"", "mp3")
|
amine@140
|
519
|
amine@140
|
520 @patch("auditok.io._WITH_PYDUB", True)
|
amine@140
|
521 @genty_dataset(
|
amine@140
|
522 ogg_with_extension=("audio.ogg", None),
|
amine@140
|
523 ogg_with_audio_format=("audio", "ogg"),
|
amine@140
|
524 ogg_format_with_wrong_extension=("audio.wav", "ogg"),
|
amine@140
|
525 )
|
amine@140
|
526 def test_to_file_compressed(self, filename, audio_format, *mocks):
|
amine@140
|
527 with patch("auditok.io.AudioSegment.export") as export:
|
amine@142
|
528 tmpdir = TemporaryDirectory()
|
amine@142
|
529 filename = os.path.join(tmpdir.name, filename)
|
amine@140
|
530 to_file(b"\0\0", filename, audio_format, **AUDIO_PARAMS_SHORT)
|
amine@140
|
531 self.assertTrue(export.called)
|
amine@142
|
532 tmpdir.cleanup()
|
amine@190
|
533
|
amine@190
|
534 @genty_dataset(
|
amine@190
|
535 string_wave=(
|
amine@190
|
536 "tests/data/test_16KHZ_mono_400Hz.wav",
|
amine@190
|
537 BufferAudioSource,
|
amine@190
|
538 ),
|
amine@190
|
539 string_wave_large_file=(
|
amine@190
|
540 "tests/data/test_16KHZ_mono_400Hz.wav",
|
amine@190
|
541 WaveAudioSource,
|
amine@190
|
542 {"large_file": True},
|
amine@190
|
543 ),
|
amine@190
|
544 stdin=("-", StdinAudioSource),
|
amine@190
|
545 string_raw=("tests/data/test_16KHZ_mono_400Hz.raw", BufferAudioSource),
|
amine@190
|
546 string_raw_large_file=(
|
amine@190
|
547 "tests/data/test_16KHZ_mono_400Hz.raw",
|
amine@190
|
548 RawAudioSource,
|
amine@190
|
549 {"large_file": True},
|
amine@190
|
550 ),
|
amine@190
|
551 bytes_=(b"0" * 8000, BufferAudioSource),
|
amine@190
|
552 )
|
amine@190
|
553 def test_get_audio_source(self, input, expected_type, extra_args=None):
|
amine@190
|
554 kwargs = {"sampling_rate": 16000, "sample_width": 2, "channels": 1}
|
amine@190
|
555 if extra_args is not None:
|
amine@190
|
556 kwargs.update(extra_args)
|
amine@190
|
557 audio_source = get_audio_source(input, **kwargs)
|
amine@210
|
558 self.assertIsInstance(audio_source, expected_type) |