amine@106
|
1 import os
|
amine@106
|
2 import sys
|
amine@106
|
3 import math
|
amine@107
|
4 from array import array
|
amine@133
|
5 from tempfile import NamedTemporaryFile, TemporaryDirectory
|
amine@110
|
6 import filecmp
|
amine@108
|
7 from unittest import TestCase
|
amine@108
|
8 from genty import genty, genty_dataset
|
amine@110
|
9 from auditok.io import (
|
amine@126
|
10 DATA_FORMAT,
|
amine@121
|
11 AudioIOError,
|
amine@110
|
12 AudioParameterError,
|
amine@126
|
13 BufferAudioSource,
|
amine@110
|
14 check_audio_data,
|
amine@143
|
15 _guess_audio_format,
|
amine@144
|
16 _normalize_use_channel,
|
amine@128
|
17 _get_audio_parameters,
|
amine@116
|
18 _array_to_bytes,
|
amine@118
|
19 _mix_audio_channels,
|
amine@119
|
20 _extract_selected_channel,
|
amine@126
|
21 _load_raw,
|
amine@129
|
22 _load_wave,
|
amine@131
|
23 _load_with_pydub,
|
amine@120
|
24 from_file,
|
amine@111
|
25 _save_raw,
|
amine@110
|
26 _save_wave,
|
amine@141
|
27 _save_with_pydub,
|
amine@135
|
28 to_file,
|
amine@110
|
29 )
|
amine@106
|
30
|
amine@106
|
31
|
amine@106
|
32 if sys.version_info >= (3, 0):
|
amine@106
|
33 PYTHON_3 = True
|
amine@124
|
34 from unittest.mock import patch, Mock
|
amine@106
|
35 else:
|
amine@106
|
36 PYTHON_3 = False
|
amine@124
|
37 from mock import patch, Mock
|
amine@120
|
38
|
amine@120
|
39 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
|
amine@106
|
40
|
amine@106
|
41
|
amine@106
|
42 def _sample_generator(*data_buffers):
|
amine@106
|
43 """
|
amine@106
|
44 Takes a list of many mono audio data buffers and makes a sample generator
|
amine@106
|
45 of interleaved audio samples, one sample from each channel. The resulting
|
amine@106
|
46 generator can be used to build a multichannel audio buffer.
|
amine@106
|
47 >>> gen = _sample_generator("abcd", "ABCD")
|
amine@106
|
48 >>> list(gen)
|
amine@106
|
49 ["a", "A", "b", "B", "c", "C", "d", "D"]
|
amine@106
|
50 """
|
amine@106
|
51 frame_gen = zip(*data_buffers)
|
amine@106
|
52 return (sample for frame in frame_gen for sample in frame)
|
amine@106
|
53
|
amine@106
|
54
|
amine@107
|
55 def _generate_pure_tone(
|
amine@107
|
56 frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4
|
amine@107
|
57 ):
|
amine@107
|
58 """
|
amine@107
|
59 Generates a pure tone with the given frequency.
|
amine@107
|
60 """
|
amine@107
|
61 assert frequency <= sampling_rate / 2
|
amine@107
|
62 max_value = (2 ** (sample_width * 8) // 2) - 1
|
amine@107
|
63 if volume > max_value:
|
amine@107
|
64 volume = max_value
|
amine@107
|
65 fmt = DATA_FORMAT[sample_width]
|
amine@107
|
66 total_samples = int(sampling_rate * duration_sec)
|
amine@107
|
67 step = frequency / sampling_rate
|
amine@107
|
68 two_pi_step = 2 * math.pi * step
|
amine@107
|
69 data = array(
|
amine@107
|
70 fmt,
|
amine@107
|
71 (
|
amine@107
|
72 int(math.sin(two_pi_step * i) * volume)
|
amine@107
|
73 for i in range(total_samples)
|
amine@107
|
74 ),
|
amine@107
|
75 )
|
amine@107
|
76 return data
|
amine@107
|
77
|
amine@107
|
78
|
amine@107
|
79 PURE_TONE_DICT = {
|
amine@107
|
80 freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600)
|
amine@107
|
81 }
|
amine@107
|
82 PURE_TONE_DICT.update(
|
amine@107
|
83 {
|
amine@107
|
84 freq: _generate_pure_tone(freq, 0.1, 16000, 2)
|
amine@107
|
85 for freq in (600, 1150, 2400, 7220)
|
amine@107
|
86 }
|
amine@107
|
87 )
|
amine@108
|
88
|
amine@108
|
89
|
amine@108
|
90 @genty
|
amine@108
|
91 class TestIO(TestCase):
|
amine@108
|
92 @genty_dataset(
|
amine@108
|
93 valid_mono=(b"\0" * 113, 1, 1),
|
amine@108
|
94 valid_stereo=(b"\0" * 160, 1, 2),
|
amine@108
|
95 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
|
amine@108
|
96 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
|
amine@108
|
97 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
|
amine@108
|
98 )
|
amine@108
|
99 def test_check_audio_data(self, data, sample_width, channels, valid=True):
|
amine@108
|
100
|
amine@108
|
101 if not valid:
|
amine@108
|
102 with self.assertRaises(AudioParameterError):
|
amine@108
|
103 check_audio_data(data, sample_width, channels)
|
amine@108
|
104 else:
|
amine@108
|
105 self.assertIsNone(check_audio_data(data, sample_width, channels))
|
amine@110
|
106
|
amine@110
|
107 @genty_dataset(
|
amine@143
|
108 extention_and_format_same=("wav", "filename.wav", "wav"),
|
amine@143
|
109 extention_and_format_different=("wav", "filename.mp3", "wav"),
|
amine@143
|
110 extention_no_format=(None, "filename.wav", "wav"),
|
amine@143
|
111 format_no_extension=("wav", "filename", "wav"),
|
amine@143
|
112 no_format_no_extension=(None, "filename", None),
|
amine@143
|
113 )
|
amine@143
|
114 def test_guess_audio_format(self, fmt, filename, expected):
|
amine@143
|
115 result = _guess_audio_format(fmt, filename)
|
amine@143
|
116 self.assertEqual(result, expected)
|
amine@143
|
117
|
amine@143
|
118 @genty_dataset(
|
amine@144
|
119 none=(None, 0),
|
amine@144
|
120 positive_int=(1, 1),
|
amine@144
|
121 negative_int=(-1, -1),
|
amine@144
|
122 left=("left", 0),
|
amine@144
|
123 right=("right", 1),
|
amine@144
|
124 mix=("mix", "mix"),
|
amine@144
|
125 )
|
amine@144
|
126 def test_normalize_use_channel(self, use_channel, expected):
|
amine@144
|
127 result = _normalize_use_channel(use_channel)
|
amine@144
|
128 self.assertEqual(result, expected)
|
amine@144
|
129
|
amine@144
|
130 @genty_dataset(
|
amine@145
|
131 simple=((8000, 2, 1, 0), (8000, 2, 1, 0)),
|
amine@145
|
132 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
|
amine@145
|
133 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
|
amine@145
|
134 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
|
amine@145
|
135 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
|
amine@145
|
136 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
|
amine@145
|
137 )
|
amine@145
|
138 def test_get_audio_parameters_short_params(self, values, expected):
|
amine@145
|
139 params = {k: v for k, v in zip(("sr", "sw", "ch", "uc"), values)}
|
amine@145
|
140 result = _get_audio_parameters(params)
|
amine@145
|
141 self.assertEqual(result, expected)
|
amine@145
|
142
|
amine@145
|
143 @genty_dataset(
|
amine@145
|
144 simple=((8000, 2, 1, 0), (8000, 2, 1, 0)),
|
amine@145
|
145 use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, 0)),
|
amine@145
|
146 use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, 1)),
|
amine@145
|
147 use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
|
amine@145
|
148 use_channel_None=((8000, 2, 2, None), (8000, 2, 2, 0)),
|
amine@145
|
149 no_use_channel=((8000, 2, 2), (8000, 2, 2, 0)),
|
amine@145
|
150 )
|
amine@145
|
151 def test_get_audio_parameters_long_params(self, values, expected):
|
amine@145
|
152 params = {
|
amine@145
|
153 k: v
|
amine@145
|
154 for k, v in zip(
|
amine@145
|
155 ("sampling_rate", "sample_width", "channels", "use_channel"),
|
amine@145
|
156 values,
|
amine@145
|
157 )
|
amine@145
|
158 }
|
amine@145
|
159 result = _get_audio_parameters(params)
|
amine@145
|
160 self.assertEqual(result, expected)
|
amine@145
|
161
|
amine@145
|
162 @genty_dataset(simple=((8000, 2, 1, 0), (8000, 2, 1, 0)))
|
amine@145
|
163 def test_get_audio_parameters_short_and_long_params(
|
amine@145
|
164 self, values, expected
|
amine@145
|
165 ):
|
amine@145
|
166 params = {
|
amine@145
|
167 k: v
|
amine@145
|
168 for k, v in zip(
|
amine@145
|
169 ("sampling_rate", "sample_width", "channels", "use_channel"),
|
amine@145
|
170 values,
|
amine@145
|
171 )
|
amine@145
|
172 }
|
amine@145
|
173
|
amine@145
|
174 params.update({k: v for k, v in zip(("sr", "sw", "ch", "uc"), "xxxx")})
|
amine@145
|
175 result = _get_audio_parameters(params)
|
amine@145
|
176 self.assertEqual(result, expected)
|
amine@145
|
177
|
amine@145
|
178 @genty_dataset(
|
amine@146
|
179 str_sampling_rate=(("x", 2, 1, 0),),
|
amine@146
|
180 negative_sampling_rate=((-8000, 2, 1, 0),),
|
amine@146
|
181 str_sample_width=((8000, "x", 1, 0),),
|
amine@146
|
182 negative_sample_width=((8000, -2, 1, 0),),
|
amine@146
|
183 str_channels=((8000, 2, "x", 0),),
|
amine@146
|
184 negative_channels=((8000, 2, -1, 0),),
|
amine@146
|
185 )
|
amine@146
|
186 def test_get_audio_parameters_invalid(self, values):
|
amine@146
|
187 params = {
|
amine@146
|
188 k: v
|
amine@146
|
189 for k, v in zip(
|
amine@146
|
190 ("sampling_rate", "sample_width", "channels", "use_channel"),
|
amine@146
|
191 values,
|
amine@146
|
192 )
|
amine@146
|
193 }
|
amine@146
|
194 with self.assertRaises(AudioParameterError):
|
amine@146
|
195 _get_audio_parameters(params)
|
amine@146
|
196
|
amine@146
|
197 @genty_dataset(
|
amine@118
|
198 mono_1byte=([400], 1),
|
amine@118
|
199 stereo_1byte=([400, 600], 1),
|
amine@118
|
200 three_channel_1byte=([400, 600, 2400], 1),
|
amine@118
|
201 mono_2byte=([400], 2),
|
amine@118
|
202 stereo_2byte=([400, 600], 2),
|
amine@118
|
203 three_channel_2byte=([400, 600, 1150], 2),
|
amine@118
|
204 mono_4byte=([400], 4),
|
amine@118
|
205 stereo_4byte=([400, 600], 4),
|
amine@118
|
206 four_channel_2byte=([400, 600, 1150, 7220], 4),
|
amine@118
|
207 )
|
amine@118
|
208 def test_mix_audio_channels(self, frequencies, sample_width):
|
amine@118
|
209 sampling_rate = 16000
|
amine@118
|
210 sample_width = 2
|
amine@118
|
211 channels = len(frequencies)
|
amine@118
|
212 mono_channels = [
|
amine@118
|
213 _generate_pure_tone(
|
amine@118
|
214 freq,
|
amine@118
|
215 duration_sec=0.1,
|
amine@118
|
216 sampling_rate=sampling_rate,
|
amine@118
|
217 sample_width=sample_width,
|
amine@118
|
218 )
|
amine@118
|
219 for freq in frequencies
|
amine@118
|
220 ]
|
amine@118
|
221 fmt = DATA_FORMAT[sample_width]
|
amine@118
|
222 expected = _array_to_bytes(
|
amine@118
|
223 array(
|
amine@118
|
224 fmt,
|
amine@118
|
225 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@118
|
226 )
|
amine@118
|
227 )
|
amine@118
|
228 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@118
|
229 mixed = _mix_audio_channels(data, channels, sample_width)
|
amine@118
|
230 self.assertEqual(mixed, expected)
|
amine@118
|
231
|
amine@118
|
232 @genty_dataset(
|
amine@119
|
233 mono_1byte=([400], 1, 0),
|
amine@119
|
234 stereo_1byte_2st_channel=([400, 600], 1, 1),
|
amine@119
|
235 mono_2byte=([400], 2, 0),
|
amine@119
|
236 stereo_2byte_1st_channel=([400, 600], 2, 0),
|
amine@119
|
237 stereo_2byte_2nd_channel=([400, 600], 2, 1),
|
amine@119
|
238 three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
|
amine@119
|
239 three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
|
amine@119
|
240 three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
|
amine@119
|
241 three_channel_4byte_1st=([400, 600, 1150], 4, 0),
|
amine@119
|
242 three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
|
amine@119
|
243 )
|
amine@119
|
244 def test_extract_selected_channel(
|
amine@119
|
245 self, frequencies, sample_width, use_channel
|
amine@119
|
246 ):
|
amine@119
|
247
|
amine@119
|
248 mono_channels = [
|
amine@119
|
249 _generate_pure_tone(
|
amine@119
|
250 freq,
|
amine@119
|
251 duration_sec=0.1,
|
amine@119
|
252 sampling_rate=16000,
|
amine@119
|
253 sample_width=sample_width,
|
amine@119
|
254 )
|
amine@119
|
255 for freq in frequencies
|
amine@119
|
256 ]
|
amine@119
|
257 channels = len(frequencies)
|
amine@119
|
258 fmt = DATA_FORMAT[sample_width]
|
amine@119
|
259 expected = _array_to_bytes(mono_channels[use_channel])
|
amine@119
|
260 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@119
|
261 selected_channel = _extract_selected_channel(
|
amine@119
|
262 data, channels, sample_width, use_channel
|
amine@119
|
263 )
|
amine@119
|
264 self.assertEqual(selected_channel, expected)
|
amine@119
|
265
|
amine@148
|
266 @genty_dataset(mono=([400],), three_channel=([600, 1150, 2400],))
|
amine@148
|
267 def test_extract_selected_channel_mix(self, frequencies):
|
amine@148
|
268
|
amine@148
|
269 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@148
|
270 channels = len(frequencies)
|
amine@148
|
271 fmt = DATA_FORMAT[2]
|
amine@148
|
272 expected = _array_to_bytes(
|
amine@148
|
273 array(
|
amine@148
|
274 fmt,
|
amine@148
|
275 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@148
|
276 )
|
amine@148
|
277 )
|
amine@148
|
278 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@148
|
279 selected_channel = _extract_selected_channel(data, channels, 2, "mix")
|
amine@148
|
280 self.assertEqual(selected_channel, expected)
|
amine@148
|
281
|
amine@149
|
282 @genty_dataset(positive=(2,), negative=(-3,))
|
amine@149
|
283 def test_extract_selected_channel_invalid_use_channel(self, use_channel):
|
amine@149
|
284 with self.assertRaises(AudioParameterError):
|
amine@149
|
285 _extract_selected_channel(b"\0\0", 2, 2, use_channel)
|
amine@149
|
286
|
amine@119
|
287 @genty_dataset(
|
amine@120
|
288 raw_with_audio_format=(
|
amine@120
|
289 "audio",
|
amine@120
|
290 "raw",
|
amine@120
|
291 "_load_raw",
|
amine@120
|
292 AUDIO_PARAMS_SHORT,
|
amine@120
|
293 ),
|
amine@120
|
294 raw_with_extension=(
|
amine@120
|
295 "audio.raw",
|
amine@120
|
296 None,
|
amine@120
|
297 "_load_raw",
|
amine@120
|
298 AUDIO_PARAMS_SHORT,
|
amine@120
|
299 ),
|
amine@120
|
300 wave_with_audio_format=("audio", "wave", "_load_wave"),
|
amine@120
|
301 wav_with_audio_format=("audio", "wave", "_load_wave"),
|
amine@120
|
302 wav_with_extension=("audio.wav", None, "_load_wave"),
|
amine@120
|
303 format_and_extension_both_given=("audio.dat", "wav", "_load_wave"),
|
amine@120
|
304 format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"),
|
amine@120
|
305 no_format_nor_extension=("audio", None, "_load_with_pydub"),
|
amine@120
|
306 other_formats_ogg=("audio.ogg", None, "_load_with_pydub"),
|
amine@120
|
307 other_formats_webm=("audio", "webm", "_load_with_pydub"),
|
amine@120
|
308 )
|
amine@120
|
309 def test_from_file(
|
amine@120
|
310 self, filename, audio_format, funtion_name, kwargs=None
|
amine@120
|
311 ):
|
amine@120
|
312 funtion_name = "auditok.io." + funtion_name
|
amine@120
|
313 if kwargs is None:
|
amine@120
|
314 kwargs = {}
|
amine@120
|
315 with patch(funtion_name) as patch_function:
|
amine@120
|
316 from_file(filename, audio_format, **kwargs)
|
amine@120
|
317 self.assertTrue(patch_function.called)
|
amine@120
|
318
|
amine@137
|
319 @genty_dataset(
|
amine@137
|
320 missing_sampling_rate=("sr",),
|
amine@137
|
321 missing_sample_width=("sw",),
|
amine@137
|
322 missing_channels=("ch",),
|
amine@137
|
323 )
|
amine@137
|
324 def test_from_file_missing_audio_param(self, missing_param):
|
amine@137
|
325 with self.assertRaises(AudioParameterError):
|
amine@137
|
326 params = AUDIO_PARAMS_SHORT.copy()
|
amine@137
|
327 del params[missing_param]
|
amine@137
|
328 from_file("audio", audio_format="raw", **params)
|
amine@137
|
329
|
amine@121
|
330 def test_from_file_no_pydub(self):
|
amine@121
|
331 with patch("auditok.io._WITH_PYDUB", False):
|
amine@121
|
332 with self.assertRaises(AudioIOError):
|
amine@121
|
333 from_file("audio", "mp3")
|
amine@121
|
334
|
amine@111
|
335 @genty_dataset(
|
amine@122
|
336 raw_first_channel=("raw", 0, 400),
|
amine@122
|
337 raw_second_channel=("raw", 1, 800),
|
amine@122
|
338 raw_third_channel=("raw", 2, 1600),
|
amine@122
|
339 raw_left_channel=("raw", "left", 400),
|
amine@122
|
340 raw_right_channel=("raw", "right", 800),
|
amine@122
|
341 wav_first_channel=("wav", 0, 400),
|
amine@122
|
342 wav_second_channel=("wav", 1, 800),
|
amine@122
|
343 wav_third_channel=("wav", 2, 1600),
|
amine@122
|
344 wav_left_channel=("wav", "left", 400),
|
amine@122
|
345 wav_right_channel=("wav", "right", 800),
|
amine@122
|
346 )
|
amine@122
|
347 def test_from_file_multichannel_audio(
|
amine@122
|
348 self, audio_format, use_channel, frequency
|
amine@122
|
349 ):
|
amine@122
|
350 expected = PURE_TONE_DICT[frequency]
|
amine@122
|
351 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format(
|
amine@122
|
352 audio_format
|
amine@122
|
353 )
|
amine@122
|
354 sample_width = 2
|
amine@122
|
355 audio_source = from_file(
|
amine@122
|
356 filename,
|
amine@122
|
357 sampling_rate=16000,
|
amine@122
|
358 sample_width=sample_width,
|
amine@122
|
359 channels=3,
|
amine@122
|
360 use_channel=use_channel,
|
amine@122
|
361 )
|
amine@122
|
362 fmt = DATA_FORMAT[sample_width]
|
amine@122
|
363 data = array(fmt, audio_source._buffer)
|
amine@122
|
364 self.assertEqual(data, expected)
|
amine@122
|
365
|
amine@122
|
366 @genty_dataset(
|
amine@123
|
367 raw_mono=("raw", "mono_400Hz", (400,)),
|
amine@123
|
368 raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@123
|
369 wav_mono=("wav", "mono_400Hz", (400,)),
|
amine@123
|
370 wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@123
|
371 )
|
amine@123
|
372 def test_from_file_multichannel_audio_mix(
|
amine@123
|
373 self, audio_format, filename_suffix, frequencies
|
amine@123
|
374 ):
|
amine@123
|
375 sampling_rate = 16000
|
amine@123
|
376 sample_width = 2
|
amine@123
|
377 channels = len(frequencies)
|
amine@123
|
378 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@123
|
379 channels = len(frequencies)
|
amine@123
|
380 fmt = DATA_FORMAT[sample_width]
|
amine@123
|
381 expected = _array_to_bytes(
|
amine@123
|
382 array(
|
amine@123
|
383 fmt,
|
amine@123
|
384 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@123
|
385 )
|
amine@123
|
386 )
|
amine@123
|
387 filename = "tests/data/test_16KHZ_{}.{}".format(
|
amine@123
|
388 filename_suffix, audio_format
|
amine@123
|
389 )
|
amine@123
|
390 audio_source = from_file(
|
amine@123
|
391 filename,
|
amine@123
|
392 use_channel="mix",
|
amine@123
|
393 sampling_rate=sampling_rate,
|
amine@123
|
394 sample_width=2,
|
amine@123
|
395 channels=channels,
|
amine@123
|
396 )
|
amine@123
|
397 mixed = audio_source._buffer
|
amine@123
|
398 self.assertEqual((mixed), expected)
|
amine@123
|
399
|
amine@124
|
400 @patch("auditok.io._WITH_PYDUB", True)
|
amine@124
|
401 @patch("auditok.io.BufferAudioSource")
|
amine@124
|
402 @genty_dataset(
|
amine@124
|
403 ogg_first_channel=("ogg", 0, "from_ogg"),
|
amine@124
|
404 ogg_second_channel=("ogg", 1, "from_ogg"),
|
amine@124
|
405 ogg_mix=("ogg", "mix", "from_ogg"),
|
amine@124
|
406 ogg_default=("ogg", None, "from_ogg"),
|
amine@124
|
407 mp3_left_channel=("mp3", "left", "from_mp3"),
|
amine@124
|
408 mp3_right_channel=("mp3", "right", "from_mp3"),
|
amine@124
|
409 flac_first_channel=("flac", 0, "from_file"),
|
amine@124
|
410 flac_second_channel=("flac", 1, "from_file"),
|
amine@124
|
411 flv_left_channel=("flv", "left", "from_flv"),
|
amine@124
|
412 webm_right_channel=("webm", "right", "from_file"),
|
amine@124
|
413 )
|
amine@124
|
414 def test_from_file_multichannel_audio_compressed(
|
amine@124
|
415 self, audio_format, use_channel, function, *mocks
|
amine@124
|
416 ):
|
amine@124
|
417 filename = "audio.{}".format(audio_format)
|
amine@124
|
418 segment_mock = Mock()
|
amine@124
|
419 segment_mock.sample_width = 2
|
amine@124
|
420 segment_mock.channels = 2
|
amine@124
|
421 segment_mock._data = b"abcd"
|
amine@124
|
422 with patch("auditok.io._extract_selected_channel") as ext_mock:
|
amine@124
|
423 with patch(
|
amine@124
|
424 "auditok.io.AudioSegment.{}".format(function)
|
amine@124
|
425 ) as open_func:
|
amine@124
|
426 open_func.return_value = segment_mock
|
amine@124
|
427 from_file(filename, use_channel=use_channel)
|
amine@124
|
428 self.assertTrue(open_func.called)
|
amine@124
|
429 self.assertTrue(ext_mock.called)
|
amine@124
|
430
|
amine@124
|
431 use_channel = {"left": 0, "right": 1, None: 0}.get(
|
amine@124
|
432 use_channel, use_channel
|
amine@124
|
433 )
|
amine@124
|
434 ext_mock.assert_called_with(
|
amine@124
|
435 segment_mock._data,
|
amine@124
|
436 segment_mock.channels,
|
amine@124
|
437 segment_mock.sample_width,
|
amine@124
|
438 use_channel,
|
amine@124
|
439 )
|
amine@124
|
440
|
amine@124
|
441 with patch("auditok.io._extract_selected_channel") as ext_mock:
|
amine@124
|
442 with patch(
|
amine@124
|
443 "auditok.io.AudioSegment.{}".format(function)
|
amine@124
|
444 ) as open_func:
|
amine@124
|
445 segment_mock.channels = 1
|
amine@124
|
446 open_func.return_value = segment_mock
|
amine@124
|
447 from_file(filename, use_channel=use_channel)
|
amine@124
|
448 self.assertTrue(open_func.called)
|
amine@124
|
449 self.assertFalse(ext_mock.called)
|
amine@124
|
450
|
amine@125
|
451 @patch("auditok.io._WITH_PYDUB", True)
|
amine@125
|
452 @patch("auditok.io.BufferAudioSource")
|
amine@125
|
453 @genty_dataset(
|
amine@125
|
454 ogg=("ogg", "from_ogg"),
|
amine@125
|
455 mp3=("mp3", "from_mp3"),
|
amine@125
|
456 flac=("flac", "from_file"),
|
amine@125
|
457 )
|
amine@125
|
458 def test_from_file_multichannel_audio_mix_compressed(
|
amine@125
|
459 self, audio_format, function, *mocks
|
amine@125
|
460 ):
|
amine@125
|
461 filename = "audio.{}".format(audio_format)
|
amine@125
|
462 segment_mock = Mock()
|
amine@125
|
463 segment_mock.sample_width = 2
|
amine@125
|
464 segment_mock.channels = 2
|
amine@125
|
465 segment_mock._data = b"abcd"
|
amine@125
|
466 with patch("auditok.io._mix_audio_channels") as mix_mock:
|
amine@125
|
467 with patch(
|
amine@125
|
468 "auditok.io.AudioSegment.{}".format(function)
|
amine@125
|
469 ) as open_func:
|
amine@125
|
470 open_func.return_value = segment_mock
|
amine@125
|
471 from_file(filename, use_channel="mix")
|
amine@125
|
472 self.assertTrue(open_func.called)
|
amine@125
|
473 mix_mock.assert_called_with(
|
amine@125
|
474 segment_mock._data,
|
amine@125
|
475 segment_mock.channels,
|
amine@125
|
476 segment_mock.sample_width,
|
amine@125
|
477 )
|
amine@125
|
478
|
amine@123
|
479 @genty_dataset(
|
amine@126
|
480 dafault_first_channel=(None, 400),
|
amine@126
|
481 first_channel=(0, 400),
|
amine@126
|
482 second_channel=(1, 800),
|
amine@126
|
483 third_channel=(2, 1600),
|
amine@126
|
484 negative_first_channel=(-3, 400),
|
amine@126
|
485 negative_second_channel=(-2, 800),
|
amine@126
|
486 negative_third_channel=(-1, 1600),
|
amine@126
|
487 )
|
amine@126
|
488 def test_load_raw(self, use_channel, frequency):
|
amine@126
|
489 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw"
|
amine@126
|
490 if use_channel is not None:
|
amine@126
|
491 audio_source = _load_raw(
|
amine@126
|
492 filename,
|
amine@126
|
493 sampling_rate=16000,
|
amine@126
|
494 sample_width=2,
|
amine@126
|
495 channels=3,
|
amine@126
|
496 use_channel=use_channel,
|
amine@126
|
497 )
|
amine@126
|
498 else:
|
amine@126
|
499 audio_source = _load_raw(
|
amine@126
|
500 filename, sampling_rate=16000, sample_width=2, channels=3
|
amine@126
|
501 )
|
amine@126
|
502 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@126
|
503 self.assertEqual(audio_source.sampling_rate, 16000)
|
amine@126
|
504 self.assertEqual(audio_source.sample_width, 2)
|
amine@126
|
505 self.assertEqual(audio_source.channels, 1)
|
amine@126
|
506 # generate a pure sine wave tone of the given frequency
|
amine@126
|
507 expected = PURE_TONE_DICT[frequency]
|
amine@126
|
508 # compre with data read from file
|
amine@126
|
509 fmt = DATA_FORMAT[2]
|
amine@126
|
510 data = array(fmt, audio_source._buffer)
|
amine@126
|
511 self.assertEqual(data, expected)
|
amine@126
|
512
|
amine@126
|
513 @genty_dataset(
|
amine@127
|
514 mono=("mono_400Hz", (400,)),
|
amine@127
|
515 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@127
|
516 )
|
amine@127
|
517 def test_load_raw_mix(self, filename_suffix, frequencies):
|
amine@127
|
518 sampling_rate = 16000
|
amine@127
|
519 sample_width = 2
|
amine@127
|
520 channels = len(frequencies)
|
amine@127
|
521 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@127
|
522
|
amine@127
|
523 fmt = DATA_FORMAT[sample_width]
|
amine@127
|
524 expected = _array_to_bytes(
|
amine@127
|
525 array(
|
amine@127
|
526 fmt,
|
amine@127
|
527 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@127
|
528 )
|
amine@127
|
529 )
|
amine@127
|
530 filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix)
|
amine@127
|
531 audio_source = _load_raw(
|
amine@127
|
532 filename,
|
amine@127
|
533 use_channel="mix",
|
amine@127
|
534 sampling_rate=sampling_rate,
|
amine@127
|
535 sample_width=2,
|
amine@127
|
536 channels=channels,
|
amine@127
|
537 )
|
amine@127
|
538 mixed = audio_source._buffer
|
amine@127
|
539 self.assertEqual(mixed, expected)
|
amine@127
|
540 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@127
|
541 self.assertEqual(audio_source.sampling_rate, sampling_rate)
|
amine@127
|
542 self.assertEqual(audio_source.sample_width, sample_width)
|
amine@127
|
543 self.assertEqual(audio_source.channels, 1)
|
amine@127
|
544
|
amine@127
|
545 @genty_dataset(
|
amine@128
|
546 missing_sampling_rate=("sr",),
|
amine@128
|
547 missing_sample_width=("sw",),
|
amine@128
|
548 missing_channels=("ch",),
|
amine@128
|
549 )
|
amine@128
|
550 def test_load_raw_missing_audio_param(self, missing_param):
|
amine@128
|
551 with self.assertRaises(AudioParameterError):
|
amine@128
|
552 params = AUDIO_PARAMS_SHORT.copy()
|
amine@128
|
553 del params[missing_param]
|
amine@128
|
554 srate, swidth, channels, _ = _get_audio_parameters(params)
|
amine@128
|
555 _load_raw("audio", srate, swidth, channels)
|
amine@128
|
556
|
amine@128
|
557 @genty_dataset(
|
amine@129
|
558 dafault_first_channel=(None, 400),
|
amine@129
|
559 first_channel=(0, 400),
|
amine@129
|
560 second_channel=(1, 800),
|
amine@129
|
561 third_channel=(2, 1600),
|
amine@129
|
562 negative_first_channel=(-3, 400),
|
amine@129
|
563 negative_second_channel=(-2, 800),
|
amine@129
|
564 negative_third_channel=(-1, 1600),
|
amine@129
|
565 )
|
amine@129
|
566 def test_load_wave(self, use_channel, frequency):
|
amine@129
|
567 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav"
|
amine@129
|
568 if use_channel is not None:
|
amine@129
|
569 audio_source = _load_wave(filename, use_channel=use_channel)
|
amine@129
|
570 else:
|
amine@129
|
571 audio_source = _load_wave(filename)
|
amine@129
|
572 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@129
|
573 self.assertEqual(audio_source.sampling_rate, 16000)
|
amine@129
|
574 self.assertEqual(audio_source.sample_width, 2)
|
amine@129
|
575 self.assertEqual(audio_source.channels, 1)
|
amine@129
|
576 # generate a pure sine wave tone of the given frequency
|
amine@129
|
577 expected = PURE_TONE_DICT[frequency]
|
amine@129
|
578 # compre with data read from file
|
amine@129
|
579 fmt = DATA_FORMAT[2]
|
amine@129
|
580 data = array(fmt, audio_source._buffer)
|
amine@129
|
581 self.assertEqual(data, expected)
|
amine@129
|
582
|
amine@129
|
583 @genty_dataset(
|
amine@130
|
584 mono=("mono_400Hz", (400,)),
|
amine@130
|
585 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@130
|
586 )
|
amine@130
|
587 def test_load_wave_mix(self, filename_suffix, frequencies):
|
amine@130
|
588 sampling_rate = 16000
|
amine@130
|
589 sample_width = 2
|
amine@130
|
590 channels = len(frequencies)
|
amine@130
|
591 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@130
|
592 fmt = DATA_FORMAT[sample_width]
|
amine@130
|
593 expected = _array_to_bytes(
|
amine@130
|
594 array(
|
amine@130
|
595 fmt,
|
amine@130
|
596 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@130
|
597 )
|
amine@130
|
598 )
|
amine@130
|
599 filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix)
|
amine@130
|
600 audio_source = _load_wave(filename, use_channel="mix")
|
amine@130
|
601 mixed = audio_source._buffer
|
amine@130
|
602 self.assertEqual(mixed, expected)
|
amine@130
|
603 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@130
|
604 self.assertEqual(audio_source.sampling_rate, sampling_rate)
|
amine@130
|
605 self.assertEqual(audio_source.sample_width, sample_width)
|
amine@130
|
606 self.assertEqual(audio_source.channels, 1)
|
amine@130
|
607
|
amine@131
|
608 @patch("auditok.io._WITH_PYDUB", True)
|
amine@131
|
609 @patch("auditok.io.BufferAudioSource")
|
amine@131
|
610 @genty_dataset(
|
amine@131
|
611 ogg_default_first_channel=("ogg", 2, None, "from_ogg"),
|
amine@131
|
612 ogg_first_channel=("ogg", 1, 0, "from_ogg"),
|
amine@131
|
613 ogg_second_channel=("ogg", 2, 1, "from_ogg"),
|
amine@131
|
614 ogg_mix_channels=("ogg", 3, "mix", "from_ogg"),
|
amine@131
|
615 mp3_left_channel=("mp3", 1, "left", "from_mp3"),
|
amine@131
|
616 mp3_right_channel=("mp3", 2, "right", "from_mp3"),
|
amine@131
|
617 mp3_mix_channels=("mp3", 3, "mix", "from_mp3"),
|
amine@131
|
618 flac_first_channel=("flac", 2, 0, "from_file"),
|
amine@131
|
619 flac_second_channel=("flac", 2, 1, "from_file"),
|
amine@131
|
620 flv_left_channel=("flv", 1, "left", "from_flv"),
|
amine@131
|
621 webm_right_channel=("webm", 2, "right", "from_file"),
|
amine@131
|
622 webm_mix_channels=("webm", 4, "mix", "from_file"),
|
amine@131
|
623 )
|
amine@131
|
624 def test_load_with_pydub(
|
amine@131
|
625 self, audio_format, channels, use_channel, function, *mocks
|
amine@131
|
626 ):
|
amine@131
|
627 filename = "audio.{}".format(audio_format)
|
amine@131
|
628 segment_mock = Mock()
|
amine@131
|
629 segment_mock.sample_width = 2
|
amine@131
|
630 segment_mock.channels = channels
|
amine@131
|
631 segment_mock._data = b"abcdefgh"
|
amine@131
|
632 with patch("auditok.io._extract_selected_channel") as ext_mock:
|
amine@131
|
633 with patch(
|
amine@131
|
634 "auditok.io.AudioSegment.{}".format(function)
|
amine@131
|
635 ) as open_func:
|
amine@131
|
636 open_func.return_value = segment_mock
|
amine@131
|
637 use_channel = {"left": 0, "right": 1, None: 0}.get(
|
amine@131
|
638 use_channel, use_channel
|
amine@131
|
639 )
|
amine@131
|
640 _load_with_pydub(filename, audio_format, use_channel)
|
amine@131
|
641 self.assertTrue(open_func.called)
|
amine@131
|
642 if channels > 1:
|
amine@131
|
643 self.assertTrue(ext_mock.called)
|
amine@131
|
644 ext_mock.assert_called_with(
|
amine@131
|
645 segment_mock._data,
|
amine@131
|
646 segment_mock.channels,
|
amine@131
|
647 segment_mock.sample_width,
|
amine@131
|
648 use_channel,
|
amine@131
|
649 )
|
amine@131
|
650 else:
|
amine@131
|
651 self.assertFalse(ext_mock.called)
|
amine@131
|
652
|
amine@130
|
653 @genty_dataset(
|
amine@132
|
654 mono=("mono_400Hz.raw", (400,)),
|
amine@132
|
655 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
|
amine@132
|
656 )
|
amine@132
|
657 def test_save_raw(self, filename, frequencies):
|
amine@132
|
658 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@132
|
659 sample_width = 2
|
amine@132
|
660 fmt = DATA_FORMAT[sample_width]
|
amine@132
|
661 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@132
|
662 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@132
|
663 tmpfile = NamedTemporaryFile()
|
amine@136
|
664 _save_raw(data, tmpfile.name)
|
amine@132
|
665 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|
amine@132
|
666
|
amine@132
|
667 @genty_dataset(
|
amine@110
|
668 mono=("mono_400Hz.wav", (400,)),
|
amine@110
|
669 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
|
amine@110
|
670 )
|
amine@110
|
671 def test_save_wave(self, filename, frequencies):
|
amine@110
|
672 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@110
|
673 sampling_rate = 16000
|
amine@110
|
674 sample_width = 2
|
amine@110
|
675 channels = len(frequencies)
|
amine@110
|
676 fmt = DATA_FORMAT[sample_width]
|
amine@110
|
677 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@110
|
678 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@110
|
679 tmpfile = NamedTemporaryFile()
|
amine@136
|
680 _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels)
|
amine@110
|
681 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|
amine@132
|
682
|
amine@132
|
683 @genty_dataset(
|
amine@132
|
684 missing_sampling_rate=("sr",),
|
amine@132
|
685 missing_sample_width=("sw",),
|
amine@132
|
686 missing_channels=("ch",),
|
amine@132
|
687 )
|
amine@132
|
688 def test_save_wave_missing_audio_param(self, missing_param):
|
amine@132
|
689 with self.assertRaises(AudioParameterError):
|
amine@132
|
690 params = AUDIO_PARAMS_SHORT.copy()
|
amine@132
|
691 del params[missing_param]
|
amine@132
|
692 srate, swidth, channels, _ = _get_audio_parameters(params)
|
amine@136
|
693 _save_wave(b"\0\0", "audio", srate, swidth, channels)
|
amine@133
|
694
|
amine@141
|
695 def test_save_with_pydub(self):
|
amine@141
|
696 with patch("auditok.io.AudioSegment.export") as export:
|
amine@142
|
697 tmpdir = TemporaryDirectory()
|
amine@142
|
698 filename = os.path.join(tmpdir.name, "audio.ogg")
|
amine@142
|
699 _save_with_pydub(b"\0\0", filename, "ogg", 16000, 2, 1)
|
amine@141
|
700 self.assertTrue(export.called)
|
amine@142
|
701 tmpdir.cleanup()
|
amine@141
|
702
|
amine@133
|
703 @genty_dataset(
|
amine@133
|
704 raw_with_audio_format=("audio", "raw"),
|
amine@133
|
705 raw_with_extension=("audio.raw", None),
|
amine@133
|
706 raw_with_audio_format_and_extension=("audio.mp3", "raw"),
|
amine@133
|
707 raw_no_audio_format_nor_extension=("audio", None),
|
amine@133
|
708 )
|
amine@133
|
709 def test_to_file_raw(self, filename, audio_format):
|
amine@133
|
710 exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw"
|
amine@133
|
711 tmpdir = TemporaryDirectory()
|
amine@133
|
712 filename = os.path.join(tmpdir.name, filename)
|
amine@133
|
713 data = _array_to_bytes(PURE_TONE_DICT[400])
|
amine@135
|
714 to_file(data, filename, audio_format=audio_format)
|
amine@133
|
715 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
|
amine@133
|
716 tmpdir.cleanup()
|
amine@134
|
717
|
amine@134
|
718 @genty_dataset(
|
amine@134
|
719 wav_with_audio_format=("audio", "wav"),
|
amine@134
|
720 wav_with_extension=("audio.wav", None),
|
amine@134
|
721 wav_with_audio_format_and_extension=("audio.mp3", "wav"),
|
amine@134
|
722 wave_with_audio_format=("audio", "wave"),
|
amine@134
|
723 wave_with_extension=("audio.wave", None),
|
amine@134
|
724 wave_with_audio_format_and_extension=("audio.mp3", "wave"),
|
amine@134
|
725 )
|
amine@135
|
726 def test_to_file_wave(self, filename, audio_format):
|
amine@134
|
727 exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav"
|
amine@134
|
728 tmpdir = TemporaryDirectory()
|
amine@134
|
729 filename = os.path.join(tmpdir.name, filename)
|
amine@134
|
730 data = _array_to_bytes(PURE_TONE_DICT[400])
|
amine@135
|
731 to_file(
|
amine@135
|
732 data,
|
amine@135
|
733 filename,
|
amine@135
|
734 audio_format=audio_format,
|
amine@135
|
735 sampling_rate=16000,
|
amine@135
|
736 sample_width=2,
|
amine@135
|
737 channels=1,
|
amine@134
|
738 )
|
amine@134
|
739 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
|
amine@134
|
740 tmpdir.cleanup()
|
amine@138
|
741
|
amine@138
|
742 @genty_dataset(
|
amine@138
|
743 missing_sampling_rate=("sr",),
|
amine@138
|
744 missing_sample_width=("sw",),
|
amine@138
|
745 missing_channels=("ch",),
|
amine@138
|
746 )
|
amine@138
|
747 def test_to_file_missing_audio_param(self, missing_param):
|
amine@138
|
748 params = AUDIO_PARAMS_SHORT.copy()
|
amine@138
|
749 del params[missing_param]
|
amine@138
|
750 with self.assertRaises(AudioParameterError):
|
amine@138
|
751 to_file(b"\0\0", "audio", audio_format="wav", **params)
|
amine@138
|
752 with self.assertRaises(AudioParameterError):
|
amine@138
|
753 to_file(b"\0\0", "audio", audio_format="mp3", **params)
|
amine@139
|
754
|
amine@139
|
755 def test_to_file_no_pydub(self):
|
amine@139
|
756 with patch("auditok.io._WITH_PYDUB", False):
|
amine@139
|
757 with self.assertRaises(AudioIOError):
|
amine@139
|
758 to_file("audio", b"", "mp3")
|
amine@140
|
759
|
amine@140
|
760 @patch("auditok.io._WITH_PYDUB", True)
|
amine@140
|
761 @genty_dataset(
|
amine@140
|
762 ogg_with_extension=("audio.ogg", None),
|
amine@140
|
763 ogg_with_audio_format=("audio", "ogg"),
|
amine@140
|
764 ogg_format_with_wrong_extension=("audio.wav", "ogg"),
|
amine@140
|
765 )
|
amine@140
|
766 def test_to_file_compressed(self, filename, audio_format, *mocks):
|
amine@140
|
767 with patch("auditok.io.AudioSegment.export") as export:
|
amine@142
|
768 tmpdir = TemporaryDirectory()
|
amine@142
|
769 filename = os.path.join(tmpdir.name, filename)
|
amine@140
|
770 to_file(b"\0\0", filename, audio_format, **AUDIO_PARAMS_SHORT)
|
amine@140
|
771 self.assertTrue(export.called)
|
amine@142
|
772 tmpdir.cleanup()
|