amine@106
|
1 import os
|
amine@106
|
2 import sys
|
amine@106
|
3 import math
|
amine@107
|
4 from array import array
|
amine@133
|
5 from tempfile import NamedTemporaryFile, TemporaryDirectory
|
amine@110
|
6 import filecmp
|
amine@108
|
7 from unittest import TestCase
|
amine@108
|
8 from genty import genty, genty_dataset
|
amine@157
|
9 from test_util import _sample_generator, _generate_pure_tone, PURE_TONE_DICT
|
amine@110
|
10 from auditok.io import (
|
amine@126
|
11 DATA_FORMAT,
|
amine@121
|
12 AudioIOError,
|
amine@110
|
13 AudioParameterError,
|
amine@126
|
14 BufferAudioSource,
|
amine@162
|
15 RawAudioSource,
|
amine@162
|
16 WaveAudioSource,
|
amine@190
|
17 StdinAudioSource,
|
amine@110
|
18 check_audio_data,
|
amine@143
|
19 _guess_audio_format,
|
amine@144
|
20 _normalize_use_channel,
|
amine@128
|
21 _get_audio_parameters,
|
amine@116
|
22 _array_to_bytes,
|
amine@118
|
23 _mix_audio_channels,
|
amine@119
|
24 _extract_selected_channel,
|
amine@126
|
25 _load_raw,
|
amine@129
|
26 _load_wave,
|
amine@131
|
27 _load_with_pydub,
|
amine@190
|
28 get_audio_source,
|
amine@120
|
29 from_file,
|
amine@111
|
30 _save_raw,
|
amine@110
|
31 _save_wave,
|
amine@141
|
32 _save_with_pydub,
|
amine@135
|
33 to_file,
|
amine@110
|
34 )
|
amine@106
|
35
|
amine@106
|
36
|
amine@106
|
37 if sys.version_info >= (3, 0):
|
amine@106
|
38 PYTHON_3 = True
|
amine@124
|
39 from unittest.mock import patch, Mock
|
amine@106
|
40 else:
|
amine@106
|
41 PYTHON_3 = False
|
amine@124
|
42 from mock import patch, Mock
|
amine@120
|
43
|
amine@120
|
44 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
|
amine@106
|
45
|
amine@106
|
46
|
amine@108
|
47 @genty
|
amine@108
|
48 class TestIO(TestCase):
|
amine@108
|
49 @genty_dataset(
|
amine@108
|
50 valid_mono=(b"\0" * 113, 1, 1),
|
amine@108
|
51 valid_stereo=(b"\0" * 160, 1, 2),
|
amine@108
|
52 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
|
amine@108
|
53 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
|
amine@108
|
54 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
|
amine@108
|
55 )
|
amine@108
|
56 def test_check_audio_data(self, data, sample_width, channels, valid=True):
|
amine@108
|
57
|
amine@108
|
58 if not valid:
|
amine@108
|
59 with self.assertRaises(AudioParameterError):
|
amine@108
|
60 check_audio_data(data, sample_width, channels)
|
amine@108
|
61 else:
|
amine@108
|
62 self.assertIsNone(check_audio_data(data, sample_width, channels))
|
amine@110
|
63
|
amine@110
|
64 @genty_dataset(
|
amine@143
|
65 extention_and_format_same=("wav", "filename.wav", "wav"),
|
amine@143
|
66 extention_and_format_different=("wav", "filename.mp3", "wav"),
|
amine@143
|
67 extention_no_format=(None, "filename.wav", "wav"),
|
amine@143
|
68 format_no_extension=("wav", "filename", "wav"),
|
amine@143
|
69 no_format_no_extension=(None, "filename", None),
|
amine@143
|
70 )
|
amine@143
|
71 def test_guess_audio_format(self, fmt, filename, expected):
|
amine@143
|
72 result = _guess_audio_format(fmt, filename)
|
amine@143
|
73 self.assertEqual(result, expected)
|
amine@143
|
74
|
amine@143
|
75 @genty_dataset(
|
amine@144
|
76 none=(None, 0),
|
amine@208
|
77 positive_int=(1, 0),
|
amine@144
|
78 left=("left", 0),
|
amine@144
|
79 right=("right", 1),
|
amine@144
|
80 mix=("mix", "mix"),
|
amine@144
|
81 )
|
amine@144
|
82 def test_normalize_use_channel(self, use_channel, expected):
|
amine@144
|
83 result = _normalize_use_channel(use_channel)
|
amine@144
|
84 self.assertEqual(result, expected)
|
amine@144
|
85
|
amine@144
|
86 @genty_dataset(
|
amine@208
|
87 int_1=((8000, 2, 1, 1), (8000, 2, 1, 0)),
|
amine@208
|
88 int_2=((8000, 2, 1, 2), (8000, 2, 1, 1)),
|
amine@145
|
89 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
|
amine@145
|
90 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
|
amine@145
|
91 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
|
amine@145
|
92 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
|
amine@145
|
93 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
|
amine@145
|
94 )
|
amine@145
|
95 def test_get_audio_parameters_short_params(self, values, expected):
|
amine@208
|
96 params = dict(zip(("sr", "sw", "ch", "uc"), values))
|
amine@145
|
97 result = _get_audio_parameters(params)
|
amine@145
|
98 self.assertEqual(result, expected)
|
amine@145
|
99
|
amine@145
|
100 @genty_dataset(
|
amine@208
|
101 int_1=((8000, 2, 1, 1), (8000, 2, 1, 0)),
|
amine@208
|
102 int_2=((8000, 2, 1, 2), (8000, 2, 1, 1)),
|
amine@145
|
103 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
|
amine@145
|
104 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
|
amine@145
|
105 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
|
amine@145
|
106 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
|
amine@145
|
107 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
|
amine@145
|
108 )
|
amine@145
|
109 def test_get_audio_parameters_long_params(self, values, expected):
|
amine@208
|
110 params = dict(zip(("sampling_rate", "sample_width", "channels", "use_channel"), values))
|
amine@145
|
111 result = _get_audio_parameters(params)
|
amine@145
|
112 self.assertEqual(result, expected)
|
amine@145
|
113
|
amine@208
|
114 @genty_dataset(simple=((8000, 2, 1, 1), (8000, 2, 1, 0)))
|
amine@208
|
115 def test_get_audio_parameters_long_params_shadow_short_ones(
|
amine@145
|
116 self, values, expected
|
amine@145
|
117 ):
|
amine@208
|
118 params = dict(zip(("sampling_rate", "sample_width", "channels", "use_channel"), values))
|
amine@208
|
119 params.update(dict(zip(("sr", "sw", "ch", "uc"), "xxxx")))
|
amine@145
|
120 result = _get_audio_parameters(params)
|
amine@145
|
121 self.assertEqual(result, expected)
|
amine@145
|
122
|
amine@145
|
123 @genty_dataset(
|
amine@146
|
124 str_sampling_rate=(("x", 2, 1, 0),),
|
amine@146
|
125 negative_sampling_rate=((-8000, 2, 1, 0),),
|
amine@146
|
126 str_sample_width=((8000, "x", 1, 0),),
|
amine@146
|
127 negative_sample_width=((8000, -2, 1, 0),),
|
amine@146
|
128 str_channels=((8000, 2, "x", 0),),
|
amine@146
|
129 negative_channels=((8000, 2, -1, 0),),
|
amine@146
|
130 )
|
amine@146
|
131 def test_get_audio_parameters_invalid(self, values):
|
amine@208
|
132 # TODO 0 or negative use_channel must raise AudioParameterError
|
amine@208
|
133 # change implementation, don't accept negative uc
|
amine@208
|
134 # hifglight everywhere in doc that uc must be positive
|
amine@208
|
135 params = dict(zip(("sampling_rate", "sample_width", "channels", "use_channel"), values))
|
amine@146
|
136 with self.assertRaises(AudioParameterError):
|
amine@146
|
137 _get_audio_parameters(params)
|
amine@146
|
138
|
amine@146
|
139 @genty_dataset(
|
amine@118
|
140 mono_1byte=([400], 1),
|
amine@118
|
141 stereo_1byte=([400, 600], 1),
|
amine@118
|
142 three_channel_1byte=([400, 600, 2400], 1),
|
amine@118
|
143 mono_2byte=([400], 2),
|
amine@118
|
144 stereo_2byte=([400, 600], 2),
|
amine@118
|
145 three_channel_2byte=([400, 600, 1150], 2),
|
amine@118
|
146 mono_4byte=([400], 4),
|
amine@118
|
147 stereo_4byte=([400, 600], 4),
|
amine@118
|
148 four_channel_2byte=([400, 600, 1150, 7220], 4),
|
amine@118
|
149 )
|
amine@118
|
150 def test_mix_audio_channels(self, frequencies, sample_width):
|
amine@118
|
151 sampling_rate = 16000
|
amine@118
|
152 sample_width = 2
|
amine@118
|
153 channels = len(frequencies)
|
amine@118
|
154 mono_channels = [
|
amine@118
|
155 _generate_pure_tone(
|
amine@118
|
156 freq,
|
amine@118
|
157 duration_sec=0.1,
|
amine@118
|
158 sampling_rate=sampling_rate,
|
amine@118
|
159 sample_width=sample_width,
|
amine@118
|
160 )
|
amine@118
|
161 for freq in frequencies
|
amine@118
|
162 ]
|
amine@118
|
163 fmt = DATA_FORMAT[sample_width]
|
amine@118
|
164 expected = _array_to_bytes(
|
amine@118
|
165 array(
|
amine@118
|
166 fmt,
|
amine@118
|
167 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@118
|
168 )
|
amine@118
|
169 )
|
amine@118
|
170 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@118
|
171 mixed = _mix_audio_channels(data, channels, sample_width)
|
amine@118
|
172 self.assertEqual(mixed, expected)
|
amine@118
|
173
|
amine@118
|
174 @genty_dataset(
|
amine@119
|
175 mono_1byte=([400], 1, 0),
|
amine@119
|
176 stereo_1byte_2st_channel=([400, 600], 1, 1),
|
amine@119
|
177 mono_2byte=([400], 2, 0),
|
amine@119
|
178 stereo_2byte_1st_channel=([400, 600], 2, 0),
|
amine@119
|
179 stereo_2byte_2nd_channel=([400, 600], 2, 1),
|
amine@119
|
180 three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
|
amine@119
|
181 three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
|
amine@119
|
182 three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
|
amine@119
|
183 three_channel_4byte_1st=([400, 600, 1150], 4, 0),
|
amine@119
|
184 three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
|
amine@119
|
185 )
|
amine@119
|
186 def test_extract_selected_channel(
|
amine@119
|
187 self, frequencies, sample_width, use_channel
|
amine@119
|
188 ):
|
amine@119
|
189
|
amine@119
|
190 mono_channels = [
|
amine@119
|
191 _generate_pure_tone(
|
amine@119
|
192 freq,
|
amine@119
|
193 duration_sec=0.1,
|
amine@119
|
194 sampling_rate=16000,
|
amine@119
|
195 sample_width=sample_width,
|
amine@119
|
196 )
|
amine@119
|
197 for freq in frequencies
|
amine@119
|
198 ]
|
amine@119
|
199 channels = len(frequencies)
|
amine@119
|
200 fmt = DATA_FORMAT[sample_width]
|
amine@119
|
201 expected = _array_to_bytes(mono_channels[use_channel])
|
amine@119
|
202 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@119
|
203 selected_channel = _extract_selected_channel(
|
amine@119
|
204 data, channels, sample_width, use_channel
|
amine@119
|
205 )
|
amine@119
|
206 self.assertEqual(selected_channel, expected)
|
amine@119
|
207
|
amine@148
|
208 @genty_dataset(mono=([400],), three_channel=([600, 1150, 2400],))
|
amine@148
|
209 def test_extract_selected_channel_mix(self, frequencies):
|
amine@148
|
210
|
amine@148
|
211 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@148
|
212 channels = len(frequencies)
|
amine@148
|
213 fmt = DATA_FORMAT[2]
|
amine@148
|
214 expected = _array_to_bytes(
|
amine@148
|
215 array(
|
amine@148
|
216 fmt,
|
amine@148
|
217 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@148
|
218 )
|
amine@148
|
219 )
|
amine@148
|
220 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@148
|
221 selected_channel = _extract_selected_channel(data, channels, 2, "mix")
|
amine@148
|
222 self.assertEqual(selected_channel, expected)
|
amine@148
|
223
|
amine@149
|
224 @genty_dataset(positive=(2,), negative=(-3,))
|
amine@149
|
225 def test_extract_selected_channel_invalid_use_channel(self, use_channel):
|
amine@149
|
226 with self.assertRaises(AudioParameterError):
|
amine@149
|
227 _extract_selected_channel(b"\0\0", 2, 2, use_channel)
|
amine@149
|
228
|
amine@119
|
229 @genty_dataset(
|
amine@120
|
230 raw_with_audio_format=(
|
amine@120
|
231 "audio",
|
amine@120
|
232 "raw",
|
amine@120
|
233 "_load_raw",
|
amine@120
|
234 AUDIO_PARAMS_SHORT,
|
amine@120
|
235 ),
|
amine@120
|
236 raw_with_extension=(
|
amine@120
|
237 "audio.raw",
|
amine@120
|
238 None,
|
amine@120
|
239 "_load_raw",
|
amine@120
|
240 AUDIO_PARAMS_SHORT,
|
amine@120
|
241 ),
|
amine@120
|
242 wave_with_audio_format=("audio", "wave", "_load_wave"),
|
amine@120
|
243 wav_with_audio_format=("audio", "wave", "_load_wave"),
|
amine@120
|
244 wav_with_extension=("audio.wav", None, "_load_wave"),
|
amine@120
|
245 format_and_extension_both_given=("audio.dat", "wav", "_load_wave"),
|
amine@120
|
246 format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"),
|
amine@120
|
247 no_format_nor_extension=("audio", None, "_load_with_pydub"),
|
amine@120
|
248 other_formats_ogg=("audio.ogg", None, "_load_with_pydub"),
|
amine@120
|
249 other_formats_webm=("audio", "webm", "_load_with_pydub"),
|
amine@120
|
250 )
|
amine@120
|
251 def test_from_file(
|
amine@120
|
252 self, filename, audio_format, funtion_name, kwargs=None
|
amine@120
|
253 ):
|
amine@120
|
254 funtion_name = "auditok.io." + funtion_name
|
amine@120
|
255 if kwargs is None:
|
amine@120
|
256 kwargs = {}
|
amine@120
|
257 with patch(funtion_name) as patch_function:
|
amine@120
|
258 from_file(filename, audio_format, **kwargs)
|
amine@120
|
259 self.assertTrue(patch_function.called)
|
amine@120
|
260
|
amine@190
|
261 def test_from_file_large_file_raw(self,):
|
amine@162
|
262 filename = "tests/data/test_16KHZ_mono_400Hz.raw"
|
amine@190
|
263 audio_source = from_file(
|
amine@190
|
264 filename,
|
amine@190
|
265 large_file=True,
|
amine@190
|
266 sampling_rate=16000,
|
amine@190
|
267 sample_width=2,
|
amine@190
|
268 channels=1,
|
amine@190
|
269 )
|
amine@162
|
270 self.assertIsInstance(audio_source, RawAudioSource)
|
amine@162
|
271
|
amine@190
|
272 def test_from_file_large_file_wave(self,):
|
amine@162
|
273 filename = "tests/data/test_16KHZ_mono_400Hz.wav"
|
amine@162
|
274 audio_source = from_file(filename, large_file=True)
|
amine@162
|
275 self.assertIsInstance(audio_source, WaveAudioSource)
|
amine@163
|
276
|
amine@190
|
277 def test_from_file_large_file_compressed(self,):
|
amine@163
|
278 filename = "tests/data/test_16KHZ_mono_400Hz.ogg"
|
amine@163
|
279 with self.assertRaises(AudioIOError):
|
amine@163
|
280 from_file(filename, large_file=True)
|
amine@162
|
281
|
amine@137
|
282 @genty_dataset(
|
amine@137
|
283 missing_sampling_rate=("sr",),
|
amine@137
|
284 missing_sample_width=("sw",),
|
amine@137
|
285 missing_channels=("ch",),
|
amine@137
|
286 )
|
amine@137
|
287 def test_from_file_missing_audio_param(self, missing_param):
|
amine@137
|
288 with self.assertRaises(AudioParameterError):
|
amine@137
|
289 params = AUDIO_PARAMS_SHORT.copy()
|
amine@137
|
290 del params[missing_param]
|
amine@137
|
291 from_file("audio", audio_format="raw", **params)
|
amine@137
|
292
|
amine@121
|
293 def test_from_file_no_pydub(self):
|
amine@121
|
294 with patch("auditok.io._WITH_PYDUB", False):
|
amine@121
|
295 with self.assertRaises(AudioIOError):
|
amine@121
|
296 from_file("audio", "mp3")
|
amine@121
|
297
|
amine@111
|
298 @genty_dataset(
|
amine@208
|
299 raw_first_channel=("raw", 1, 400),
|
amine@208
|
300 raw_second_channel=("raw", 2, 800),
|
amine@208
|
301 raw_third_channel=("raw", 3, 1600),
|
amine@122
|
302 raw_left_channel=("raw", "left", 400),
|
amine@122
|
303 raw_right_channel=("raw", "right", 800),
|
amine@208
|
304 wav_first_channel=("wav", 1, 400),
|
amine@208
|
305 wav_second_channel=("wav", 2, 800),
|
amine@208
|
306 wav_third_channel=("wav", 3, 1600),
|
amine@122
|
307 wav_left_channel=("wav", "left", 400),
|
amine@122
|
308 wav_right_channel=("wav", "right", 800),
|
amine@122
|
309 )
|
amine@122
|
310 def test_from_file_multichannel_audio(
|
amine@122
|
311 self, audio_format, use_channel, frequency
|
amine@122
|
312 ):
|
amine@122
|
313 expected = PURE_TONE_DICT[frequency]
|
amine@122
|
314 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format(
|
amine@122
|
315 audio_format
|
amine@122
|
316 )
|
amine@122
|
317 sample_width = 2
|
amine@122
|
318 audio_source = from_file(
|
amine@122
|
319 filename,
|
amine@122
|
320 sampling_rate=16000,
|
amine@122
|
321 sample_width=sample_width,
|
amine@122
|
322 channels=3,
|
amine@122
|
323 use_channel=use_channel,
|
amine@122
|
324 )
|
amine@122
|
325 fmt = DATA_FORMAT[sample_width]
|
amine@122
|
326 data = array(fmt, audio_source._buffer)
|
amine@122
|
327 self.assertEqual(data, expected)
|
amine@122
|
328
|
amine@122
|
329 @genty_dataset(
|
amine@123
|
330 raw_mono=("raw", "mono_400Hz", (400,)),
|
amine@123
|
331 raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@123
|
332 wav_mono=("wav", "mono_400Hz", (400,)),
|
amine@123
|
333 wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@123
|
334 )
|
amine@123
|
335 def test_from_file_multichannel_audio_mix(
|
amine@123
|
336 self, audio_format, filename_suffix, frequencies
|
amine@123
|
337 ):
|
amine@123
|
338 sampling_rate = 16000
|
amine@123
|
339 sample_width = 2
|
amine@123
|
340 channels = len(frequencies)
|
amine@123
|
341 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@123
|
342 channels = len(frequencies)
|
amine@123
|
343 fmt = DATA_FORMAT[sample_width]
|
amine@123
|
344 expected = _array_to_bytes(
|
amine@123
|
345 array(
|
amine@123
|
346 fmt,
|
amine@123
|
347 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@123
|
348 )
|
amine@123
|
349 )
|
amine@123
|
350 filename = "tests/data/test_16KHZ_{}.{}".format(
|
amine@123
|
351 filename_suffix, audio_format
|
amine@123
|
352 )
|
amine@123
|
353 audio_source = from_file(
|
amine@123
|
354 filename,
|
amine@123
|
355 use_channel="mix",
|
amine@123
|
356 sampling_rate=sampling_rate,
|
amine@123
|
357 sample_width=2,
|
amine@123
|
358 channels=channels,
|
amine@123
|
359 )
|
amine@123
|
360 mixed = audio_source._buffer
|
amine@123
|
361 self.assertEqual((mixed), expected)
|
amine@123
|
362
|
amine@124
|
363 @patch("auditok.io._WITH_PYDUB", True)
|
amine@124
|
364 @patch("auditok.io.BufferAudioSource")
|
amine@124
|
365 @genty_dataset(
|
amine@208
|
366 ogg_first_channel=("ogg", 1, "from_ogg"),
|
amine@208
|
367 ogg_second_channel=("ogg", 2, "from_ogg"),
|
amine@124
|
368 ogg_mix=("ogg", "mix", "from_ogg"),
|
amine@124
|
369 ogg_default=("ogg", None, "from_ogg"),
|
amine@124
|
370 mp3_left_channel=("mp3", "left", "from_mp3"),
|
amine@124
|
371 mp3_right_channel=("mp3", "right", "from_mp3"),
|
amine@208
|
372 flac_first_channel=("flac", 1, "from_file"),
|
amine@124
|
373 flac_second_channel=("flac", 1, "from_file"),
|
amine@124
|
374 flv_left_channel=("flv", "left", "from_flv"),
|
amine@124
|
375 webm_right_channel=("webm", "right", "from_file"),
|
amine@124
|
376 )
|
amine@124
|
377 def test_from_file_multichannel_audio_compressed(
|
amine@124
|
378 self, audio_format, use_channel, function, *mocks
|
amine@124
|
379 ):
|
amine@124
|
380 filename = "audio.{}".format(audio_format)
|
amine@124
|
381 segment_mock = Mock()
|
amine@124
|
382 segment_mock.sample_width = 2
|
amine@124
|
383 segment_mock.channels = 2
|
amine@124
|
384 segment_mock._data = b"abcd"
|
amine@124
|
385 with patch("auditok.io._extract_selected_channel") as ext_mock:
|
amine@124
|
386 with patch(
|
amine@124
|
387 "auditok.io.AudioSegment.{}".format(function)
|
amine@124
|
388 ) as open_func:
|
amine@124
|
389 open_func.return_value = segment_mock
|
amine@124
|
390 from_file(filename, use_channel=use_channel)
|
amine@124
|
391 self.assertTrue(open_func.called)
|
amine@124
|
392 self.assertTrue(ext_mock.called)
|
amine@124
|
393
|
amine@208
|
394 use_channel = {"left": 1, "right": 2, None: 1}.get(
|
amine@124
|
395 use_channel, use_channel
|
amine@124
|
396 )
|
amine@208
|
397 if isinstance(use_channel, int):
|
amine@208
|
398 # _extract_selected_channel will be called with a channel starting from 0
|
amine@208
|
399 use_channel -= 1
|
amine@124
|
400 ext_mock.assert_called_with(
|
amine@124
|
401 segment_mock._data,
|
amine@124
|
402 segment_mock.channels,
|
amine@124
|
403 segment_mock.sample_width,
|
amine@124
|
404 use_channel,
|
amine@124
|
405 )
|
amine@124
|
406
|
amine@124
|
407 with patch("auditok.io._extract_selected_channel") as ext_mock:
|
amine@124
|
408 with patch(
|
amine@124
|
409 "auditok.io.AudioSegment.{}".format(function)
|
amine@124
|
410 ) as open_func:
|
amine@124
|
411 segment_mock.channels = 1
|
amine@124
|
412 open_func.return_value = segment_mock
|
amine@124
|
413 from_file(filename, use_channel=use_channel)
|
amine@124
|
414 self.assertTrue(open_func.called)
|
amine@124
|
415 self.assertFalse(ext_mock.called)
|
amine@124
|
416
|
amine@125
|
417 @patch("auditok.io._WITH_PYDUB", True)
|
amine@125
|
418 @patch("auditok.io.BufferAudioSource")
|
amine@125
|
419 @genty_dataset(
|
amine@125
|
420 ogg=("ogg", "from_ogg"),
|
amine@125
|
421 mp3=("mp3", "from_mp3"),
|
amine@125
|
422 flac=("flac", "from_file"),
|
amine@125
|
423 )
|
amine@125
|
424 def test_from_file_multichannel_audio_mix_compressed(
|
amine@125
|
425 self, audio_format, function, *mocks
|
amine@125
|
426 ):
|
amine@125
|
427 filename = "audio.{}".format(audio_format)
|
amine@125
|
428 segment_mock = Mock()
|
amine@125
|
429 segment_mock.sample_width = 2
|
amine@125
|
430 segment_mock.channels = 2
|
amine@125
|
431 segment_mock._data = b"abcd"
|
amine@125
|
432 with patch("auditok.io._mix_audio_channels") as mix_mock:
|
amine@125
|
433 with patch(
|
amine@125
|
434 "auditok.io.AudioSegment.{}".format(function)
|
amine@125
|
435 ) as open_func:
|
amine@125
|
436 open_func.return_value = segment_mock
|
amine@125
|
437 from_file(filename, use_channel="mix")
|
amine@125
|
438 self.assertTrue(open_func.called)
|
amine@125
|
439 mix_mock.assert_called_with(
|
amine@125
|
440 segment_mock._data,
|
amine@125
|
441 segment_mock.channels,
|
amine@125
|
442 segment_mock.sample_width,
|
amine@125
|
443 )
|
amine@125
|
444
|
amine@123
|
445 @genty_dataset(
|
amine@126
|
446 dafault_first_channel=(None, 400),
|
amine@126
|
447 first_channel=(0, 400),
|
amine@126
|
448 second_channel=(1, 800),
|
amine@126
|
449 third_channel=(2, 1600),
|
amine@126
|
450 negative_first_channel=(-3, 400),
|
amine@126
|
451 negative_second_channel=(-2, 800),
|
amine@126
|
452 negative_third_channel=(-1, 1600),
|
amine@126
|
453 )
|
amine@126
|
454 def test_load_raw(self, use_channel, frequency):
|
amine@126
|
455 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw"
|
amine@126
|
456 if use_channel is not None:
|
amine@126
|
457 audio_source = _load_raw(
|
amine@126
|
458 filename,
|
amine@126
|
459 sampling_rate=16000,
|
amine@126
|
460 sample_width=2,
|
amine@126
|
461 channels=3,
|
amine@126
|
462 use_channel=use_channel,
|
amine@126
|
463 )
|
amine@126
|
464 else:
|
amine@126
|
465 audio_source = _load_raw(
|
amine@126
|
466 filename, sampling_rate=16000, sample_width=2, channels=3
|
amine@126
|
467 )
|
amine@126
|
468 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@126
|
469 self.assertEqual(audio_source.sampling_rate, 16000)
|
amine@126
|
470 self.assertEqual(audio_source.sample_width, 2)
|
amine@126
|
471 self.assertEqual(audio_source.channels, 1)
|
amine@126
|
472 # generate a pure sine wave tone of the given frequency
|
amine@126
|
473 expected = PURE_TONE_DICT[frequency]
|
amine@126
|
474 # compre with data read from file
|
amine@126
|
475 fmt = DATA_FORMAT[2]
|
amine@126
|
476 data = array(fmt, audio_source._buffer)
|
amine@126
|
477 self.assertEqual(data, expected)
|
amine@126
|
478
|
amine@126
|
479 @genty_dataset(
|
amine@127
|
480 mono=("mono_400Hz", (400,)),
|
amine@127
|
481 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@127
|
482 )
|
amine@127
|
483 def test_load_raw_mix(self, filename_suffix, frequencies):
|
amine@127
|
484 sampling_rate = 16000
|
amine@127
|
485 sample_width = 2
|
amine@127
|
486 channels = len(frequencies)
|
amine@127
|
487 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@127
|
488
|
amine@127
|
489 fmt = DATA_FORMAT[sample_width]
|
amine@127
|
490 expected = _array_to_bytes(
|
amine@127
|
491 array(
|
amine@127
|
492 fmt,
|
amine@127
|
493 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@127
|
494 )
|
amine@127
|
495 )
|
amine@127
|
496 filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix)
|
amine@127
|
497 audio_source = _load_raw(
|
amine@127
|
498 filename,
|
amine@127
|
499 use_channel="mix",
|
amine@127
|
500 sampling_rate=sampling_rate,
|
amine@127
|
501 sample_width=2,
|
amine@127
|
502 channels=channels,
|
amine@127
|
503 )
|
amine@127
|
504 mixed = audio_source._buffer
|
amine@127
|
505 self.assertEqual(mixed, expected)
|
amine@127
|
506 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@127
|
507 self.assertEqual(audio_source.sampling_rate, sampling_rate)
|
amine@127
|
508 self.assertEqual(audio_source.sample_width, sample_width)
|
amine@127
|
509 self.assertEqual(audio_source.channels, 1)
|
amine@127
|
510
|
amine@127
|
511 @genty_dataset(
|
amine@128
|
512 missing_sampling_rate=("sr",),
|
amine@128
|
513 missing_sample_width=("sw",),
|
amine@128
|
514 missing_channels=("ch",),
|
amine@128
|
515 )
|
amine@128
|
516 def test_load_raw_missing_audio_param(self, missing_param):
|
amine@128
|
517 with self.assertRaises(AudioParameterError):
|
amine@128
|
518 params = AUDIO_PARAMS_SHORT.copy()
|
amine@128
|
519 del params[missing_param]
|
amine@128
|
520 srate, swidth, channels, _ = _get_audio_parameters(params)
|
amine@128
|
521 _load_raw("audio", srate, swidth, channels)
|
amine@128
|
522
|
amine@128
|
523 @genty_dataset(
|
amine@129
|
524 dafault_first_channel=(None, 400),
|
amine@129
|
525 first_channel=(0, 400),
|
amine@129
|
526 second_channel=(1, 800),
|
amine@129
|
527 third_channel=(2, 1600),
|
amine@129
|
528 negative_first_channel=(-3, 400),
|
amine@129
|
529 negative_second_channel=(-2, 800),
|
amine@129
|
530 negative_third_channel=(-1, 1600),
|
amine@129
|
531 )
|
amine@129
|
532 def test_load_wave(self, use_channel, frequency):
|
amine@129
|
533 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav"
|
amine@129
|
534 if use_channel is not None:
|
amine@129
|
535 audio_source = _load_wave(filename, use_channel=use_channel)
|
amine@129
|
536 else:
|
amine@129
|
537 audio_source = _load_wave(filename)
|
amine@129
|
538 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@129
|
539 self.assertEqual(audio_source.sampling_rate, 16000)
|
amine@129
|
540 self.assertEqual(audio_source.sample_width, 2)
|
amine@129
|
541 self.assertEqual(audio_source.channels, 1)
|
amine@129
|
542 # generate a pure sine wave tone of the given frequency
|
amine@129
|
543 expected = PURE_TONE_DICT[frequency]
|
amine@129
|
544 # compre with data read from file
|
amine@129
|
545 fmt = DATA_FORMAT[2]
|
amine@129
|
546 data = array(fmt, audio_source._buffer)
|
amine@129
|
547 self.assertEqual(data, expected)
|
amine@129
|
548
|
amine@129
|
549 @genty_dataset(
|
amine@130
|
550 mono=("mono_400Hz", (400,)),
|
amine@130
|
551 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@130
|
552 )
|
amine@130
|
553 def test_load_wave_mix(self, filename_suffix, frequencies):
|
amine@130
|
554 sampling_rate = 16000
|
amine@130
|
555 sample_width = 2
|
amine@130
|
556 channels = len(frequencies)
|
amine@130
|
557 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@130
|
558 fmt = DATA_FORMAT[sample_width]
|
amine@130
|
559 expected = _array_to_bytes(
|
amine@130
|
560 array(
|
amine@130
|
561 fmt,
|
amine@130
|
562 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@130
|
563 )
|
amine@130
|
564 )
|
amine@130
|
565 filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix)
|
amine@130
|
566 audio_source = _load_wave(filename, use_channel="mix")
|
amine@130
|
567 mixed = audio_source._buffer
|
amine@130
|
568 self.assertEqual(mixed, expected)
|
amine@130
|
569 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@130
|
570 self.assertEqual(audio_source.sampling_rate, sampling_rate)
|
amine@130
|
571 self.assertEqual(audio_source.sample_width, sample_width)
|
amine@130
|
572 self.assertEqual(audio_source.channels, 1)
|
amine@130
|
573
|
amine@131
|
574 @patch("auditok.io._WITH_PYDUB", True)
|
amine@131
|
575 @patch("auditok.io.BufferAudioSource")
|
amine@131
|
576 @genty_dataset(
|
amine@131
|
577 ogg_default_first_channel=("ogg", 2, None, "from_ogg"),
|
amine@131
|
578 ogg_first_channel=("ogg", 1, 0, "from_ogg"),
|
amine@131
|
579 ogg_second_channel=("ogg", 2, 1, "from_ogg"),
|
amine@131
|
580 ogg_mix_channels=("ogg", 3, "mix", "from_ogg"),
|
amine@131
|
581 mp3_left_channel=("mp3", 1, "left", "from_mp3"),
|
amine@131
|
582 mp3_right_channel=("mp3", 2, "right", "from_mp3"),
|
amine@131
|
583 mp3_mix_channels=("mp3", 3, "mix", "from_mp3"),
|
amine@131
|
584 flac_first_channel=("flac", 2, 0, "from_file"),
|
amine@131
|
585 flac_second_channel=("flac", 2, 1, "from_file"),
|
amine@131
|
586 flv_left_channel=("flv", 1, "left", "from_flv"),
|
amine@131
|
587 webm_right_channel=("webm", 2, "right", "from_file"),
|
amine@131
|
588 webm_mix_channels=("webm", 4, "mix", "from_file"),
|
amine@131
|
589 )
|
amine@131
|
590 def test_load_with_pydub(
|
amine@131
|
591 self, audio_format, channels, use_channel, function, *mocks
|
amine@131
|
592 ):
|
amine@131
|
593 filename = "audio.{}".format(audio_format)
|
amine@131
|
594 segment_mock = Mock()
|
amine@131
|
595 segment_mock.sample_width = 2
|
amine@131
|
596 segment_mock.channels = channels
|
amine@131
|
597 segment_mock._data = b"abcdefgh"
|
amine@131
|
598 with patch("auditok.io._extract_selected_channel") as ext_mock:
|
amine@131
|
599 with patch(
|
amine@131
|
600 "auditok.io.AudioSegment.{}".format(function)
|
amine@131
|
601 ) as open_func:
|
amine@131
|
602 open_func.return_value = segment_mock
|
amine@131
|
603 use_channel = {"left": 0, "right": 1, None: 0}.get(
|
amine@131
|
604 use_channel, use_channel
|
amine@131
|
605 )
|
amine@131
|
606 _load_with_pydub(filename, audio_format, use_channel)
|
amine@131
|
607 self.assertTrue(open_func.called)
|
amine@131
|
608 if channels > 1:
|
amine@131
|
609 self.assertTrue(ext_mock.called)
|
amine@131
|
610 ext_mock.assert_called_with(
|
amine@131
|
611 segment_mock._data,
|
amine@131
|
612 segment_mock.channels,
|
amine@131
|
613 segment_mock.sample_width,
|
amine@131
|
614 use_channel,
|
amine@131
|
615 )
|
amine@131
|
616 else:
|
amine@131
|
617 self.assertFalse(ext_mock.called)
|
amine@131
|
618
|
amine@130
|
619 @genty_dataset(
|
amine@132
|
620 mono=("mono_400Hz.raw", (400,)),
|
amine@132
|
621 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
|
amine@132
|
622 )
|
amine@132
|
623 def test_save_raw(self, filename, frequencies):
|
amine@132
|
624 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@132
|
625 sample_width = 2
|
amine@132
|
626 fmt = DATA_FORMAT[sample_width]
|
amine@132
|
627 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@132
|
628 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@132
|
629 tmpfile = NamedTemporaryFile()
|
amine@136
|
630 _save_raw(data, tmpfile.name)
|
amine@132
|
631 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|
amine@132
|
632
|
amine@132
|
633 @genty_dataset(
|
amine@110
|
634 mono=("mono_400Hz.wav", (400,)),
|
amine@110
|
635 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
|
amine@110
|
636 )
|
amine@110
|
637 def test_save_wave(self, filename, frequencies):
|
amine@110
|
638 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@110
|
639 sampling_rate = 16000
|
amine@110
|
640 sample_width = 2
|
amine@110
|
641 channels = len(frequencies)
|
amine@110
|
642 fmt = DATA_FORMAT[sample_width]
|
amine@110
|
643 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@110
|
644 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@110
|
645 tmpfile = NamedTemporaryFile()
|
amine@136
|
646 _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels)
|
amine@110
|
647 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|
amine@132
|
648
|
amine@132
|
649 @genty_dataset(
|
amine@132
|
650 missing_sampling_rate=("sr",),
|
amine@132
|
651 missing_sample_width=("sw",),
|
amine@132
|
652 missing_channels=("ch",),
|
amine@132
|
653 )
|
amine@132
|
654 def test_save_wave_missing_audio_param(self, missing_param):
|
amine@132
|
655 with self.assertRaises(AudioParameterError):
|
amine@132
|
656 params = AUDIO_PARAMS_SHORT.copy()
|
amine@132
|
657 del params[missing_param]
|
amine@132
|
658 srate, swidth, channels, _ = _get_audio_parameters(params)
|
amine@136
|
659 _save_wave(b"\0\0", "audio", srate, swidth, channels)
|
amine@133
|
660
|
amine@141
|
661 def test_save_with_pydub(self):
|
amine@141
|
662 with patch("auditok.io.AudioSegment.export") as export:
|
amine@142
|
663 tmpdir = TemporaryDirectory()
|
amine@142
|
664 filename = os.path.join(tmpdir.name, "audio.ogg")
|
amine@142
|
665 _save_with_pydub(b"\0\0", filename, "ogg", 16000, 2, 1)
|
amine@141
|
666 self.assertTrue(export.called)
|
amine@142
|
667 tmpdir.cleanup()
|
amine@141
|
668
|
amine@133
|
669 @genty_dataset(
|
amine@133
|
670 raw_with_audio_format=("audio", "raw"),
|
amine@133
|
671 raw_with_extension=("audio.raw", None),
|
amine@133
|
672 raw_with_audio_format_and_extension=("audio.mp3", "raw"),
|
amine@133
|
673 raw_no_audio_format_nor_extension=("audio", None),
|
amine@133
|
674 )
|
amine@133
|
675 def test_to_file_raw(self, filename, audio_format):
|
amine@133
|
676 exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw"
|
amine@133
|
677 tmpdir = TemporaryDirectory()
|
amine@133
|
678 filename = os.path.join(tmpdir.name, filename)
|
amine@133
|
679 data = _array_to_bytes(PURE_TONE_DICT[400])
|
amine@135
|
680 to_file(data, filename, audio_format=audio_format)
|
amine@133
|
681 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
|
amine@133
|
682 tmpdir.cleanup()
|
amine@134
|
683
|
amine@134
|
684 @genty_dataset(
|
amine@134
|
685 wav_with_audio_format=("audio", "wav"),
|
amine@134
|
686 wav_with_extension=("audio.wav", None),
|
amine@134
|
687 wav_with_audio_format_and_extension=("audio.mp3", "wav"),
|
amine@134
|
688 wave_with_audio_format=("audio", "wave"),
|
amine@134
|
689 wave_with_extension=("audio.wave", None),
|
amine@134
|
690 wave_with_audio_format_and_extension=("audio.mp3", "wave"),
|
amine@134
|
691 )
|
amine@135
|
692 def test_to_file_wave(self, filename, audio_format):
|
amine@134
|
693 exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav"
|
amine@134
|
694 tmpdir = TemporaryDirectory()
|
amine@134
|
695 filename = os.path.join(tmpdir.name, filename)
|
amine@134
|
696 data = _array_to_bytes(PURE_TONE_DICT[400])
|
amine@135
|
697 to_file(
|
amine@135
|
698 data,
|
amine@135
|
699 filename,
|
amine@135
|
700 audio_format=audio_format,
|
amine@135
|
701 sampling_rate=16000,
|
amine@135
|
702 sample_width=2,
|
amine@135
|
703 channels=1,
|
amine@134
|
704 )
|
amine@134
|
705 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
|
amine@134
|
706 tmpdir.cleanup()
|
amine@138
|
707
|
amine@138
|
708 @genty_dataset(
|
amine@138
|
709 missing_sampling_rate=("sr",),
|
amine@138
|
710 missing_sample_width=("sw",),
|
amine@138
|
711 missing_channels=("ch",),
|
amine@138
|
712 )
|
amine@138
|
713 def test_to_file_missing_audio_param(self, missing_param):
|
amine@138
|
714 params = AUDIO_PARAMS_SHORT.copy()
|
amine@138
|
715 del params[missing_param]
|
amine@138
|
716 with self.assertRaises(AudioParameterError):
|
amine@138
|
717 to_file(b"\0\0", "audio", audio_format="wav", **params)
|
amine@138
|
718 with self.assertRaises(AudioParameterError):
|
amine@138
|
719 to_file(b"\0\0", "audio", audio_format="mp3", **params)
|
amine@139
|
720
|
amine@139
|
721 def test_to_file_no_pydub(self):
|
amine@139
|
722 with patch("auditok.io._WITH_PYDUB", False):
|
amine@139
|
723 with self.assertRaises(AudioIOError):
|
amine@139
|
724 to_file("audio", b"", "mp3")
|
amine@140
|
725
|
amine@140
|
726 @patch("auditok.io._WITH_PYDUB", True)
|
amine@140
|
727 @genty_dataset(
|
amine@140
|
728 ogg_with_extension=("audio.ogg", None),
|
amine@140
|
729 ogg_with_audio_format=("audio", "ogg"),
|
amine@140
|
730 ogg_format_with_wrong_extension=("audio.wav", "ogg"),
|
amine@140
|
731 )
|
amine@140
|
732 def test_to_file_compressed(self, filename, audio_format, *mocks):
|
amine@140
|
733 with patch("auditok.io.AudioSegment.export") as export:
|
amine@142
|
734 tmpdir = TemporaryDirectory()
|
amine@142
|
735 filename = os.path.join(tmpdir.name, filename)
|
amine@140
|
736 to_file(b"\0\0", filename, audio_format, **AUDIO_PARAMS_SHORT)
|
amine@140
|
737 self.assertTrue(export.called)
|
amine@142
|
738 tmpdir.cleanup()
|
amine@190
|
739
|
amine@190
|
740 @genty_dataset(
|
amine@190
|
741 string_wave=(
|
amine@190
|
742 "tests/data/test_16KHZ_mono_400Hz.wav",
|
amine@190
|
743 BufferAudioSource,
|
amine@190
|
744 ),
|
amine@190
|
745 string_wave_large_file=(
|
amine@190
|
746 "tests/data/test_16KHZ_mono_400Hz.wav",
|
amine@190
|
747 WaveAudioSource,
|
amine@190
|
748 {"large_file": True},
|
amine@190
|
749 ),
|
amine@190
|
750 stdin=("-", StdinAudioSource),
|
amine@190
|
751 string_raw=("tests/data/test_16KHZ_mono_400Hz.raw", BufferAudioSource),
|
amine@190
|
752 string_raw_large_file=(
|
amine@190
|
753 "tests/data/test_16KHZ_mono_400Hz.raw",
|
amine@190
|
754 RawAudioSource,
|
amine@190
|
755 {"large_file": True},
|
amine@190
|
756 ),
|
amine@190
|
757 bytes_=(b"0" * 8000, BufferAudioSource),
|
amine@190
|
758 )
|
amine@190
|
759 def test_get_audio_source(self, input, expected_type, extra_args=None):
|
amine@190
|
760 kwargs = {"sampling_rate": 16000, "sample_width": 2, "channels": 1}
|
amine@190
|
761 if extra_args is not None:
|
amine@190
|
762 kwargs.update(extra_args)
|
amine@190
|
763 audio_source = get_audio_source(input, **kwargs)
|
amine@208
|
764 self.assertIsInstance(audio_source, expected_type) |