amine@106
|
1 import os
|
amine@106
|
2 import sys
|
amine@106
|
3 import math
|
amine@107
|
4 from array import array
|
amine@133
|
5 from tempfile import NamedTemporaryFile, TemporaryDirectory
|
amine@110
|
6 import filecmp
|
amine@108
|
7 from unittest import TestCase
|
amine@108
|
8 from genty import genty, genty_dataset
|
amine@110
|
9 from auditok.io import (
|
amine@126
|
10 DATA_FORMAT,
|
amine@121
|
11 AudioIOError,
|
amine@110
|
12 AudioParameterError,
|
amine@126
|
13 BufferAudioSource,
|
amine@110
|
14 check_audio_data,
|
amine@143
|
15 _guess_audio_format,
|
amine@128
|
16 _get_audio_parameters,
|
amine@116
|
17 _array_to_bytes,
|
amine@118
|
18 _mix_audio_channels,
|
amine@119
|
19 _extract_selected_channel,
|
amine@126
|
20 _load_raw,
|
amine@129
|
21 _load_wave,
|
amine@131
|
22 _load_with_pydub,
|
amine@120
|
23 from_file,
|
amine@111
|
24 _save_raw,
|
amine@110
|
25 _save_wave,
|
amine@141
|
26 _save_with_pydub,
|
amine@135
|
27 to_file,
|
amine@110
|
28 )
|
amine@106
|
29
|
amine@106
|
30
|
amine@106
|
31 if sys.version_info >= (3, 0):
|
amine@106
|
32 PYTHON_3 = True
|
amine@124
|
33 from unittest.mock import patch, Mock
|
amine@106
|
34 else:
|
amine@106
|
35 PYTHON_3 = False
|
amine@124
|
36 from mock import patch, Mock
|
amine@120
|
37
|
amine@120
|
38 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
|
amine@106
|
39
|
amine@106
|
40
|
amine@106
|
41 def _sample_generator(*data_buffers):
|
amine@106
|
42 """
|
amine@106
|
43 Takes a list of many mono audio data buffers and makes a sample generator
|
amine@106
|
44 of interleaved audio samples, one sample from each channel. The resulting
|
amine@106
|
45 generator can be used to build a multichannel audio buffer.
|
amine@106
|
46 >>> gen = _sample_generator("abcd", "ABCD")
|
amine@106
|
47 >>> list(gen)
|
amine@106
|
48 ["a", "A", "b", "B", "c", "C", "d", "D"]
|
amine@106
|
49 """
|
amine@106
|
50 frame_gen = zip(*data_buffers)
|
amine@106
|
51 return (sample for frame in frame_gen for sample in frame)
|
amine@106
|
52
|
amine@106
|
53
|
amine@107
|
54 def _generate_pure_tone(
|
amine@107
|
55 frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4
|
amine@107
|
56 ):
|
amine@107
|
57 """
|
amine@107
|
58 Generates a pure tone with the given frequency.
|
amine@107
|
59 """
|
amine@107
|
60 assert frequency <= sampling_rate / 2
|
amine@107
|
61 max_value = (2 ** (sample_width * 8) // 2) - 1
|
amine@107
|
62 if volume > max_value:
|
amine@107
|
63 volume = max_value
|
amine@107
|
64 fmt = DATA_FORMAT[sample_width]
|
amine@107
|
65 total_samples = int(sampling_rate * duration_sec)
|
amine@107
|
66 step = frequency / sampling_rate
|
amine@107
|
67 two_pi_step = 2 * math.pi * step
|
amine@107
|
68 data = array(
|
amine@107
|
69 fmt,
|
amine@107
|
70 (
|
amine@107
|
71 int(math.sin(two_pi_step * i) * volume)
|
amine@107
|
72 for i in range(total_samples)
|
amine@107
|
73 ),
|
amine@107
|
74 )
|
amine@107
|
75 return data
|
amine@107
|
76
|
amine@107
|
77
|
amine@107
|
78 PURE_TONE_DICT = {
|
amine@107
|
79 freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600)
|
amine@107
|
80 }
|
amine@107
|
81 PURE_TONE_DICT.update(
|
amine@107
|
82 {
|
amine@107
|
83 freq: _generate_pure_tone(freq, 0.1, 16000, 2)
|
amine@107
|
84 for freq in (600, 1150, 2400, 7220)
|
amine@107
|
85 }
|
amine@107
|
86 )
|
amine@108
|
87
|
amine@108
|
88
|
amine@108
|
89 @genty
|
amine@108
|
90 class TestIO(TestCase):
|
amine@108
|
91 @genty_dataset(
|
amine@108
|
92 valid_mono=(b"\0" * 113, 1, 1),
|
amine@108
|
93 valid_stereo=(b"\0" * 160, 1, 2),
|
amine@108
|
94 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
|
amine@108
|
95 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
|
amine@108
|
96 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
|
amine@108
|
97 )
|
amine@108
|
98 def test_check_audio_data(self, data, sample_width, channels, valid=True):
|
amine@108
|
99
|
amine@108
|
100 if not valid:
|
amine@108
|
101 with self.assertRaises(AudioParameterError):
|
amine@108
|
102 check_audio_data(data, sample_width, channels)
|
amine@108
|
103 else:
|
amine@108
|
104 self.assertIsNone(check_audio_data(data, sample_width, channels))
|
amine@110
|
105
|
amine@110
|
106 @genty_dataset(
|
amine@143
|
107 extention_and_format_same=("wav", "filename.wav", "wav"),
|
amine@143
|
108 extention_and_format_different=("wav", "filename.mp3", "wav"),
|
amine@143
|
109 extention_no_format=(None, "filename.wav", "wav"),
|
amine@143
|
110 format_no_extension=("wav", "filename", "wav"),
|
amine@143
|
111 no_format_no_extension=(None, "filename", None),
|
amine@143
|
112 )
|
amine@143
|
113 def test_guess_audio_format(self, fmt, filename, expected):
|
amine@143
|
114 result = _guess_audio_format(fmt, filename)
|
amine@143
|
115 self.assertEqual(result, expected)
|
amine@143
|
116
|
amine@143
|
117 @genty_dataset(
|
amine@118
|
118 mono_1byte=([400], 1),
|
amine@118
|
119 stereo_1byte=([400, 600], 1),
|
amine@118
|
120 three_channel_1byte=([400, 600, 2400], 1),
|
amine@118
|
121 mono_2byte=([400], 2),
|
amine@118
|
122 stereo_2byte=([400, 600], 2),
|
amine@118
|
123 three_channel_2byte=([400, 600, 1150], 2),
|
amine@118
|
124 mono_4byte=([400], 4),
|
amine@118
|
125 stereo_4byte=([400, 600], 4),
|
amine@118
|
126 four_channel_2byte=([400, 600, 1150, 7220], 4),
|
amine@118
|
127 )
|
amine@118
|
128 def test_mix_audio_channels(self, frequencies, sample_width):
|
amine@118
|
129 sampling_rate = 16000
|
amine@118
|
130 sample_width = 2
|
amine@118
|
131 channels = len(frequencies)
|
amine@118
|
132 mono_channels = [
|
amine@118
|
133 _generate_pure_tone(
|
amine@118
|
134 freq,
|
amine@118
|
135 duration_sec=0.1,
|
amine@118
|
136 sampling_rate=sampling_rate,
|
amine@118
|
137 sample_width=sample_width,
|
amine@118
|
138 )
|
amine@118
|
139 for freq in frequencies
|
amine@118
|
140 ]
|
amine@118
|
141 fmt = DATA_FORMAT[sample_width]
|
amine@118
|
142 expected = _array_to_bytes(
|
amine@118
|
143 array(
|
amine@118
|
144 fmt,
|
amine@118
|
145 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@118
|
146 )
|
amine@118
|
147 )
|
amine@118
|
148 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@118
|
149 mixed = _mix_audio_channels(data, channels, sample_width)
|
amine@118
|
150 self.assertEqual(mixed, expected)
|
amine@118
|
151
|
amine@118
|
152 @genty_dataset(
|
amine@119
|
153 mono_1byte=([400], 1, 0),
|
amine@119
|
154 stereo_1byte_2st_channel=([400, 600], 1, 1),
|
amine@119
|
155 mono_2byte=([400], 2, 0),
|
amine@119
|
156 stereo_2byte_1st_channel=([400, 600], 2, 0),
|
amine@119
|
157 stereo_2byte_2nd_channel=([400, 600], 2, 1),
|
amine@119
|
158 three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
|
amine@119
|
159 three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
|
amine@119
|
160 three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
|
amine@119
|
161 three_channel_4byte_1st=([400, 600, 1150], 4, 0),
|
amine@119
|
162 three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
|
amine@119
|
163 )
|
amine@119
|
164 def test_extract_selected_channel(
|
amine@119
|
165 self, frequencies, sample_width, use_channel
|
amine@119
|
166 ):
|
amine@119
|
167
|
amine@119
|
168 mono_channels = [
|
amine@119
|
169 _generate_pure_tone(
|
amine@119
|
170 freq,
|
amine@119
|
171 duration_sec=0.1,
|
amine@119
|
172 sampling_rate=16000,
|
amine@119
|
173 sample_width=sample_width,
|
amine@119
|
174 )
|
amine@119
|
175 for freq in frequencies
|
amine@119
|
176 ]
|
amine@119
|
177 channels = len(frequencies)
|
amine@119
|
178 fmt = DATA_FORMAT[sample_width]
|
amine@119
|
179 expected = _array_to_bytes(mono_channels[use_channel])
|
amine@119
|
180 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@119
|
181 selected_channel = _extract_selected_channel(
|
amine@119
|
182 data, channels, sample_width, use_channel
|
amine@119
|
183 )
|
amine@119
|
184 self.assertEqual(selected_channel, expected)
|
amine@119
|
185
|
amine@119
|
186 @genty_dataset(
|
amine@120
|
187 raw_with_audio_format=(
|
amine@120
|
188 "audio",
|
amine@120
|
189 "raw",
|
amine@120
|
190 "_load_raw",
|
amine@120
|
191 AUDIO_PARAMS_SHORT,
|
amine@120
|
192 ),
|
amine@120
|
193 raw_with_extension=(
|
amine@120
|
194 "audio.raw",
|
amine@120
|
195 None,
|
amine@120
|
196 "_load_raw",
|
amine@120
|
197 AUDIO_PARAMS_SHORT,
|
amine@120
|
198 ),
|
amine@120
|
199 wave_with_audio_format=("audio", "wave", "_load_wave"),
|
amine@120
|
200 wav_with_audio_format=("audio", "wave", "_load_wave"),
|
amine@120
|
201 wav_with_extension=("audio.wav", None, "_load_wave"),
|
amine@120
|
202 format_and_extension_both_given=("audio.dat", "wav", "_load_wave"),
|
amine@120
|
203 format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"),
|
amine@120
|
204 no_format_nor_extension=("audio", None, "_load_with_pydub"),
|
amine@120
|
205 other_formats_ogg=("audio.ogg", None, "_load_with_pydub"),
|
amine@120
|
206 other_formats_webm=("audio", "webm", "_load_with_pydub"),
|
amine@120
|
207 )
|
amine@120
|
208 def test_from_file(
|
amine@120
|
209 self, filename, audio_format, funtion_name, kwargs=None
|
amine@120
|
210 ):
|
amine@120
|
211 funtion_name = "auditok.io." + funtion_name
|
amine@120
|
212 if kwargs is None:
|
amine@120
|
213 kwargs = {}
|
amine@120
|
214 with patch(funtion_name) as patch_function:
|
amine@120
|
215 from_file(filename, audio_format, **kwargs)
|
amine@120
|
216 self.assertTrue(patch_function.called)
|
amine@120
|
217
|
amine@137
|
218 @genty_dataset(
|
amine@137
|
219 missing_sampling_rate=("sr",),
|
amine@137
|
220 missing_sample_width=("sw",),
|
amine@137
|
221 missing_channels=("ch",),
|
amine@137
|
222 )
|
amine@137
|
223 def test_from_file_missing_audio_param(self, missing_param):
|
amine@137
|
224 with self.assertRaises(AudioParameterError):
|
amine@137
|
225 params = AUDIO_PARAMS_SHORT.copy()
|
amine@137
|
226 del params[missing_param]
|
amine@137
|
227 from_file("audio", audio_format="raw", **params)
|
amine@137
|
228
|
amine@121
|
229 def test_from_file_no_pydub(self):
|
amine@121
|
230 with patch("auditok.io._WITH_PYDUB", False):
|
amine@121
|
231 with self.assertRaises(AudioIOError):
|
amine@121
|
232 from_file("audio", "mp3")
|
amine@121
|
233
|
amine@111
|
234 @genty_dataset(
|
amine@122
|
235 raw_first_channel=("raw", 0, 400),
|
amine@122
|
236 raw_second_channel=("raw", 1, 800),
|
amine@122
|
237 raw_third_channel=("raw", 2, 1600),
|
amine@122
|
238 raw_left_channel=("raw", "left", 400),
|
amine@122
|
239 raw_right_channel=("raw", "right", 800),
|
amine@122
|
240 wav_first_channel=("wav", 0, 400),
|
amine@122
|
241 wav_second_channel=("wav", 1, 800),
|
amine@122
|
242 wav_third_channel=("wav", 2, 1600),
|
amine@122
|
243 wav_left_channel=("wav", "left", 400),
|
amine@122
|
244 wav_right_channel=("wav", "right", 800),
|
amine@122
|
245 )
|
amine@122
|
246 def test_from_file_multichannel_audio(
|
amine@122
|
247 self, audio_format, use_channel, frequency
|
amine@122
|
248 ):
|
amine@122
|
249 expected = PURE_TONE_DICT[frequency]
|
amine@122
|
250 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format(
|
amine@122
|
251 audio_format
|
amine@122
|
252 )
|
amine@122
|
253 sample_width = 2
|
amine@122
|
254 audio_source = from_file(
|
amine@122
|
255 filename,
|
amine@122
|
256 sampling_rate=16000,
|
amine@122
|
257 sample_width=sample_width,
|
amine@122
|
258 channels=3,
|
amine@122
|
259 use_channel=use_channel,
|
amine@122
|
260 )
|
amine@122
|
261 fmt = DATA_FORMAT[sample_width]
|
amine@122
|
262 data = array(fmt, audio_source._buffer)
|
amine@122
|
263 self.assertEqual(data, expected)
|
amine@122
|
264
|
amine@122
|
265 @genty_dataset(
|
amine@123
|
266 raw_mono=("raw", "mono_400Hz", (400,)),
|
amine@123
|
267 raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@123
|
268 wav_mono=("wav", "mono_400Hz", (400,)),
|
amine@123
|
269 wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@123
|
270 )
|
amine@123
|
271 def test_from_file_multichannel_audio_mix(
|
amine@123
|
272 self, audio_format, filename_suffix, frequencies
|
amine@123
|
273 ):
|
amine@123
|
274 sampling_rate = 16000
|
amine@123
|
275 sample_width = 2
|
amine@123
|
276 channels = len(frequencies)
|
amine@123
|
277 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@123
|
278 channels = len(frequencies)
|
amine@123
|
279 fmt = DATA_FORMAT[sample_width]
|
amine@123
|
280 expected = _array_to_bytes(
|
amine@123
|
281 array(
|
amine@123
|
282 fmt,
|
amine@123
|
283 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@123
|
284 )
|
amine@123
|
285 )
|
amine@123
|
286 filename = "tests/data/test_16KHZ_{}.{}".format(
|
amine@123
|
287 filename_suffix, audio_format
|
amine@123
|
288 )
|
amine@123
|
289 audio_source = from_file(
|
amine@123
|
290 filename,
|
amine@123
|
291 use_channel="mix",
|
amine@123
|
292 sampling_rate=sampling_rate,
|
amine@123
|
293 sample_width=2,
|
amine@123
|
294 channels=channels,
|
amine@123
|
295 )
|
amine@123
|
296 mixed = audio_source._buffer
|
amine@123
|
297 self.assertEqual((mixed), expected)
|
amine@123
|
298
|
amine@124
|
299 @patch("auditok.io._WITH_PYDUB", True)
|
amine@124
|
300 @patch("auditok.io.BufferAudioSource")
|
amine@124
|
301 @genty_dataset(
|
amine@124
|
302 ogg_first_channel=("ogg", 0, "from_ogg"),
|
amine@124
|
303 ogg_second_channel=("ogg", 1, "from_ogg"),
|
amine@124
|
304 ogg_mix=("ogg", "mix", "from_ogg"),
|
amine@124
|
305 ogg_default=("ogg", None, "from_ogg"),
|
amine@124
|
306 mp3_left_channel=("mp3", "left", "from_mp3"),
|
amine@124
|
307 mp3_right_channel=("mp3", "right", "from_mp3"),
|
amine@124
|
308 flac_first_channel=("flac", 0, "from_file"),
|
amine@124
|
309 flac_second_channel=("flac", 1, "from_file"),
|
amine@124
|
310 flv_left_channel=("flv", "left", "from_flv"),
|
amine@124
|
311 webm_right_channel=("webm", "right", "from_file"),
|
amine@124
|
312 )
|
amine@124
|
313 def test_from_file_multichannel_audio_compressed(
|
amine@124
|
314 self, audio_format, use_channel, function, *mocks
|
amine@124
|
315 ):
|
amine@124
|
316 filename = "audio.{}".format(audio_format)
|
amine@124
|
317 segment_mock = Mock()
|
amine@124
|
318 segment_mock.sample_width = 2
|
amine@124
|
319 segment_mock.channels = 2
|
amine@124
|
320 segment_mock._data = b"abcd"
|
amine@124
|
321 with patch("auditok.io._extract_selected_channel") as ext_mock:
|
amine@124
|
322 with patch(
|
amine@124
|
323 "auditok.io.AudioSegment.{}".format(function)
|
amine@124
|
324 ) as open_func:
|
amine@124
|
325 open_func.return_value = segment_mock
|
amine@124
|
326 from_file(filename, use_channel=use_channel)
|
amine@124
|
327 self.assertTrue(open_func.called)
|
amine@124
|
328 self.assertTrue(ext_mock.called)
|
amine@124
|
329
|
amine@124
|
330 use_channel = {"left": 0, "right": 1, None: 0}.get(
|
amine@124
|
331 use_channel, use_channel
|
amine@124
|
332 )
|
amine@124
|
333 ext_mock.assert_called_with(
|
amine@124
|
334 segment_mock._data,
|
amine@124
|
335 segment_mock.channels,
|
amine@124
|
336 segment_mock.sample_width,
|
amine@124
|
337 use_channel,
|
amine@124
|
338 )
|
amine@124
|
339
|
amine@124
|
340 with patch("auditok.io._extract_selected_channel") as ext_mock:
|
amine@124
|
341 with patch(
|
amine@124
|
342 "auditok.io.AudioSegment.{}".format(function)
|
amine@124
|
343 ) as open_func:
|
amine@124
|
344 segment_mock.channels = 1
|
amine@124
|
345 open_func.return_value = segment_mock
|
amine@124
|
346 from_file(filename, use_channel=use_channel)
|
amine@124
|
347 self.assertTrue(open_func.called)
|
amine@124
|
348 self.assertFalse(ext_mock.called)
|
amine@124
|
349
|
amine@125
|
350 @patch("auditok.io._WITH_PYDUB", True)
|
amine@125
|
351 @patch("auditok.io.BufferAudioSource")
|
amine@125
|
352 @genty_dataset(
|
amine@125
|
353 ogg=("ogg", "from_ogg"),
|
amine@125
|
354 mp3=("mp3", "from_mp3"),
|
amine@125
|
355 flac=("flac", "from_file"),
|
amine@125
|
356 )
|
amine@125
|
357 def test_from_file_multichannel_audio_mix_compressed(
|
amine@125
|
358 self, audio_format, function, *mocks
|
amine@125
|
359 ):
|
amine@125
|
360 filename = "audio.{}".format(audio_format)
|
amine@125
|
361 segment_mock = Mock()
|
amine@125
|
362 segment_mock.sample_width = 2
|
amine@125
|
363 segment_mock.channels = 2
|
amine@125
|
364 segment_mock._data = b"abcd"
|
amine@125
|
365 with patch("auditok.io._mix_audio_channels") as mix_mock:
|
amine@125
|
366 with patch(
|
amine@125
|
367 "auditok.io.AudioSegment.{}".format(function)
|
amine@125
|
368 ) as open_func:
|
amine@125
|
369 open_func.return_value = segment_mock
|
amine@125
|
370 from_file(filename, use_channel="mix")
|
amine@125
|
371 self.assertTrue(open_func.called)
|
amine@125
|
372 mix_mock.assert_called_with(
|
amine@125
|
373 segment_mock._data,
|
amine@125
|
374 segment_mock.channels,
|
amine@125
|
375 segment_mock.sample_width,
|
amine@125
|
376 )
|
amine@125
|
377
|
amine@123
|
378 @genty_dataset(
|
amine@126
|
379 dafault_first_channel=(None, 400),
|
amine@126
|
380 first_channel=(0, 400),
|
amine@126
|
381 second_channel=(1, 800),
|
amine@126
|
382 third_channel=(2, 1600),
|
amine@126
|
383 negative_first_channel=(-3, 400),
|
amine@126
|
384 negative_second_channel=(-2, 800),
|
amine@126
|
385 negative_third_channel=(-1, 1600),
|
amine@126
|
386 )
|
amine@126
|
387 def test_load_raw(self, use_channel, frequency):
|
amine@126
|
388 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw"
|
amine@126
|
389 if use_channel is not None:
|
amine@126
|
390 audio_source = _load_raw(
|
amine@126
|
391 filename,
|
amine@126
|
392 sampling_rate=16000,
|
amine@126
|
393 sample_width=2,
|
amine@126
|
394 channels=3,
|
amine@126
|
395 use_channel=use_channel,
|
amine@126
|
396 )
|
amine@126
|
397 else:
|
amine@126
|
398 audio_source = _load_raw(
|
amine@126
|
399 filename, sampling_rate=16000, sample_width=2, channels=3
|
amine@126
|
400 )
|
amine@126
|
401 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@126
|
402 self.assertEqual(audio_source.sampling_rate, 16000)
|
amine@126
|
403 self.assertEqual(audio_source.sample_width, 2)
|
amine@126
|
404 self.assertEqual(audio_source.channels, 1)
|
amine@126
|
405 # generate a pure sine wave tone of the given frequency
|
amine@126
|
406 expected = PURE_TONE_DICT[frequency]
|
amine@126
|
407 # compre with data read from file
|
amine@126
|
408 fmt = DATA_FORMAT[2]
|
amine@126
|
409 data = array(fmt, audio_source._buffer)
|
amine@126
|
410 self.assertEqual(data, expected)
|
amine@126
|
411
|
amine@126
|
412 @genty_dataset(
|
amine@127
|
413 mono=("mono_400Hz", (400,)),
|
amine@127
|
414 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@127
|
415 )
|
amine@127
|
416 def test_load_raw_mix(self, filename_suffix, frequencies):
|
amine@127
|
417 sampling_rate = 16000
|
amine@127
|
418 sample_width = 2
|
amine@127
|
419 channels = len(frequencies)
|
amine@127
|
420 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@127
|
421
|
amine@127
|
422 fmt = DATA_FORMAT[sample_width]
|
amine@127
|
423 expected = _array_to_bytes(
|
amine@127
|
424 array(
|
amine@127
|
425 fmt,
|
amine@127
|
426 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@127
|
427 )
|
amine@127
|
428 )
|
amine@127
|
429 filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix)
|
amine@127
|
430 audio_source = _load_raw(
|
amine@127
|
431 filename,
|
amine@127
|
432 use_channel="mix",
|
amine@127
|
433 sampling_rate=sampling_rate,
|
amine@127
|
434 sample_width=2,
|
amine@127
|
435 channels=channels,
|
amine@127
|
436 )
|
amine@127
|
437 mixed = audio_source._buffer
|
amine@127
|
438 self.assertEqual(mixed, expected)
|
amine@127
|
439 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@127
|
440 self.assertEqual(audio_source.sampling_rate, sampling_rate)
|
amine@127
|
441 self.assertEqual(audio_source.sample_width, sample_width)
|
amine@127
|
442 self.assertEqual(audio_source.channels, 1)
|
amine@127
|
443
|
amine@127
|
444 @genty_dataset(
|
amine@128
|
445 missing_sampling_rate=("sr",),
|
amine@128
|
446 missing_sample_width=("sw",),
|
amine@128
|
447 missing_channels=("ch",),
|
amine@128
|
448 )
|
amine@128
|
449 def test_load_raw_missing_audio_param(self, missing_param):
|
amine@128
|
450 with self.assertRaises(AudioParameterError):
|
amine@128
|
451 params = AUDIO_PARAMS_SHORT.copy()
|
amine@128
|
452 del params[missing_param]
|
amine@128
|
453 srate, swidth, channels, _ = _get_audio_parameters(params)
|
amine@128
|
454 _load_raw("audio", srate, swidth, channels)
|
amine@128
|
455
|
amine@128
|
456 @genty_dataset(
|
amine@129
|
457 dafault_first_channel=(None, 400),
|
amine@129
|
458 first_channel=(0, 400),
|
amine@129
|
459 second_channel=(1, 800),
|
amine@129
|
460 third_channel=(2, 1600),
|
amine@129
|
461 negative_first_channel=(-3, 400),
|
amine@129
|
462 negative_second_channel=(-2, 800),
|
amine@129
|
463 negative_third_channel=(-1, 1600),
|
amine@129
|
464 )
|
amine@129
|
465 def test_load_wave(self, use_channel, frequency):
|
amine@129
|
466 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav"
|
amine@129
|
467 if use_channel is not None:
|
amine@129
|
468 audio_source = _load_wave(filename, use_channel=use_channel)
|
amine@129
|
469 else:
|
amine@129
|
470 audio_source = _load_wave(filename)
|
amine@129
|
471 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@129
|
472 self.assertEqual(audio_source.sampling_rate, 16000)
|
amine@129
|
473 self.assertEqual(audio_source.sample_width, 2)
|
amine@129
|
474 self.assertEqual(audio_source.channels, 1)
|
amine@129
|
475 # generate a pure sine wave tone of the given frequency
|
amine@129
|
476 expected = PURE_TONE_DICT[frequency]
|
amine@129
|
477 # compre with data read from file
|
amine@129
|
478 fmt = DATA_FORMAT[2]
|
amine@129
|
479 data = array(fmt, audio_source._buffer)
|
amine@129
|
480 self.assertEqual(data, expected)
|
amine@129
|
481
|
amine@129
|
482 @genty_dataset(
|
amine@130
|
483 mono=("mono_400Hz", (400,)),
|
amine@130
|
484 three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
|
amine@130
|
485 )
|
amine@130
|
486 def test_load_wave_mix(self, filename_suffix, frequencies):
|
amine@130
|
487 sampling_rate = 16000
|
amine@130
|
488 sample_width = 2
|
amine@130
|
489 channels = len(frequencies)
|
amine@130
|
490 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@130
|
491 fmt = DATA_FORMAT[sample_width]
|
amine@130
|
492 expected = _array_to_bytes(
|
amine@130
|
493 array(
|
amine@130
|
494 fmt,
|
amine@130
|
495 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@130
|
496 )
|
amine@130
|
497 )
|
amine@130
|
498 filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix)
|
amine@130
|
499 audio_source = _load_wave(filename, use_channel="mix")
|
amine@130
|
500 mixed = audio_source._buffer
|
amine@130
|
501 self.assertEqual(mixed, expected)
|
amine@130
|
502 self.assertIsInstance(audio_source, BufferAudioSource)
|
amine@130
|
503 self.assertEqual(audio_source.sampling_rate, sampling_rate)
|
amine@130
|
504 self.assertEqual(audio_source.sample_width, sample_width)
|
amine@130
|
505 self.assertEqual(audio_source.channels, 1)
|
amine@130
|
506
|
amine@131
|
507 @patch("auditok.io._WITH_PYDUB", True)
|
amine@131
|
508 @patch("auditok.io.BufferAudioSource")
|
amine@131
|
509 @genty_dataset(
|
amine@131
|
510 ogg_default_first_channel=("ogg", 2, None, "from_ogg"),
|
amine@131
|
511 ogg_first_channel=("ogg", 1, 0, "from_ogg"),
|
amine@131
|
512 ogg_second_channel=("ogg", 2, 1, "from_ogg"),
|
amine@131
|
513 ogg_mix_channels=("ogg", 3, "mix", "from_ogg"),
|
amine@131
|
514 mp3_left_channel=("mp3", 1, "left", "from_mp3"),
|
amine@131
|
515 mp3_right_channel=("mp3", 2, "right", "from_mp3"),
|
amine@131
|
516 mp3_mix_channels=("mp3", 3, "mix", "from_mp3"),
|
amine@131
|
517 flac_first_channel=("flac", 2, 0, "from_file"),
|
amine@131
|
518 flac_second_channel=("flac", 2, 1, "from_file"),
|
amine@131
|
519 flv_left_channel=("flv", 1, "left", "from_flv"),
|
amine@131
|
520 webm_right_channel=("webm", 2, "right", "from_file"),
|
amine@131
|
521 webm_mix_channels=("webm", 4, "mix", "from_file"),
|
amine@131
|
522 )
|
amine@131
|
523 def test_load_with_pydub(
|
amine@131
|
524 self, audio_format, channels, use_channel, function, *mocks
|
amine@131
|
525 ):
|
amine@131
|
526 filename = "audio.{}".format(audio_format)
|
amine@131
|
527 segment_mock = Mock()
|
amine@131
|
528 segment_mock.sample_width = 2
|
amine@131
|
529 segment_mock.channels = channels
|
amine@131
|
530 segment_mock._data = b"abcdefgh"
|
amine@131
|
531 with patch("auditok.io._extract_selected_channel") as ext_mock:
|
amine@131
|
532 with patch(
|
amine@131
|
533 "auditok.io.AudioSegment.{}".format(function)
|
amine@131
|
534 ) as open_func:
|
amine@131
|
535 open_func.return_value = segment_mock
|
amine@131
|
536 use_channel = {"left": 0, "right": 1, None: 0}.get(
|
amine@131
|
537 use_channel, use_channel
|
amine@131
|
538 )
|
amine@131
|
539 _load_with_pydub(filename, audio_format, use_channel)
|
amine@131
|
540 self.assertTrue(open_func.called)
|
amine@131
|
541 if channels > 1:
|
amine@131
|
542 self.assertTrue(ext_mock.called)
|
amine@131
|
543 ext_mock.assert_called_with(
|
amine@131
|
544 segment_mock._data,
|
amine@131
|
545 segment_mock.channels,
|
amine@131
|
546 segment_mock.sample_width,
|
amine@131
|
547 use_channel,
|
amine@131
|
548 )
|
amine@131
|
549 else:
|
amine@131
|
550 self.assertFalse(ext_mock.called)
|
amine@131
|
551
|
amine@130
|
552 @genty_dataset(
|
amine@132
|
553 mono=("mono_400Hz.raw", (400,)),
|
amine@132
|
554 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
|
amine@132
|
555 )
|
amine@132
|
556 def test_save_raw(self, filename, frequencies):
|
amine@132
|
557 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@132
|
558 sample_width = 2
|
amine@132
|
559 fmt = DATA_FORMAT[sample_width]
|
amine@132
|
560 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@132
|
561 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@132
|
562 tmpfile = NamedTemporaryFile()
|
amine@136
|
563 _save_raw(data, tmpfile.name)
|
amine@132
|
564 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|
amine@132
|
565
|
amine@132
|
566 @genty_dataset(
|
amine@110
|
567 mono=("mono_400Hz.wav", (400,)),
|
amine@110
|
568 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
|
amine@110
|
569 )
|
amine@110
|
570 def test_save_wave(self, filename, frequencies):
|
amine@110
|
571 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@110
|
572 sampling_rate = 16000
|
amine@110
|
573 sample_width = 2
|
amine@110
|
574 channels = len(frequencies)
|
amine@110
|
575 fmt = DATA_FORMAT[sample_width]
|
amine@110
|
576 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@110
|
577 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@110
|
578 tmpfile = NamedTemporaryFile()
|
amine@136
|
579 _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels)
|
amine@110
|
580 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|
amine@132
|
581
|
amine@132
|
582 @genty_dataset(
|
amine@132
|
583 missing_sampling_rate=("sr",),
|
amine@132
|
584 missing_sample_width=("sw",),
|
amine@132
|
585 missing_channels=("ch",),
|
amine@132
|
586 )
|
amine@132
|
587 def test_save_wave_missing_audio_param(self, missing_param):
|
amine@132
|
588 with self.assertRaises(AudioParameterError):
|
amine@132
|
589 params = AUDIO_PARAMS_SHORT.copy()
|
amine@132
|
590 del params[missing_param]
|
amine@132
|
591 srate, swidth, channels, _ = _get_audio_parameters(params)
|
amine@136
|
592 _save_wave(b"\0\0", "audio", srate, swidth, channels)
|
amine@133
|
593
|
amine@141
|
594 def test_save_with_pydub(self):
|
amine@141
|
595 with patch("auditok.io.AudioSegment.export") as export:
|
amine@142
|
596 tmpdir = TemporaryDirectory()
|
amine@142
|
597 filename = os.path.join(tmpdir.name, "audio.ogg")
|
amine@142
|
598 _save_with_pydub(b"\0\0", filename, "ogg", 16000, 2, 1)
|
amine@141
|
599 self.assertTrue(export.called)
|
amine@142
|
600 tmpdir.cleanup()
|
amine@141
|
601
|
amine@133
|
602 @genty_dataset(
|
amine@133
|
603 raw_with_audio_format=("audio", "raw"),
|
amine@133
|
604 raw_with_extension=("audio.raw", None),
|
amine@133
|
605 raw_with_audio_format_and_extension=("audio.mp3", "raw"),
|
amine@133
|
606 raw_no_audio_format_nor_extension=("audio", None),
|
amine@133
|
607 )
|
amine@133
|
608 def test_to_file_raw(self, filename, audio_format):
|
amine@133
|
609 exp_filename = "tests/data/test_16KHZ_mono_400Hz.raw"
|
amine@133
|
610 tmpdir = TemporaryDirectory()
|
amine@133
|
611 filename = os.path.join(tmpdir.name, filename)
|
amine@133
|
612 data = _array_to_bytes(PURE_TONE_DICT[400])
|
amine@135
|
613 to_file(data, filename, audio_format=audio_format)
|
amine@133
|
614 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
|
amine@133
|
615 tmpdir.cleanup()
|
amine@134
|
616
|
amine@134
|
617 @genty_dataset(
|
amine@134
|
618 wav_with_audio_format=("audio", "wav"),
|
amine@134
|
619 wav_with_extension=("audio.wav", None),
|
amine@134
|
620 wav_with_audio_format_and_extension=("audio.mp3", "wav"),
|
amine@134
|
621 wave_with_audio_format=("audio", "wave"),
|
amine@134
|
622 wave_with_extension=("audio.wave", None),
|
amine@134
|
623 wave_with_audio_format_and_extension=("audio.mp3", "wave"),
|
amine@134
|
624 )
|
amine@135
|
625 def test_to_file_wave(self, filename, audio_format):
|
amine@134
|
626 exp_filename = "tests/data/test_16KHZ_mono_400Hz.wav"
|
amine@134
|
627 tmpdir = TemporaryDirectory()
|
amine@134
|
628 filename = os.path.join(tmpdir.name, filename)
|
amine@134
|
629 data = _array_to_bytes(PURE_TONE_DICT[400])
|
amine@135
|
630 to_file(
|
amine@135
|
631 data,
|
amine@135
|
632 filename,
|
amine@135
|
633 audio_format=audio_format,
|
amine@135
|
634 sampling_rate=16000,
|
amine@135
|
635 sample_width=2,
|
amine@135
|
636 channels=1,
|
amine@134
|
637 )
|
amine@134
|
638 self.assertTrue(filecmp.cmp(filename, exp_filename, shallow=False))
|
amine@134
|
639 tmpdir.cleanup()
|
amine@138
|
640
|
amine@138
|
641 @genty_dataset(
|
amine@138
|
642 missing_sampling_rate=("sr",),
|
amine@138
|
643 missing_sample_width=("sw",),
|
amine@138
|
644 missing_channels=("ch",),
|
amine@138
|
645 )
|
amine@138
|
646 def test_to_file_missing_audio_param(self, missing_param):
|
amine@138
|
647 params = AUDIO_PARAMS_SHORT.copy()
|
amine@138
|
648 del params[missing_param]
|
amine@138
|
649 with self.assertRaises(AudioParameterError):
|
amine@138
|
650 to_file(b"\0\0", "audio", audio_format="wav", **params)
|
amine@138
|
651 with self.assertRaises(AudioParameterError):
|
amine@138
|
652 to_file(b"\0\0", "audio", audio_format="mp3", **params)
|
amine@139
|
653
|
amine@139
|
654 def test_to_file_no_pydub(self):
|
amine@139
|
655 with patch("auditok.io._WITH_PYDUB", False):
|
amine@139
|
656 with self.assertRaises(AudioIOError):
|
amine@139
|
657 to_file("audio", b"", "mp3")
|
amine@140
|
658
|
amine@140
|
659 @patch("auditok.io._WITH_PYDUB", True)
|
amine@140
|
660 @genty_dataset(
|
amine@140
|
661 ogg_with_extension=("audio.ogg", None),
|
amine@140
|
662 ogg_with_audio_format=("audio", "ogg"),
|
amine@140
|
663 ogg_format_with_wrong_extension=("audio.wav", "ogg"),
|
amine@140
|
664 )
|
amine@140
|
665 def test_to_file_compressed(self, filename, audio_format, *mocks):
|
amine@140
|
666 with patch("auditok.io.AudioSegment.export") as export:
|
amine@142
|
667 tmpdir = TemporaryDirectory()
|
amine@142
|
668 filename = os.path.join(tmpdir.name, filename)
|
amine@140
|
669 to_file(b"\0\0", filename, audio_format, **AUDIO_PARAMS_SHORT)
|
amine@140
|
670 self.assertTrue(export.called)
|
amine@142
|
671 tmpdir.cleanup()
|