amine@337
|
1 import unittest
|
amine@317
|
2 from unittest import TestCase
|
amine@357
|
3 from unittest.mock import patch
|
amine@157
|
4 import math
|
amine@357
|
5 from array import array as array_
|
amine@317
|
6 from genty import genty, genty_dataset
|
amine@357
|
7 from auditok.util import (
|
amine@357
|
8 AudioEnergyValidator,
|
amine@357
|
9 make_duration_formatter,
|
amine@357
|
10 make_channel_selector,
|
amine@357
|
11 )
|
amine@357
|
12 from auditok import signal as signal_
|
amine@357
|
13 from auditok import signal_numpy
|
amine@357
|
14
|
amine@345
|
15 from auditok.exceptions import TimeFormatError
|
amine@157
|
16
|
amine@157
|
17
|
amine@157
|
18 def _sample_generator(*data_buffers):
|
amine@157
|
19 """
|
amine@157
|
20 Takes a list of many mono audio data buffers and makes a sample generator
|
amine@157
|
21 of interleaved audio samples, one sample from each channel. The resulting
|
amine@157
|
22 generator can be used to build a multichannel audio buffer.
|
amine@157
|
23 >>> gen = _sample_generator("abcd", "ABCD")
|
amine@157
|
24 >>> list(gen)
|
amine@357
|
25 ["a", "A", 1, 1, "c", "C", "d", "D"]
|
amine@157
|
26 """
|
amine@157
|
27 frame_gen = zip(*data_buffers)
|
amine@157
|
28 return (sample for frame in frame_gen for sample in frame)
|
amine@157
|
29
|
amine@157
|
30
|
amine@157
|
31 def _generate_pure_tone(
|
amine@157
|
32 frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4
|
amine@157
|
33 ):
|
amine@157
|
34 """
|
amine@157
|
35 Generates a pure tone with the given frequency.
|
amine@157
|
36 """
|
amine@157
|
37 assert frequency <= sampling_rate / 2
|
amine@157
|
38 max_value = (2 ** (sample_width * 8) // 2) - 1
|
amine@157
|
39 if volume > max_value:
|
amine@157
|
40 volume = max_value
|
amine@357
|
41 fmt = signal_.FORMAT[sample_width]
|
amine@157
|
42 total_samples = int(sampling_rate * duration_sec)
|
amine@157
|
43 step = frequency / sampling_rate
|
amine@157
|
44 two_pi_step = 2 * math.pi * step
|
amine@357
|
45 data = array_(
|
amine@157
|
46 fmt,
|
amine@157
|
47 (
|
amine@157
|
48 int(math.sin(two_pi_step * i) * volume)
|
amine@157
|
49 for i in range(total_samples)
|
amine@157
|
50 ),
|
amine@157
|
51 )
|
amine@157
|
52 return data
|
amine@157
|
53
|
amine@157
|
54
|
amine@157
|
55 PURE_TONE_DICT = {
|
amine@157
|
56 freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600)
|
amine@157
|
57 }
|
amine@157
|
58 PURE_TONE_DICT.update(
|
amine@157
|
59 {
|
amine@157
|
60 freq: _generate_pure_tone(freq, 0.1, 16000, 2)
|
amine@157
|
61 for freq in (600, 1150, 2400, 7220)
|
amine@157
|
62 }
|
amine@157
|
63 )
|
amine@317
|
64
|
amine@317
|
65
|
amine@317
|
66 @genty
|
amine@345
|
67 class TestFunctions(TestCase):
|
amine@357
|
68 def setUp(self):
|
amine@357
|
69 self.data = b"012345679ABC"
|
amine@357
|
70
|
amine@345
|
71 @genty_dataset(
|
amine@345
|
72 only_seconds=("%S", 5400, "5400.000"),
|
amine@345
|
73 only_millis=("%I", 5400, "5400000"),
|
amine@345
|
74 full=("%h:%m:%s.%i", 3725.365, "01:02:05.365"),
|
amine@345
|
75 full_zero_hours=("%h:%m:%s.%i", 1925.075, "00:32:05.075"),
|
amine@345
|
76 full_zero_minutes=("%h:%m:%s.%i", 3659.075, "01:00:59.075"),
|
amine@345
|
77 full_zero_seconds=("%h:%m:%s.%i", 3720.075, "01:02:00.075"),
|
amine@345
|
78 full_zero_millis=("%h:%m:%s.%i", 3725, "01:02:05.000"),
|
amine@345
|
79 duplicate_directive=(
|
amine@345
|
80 "%h %h:%m:%s.%i %s",
|
amine@345
|
81 3725.365,
|
amine@345
|
82 "01 01:02:05.365 05",
|
amine@345
|
83 ),
|
amine@345
|
84 no_millis=("%h:%m:%s", 3725, "01:02:05"),
|
amine@345
|
85 no_seconds=("%h:%m", 3725, "01:02"),
|
amine@345
|
86 no_minutes=("%h", 3725, "01"),
|
amine@345
|
87 no_hours=("%m:%s.%i", 3725, "02:05.000"),
|
amine@345
|
88 )
|
amine@345
|
89 def test_make_duration_formatter(self, fmt, duration, expected):
|
amine@345
|
90 formatter = make_duration_formatter(fmt)
|
amine@345
|
91 result = formatter(duration)
|
amine@345
|
92 self.assertEqual(result, expected)
|
amine@345
|
93
|
amine@345
|
94 @genty_dataset(
|
amine@345
|
95 duplicate_only_seconds=("%S %S",),
|
amine@345
|
96 duplicate_only_millis=("%I %I",),
|
amine@345
|
97 unknown_directive=("%x",),
|
amine@345
|
98 )
|
amine@345
|
99 def test_make_duration_formatter_error(self, fmt):
|
amine@345
|
100 with self.assertRaises(TimeFormatError):
|
amine@345
|
101 make_duration_formatter(fmt)
|
amine@345
|
102
|
amine@357
|
103 @genty_dataset(
|
amine@357
|
104 int8_1channel_select_0=(
|
amine@357
|
105 1,
|
amine@357
|
106 1,
|
amine@357
|
107 0,
|
amine@357
|
108 [48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67],
|
amine@357
|
109 ),
|
amine@357
|
110 int8_2channel_select_0=(1, 2, 0, [48, 50, 52, 54, 57, 66]),
|
amine@357
|
111 int8_3channel_select_0=(1, 3, 0, [48, 51, 54, 65]),
|
amine@357
|
112 int8_3channel_select_1=(1, 3, 1, [49, 52, 55, 66]),
|
amine@357
|
113 int8_3channel_select_2=(1, 3, 2, [50, 53, 57, 67]),
|
amine@357
|
114 int8_4channel_select_0=(1, 4, 0, [48, 52, 57]),
|
amine@357
|
115 int16_1channel_select_0=(
|
amine@357
|
116 2,
|
amine@357
|
117 1,
|
amine@357
|
118 0,
|
amine@357
|
119 [12592, 13106, 13620, 14134, 16697, 17218],
|
amine@357
|
120 ),
|
amine@357
|
121 int16_2channel_select_0=(2, 2, 0, [12592, 13620, 16697]),
|
amine@357
|
122 int16_2channel_select_1=(2, 2, 1, [13106, 14134, 17218]),
|
amine@357
|
123 int16_3channel_select_0=(2, 3, 0, [12592, 14134]),
|
amine@357
|
124 int16_3channel_select_1=(2, 3, 1, [13106, 16697]),
|
amine@357
|
125 int16_3channel_select_2=(2, 3, 2, [13620, 17218]),
|
amine@357
|
126 int32_1channel_select_0=(4, 1, 0, [858927408, 926299444, 1128415545],),
|
amine@357
|
127 int32_3channel_select_0=(4, 3, 0, [858927408]),
|
amine@357
|
128 int32_3channel_select_1=(4, 3, 1, [926299444]),
|
amine@357
|
129 int32_3channel_select_2=(4, 3, 2, [1128415545]),
|
amine@357
|
130 )
|
amine@357
|
131 def test_make_channel_selector_one_channel(
|
amine@357
|
132 self, sample_width, channels, selected, expected
|
amine@357
|
133 ):
|
amine@357
|
134
|
amine@357
|
135 # force using signal functions with standard python implementation
|
amine@357
|
136 with patch("auditok.util.signal", signal_):
|
amine@357
|
137 selector = make_channel_selector(sample_width, channels, selected)
|
amine@357
|
138 result = selector(self.data)
|
amine@357
|
139
|
amine@357
|
140 fmt = signal_.FORMAT[sample_width]
|
amine@357
|
141 expected = array_(fmt, expected)
|
amine@357
|
142 if channels == 1:
|
amine@357
|
143 expected = bytes(expected)
|
amine@357
|
144 self.assertEqual(result, expected)
|
amine@357
|
145
|
amine@357
|
146 # Use signal functions with numpy implementation
|
amine@357
|
147 with patch("auditok.util.signal", signal_numpy):
|
amine@357
|
148 selector = make_channel_selector(sample_width, channels, selected)
|
amine@360
|
149 result_numpy = selector(self.data)
|
amine@357
|
150
|
amine@357
|
151 expected = array_(fmt, expected)
|
amine@357
|
152 if channels == 1:
|
amine@357
|
153 expected = bytes(expected)
|
amine@360
|
154 self.assertEqual(result_numpy, expected)
|
amine@357
|
155 else:
|
amine@360
|
156 self.assertTrue(all(result_numpy == expected))
|
amine@360
|
157
|
amine@360
|
158 @genty_dataset(
|
amine@360
|
159 int8_2channel=(1, 2, "avg", [48, 50, 52, 54, 61, 66]),
|
amine@360
|
160 int8_4channel=(1, 4, "average", [50, 54, 64]),
|
amine@360
|
161 int16_1channel=(
|
amine@360
|
162 2,
|
amine@360
|
163 1,
|
amine@360
|
164 "mix",
|
amine@360
|
165 [12592, 13106, 13620, 14134, 16697, 17218],
|
amine@360
|
166 ),
|
amine@360
|
167 int16_2channel=(2, 2, "avg", [12849, 13877, 16957]),
|
amine@360
|
168 int32_3channel=(4, 3, "average", [971214132]),
|
amine@360
|
169 )
|
amine@360
|
170 def test_make_channel_selector_average(
|
amine@360
|
171 self, sample_width, channels, selected, expected
|
amine@360
|
172 ):
|
amine@360
|
173 # force using signal functions with standard python implementation
|
amine@360
|
174 with patch("auditok.util.signal", signal_):
|
amine@360
|
175 selector = make_channel_selector(sample_width, channels, selected)
|
amine@360
|
176 result = selector(self.data)
|
amine@360
|
177
|
amine@360
|
178 fmt = signal_.FORMAT[sample_width]
|
amine@360
|
179 expected = array_(fmt, expected)
|
amine@360
|
180 if channels == 1:
|
amine@360
|
181 expected = bytes(expected)
|
amine@360
|
182 self.assertEqual(result, expected)
|
amine@360
|
183
|
amine@360
|
184 # Use signal functions with numpy implementation
|
amine@360
|
185 with patch("auditok.util.signal", signal_numpy):
|
amine@360
|
186 selector = make_channel_selector(sample_width, channels, selected)
|
amine@360
|
187 result_numpy = selector(self.data)
|
amine@360
|
188
|
amine@360
|
189 if channels in (1, 2):
|
amine@360
|
190 self.assertEqual(result_numpy, expected)
|
amine@360
|
191 else:
|
amine@360
|
192 self.assertTrue(all(result_numpy == expected))
|
amine@360
|
193
|
amine@361
|
194 @genty_dataset(
|
amine@361
|
195 int8_1channel=(
|
amine@361
|
196 1,
|
amine@361
|
197 1,
|
amine@361
|
198 "any",
|
amine@361
|
199 [[48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67]],
|
amine@361
|
200 ),
|
amine@361
|
201 int8_2channel=(
|
amine@361
|
202 1,
|
amine@361
|
203 2,
|
amine@361
|
204 None,
|
amine@361
|
205 [[48, 50, 52, 54, 57, 66], [49, 51, 53, 55, 65, 67]],
|
amine@361
|
206 ),
|
amine@361
|
207 int8_4channel=(
|
amine@361
|
208 1,
|
amine@361
|
209 4,
|
amine@361
|
210 "any",
|
amine@361
|
211 [[48, 52, 57], [49, 53, 65], [50, 54, 66], [51, 55, 67]],
|
amine@361
|
212 ),
|
amine@361
|
213 int16_2channel=(
|
amine@361
|
214 2,
|
amine@361
|
215 2,
|
amine@361
|
216 None,
|
amine@361
|
217 [[12592, 13620, 16697], [13106, 14134, 17218]],
|
amine@361
|
218 ),
|
amine@361
|
219 int32_3channel=(4, 3, "any", [[858927408], [926299444], [1128415545]]),
|
amine@361
|
220 )
|
amine@361
|
221 def test_make_channel_selector_any(
|
amine@361
|
222 self, sample_width, channels, selected, expected
|
amine@361
|
223 ):
|
amine@361
|
224
|
amine@361
|
225 # force using signal functions with standard python implementation
|
amine@361
|
226 with patch("auditok.util.signal", signal_):
|
amine@361
|
227 selector = make_channel_selector(sample_width, channels, selected)
|
amine@361
|
228 result = selector(self.data)
|
amine@361
|
229
|
amine@361
|
230 fmt = signal_.FORMAT[sample_width]
|
amine@361
|
231 expected = [array_(fmt, exp) for exp in expected]
|
amine@361
|
232 if channels == 1:
|
amine@361
|
233 expected = bytes(expected[0])
|
amine@361
|
234 self.assertEqual(result, expected)
|
amine@361
|
235
|
amine@361
|
236 # Use signal functions with numpy implementation
|
amine@361
|
237 with patch("auditok.util.signal", signal_numpy):
|
amine@361
|
238 selector = make_channel_selector(sample_width, channels, selected)
|
amine@361
|
239 result_numpy = selector(self.data)
|
amine@361
|
240
|
amine@361
|
241 if channels == 1:
|
amine@361
|
242 self.assertEqual(result_numpy, expected)
|
amine@361
|
243 else:
|
amine@361
|
244 self.assertTrue((result_numpy == expected).all())
|
amine@357
|
245
|
amine@345
|
246
|
amine@345
|
247 @genty
|
amine@317
|
248 class TestAudioEnergyValidator(TestCase):
|
amine@317
|
249 @genty_dataset(
|
amine@317
|
250 mono_valid_uc_None=([350, 400], 1, None, True),
|
amine@317
|
251 mono_valid_uc_any=([350, 400], 1, "any", True),
|
amine@317
|
252 mono_valid_uc_0=([350, 400], 1, 0, True),
|
amine@317
|
253 mono_valid_uc_mix=([350, 400], 1, "mix", True),
|
amine@317
|
254 # previous cases are all the same since we have mono audio
|
amine@317
|
255 mono_invalid_uc_None=([300, 300], 1, None, False),
|
amine@317
|
256 stereo_valid_uc_None=([300, 400, 350, 300], 2, None, True),
|
amine@317
|
257 stereo_valid_uc_any=([300, 400, 350, 300], 2, "any", True),
|
amine@317
|
258 stereo_valid_uc_mix=([300, 400, 350, 300], 2, "mix", True),
|
amine@317
|
259 stereo_valid_uc_avg=([300, 400, 350, 300], 2, "avg", True),
|
amine@317
|
260 stereo_valid_uc_average=([300, 400, 300, 300], 2, "average", True),
|
amine@317
|
261 stereo_valid_uc_mix_with_null_channel=(
|
amine@317
|
262 [634, 0, 634, 0],
|
amine@317
|
263 2,
|
amine@317
|
264 "mix",
|
amine@317
|
265 True,
|
amine@317
|
266 ),
|
amine@317
|
267 stereo_valid_uc_0=([320, 100, 320, 100], 2, 0, True),
|
amine@317
|
268 stereo_valid_uc_1=([100, 320, 100, 320], 2, 1, True),
|
amine@317
|
269 stereo_invalid_uc_None=([280, 100, 280, 100], 2, None, False),
|
amine@317
|
270 stereo_invalid_uc_any=([280, 100, 280, 100], 2, "any", False),
|
amine@317
|
271 stereo_invalid_uc_mix=([400, 200, 400, 200], 2, "mix", False),
|
amine@317
|
272 stereo_invalid_uc_0=([300, 400, 300, 400], 2, 0, False),
|
amine@317
|
273 stereo_invalid_uc_1=([400, 300, 400, 300], 2, 1, False),
|
amine@317
|
274 zeros=([0, 0, 0, 0], 2, None, False),
|
amine@317
|
275 )
|
amine@317
|
276 def test_audio_energy_validator(
|
amine@317
|
277 self, data, channels, use_channel, expected
|
amine@317
|
278 ):
|
amine@317
|
279
|
amine@357
|
280 data = array_("h", data)
|
amine@317
|
281 sample_width = 2
|
amine@317
|
282 energy_threshold = 50
|
amine@317
|
283 validator = AudioEnergyValidator(
|
amine@317
|
284 energy_threshold, sample_width, channels, use_channel
|
amine@317
|
285 )
|
amine@317
|
286
|
amine@317
|
287 if expected:
|
amine@317
|
288 self.assertTrue(validator.is_valid(data))
|
amine@317
|
289 else:
|
amine@317
|
290 self.assertFalse(validator.is_valid(data))
|
amine@337
|
291
|
amine@337
|
292
|
amine@337
|
293 if __name__ == "__main__":
|
amine@337
|
294 unittest.main()
|