amine@106
|
1 import os
|
amine@106
|
2 import sys
|
amine@106
|
3 import math
|
amine@107
|
4 from array import array
|
amine@110
|
5 from tempfile import NamedTemporaryFile
|
amine@110
|
6 import filecmp
|
amine@108
|
7 from unittest import TestCase
|
amine@108
|
8 from genty import genty, genty_dataset
|
amine@110
|
9 from auditok.io import (
|
amine@121
|
10 AudioIOError,
|
amine@110
|
11 DATA_FORMAT,
|
amine@110
|
12 AudioParameterError,
|
amine@110
|
13 check_audio_data,
|
amine@116
|
14 _array_to_bytes,
|
amine@118
|
15 _mix_audio_channels,
|
amine@119
|
16 _extract_selected_channel,
|
amine@120
|
17 from_file,
|
amine@111
|
18 _save_raw,
|
amine@110
|
19 _save_wave,
|
amine@110
|
20 )
|
amine@106
|
21
|
amine@106
|
22
|
amine@106
|
23 if sys.version_info >= (3, 0):
|
amine@106
|
24 PYTHON_3 = True
|
amine@120
|
25 from unittest.mock import patch
|
amine@106
|
26 else:
|
amine@106
|
27 PYTHON_3 = False
|
amine@120
|
28 from mock import patch
|
amine@120
|
29
|
amine@120
|
30 AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1}
|
amine@106
|
31
|
amine@106
|
32
|
amine@106
|
33 def _sample_generator(*data_buffers):
|
amine@106
|
34 """
|
amine@106
|
35 Takes a list of many mono audio data buffers and makes a sample generator
|
amine@106
|
36 of interleaved audio samples, one sample from each channel. The resulting
|
amine@106
|
37 generator can be used to build a multichannel audio buffer.
|
amine@106
|
38 >>> gen = _sample_generator("abcd", "ABCD")
|
amine@106
|
39 >>> list(gen)
|
amine@106
|
40 ["a", "A", "b", "B", "c", "C", "d", "D"]
|
amine@106
|
41 """
|
amine@106
|
42 frame_gen = zip(*data_buffers)
|
amine@106
|
43 return (sample for frame in frame_gen for sample in frame)
|
amine@106
|
44
|
amine@106
|
45
|
amine@107
|
46 def _generate_pure_tone(
|
amine@107
|
47 frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4
|
amine@107
|
48 ):
|
amine@107
|
49 """
|
amine@107
|
50 Generates a pure tone with the given frequency.
|
amine@107
|
51 """
|
amine@107
|
52 assert frequency <= sampling_rate / 2
|
amine@107
|
53 max_value = (2 ** (sample_width * 8) // 2) - 1
|
amine@107
|
54 if volume > max_value:
|
amine@107
|
55 volume = max_value
|
amine@107
|
56 fmt = DATA_FORMAT[sample_width]
|
amine@107
|
57 total_samples = int(sampling_rate * duration_sec)
|
amine@107
|
58 step = frequency / sampling_rate
|
amine@107
|
59 two_pi_step = 2 * math.pi * step
|
amine@107
|
60 data = array(
|
amine@107
|
61 fmt,
|
amine@107
|
62 (
|
amine@107
|
63 int(math.sin(two_pi_step * i) * volume)
|
amine@107
|
64 for i in range(total_samples)
|
amine@107
|
65 ),
|
amine@107
|
66 )
|
amine@107
|
67 return data
|
amine@107
|
68
|
amine@107
|
69
|
amine@107
|
70 PURE_TONE_DICT = {
|
amine@107
|
71 freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600)
|
amine@107
|
72 }
|
amine@107
|
73 PURE_TONE_DICT.update(
|
amine@107
|
74 {
|
amine@107
|
75 freq: _generate_pure_tone(freq, 0.1, 16000, 2)
|
amine@107
|
76 for freq in (600, 1150, 2400, 7220)
|
amine@107
|
77 }
|
amine@107
|
78 )
|
amine@108
|
79
|
amine@108
|
80
|
amine@108
|
81 @genty
|
amine@108
|
82 class TestIO(TestCase):
|
amine@108
|
83 @genty_dataset(
|
amine@108
|
84 valid_mono=(b"\0" * 113, 1, 1),
|
amine@108
|
85 valid_stereo=(b"\0" * 160, 1, 2),
|
amine@108
|
86 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
|
amine@108
|
87 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
|
amine@108
|
88 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
|
amine@108
|
89 )
|
amine@108
|
90 def test_check_audio_data(self, data, sample_width, channels, valid=True):
|
amine@108
|
91
|
amine@108
|
92 if not valid:
|
amine@108
|
93 with self.assertRaises(AudioParameterError):
|
amine@108
|
94 check_audio_data(data, sample_width, channels)
|
amine@108
|
95 else:
|
amine@108
|
96 self.assertIsNone(check_audio_data(data, sample_width, channels))
|
amine@110
|
97
|
amine@110
|
98 @genty_dataset(
|
amine@118
|
99 mono_1byte=([400], 1),
|
amine@118
|
100 stereo_1byte=([400, 600], 1),
|
amine@118
|
101 three_channel_1byte=([400, 600, 2400], 1),
|
amine@118
|
102 mono_2byte=([400], 2),
|
amine@118
|
103 stereo_2byte=([400, 600], 2),
|
amine@118
|
104 three_channel_2byte=([400, 600, 1150], 2),
|
amine@118
|
105 mono_4byte=([400], 4),
|
amine@118
|
106 stereo_4byte=([400, 600], 4),
|
amine@118
|
107 four_channel_2byte=([400, 600, 1150, 7220], 4),
|
amine@118
|
108 )
|
amine@118
|
109 def test_mix_audio_channels(self, frequencies, sample_width):
|
amine@118
|
110 sampling_rate = 16000
|
amine@118
|
111 sample_width = 2
|
amine@118
|
112 channels = len(frequencies)
|
amine@118
|
113 mono_channels = [
|
amine@118
|
114 _generate_pure_tone(
|
amine@118
|
115 freq,
|
amine@118
|
116 duration_sec=0.1,
|
amine@118
|
117 sampling_rate=sampling_rate,
|
amine@118
|
118 sample_width=sample_width,
|
amine@118
|
119 )
|
amine@118
|
120 for freq in frequencies
|
amine@118
|
121 ]
|
amine@118
|
122 fmt = DATA_FORMAT[sample_width]
|
amine@118
|
123 expected = _array_to_bytes(
|
amine@118
|
124 array(
|
amine@118
|
125 fmt,
|
amine@118
|
126 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@118
|
127 )
|
amine@118
|
128 )
|
amine@118
|
129 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@118
|
130 mixed = _mix_audio_channels(data, channels, sample_width)
|
amine@118
|
131 self.assertEqual(mixed, expected)
|
amine@118
|
132
|
amine@118
|
133 @genty_dataset(
|
amine@119
|
134 mono_1byte=([400], 1, 0),
|
amine@119
|
135 stereo_1byte_2st_channel=([400, 600], 1, 1),
|
amine@119
|
136 mono_2byte=([400], 2, 0),
|
amine@119
|
137 stereo_2byte_1st_channel=([400, 600], 2, 0),
|
amine@119
|
138 stereo_2byte_2nd_channel=([400, 600], 2, 1),
|
amine@119
|
139 three_channel_2byte_last_negative_idx=([400, 600, 1150], 2, -1),
|
amine@119
|
140 three_channel_2byte_2nd_negative_idx=([400, 600, 1150], 2, -2),
|
amine@119
|
141 three_channel_2byte_1st_negative_idx=([400, 600, 1150], 2, -3),
|
amine@119
|
142 three_channel_4byte_1st=([400, 600, 1150], 4, 0),
|
amine@119
|
143 three_channel_4byte_last_negative_idx=([400, 600, 1150], 4, -1),
|
amine@119
|
144 )
|
amine@119
|
145 def test_extract_selected_channel(
|
amine@119
|
146 self, frequencies, sample_width, use_channel
|
amine@119
|
147 ):
|
amine@119
|
148
|
amine@119
|
149 mono_channels = [
|
amine@119
|
150 _generate_pure_tone(
|
amine@119
|
151 freq,
|
amine@119
|
152 duration_sec=0.1,
|
amine@119
|
153 sampling_rate=16000,
|
amine@119
|
154 sample_width=sample_width,
|
amine@119
|
155 )
|
amine@119
|
156 for freq in frequencies
|
amine@119
|
157 ]
|
amine@119
|
158 channels = len(frequencies)
|
amine@119
|
159 fmt = DATA_FORMAT[sample_width]
|
amine@119
|
160 expected = _array_to_bytes(mono_channels[use_channel])
|
amine@119
|
161 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@119
|
162 selected_channel = _extract_selected_channel(
|
amine@119
|
163 data, channels, sample_width, use_channel
|
amine@119
|
164 )
|
amine@119
|
165 self.assertEqual(selected_channel, expected)
|
amine@119
|
166
|
amine@119
|
167 @genty_dataset(
|
amine@120
|
168 raw_with_audio_format=(
|
amine@120
|
169 "audio",
|
amine@120
|
170 "raw",
|
amine@120
|
171 "_load_raw",
|
amine@120
|
172 AUDIO_PARAMS_SHORT,
|
amine@120
|
173 ),
|
amine@120
|
174 raw_with_extension=(
|
amine@120
|
175 "audio.raw",
|
amine@120
|
176 None,
|
amine@120
|
177 "_load_raw",
|
amine@120
|
178 AUDIO_PARAMS_SHORT,
|
amine@120
|
179 ),
|
amine@120
|
180 wave_with_audio_format=("audio", "wave", "_load_wave"),
|
amine@120
|
181 wav_with_audio_format=("audio", "wave", "_load_wave"),
|
amine@120
|
182 wav_with_extension=("audio.wav", None, "_load_wave"),
|
amine@120
|
183 format_and_extension_both_given=("audio.dat", "wav", "_load_wave"),
|
amine@120
|
184 format_and_extension_both_given_b=("audio.raw", "wave", "_load_wave"),
|
amine@120
|
185 no_format_nor_extension=("audio", None, "_load_with_pydub"),
|
amine@120
|
186 other_formats_ogg=("audio.ogg", None, "_load_with_pydub"),
|
amine@120
|
187 other_formats_webm=("audio", "webm", "_load_with_pydub"),
|
amine@120
|
188 )
|
amine@120
|
189 def test_from_file(
|
amine@120
|
190 self, filename, audio_format, funtion_name, kwargs=None
|
amine@120
|
191 ):
|
amine@120
|
192 funtion_name = "auditok.io." + funtion_name
|
amine@120
|
193 if kwargs is None:
|
amine@120
|
194 kwargs = {}
|
amine@120
|
195 with patch(funtion_name) as patch_function:
|
amine@120
|
196 from_file(filename, audio_format, **kwargs)
|
amine@120
|
197 self.assertTrue(patch_function.called)
|
amine@120
|
198
|
amine@120
|
199 @genty_dataset(
|
amine@111
|
200 mono=("mono_400Hz.raw", (400,)),
|
amine@111
|
201 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
|
amine@111
|
202 )
|
amine@111
|
203 def test_save_raw(self, filename, frequencies):
|
amine@111
|
204 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@111
|
205 sample_width = 2
|
amine@111
|
206 fmt = DATA_FORMAT[sample_width]
|
amine@111
|
207 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@111
|
208 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@111
|
209 tmpfile = NamedTemporaryFile()
|
amine@111
|
210 _save_raw(tmpfile.name, data)
|
amine@111
|
211 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|
amine@111
|
212
|
amine@121
|
213 def test_from_file_no_pydub(self):
|
amine@121
|
214 with patch("auditok.io._WITH_PYDUB", False):
|
amine@121
|
215 with self.assertRaises(AudioIOError):
|
amine@121
|
216 from_file("audio", "mp3")
|
amine@121
|
217
|
amine@111
|
218 @genty_dataset(
|
amine@122
|
219 raw_first_channel=("raw", 0, 400),
|
amine@122
|
220 raw_second_channel=("raw", 1, 800),
|
amine@122
|
221 raw_third_channel=("raw", 2, 1600),
|
amine@122
|
222 raw_left_channel=("raw", "left", 400),
|
amine@122
|
223 raw_right_channel=("raw", "right", 800),
|
amine@122
|
224 wav_first_channel=("wav", 0, 400),
|
amine@122
|
225 wav_second_channel=("wav", 1, 800),
|
amine@122
|
226 wav_third_channel=("wav", 2, 1600),
|
amine@122
|
227 wav_left_channel=("wav", "left", 400),
|
amine@122
|
228 wav_right_channel=("wav", "right", 800),
|
amine@122
|
229 )
|
amine@122
|
230 def test_from_file_multichannel_audio(
|
amine@122
|
231 self, audio_format, use_channel, frequency
|
amine@122
|
232 ):
|
amine@122
|
233 expected = PURE_TONE_DICT[frequency]
|
amine@122
|
234 filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format(
|
amine@122
|
235 audio_format
|
amine@122
|
236 )
|
amine@122
|
237 sample_width = 2
|
amine@122
|
238 audio_source = from_file(
|
amine@122
|
239 filename,
|
amine@122
|
240 sampling_rate=16000,
|
amine@122
|
241 sample_width=sample_width,
|
amine@122
|
242 channels=3,
|
amine@122
|
243 use_channel=use_channel,
|
amine@122
|
244 )
|
amine@122
|
245 fmt = DATA_FORMAT[sample_width]
|
amine@122
|
246 data = array(fmt, audio_source._buffer)
|
amine@122
|
247 self.assertEqual(data, expected)
|
amine@122
|
248
|
amine@122
|
249 @genty_dataset(
|
amine@110
|
250 mono=("mono_400Hz.wav", (400,)),
|
amine@110
|
251 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
|
amine@110
|
252 )
|
amine@110
|
253 def test_save_wave(self, filename, frequencies):
|
amine@110
|
254 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@110
|
255 sampling_rate = 16000
|
amine@110
|
256 sample_width = 2
|
amine@110
|
257 channels = len(frequencies)
|
amine@110
|
258 fmt = DATA_FORMAT[sample_width]
|
amine@110
|
259 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@110
|
260 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@110
|
261 tmpfile = NamedTemporaryFile()
|
amine@110
|
262 _save_wave(tmpfile.name, data, sampling_rate, sample_width, channels)
|
amine@110
|
263 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|