comparison tests/test_core.py @ 400:323d59b404a2

Use pytest instead of genty
author Amine Sehili <amine.sehili@gmail.com>
date Sat, 25 May 2024 21:54:13 +0200
parents 8d3e2b492c6f
children 996948ada980
comparison
equal deleted inserted replaced
399:08f893725d23 400:323d59b404a2
1 import os 1 import os
2 import math 2 import math
3 from random import random 3 from random import random
4 from tempfile import TemporaryDirectory 4 from tempfile import TemporaryDirectory
5 from array import array as array_ 5 from array import array as array_
6 import unittest 6 import pytest
7 from unittest import TestCase, mock 7 from unittest.mock import patch, Mock
8 from unittest.mock import patch
9 from genty import genty, genty_dataset
10 from auditok import load, split, AudioRegion, AudioParameterError 8 from auditok import load, split, AudioRegion, AudioParameterError
11 from auditok.core import ( 9 from auditok.core import (
12 _duration_to_nb_windows, 10 _duration_to_nb_windows,
13 _make_audio_region, 11 _make_audio_region,
14 _read_chunks_online, 12 _read_chunks_online,
15 _read_offline, 13 _read_offline,
16 ) 14 )
17 from auditok.util import AudioDataSource 15 from auditok.util import AudioDataSource
18 from auditok.io import get_audio_source 16 from auditok.io import get_audio_source
19
20 mock._magics.add("__round__")
21 17
22 18
23 def _make_random_length_regions( 19 def _make_random_length_regions(
24 byte_seq, sampling_rate, sample_width, channels 20 byte_seq, sampling_rate, sample_width, channels
25 ): 21 ):
30 region = AudioRegion(data, sampling_rate, sample_width, channels) 26 region = AudioRegion(data, sampling_rate, sample_width, channels)
31 regions.append(region) 27 regions.append(region)
32 return regions 28 return regions
33 29
34 30
35 @genty 31 @pytest.mark.parametrize(
36 class TestFunctions(TestCase): 32 "skip, max_read, channels",
37 @genty_dataset( 33 [
38 no_skip_read_all=(0, -1), 34 (0, -1, 1),
39 no_skip_read_all_stereo=(0, -1, 2), 35 (0, -1, 2),
40 skip_2_read_all=(2, -1), 36 (2, -1, 1),
41 skip_2_read_all_None=(2, None), 37 (2, None, 1),
42 skip_2_read_3=(2, 3), 38 (2, 3, 1),
43 skip_2_read_3_5_stereo=(2, 3.5, 2), 39 (2, 3.5, 2),
44 skip_2_4_read_3_5_stereo=(2.4, 3.5, 2), 40 (2.4, 3.5, 2),
45 ) 41 ],
46 def test_load(self, skip, max_read, channels=1): 42 ids=[
47 sampling_rate = 10 43 "no_skip_read_all",
48 sample_width = 2 44 "no_skip_read_all_stereo",
49 filename = "tests/data/test_split_10HZ_{}.raw" 45 "skip_2_read_all",
50 filename = filename.format("mono" if channels == 1 else "stereo") 46 "skip_2_read_all_None",
51 region = load( 47 "skip_2_read_3",
52 filename, 48 "skip_2_read_3_5_stereo",
53 skip=skip, 49 "skip_2_4_read_3_5_stereo",
54 max_read=max_read, 50 ],
55 sr=sampling_rate, 51 )
56 sw=sample_width, 52 def test_load(skip, max_read, channels):
57 ch=channels, 53 sampling_rate = 10
54 sample_width = 2
55 filename = "tests/data/test_split_10HZ_{}.raw"
56 filename = filename.format("mono" if channels == 1 else "stereo")
57 region = load(
58 filename,
59 skip=skip,
60 max_read=max_read,
61 sr=sampling_rate,
62 sw=sample_width,
63 ch=channels,
64 )
65 with open(filename, "rb") as fp:
66 fp.read(round(skip * sampling_rate * sample_width * channels))
67 if max_read is None or max_read < 0:
68 to_read = -1
69 else:
70 to_read = round(max_read * sampling_rate * sample_width * channels)
71 expected = fp.read(to_read)
72 assert bytes(region) == expected
73
74
75 @pytest.mark.parametrize(
76 "duration, analysis_window, round_fn, expected, kwargs",
77 [
78 (0, 1, None, 0, None),
79 (0.3, 0.1, round, 3, None),
80 (0.35, 0.1, math.ceil, 4, None),
81 (0.35, 0.1, math.floor, 3, None),
82 (0.05, 0.1, round, 0, None),
83 (0.05, 0.1, math.ceil, 1, None),
84 (0.3, 0.1, math.floor, 3, {"epsilon": 1e-6}),
85 (-0.5, 0.1, math.ceil, ValueError, None),
86 (0.5, -0.1, math.ceil, ValueError, None),
87 ],
88 ids=[
89 "zero_duration",
90 "multiple",
91 "not_multiple_ceil",
92 "not_multiple_floor",
93 "small_duration",
94 "small_duration_ceil",
95 "with_round_error",
96 "negative_duration",
97 "negative_analysis_window",
98 ],
99 )
100 def test_duration_to_nb_windows(
101 duration, analysis_window, round_fn, expected, kwargs
102 ):
103 if expected == ValueError:
104 with pytest.raises(ValueError):
105 _duration_to_nb_windows(duration, analysis_window, round_fn)
106 else:
107 if kwargs is None:
108 kwargs = {}
109 result = _duration_to_nb_windows(
110 duration, analysis_window, round_fn, **kwargs
58 ) 111 )
59 with open(filename, "rb") as fp: 112 assert result == expected
60 fp.read(round(skip * sampling_rate * sample_width * channels)) 113
61 if max_read is None or max_read < 0: 114
62 to_read = -1 115 @pytest.mark.parametrize(
63 else: 116 "channels, skip, max_read",
64 to_read = round( 117 [
65 max_read * sampling_rate * sample_width * channels 118 (1, 0, None),
66 ) 119 (1, 3, None),
67 expected = fp.read(to_read) 120 (1, 2, -1),
68 self.assertEqual(bytes(region), expected) 121 (1, 2, 3),
69 122 (2, 0, None),
70 @genty_dataset( 123 (2, 3, None),
71 zero_duration=(0, 1, None, 0), 124 (2, 2, -1),
72 multiple=(0.3, 0.1, round, 3), 125 (2, 2, 3),
73 not_multiple_ceil=(0.35, 0.1, math.ceil, 4), 126 ],
74 not_multiple_floor=(0.35, 0.1, math.floor, 3), 127 ids=[
75 small_duration=(0.05, 0.1, round, 0), 128 "mono_skip_0_max_read_None",
76 small_duration_ceil=(0.05, 0.1, math.ceil, 1), 129 "mono_skip_3_max_read_None",
77 with_round_error=(0.3, 0.1, math.floor, 3, {"epsilon": 1e-6}), 130 "mono_skip_2_max_read_negative",
78 negative_duration=(-0.5, 0.1, math.ceil, ValueError), 131 "mono_skip_2_max_read_3",
79 negative_analysis_window=(0.5, -0.1, math.ceil, ValueError), 132 "stereo_skip_0_max_read_None",
80 ) 133 "stereo_skip_3_max_read_None",
81 def test_duration_to_nb_windows( 134 "stereo_skip_2_max_read_negative",
82 self, duration, analysis_window, round_fn, expected, kwargs=None 135 "stereo_skip_2_max_read_3",
83 ): 136 ],
84 if expected == ValueError: 137 )
85 with self.assertRaises(expected): 138 def test_read_offline(channels, skip, max_read):
86 _duration_to_nb_windows(duration, analysis_window, round_fn) 139 sampling_rate = 10
87 else: 140 sample_width = 2
88 if kwargs is None: 141 mono_or_stereo = "mono" if channels == 1 else "stereo"
89 kwargs = {} 142 filename = "tests/data/test_split_10HZ_{}.raw".format(mono_or_stereo)
90 result = _duration_to_nb_windows( 143 with open(filename, "rb") as fp:
91 duration, analysis_window, round_fn, **kwargs 144 data = fp.read()
92 ) 145 onset = round(skip * sampling_rate * sample_width * channels)
93 self.assertEqual(result, expected) 146 if max_read in (-1, None):
94 147 offset = len(data) + 1
95 @genty_dataset( 148 else:
96 mono_skip_0_max_read_None=(1, 0, None), 149 offset = onset + round(
97 mono_skip_3_max_read_None=(1, 3, None), 150 max_read * sampling_rate * sample_width * channels
98 mono_skip_2_max_read_negative=(1, 2, -1),
99 mono_skip_2_max_read_3=(1, 2, 3),
100 stereo_skip_0_max_read_None=(2, 0, None),
101 stereo_skip_3_max_read_None=(2, 3, None),
102 stereo_skip_2_max_read_negative=(2, 2, -1),
103 stereo_skip_2_max_read_3=(2, 2, 3),
104 )
105 def test_read_offline(self, channels, skip, max_read=None):
106 sampling_rate = 10
107 sample_width = 2
108 mono_or_stereo = "mono" if channels == 1 else "stereo"
109 filename = "tests/data/test_split_10HZ_{}.raw".format(mono_or_stereo)
110 with open(filename, "rb") as fp:
111 data = fp.read()
112 onset = round(skip * sampling_rate * sample_width * channels)
113 if max_read in (-1, None):
114 offset = len(data) + 1
115 else:
116 offset = onset + round(
117 max_read * sampling_rate * sample_width * channels
118 )
119 expected_data = data[onset:offset]
120 read_data, *audio_params = _read_offline(
121 filename,
122 skip=skip,
123 max_read=max_read,
124 sr=sampling_rate,
125 sw=sample_width,
126 ch=channels,
127 ) 151 )
128 self.assertEqual(read_data, expected_data) 152 expected_data = data[onset:offset]
129 self.assertEqual( 153 read_data, *audio_params = _read_offline(
130 tuple(audio_params), (sampling_rate, sample_width, channels) 154 filename,
131 ) 155 skip=skip,
132 156 max_read=max_read,
133 157 sr=sampling_rate,
134 @genty 158 sw=sample_width,
135 class TestSplit(TestCase): 159 ch=channels,
136 @genty_dataset( 160 )
137 simple=( 161 assert read_data == expected_data
138 0.2, 162 assert tuple(audio_params) == (sampling_rate, sample_width, channels)
139 5, 163
140 0.2, 164
141 False, 165 @pytest.mark.parametrize(
142 False, 166 "min_dur, max_dur, max_silence, drop_trailing_silence, strict_min_dur, kwargs, expected",
143 {"eth": 50}, 167 [
144 [(2, 16), (17, 31), (34, 76)], 168 (0.2, 5, 0.2, False, False, {"eth": 50}, [(2, 16), (17, 31), (34, 76)]),
145 ), 169 (
146 short_max_dur=(
147 0.3, 170 0.3,
148 2, 171 2,
149 0.2, 172 0.2,
150 False, 173 False,
151 False, 174 False,
152 {"eth": 50}, 175 {"eth": 50},
153 [(2, 16), (17, 31), (34, 54), (54, 74), (74, 76)], 176 [(2, 16), (17, 31), (34, 54), (54, 74), (74, 76)],
154 ), 177 ),
155 long_min_dur=(3, 5, 0.2, False, False, {"eth": 50}, [(34, 76)]), 178 (3, 5, 0.2, False, False, {"eth": 50}, [(34, 76)]),
156 long_max_silence=(0.2, 80, 10, False, False, {"eth": 50}, [(2, 76)]), 179 (0.2, 80, 10, False, False, {"eth": 50}, [(2, 76)]),
157 zero_max_silence=( 180 (
158 0.2, 181 0.2,
159 5, 182 5,
160 0.0, 183 0.0,
161 False, 184 False,
162 False, 185 False,
163 {"eth": 50}, 186 {"eth": 50},
164 [(2, 14), (17, 24), (26, 29), (34, 76)], 187 [(2, 14), (17, 24), (26, 29), (34, 76)],
165 ), 188 ),
166 low_energy_threshold=( 189 (
167 0.2, 190 0.2,
168 5, 191 5,
169 0.2, 192 0.2,
170 False, 193 False,
171 False, 194 False,
172 {"energy_threshold": 40}, 195 {"energy_threshold": 40},
173 [(0, 50), (50, 76)], 196 [(0, 50), (50, 76)],
174 ), 197 ),
175 high_energy_threshold=( 198 (0.2, 5, 0.2, False, False, {"energy_threshold": 60}, []),
176 0.2, 199 (0.2, 10, 0.5, True, False, {"eth": 50}, [(2, 76)]),
177 5, 200 (0.2, 5, 0.2, True, False, {"eth": 50}, [(2, 14), (17, 29), (34, 76)]),
178 0.2, 201 (1.5, 5, 0.2, True, False, {"eth": 50}, [(34, 76)]),
179 False, 202 (
180 False,
181 {"energy_threshold": 60},
182 [],
183 ),
184 trim_leading_and_trailing_silence=(
185 0.2,
186 10, # use long max_dur
187 0.5, # and a max_silence longer than any inter-region silence
188 True,
189 False,
190 {"eth": 50},
191 [(2, 76)],
192 ),
193 drop_trailing_silence=(
194 0.2,
195 5,
196 0.2,
197 True,
198 False,
199 {"eth": 50},
200 [(2, 14), (17, 29), (34, 76)],
201 ),
202 drop_trailing_silence_2=(
203 1.5,
204 5,
205 0.2,
206 True,
207 False,
208 {"eth": 50},
209 [(34, 76)],
210 ),
211 strict_min_dur=(
212 0.3, 203 0.3,
213 2, 204 2,
214 0.2, 205 0.2,
215 False, 206 False,
216 True, 207 True,
217 {"eth": 50}, 208 {"eth": 50},
218 [(2, 16), (17, 31), (34, 54), (54, 74)], 209 [(2, 16), (17, 31), (34, 54), (54, 74)],
219 ), 210 ),
220 ) 211 ],
221 def test_split_params( 212 ids=[
222 self, 213 "simple",
214 "short_max_dur",
215 "long_min_dur",
216 "long_max_silence",
217 "zero_max_silence",
218 "low_energy_threshold",
219 "high_energy_threshold",
220 "trim_leading_and_trailing_silence",
221 "drop_trailing_silence",
222 "drop_trailing_silence_2",
223 "strict_min_dur",
224 ],
225 )
226 def test_split_params(
227 min_dur,
228 max_dur,
229 max_silence,
230 drop_trailing_silence,
231 strict_min_dur,
232 kwargs,
233 expected,
234 ):
235 with open("tests/data/test_split_10HZ_mono.raw", "rb") as fp:
236 data = fp.read()
237
238 regions = split(
239 data,
223 min_dur, 240 min_dur,
224 max_dur, 241 max_dur,
225 max_silence, 242 max_silence,
226 drop_trailing_silence, 243 drop_trailing_silence,
227 strict_min_dur, 244 strict_min_dur,
228 kwargs, 245 analysis_window=0.1,
229 expected, 246 sr=10,
230 ): 247 sw=2,
231 with open("tests/data/test_split_10HZ_mono.raw", "rb") as fp: 248 ch=1,
232 data = fp.read() 249 **kwargs
233 250 )
234 regions = split( 251
235 data, 252 region = AudioRegion(data, 10, 2, 1)
236 min_dur, 253 regions_ar = region.split(
237 max_dur, 254 min_dur,
238 max_silence, 255 max_dur,
239 drop_trailing_silence, 256 max_silence,
240 strict_min_dur, 257 drop_trailing_silence,
241 analysis_window=0.1, 258 strict_min_dur,
242 sr=10, 259 analysis_window=0.1,
243 sw=2, 260 **kwargs
261 )
262
263 regions = list(regions)
264 regions_ar = list(regions_ar)
265 err_msg = "Wrong number of regions after split, expected: "
266 err_msg += "{}, found: {}".format(len(expected), len(regions))
267 assert len(regions) == len(expected), err_msg
268 err_msg = "Wrong number of regions after AudioRegion.split, expected: "
269 err_msg += "{}, found: {}".format(len(expected), len(regions_ar))
270 assert len(regions_ar) == len(expected), err_msg
271
272 sample_width = 2
273 for reg, reg_ar, exp in zip(regions, regions_ar, expected):
274 onset, offset = exp
275 exp_data = data[onset * sample_width : offset * sample_width]
276 assert bytes(reg) == exp_data
277 assert reg == reg_ar
278
279
280 @pytest.mark.parametrize(
281 "channels, kwargs, expected",
282 [
283 (2, {}, [(2, 32), (34, 76)]),
284 (1, {"max_read": 5}, [(2, 16), (17, 31), (34, 50)]),
285 (1, {"mr": 5}, [(2, 16), (17, 31), (34, 50)]),
286 (1, {"eth": 50, "use_channel": 0}, [(2, 16), (17, 31), (34, 76)]),
287 (1, {"eth": 50, "uc": 1}, [(2, 16), (17, 31), (34, 76)]),
288 (1, {"eth": 50, "use_channel": None}, [(2, 16), (17, 31), (34, 76)]),
289 (2, {"eth": 50, "use_channel": 0}, [(2, 16), (17, 31), (34, 76)]),
290 (2, {"eth": 50}, [(2, 32), (34, 76)]),
291 (2, {"eth": 50, "use_channel": -2}, [(2, 16), (17, 31), (34, 76)]),
292 (2, {"eth": 50, "uc": 1}, [(10, 32), (36, 76)]),
293 (2, {"eth": 50, "uc": -1}, [(10, 32), (36, 76)]),
294 (1, {"eth": 50, "uc": "mix"}, [(2, 16), (17, 31), (34, 76)]),
295 (2, {"energy_threshold": 53.5, "use_channel": "mix"}, [(54, 76)]),
296 (2, {"eth": 52, "uc": "mix"}, [(17, 26), (54, 76)]),
297 (2, {"uc": "mix"}, [(10, 16), (17, 31), (36, 76)]),
298 ],
299 ids=[
300 "stereo_all_default",
301 "mono_max_read",
302 "mono_max_read_short_name",
303 "mono_use_channel_1",
304 "mono_uc_1",
305 "mono_use_channel_None",
306 "stereo_use_channel_1",
307 "stereo_use_channel_no_use_channel_given",
308 "stereo_use_channel_minus_2",
309 "stereo_uc_2",
310 "stereo_uc_minus_1",
311 "mono_uc_mix",
312 "stereo_use_channel_mix",
313 "stereo_uc_mix",
314 "stereo_uc_mix_default_eth",
315 ],
316 )
317 def test_split_kwargs(channels, kwargs, expected):
318
319 mono_or_stereo = "mono" if channels == 1 else "stereo"
320 filename = "tests/data/test_split_10HZ_{}.raw".format(mono_or_stereo)
321 with open(filename, "rb") as fp:
322 data = fp.read()
323
324 regions = split(
325 data,
326 min_dur=0.2,
327 max_dur=5,
328 max_silence=0.2,
329 drop_trailing_silence=False,
330 strict_min_dur=False,
331 analysis_window=0.1,
332 sr=10,
333 sw=2,
334 ch=channels,
335 **kwargs
336 )
337
338 region = AudioRegion(data, 10, 2, channels)
339 max_read = kwargs.get("max_read", kwargs.get("mr"))
340 if max_read is not None:
341 region = region.sec[:max_read]
342 kwargs.pop("max_read", None)
343 kwargs.pop("mr", None)
344
345 regions_ar = region.split(
346 min_dur=0.2,
347 max_dur=5,
348 max_silence=0.2,
349 drop_trailing_silence=False,
350 strict_min_dur=False,
351 analysis_window=0.1,
352 **kwargs
353 )
354
355 regions = list(regions)
356 regions_ar = list(regions_ar)
357 err_msg = "Wrong number of regions after split, expected: "
358 err_msg += "{}, found: {}".format(len(expected), len(regions))
359 assert len(regions) == len(expected), err_msg
360 err_msg = "Wrong number of regions after AudioRegion.split, expected: "
361 err_msg += "{}, found: {}".format(len(expected), len(regions_ar))
362 assert len(regions_ar) == len(expected), err_msg
363
364 sample_width = 2
365 sample_size_bytes = sample_width * channels
366 for reg, reg_ar, exp in zip(regions, regions_ar, expected):
367 onset, offset = exp
368 exp_data = data[onset * sample_size_bytes : offset * sample_size_bytes]
369 assert len(bytes(reg)) == len(exp_data)
370 assert reg == reg_ar
371
372
373 @pytest.mark.parametrize(
374 "min_dur, max_dur, max_silence, channels, kwargs, expected",
375 [
376 (0.2, 5, 0.2, 1, {"aw": 0.2}, [(2, 30), (34, 76)]),
377 (0.2, 5, 0.3, 1, {"aw": 0.2}, [(2, 30), (34, 76)]),
378 (0.2, 5, 0.4, 1, {"aw": 0.2}, [(2, 32), (34, 76)]),
379 (0.2, 5, 0, 1, {"aw": 0.2}, [(2, 14), (16, 24), (26, 28), (34, 76)]),
380 (0.2, 5, 0.2, 1, {"aw": 0.2}, [(2, 30), (34, 76)]),
381 (0.3, 5, 0, 1, {"aw": 0.3}, [(3, 12), (15, 24), (36, 76)]),
382 (0.3, 5, 0.3, 1, {"aw": 0.3}, [(3, 27), (36, 76)]),
383 (0.3, 5, 0.5, 1, {"aw": 0.3}, [(3, 27), (36, 76)]),
384 (0.3, 5, 0.6, 1, {"aw": 0.3}, [(3, 30), (36, 76)]),
385 (0.2, 5, 0, 1, {"aw": 0.4}, [(4, 12), (16, 24), (36, 76)]),
386 (0.2, 5, 0.3, 1, {"aw": 0.4}, [(4, 12), (16, 24), (36, 76)]),
387 (0.2, 5, 0.4, 1, {"aw": 0.4}, [(4, 28), (36, 76)]),
388 (0.2, 5, 0.2, 2, {"analysis_window": 0.2}, [(2, 32), (34, 76)]),
389 (
390 0.2,
391 5,
392 0.2,
393 2,
394 {"uc": None, "analysis_window": 0.2},
395 [(2, 32), (34, 76)],
396 ),
397 (
398 0.2,
399 5,
400 0.2,
401 2,
402 {"use_channel": None, "analysis_window": 0.3},
403 [(3, 30), (36, 76)],
404 ),
405 (
406 0.2,
407 5,
408 0.3,
409 2,
410 {"use_channel": "any", "analysis_window": 0.3},
411 [(3, 33), (36, 76)],
412 ),
413 (
414 0.2,
415 5,
416 0.2,
417 2,
418 {"use_channel": None, "analysis_window": 0.4},
419 [(4, 28), (36, 76)],
420 ),
421 (
422 0.2,
423 5,
424 0.4,
425 2,
426 {"use_channel": "any", "analysis_window": 0.4},
427 [(4, 32), (36, 76)],
428 ),
429 (
430 0.2,
431 5,
432 0.2,
433 2,
434 {"uc": 0, "analysis_window": 0.2},
435 [(2, 30), (34, 76)],
436 ),
437 (
438 0.2,
439 5,
440 0.2,
441 2,
442 {"uc": 1, "analysis_window": 0.2},
443 [(10, 32), (36, 76)],
444 ),
445 (
446 0.2,
447 5,
448 0,
449 2,
450 {"uc": "mix", "analysis_window": 0.1},
451 [(10, 14), (17, 24), (26, 29), (36, 76)],
452 ),
453 (
454 0.2,
455 5,
456 0.1,
457 2,
458 {"uc": "mix", "analysis_window": 0.1},
459 [(10, 15), (17, 25), (26, 30), (36, 76)],
460 ),
461 (
462 0.2,
463 5,
464 0.2,
465 2,
466 {"uc": "mix", "analysis_window": 0.1},
467 [(10, 16), (17, 31), (36, 76)],
468 ),
469 (
470 0.2,
471 5,
472 0.3,
473 2,
474 {"uc": "mix", "analysis_window": 0.1},
475 [(10, 32), (36, 76)],
476 ),
477 (
478 0.3,
479 5,
480 0,
481 2,
482 {"uc": "avg", "analysis_window": 0.2},
483 [(10, 14), (16, 24), (36, 76)],
484 ),
485 (
486 0.41,
487 5,
488 0,
489 2,
490 {"uc": "average", "analysis_window": 0.2},
491 [(16, 24), (36, 76)],
492 ),
493 (
494 0.2,
495 5,
496 0.1,
497 2,
498 {"uc": "mix", "analysis_window": 0.2},
499 [(10, 14), (16, 24), (26, 28), (36, 76)],
500 ),
501 (
502 0.2,
503 5,
504 0.2,
505 2,
506 {"uc": "mix", "analysis_window": 0.2},
507 [(10, 30), (36, 76)],
508 ),
509 (
510 0.2,
511 5,
512 0.4,
513 2,
514 {"uc": "mix", "analysis_window": 0.2},
515 [(10, 32), (36, 76)],
516 ),
517 (
518 0.2,
519 5,
520 0.5,
521 2,
522 {"uc": "mix", "analysis_window": 0.2},
523 [(10, 32), (36, 76)],
524 ),
525 (
526 0.2,
527 5,
528 0.6,
529 2,
530 {"uc": "mix", "analysis_window": 0.2},
531 [(10, 34), (36, 76)],
532 ),
533 (
534 0.2,
535 5,
536 0,
537 2,
538 {"uc": "mix", "analysis_window": 0.3},
539 [(9, 24), (27, 30), (36, 76)],
540 ),
541 (
542 0.4,
543 5,
544 0,
545 2,
546 {"uc": "mix", "analysis_window": 0.3},
547 [(9, 24), (36, 76)],
548 ),
549 (
550 0.2,
551 5,
552 0.6,
553 2,
554 {"uc": "mix", "analysis_window": 0.3},
555 [(9, 57), (57, 76)],
556 ),
557 (
558 0.2,
559 5.1,
560 0.6,
561 2,
562 {"uc": "mix", "analysis_window": 0.3},
563 [(9, 60), (60, 76)],
564 ),
565 (
566 0.2,
567 5.2,
568 0.6,
569 2,
570 {"uc": "mix", "analysis_window": 0.3},
571 [(9, 60), (60, 76)],
572 ),
573 (
574 0.2,
575 5.3,
576 0.6,
577 2,
578 {"uc": "mix", "analysis_window": 0.3},
579 [(9, 60), (60, 76)],
580 ),
581 (
582 0.2,
583 5.4,
584 0.6,
585 2,
586 {"uc": "mix", "analysis_window": 0.3},
587 [(9, 63), (63, 76)],
588 ),
589 (
590 0.2,
591 5,
592 0,
593 2,
594 {"uc": "mix", "analysis_window": 0.4},
595 [(16, 24), (36, 76)],
596 ),
597 (
598 0.2,
599 5,
600 0.3,
601 2,
602 {"uc": "mix", "analysis_window": 0.4},
603 [(16, 24), (36, 76)],
604 ),
605 (
606 0.2,
607 5,
608 0.4,
609 2,
610 {"uc": "mix", "analysis_window": 0.4},
611 [(16, 28), (36, 76)],
612 ),
613 ],
614 ids=[
615 "mono_aw_0_2_max_silence_0_2",
616 "mono_aw_0_2_max_silence_0_3",
617 "mono_aw_0_2_max_silence_0_4",
618 "mono_aw_0_2_max_silence_0",
619 "mono_aw_0_2",
620 "mono_aw_0_3_max_silence_0",
621 "mono_aw_0_3_max_silence_0_3",
622 "mono_aw_0_3_max_silence_0_5",
623 "mono_aw_0_3_max_silence_0_6",
624 "mono_aw_0_4_max_silence_0",
625 "mono_aw_0_4_max_silence_0_3",
626 "mono_aw_0_4_max_silence_0_4",
627 "stereo_uc_None_analysis_window_0_2",
628 "stereo_uc_any_analysis_window_0_2",
629 "stereo_use_channel_None_aw_0_3_max_silence_0_2",
630 "stereo_use_channel_any_aw_0_3_max_silence_0_3",
631 "stereo_use_channel_None_aw_0_4_max_silence_0_2",
632 "stereo_use_channel_any_aw_0_3_max_silence_0_4",
633 "stereo_uc_0_analysis_window_0_2",
634 "stereo_uc_1_analysis_window_0_2",
635 "stereo_uc_mix_aw_0_1_max_silence_0",
636 "stereo_uc_mix_aw_0_1_max_silence_0_1",
637 "stereo_uc_mix_aw_0_1_max_silence_0_2",
638 "stereo_uc_mix_aw_0_1_max_silence_0_3",
639 "stereo_uc_avg_aw_0_2_max_silence_0_min_dur_0_3",
640 "stereo_uc_average_aw_0_2_max_silence_0_min_dur_0_41",
641 "stereo_uc_mix_aw_0_2_max_silence_0_1",
642 "stereo_uc_mix_aw_0_2_max_silence_0_2",
643 "stereo_uc_mix_aw_0_2_max_silence_0_4",
644 "stereo_uc_mix_aw_0_2_max_silence_0_5",
645 "stereo_uc_mix_aw_0_2_max_silence_0_6",
646 "stereo_uc_mix_aw_0_3_max_silence_0",
647 "stereo_uc_mix_aw_0_3_max_silence_0_min_dur_0_3",
648 "stereo_uc_mix_aw_0_3_max_silence_0_6",
649 "stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_1",
650 "stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_2",
651 "stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_3",
652 "stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_4",
653 "stereo_uc_mix_aw_0_4_max_silence_0",
654 "stereo_uc_mix_aw_0_4_max_silence_0_3",
655 "stereo_uc_mix_aw_0_4_max_silence_0_4",
656 ],
657 )
658 def test_split_analysis_window(
659 min_dur, max_dur, max_silence, channels, kwargs, expected
660 ):
661
662 mono_or_stereo = "mono" if channels == 1 else "stereo"
663 filename = "tests/data/test_split_10HZ_{}.raw".format(mono_or_stereo)
664 with open(filename, "rb") as fp:
665 data = fp.read()
666
667 regions = split(
668 data,
669 min_dur=min_dur,
670 max_dur=max_dur,
671 max_silence=max_silence,
672 drop_trailing_silence=False,
673 strict_min_dur=False,
674 sr=10,
675 sw=2,
676 ch=channels,
677 eth=49.99,
678 **kwargs
679 )
680
681 region = AudioRegion(data, 10, 2, channels)
682 regions_ar = region.split(
683 min_dur=min_dur,
684 max_dur=max_dur,
685 max_silence=max_silence,
686 drop_trailing_silence=False,
687 strict_min_dur=False,
688 eth=49.99,
689 **kwargs
690 )
691
692 regions = list(regions)
693 regions_ar = list(regions_ar)
694 err_msg = "Wrong number of regions after split, expected: "
695 err_msg += "{}, found: {}".format(len(expected), len(regions))
696 assert len(regions) == len(expected), err_msg
697 err_msg = "Wrong number of regions after AudioRegion.split, expected: "
698 err_msg += "{}, found: {}".format(len(expected), len(regions_ar))
699 assert len(regions_ar) == len(expected), err_msg
700
701 sample_width = 2
702 sample_size_bytes = sample_width * channels
703 for reg, reg_ar, exp in zip(regions, regions_ar, expected):
704 onset, offset = exp
705 exp_data = data[onset * sample_size_bytes : offset * sample_size_bytes]
706 assert bytes(reg) == exp_data
707 assert reg == reg_ar
708
709
710 def test_split_custom_validator():
711 filename = "tests/data/test_split_10HZ_mono.raw"
712 with open(filename, "rb") as fp:
713 data = fp.read()
714
715 regions = split(
716 data,
717 min_dur=0.2,
718 max_dur=5,
719 max_silence=0.2,
720 drop_trailing_silence=False,
721 strict_min_dur=False,
722 sr=10,
723 sw=2,
724 ch=1,
725 analysis_window=0.1,
726 validator=lambda x: array_("h", x)[0] >= 320,
727 )
728
729 region = AudioRegion(data, 10, 2, 1)
730 regions_ar = region.split(
731 min_dur=0.2,
732 max_dur=5,
733 max_silence=0.2,
734 drop_trailing_silence=False,
735 strict_min_dur=False,
736 analysis_window=0.1,
737 validator=lambda x: array_("h", x)[0] >= 320,
738 )
739
740 expected = [(2, 16), (17, 31), (34, 76)]
741 regions = list(regions)
742 regions_ar = list(regions_ar)
743 err_msg = "Wrong number of regions after split, expected: "
744 err_msg += "{}, found: {}".format(len(expected), len(regions))
745 assert len(regions) == len(expected), err_msg
746 err_msg = "Wrong number of regions after AudioRegion.split, expected: "
747 err_msg += "{}, found: {}".format(len(expected), len(regions_ar))
748 assert len(regions_ar) == len(expected), err_msg
749
750 sample_size_bytes = 2
751 for reg, reg_ar, exp in zip(regions, regions_ar, expected):
752 onset, offset = exp
753 exp_data = data[onset * sample_size_bytes : offset * sample_size_bytes]
754 assert bytes(reg) == exp_data
755 assert reg == reg_ar
756
757
758 @pytest.mark.parametrize(
759 "input, kwargs",
760 [
761 (
762 "tests/data/test_split_10HZ_stereo.raw",
763 {"audio_format": "raw", "sr": 10, "sw": 2, "ch": 2},
764 ),
765 (
766 "tests/data/test_split_10HZ_stereo.raw",
767 {"fmt": "raw", "sr": 10, "sw": 2, "ch": 2},
768 ),
769 ("tests/data/test_split_10HZ_stereo.raw", {"sr": 10, "sw": 2, "ch": 2}),
770 (
771 "tests/data/test_split_10HZ_stereo.raw",
772 {"sampling_rate": 10, "sample_width": 2, "channels": 2},
773 ),
774 (
775 open("tests/data/test_split_10HZ_stereo.raw", "rb").read(),
776 {"sr": 10, "sw": 2, "ch": 2},
777 ),
778 (
779 AudioDataSource(
780 "tests/data/test_split_10HZ_stereo.raw",
781 sr=10,
782 sw=2,
783 ch=2,
784 block_dur=0.1,
785 ),
786 {},
787 ),
788 (
789 AudioRegion(
790 open("tests/data/test_split_10HZ_stereo.raw", "rb").read(),
791 10,
792 2,
793 2,
794 ),
795 {},
796 ),
797 (
798 get_audio_source(
799 "tests/data/test_split_10HZ_stereo.raw", sr=10, sw=2, ch=2
800 ),
801 {},
802 ),
803 ],
804 ids=[
805 "filename_audio_format",
806 "filename_audio_format_short_name",
807 "filename_no_audio_format",
808 "filename_no_long_audio_params",
809 "bytes_",
810 "audio_reader",
811 "audio_region",
812 "audio_source",
813 ],
814 )
815 def test_split_input_type(input, kwargs):
816
817 with open("tests/data/test_split_10HZ_stereo.raw", "rb") as fp:
818 data = fp.read()
819
820 regions = split(
821 input,
822 min_dur=0.2,
823 max_dur=5,
824 max_silence=0.2,
825 drop_trailing_silence=False,
826 strict_min_dur=False,
827 analysis_window=0.1,
828 **kwargs
829 )
830 regions = list(regions)
831 expected = [(2, 32), (34, 76)]
832 sample_width = 2
833 err_msg = "Wrong number of regions after split, expected: "
834 err_msg += "{}, found: {}".format(expected, regions)
835 assert len(regions) == len(expected), err_msg
836 for reg, exp in zip(regions, expected):
837 onset, offset = exp
838 exp_data = data[onset * sample_width * 2 : offset * sample_width * 2]
839 assert bytes(reg) == exp_data
840
841
842 @pytest.mark.parametrize(
843 "min_dur, max_dur, analysis_window",
844 [
845 (0.5, 0.4, 0.1),
846 (0.44, 0.49, 0.1),
847 ],
848 ids=[
849 "min_dur_greater_than_max_dur",
850 "durations_OK_but_wrong_number_of_analysis_windows",
851 ],
852 )
853 def test_split_wrong_min_max_dur(min_dur, max_dur, analysis_window):
854
855 with pytest.raises(ValueError) as val_err:
856 split(
857 b"0" * 16,
858 min_dur=min_dur,
859 max_dur=max_dur,
860 max_silence=0.2,
861 sr=16000,
862 sw=1,
244 ch=1, 863 ch=1,
245 **kwargs 864 analysis_window=analysis_window,
246 ) 865 )
247 866
248 region = AudioRegion(data, 10, 2, 1) 867 err_msg = "'min_dur' ({0} sec.) results in {1} analysis "
249 regions_ar = region.split( 868 err_msg += "window(s) ({1} == ceil({0} / {2})) which is "
250 min_dur, 869 err_msg += "higher than the number of analysis window(s) for "
251 max_dur, 870 err_msg += "'max_dur' ({3} == floor({4} / {2}))"
252 max_silence, 871
253 drop_trailing_silence, 872 err_msg = err_msg.format(
254 strict_min_dur, 873 min_dur,
255 analysis_window=0.1, 874 math.ceil(min_dur / analysis_window),
256 **kwargs 875 analysis_window,
876 math.floor(max_dur / analysis_window),
877 max_dur,
878 )
879 assert err_msg == str(val_err.value)
880
881
882 @pytest.mark.parametrize(
883 "max_silence, max_dur, analysis_window",
884 [
885 (0.5, 0.5, 0.1),
886 (0.5, 0.4, 0.1),
887 (0.44, 0.49, 0.1),
888 ],
889 ids=[
890 "max_silence_equals_max_dur",
891 "max_silence_greater_than_max_dur",
892 "durations_OK_but_wrong_number_of_analysis_windows",
893 ],
894 )
895 def test_split_wrong_max_silence_max_dur(max_silence, max_dur, analysis_window):
896
897 with pytest.raises(ValueError) as val_err:
898 split(
899 b"0" * 16,
900 min_dur=0.2,
901 max_dur=max_dur,
902 max_silence=max_silence,
903 sr=16000,
904 sw=1,
905 ch=1,
906 analysis_window=analysis_window,
257 ) 907 )
258 908
259 regions = list(regions) 909 err_msg = "'max_silence' ({0} sec.) results in {1} analysis "
260 regions_ar = list(regions_ar) 910 err_msg += "window(s) ({1} == floor({0} / {2})) which is "
261 err_msg = "Wrong number of regions after split, expected: " 911 err_msg += "higher or equal to the number of analysis window(s) for "
262 err_msg += "{}, found: {}".format(len(expected), len(regions)) 912 err_msg += "'max_dur' ({3} == floor({4} / {2}))"
263 self.assertEqual(len(regions), len(expected), err_msg) 913
264 err_msg = "Wrong number of regions after AudioRegion.split, expected: " 914 err_msg = err_msg.format(
265 err_msg += "{}, found: {}".format(len(expected), len(regions_ar)) 915 max_silence,
266 self.assertEqual(len(regions_ar), len(expected), err_msg) 916 math.floor(max_silence / analysis_window),
267 917 analysis_window,
268 sample_width = 2 918 math.floor(max_dur / analysis_window),
269 for reg, reg_ar, exp in zip(regions, regions_ar, expected): 919 max_dur,
270 onset, offset = exp 920 )
271 exp_data = data[onset * sample_width : offset * sample_width] 921 assert err_msg == str(val_err.value)
272 self.assertEqual(bytes(reg), exp_data) 922
273 self.assertEqual(reg, reg_ar) 923
274 924 @pytest.mark.parametrize(
275 @genty_dataset( 925 "wrong_param",
276 stereo_all_default=(2, {}, [(2, 32), (34, 76)]), 926 [
277 mono_max_read=(1, {"max_read": 5}, [(2, 16), (17, 31), (34, 50)]), 927 {"min_dur": -1},
278 mono_max_read_short_name=(1, {"mr": 5}, [(2, 16), (17, 31), (34, 50)]), 928 {"min_dur": 0},
279 mono_use_channel_1=( 929 {"max_dur": -1},
280 1, 930 {"max_dur": 0},
281 {"eth": 50, "use_channel": 0}, 931 {"max_silence": -1},
282 [(2, 16), (17, 31), (34, 76)], 932 {"analysis_window": 0},
283 ), 933 {"analysis_window": -1},
284 mono_uc_1=(1, {"eth": 50, "uc": 1}, [(2, 16), (17, 31), (34, 76)]), 934 ],
285 mono_use_channel_None=( 935 ids=[
286 1, 936 "negative_min_dur",
287 {"eth": 50, "use_channel": None}, 937 "zero_min_dur",
288 [(2, 16), (17, 31), (34, 76)], 938 "negative_max_dur",
289 ), 939 "zero_max_dur",
290 stereo_use_channel_1=( 940 "negative_max_silence",
291 2, 941 "zero_analysis_window",
292 {"eth": 50, "use_channel": 0}, 942 "negative_analysis_window",
293 [(2, 16), (17, 31), (34, 76)], 943 ],
294 ), 944 )
295 stereo_use_channel_no_use_channel_given=( 945 def test_split_negative_temporal_params(wrong_param):
296 2, 946
297 {"eth": 50}, 947 params = {
298 [(2, 32), (34, 76)], 948 "min_dur": 0.2,
299 ), 949 "max_dur": 0.5,
300 stereo_use_channel_minus_2=( 950 "max_silence": 0.1,
301 2, 951 "analysis_window": 0.1,
302 {"eth": 50, "use_channel": -2}, 952 }
303 [(2, 16), (17, 31), (34, 76)], 953 params.update(wrong_param)
304 ), 954 with pytest.raises(ValueError) as val_err:
305 stereo_uc_2=(2, {"eth": 50, "uc": 1}, [(10, 32), (36, 76)]), 955 split(None, **params)
306 stereo_uc_minus_1=(2, {"eth": 50, "uc": -1}, [(10, 32), (36, 76)]), 956
307 mono_uc_mix=( 957 name = set(wrong_param).pop()
308 1, 958 value = wrong_param[name]
309 {"eth": 50, "uc": "mix"}, 959 err_msg = "'{}' ({}) must be >{} 0".format(
310 [(2, 16), (17, 31), (34, 76)], 960 name, value, "=" if name == "max_silence" else ""
311 ), 961 )
312 stereo_use_channel_mix=( 962 assert err_msg == str(val_err.value)
313 2, 963
314 {"energy_threshold": 53.5, "use_channel": "mix"}, 964
315 [(54, 76)], 965 def test_split_too_small_analysis_window():
316 ), 966 with pytest.raises(ValueError) as val_err:
317 stereo_uc_mix=(2, {"eth": 52, "uc": "mix"}, [(17, 26), (54, 76)]), 967 split(b"", sr=10, sw=1, ch=1, analysis_window=0.09)
318 stereo_uc_mix_default_eth=( 968 err_msg = "Too small 'analysis_windows' (0.09) for sampling rate (10)."
319 2, 969 err_msg += " Analysis windows should at least be 1/10 to cover one "
320 {"uc": "mix"}, 970 err_msg += "single data sample"
321 [(10, 16), (17, 31), (36, 76)], 971 assert err_msg == str(val_err.value)
322 ), 972
323 ) 973
324 def test_split_kwargs(self, channels, kwargs, expected): 974 def test_split_and_plot():
325 975
326 mono_or_stereo = "mono" if channels == 1 else "stereo" 976 with open("tests/data/test_split_10HZ_mono.raw", "rb") as fp:
327 filename = "tests/data/test_split_10HZ_{}.raw".format(mono_or_stereo) 977 data = fp.read()
328 with open(filename, "rb") as fp: 978
329 data = fp.read() 979 region = AudioRegion(data, 10, 2, 1)
330 980 with patch("auditok.plotting.plot") as patch_fn:
331 regions = split( 981 regions = region.split_and_plot(
332 data,
333 min_dur=0.2, 982 min_dur=0.2,
334 max_dur=5, 983 max_dur=5,
335 max_silence=0.2, 984 max_silence=0.2,
336 drop_trailing_silence=False, 985 drop_trailing_silence=False,
337 strict_min_dur=False, 986 strict_min_dur=False,
338 analysis_window=0.1, 987 analysis_window=0.1,
339 sr=10, 988 sr=10,
340 sw=2, 989 sw=2,
341 ch=channels, 990 ch=1,
342 **kwargs 991 eth=50,
343 ) 992 )
344 993 assert patch_fn.called
345 region = AudioRegion(data, 10, 2, channels) 994 expected = [(2, 16), (17, 31), (34, 76)]
346 max_read = kwargs.get("max_read", kwargs.get("mr")) 995 sample_width = 2
347 if max_read is not None: 996 expected_regions = []
348 region = region.sec[:max_read] 997 for onset, offset in expected:
349 kwargs.pop("max_read", None) 998 onset *= sample_width
350 kwargs.pop("mr", None) 999 offset *= sample_width
351 1000 expected_regions.append(AudioRegion(data[onset:offset], 10, 2, 1))
352 regions_ar = region.split( 1001 assert regions == expected_regions
353 min_dur=0.2, 1002
354 max_dur=5, 1003
355 max_silence=0.2, 1004 def test_split_exception():
356 drop_trailing_silence=False, 1005 with open("tests/data/test_split_10HZ_mono.raw", "rb") as fp:
357 strict_min_dur=False, 1006 data = fp.read()
358 analysis_window=0.1,
359 **kwargs
360 )
361
362 regions = list(regions)
363 regions_ar = list(regions_ar)
364 err_msg = "Wrong number of regions after split, expected: "
365 err_msg += "{}, found: {}".format(len(expected), len(regions))
366 self.assertEqual(len(regions), len(expected), err_msg)
367 err_msg = "Wrong number of regions after AudioRegion.split, expected: "
368 err_msg += "{}, found: {}".format(len(expected), len(regions_ar))
369 self.assertEqual(len(regions_ar), len(expected), err_msg)
370
371 sample_width = 2
372 sample_size_bytes = sample_width * channels
373 for reg, reg_ar, exp in zip(regions, regions_ar, expected):
374 onset, offset = exp
375 exp_data = data[
376 onset * sample_size_bytes : offset * sample_size_bytes
377 ]
378 self.assertEqual(len(bytes(reg)), len(exp_data))
379 self.assertEqual(reg, reg_ar)
380
381 @genty_dataset(
382 mono_aw_0_2_max_silence_0_2=(
383 0.2,
384 5,
385 0.2,
386 1,
387 {"aw": 0.2},
388 [(2, 30), (34, 76)],
389 ),
390 mono_aw_0_2_max_silence_0_3=(
391 0.2,
392 5,
393 0.3,
394 1,
395 {"aw": 0.2},
396 [(2, 30), (34, 76)],
397 ),
398 mono_aw_0_2_max_silence_0_4=(
399 0.2,
400 5,
401 0.4,
402 1,
403 {"aw": 0.2},
404 [(2, 32), (34, 76)],
405 ),
406 mono_aw_0_2_max_silence_0=(
407 0.2,
408 5,
409 0,
410 1,
411 {"aw": 0.2},
412 [(2, 14), (16, 24), (26, 28), (34, 76)],
413 ),
414 mono_aw_0_2=(0.2, 5, 0.2, 1, {"aw": 0.2}, [(2, 30), (34, 76)]),
415 mono_aw_0_3_max_silence_0=(
416 0.3,
417 5,
418 0,
419 1,
420 {"aw": 0.3},
421 [(3, 12), (15, 24), (36, 76)],
422 ),
423 mono_aw_0_3_max_silence_0_3=(
424 0.3,
425 5,
426 0.3,
427 1,
428 {"aw": 0.3},
429 [(3, 27), (36, 76)],
430 ),
431 mono_aw_0_3_max_silence_0_5=(
432 0.3,
433 5,
434 0.5,
435 1,
436 {"aw": 0.3},
437 [(3, 27), (36, 76)],
438 ),
439 mono_aw_0_3_max_silence_0_6=(
440 0.3,
441 5,
442 0.6,
443 1,
444 {"aw": 0.3},
445 [(3, 30), (36, 76)],
446 ),
447 mono_aw_0_4_max_silence_0=(
448 0.2,
449 5,
450 0,
451 1,
452 {"aw": 0.4},
453 [(4, 12), (16, 24), (36, 76)],
454 ),
455 mono_aw_0_4_max_silence_0_3=(
456 0.2,
457 5,
458 0.3,
459 1,
460 {"aw": 0.4},
461 [(4, 12), (16, 24), (36, 76)],
462 ),
463 mono_aw_0_4_max_silence_0_4=(
464 0.2,
465 5,
466 0.4,
467 1,
468 {"aw": 0.4},
469 [(4, 28), (36, 76)],
470 ),
471 stereo_uc_None_analysis_window_0_2=(
472 0.2,
473 5,
474 0.2,
475 2,
476 {"analysis_window": 0.2},
477 [(2, 32), (34, 76)],
478 ),
479 stereo_uc_any_analysis_window_0_2=(
480 0.2,
481 5,
482 0.2,
483 2,
484 {"uc": None, "analysis_window": 0.2},
485 [(2, 32), (34, 76)],
486 ),
487 stereo_use_channel_None_aw_0_3_max_silence_0_2=(
488 0.2,
489 5,
490 0.2,
491 2,
492 {"use_channel": None, "analysis_window": 0.3},
493 [(3, 30), (36, 76)],
494 ),
495 stereo_use_channel_any_aw_0_3_max_silence_0_3=(
496 0.2,
497 5,
498 0.3,
499 2,
500 {"use_channel": "any", "analysis_window": 0.3},
501 [(3, 33), (36, 76)],
502 ),
503 stereo_use_channel_None_aw_0_4_max_silence_0_2=(
504 0.2,
505 5,
506 0.2,
507 2,
508 {"use_channel": None, "analysis_window": 0.4},
509 [(4, 28), (36, 76)],
510 ),
511 stereo_use_channel_any_aw_0_3_max_silence_0_4=(
512 0.2,
513 5,
514 0.4,
515 2,
516 {"use_channel": "any", "analysis_window": 0.4},
517 [(4, 32), (36, 76)],
518 ),
519 stereo_uc_0_analysis_window_0_2=(
520 0.2,
521 5,
522 0.2,
523 2,
524 {"uc": 0, "analysis_window": 0.2},
525 [(2, 30), (34, 76)],
526 ),
527 stereo_uc_1_analysis_window_0_2=(
528 0.2,
529 5,
530 0.2,
531 2,
532 {"uc": 1, "analysis_window": 0.2},
533 [(10, 32), (36, 76)],
534 ),
535 stereo_uc_mix_aw_0_1_max_silence_0=(
536 0.2,
537 5,
538 0,
539 2,
540 {"uc": "mix", "analysis_window": 0.1},
541 [(10, 14), (17, 24), (26, 29), (36, 76)],
542 ),
543 stereo_uc_mix_aw_0_1_max_silence_0_1=(
544 0.2,
545 5,
546 0.1,
547 2,
548 {"uc": "mix", "analysis_window": 0.1},
549 [(10, 15), (17, 25), (26, 30), (36, 76)],
550 ),
551 stereo_uc_mix_aw_0_1_max_silence_0_2=(
552 0.2,
553 5,
554 0.2,
555 2,
556 {"uc": "mix", "analysis_window": 0.1},
557 [(10, 16), (17, 31), (36, 76)],
558 ),
559 stereo_uc_mix_aw_0_1_max_silence_0_3=(
560 0.2,
561 5,
562 0.3,
563 2,
564 {"uc": "mix", "analysis_window": 0.1},
565 [(10, 32), (36, 76)],
566 ),
567 stereo_uc_avg_aw_0_2_max_silence_0_min_dur_0_3=(
568 0.3,
569 5,
570 0,
571 2,
572 {"uc": "avg", "analysis_window": 0.2},
573 [(10, 14), (16, 24), (36, 76)],
574 ),
575 stereo_uc_average_aw_0_2_max_silence_0_min_dur_0_41=(
576 0.41,
577 5,
578 0,
579 2,
580 {"uc": "average", "analysis_window": 0.2},
581 [(16, 24), (36, 76)],
582 ),
583 stereo_uc_mix_aw_0_2_max_silence_0_1=(
584 0.2,
585 5,
586 0.1,
587 2,
588 {"uc": "mix", "analysis_window": 0.2},
589 [(10, 14), (16, 24), (26, 28), (36, 76)],
590 ),
591 stereo_uc_mix_aw_0_2_max_silence_0_2=(
592 0.2,
593 5,
594 0.2,
595 2,
596 {"uc": "mix", "analysis_window": 0.2},
597 [(10, 30), (36, 76)],
598 ),
599 stereo_uc_mix_aw_0_2_max_silence_0_4=(
600 0.2,
601 5,
602 0.4,
603 2,
604 {"uc": "mix", "analysis_window": 0.2},
605 [(10, 32), (36, 76)],
606 ),
607 stereo_uc_mix_aw_0_2_max_silence_0_5=(
608 0.2,
609 5,
610 0.5,
611 2,
612 {"uc": "mix", "analysis_window": 0.2},
613 [(10, 32), (36, 76)],
614 ),
615 stereo_uc_mix_aw_0_2_max_silence_0_6=(
616 0.2,
617 5,
618 0.6,
619 2,
620 {"uc": "mix", "analysis_window": 0.2},
621 [(10, 34), (36, 76)],
622 ),
623 stereo_uc_mix_aw_0_3_max_silence_0=(
624 0.2,
625 5,
626 0,
627 2,
628 {"uc": "mix", "analysis_window": 0.3},
629 [(9, 24), (27, 30), (36, 76)],
630 ),
631 stereo_uc_mix_aw_0_3_max_silence_0_min_dur_0_3=(
632 0.4,
633 5,
634 0,
635 2,
636 {"uc": "mix", "analysis_window": 0.3},
637 [(9, 24), (36, 76)],
638 ),
639 stereo_uc_mix_aw_0_3_max_silence_0_6=(
640 0.2,
641 5,
642 0.6,
643 2,
644 {"uc": "mix", "analysis_window": 0.3},
645 [(9, 57), (57, 76)],
646 ),
647 stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_1=(
648 0.2,
649 5.1,
650 0.6,
651 2,
652 {"uc": "mix", "analysis_window": 0.3},
653 [(9, 60), (60, 76)],
654 ),
655 stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_2=(
656 0.2,
657 5.2,
658 0.6,
659 2,
660 {"uc": "mix", "analysis_window": 0.3},
661 [(9, 60), (60, 76)],
662 ),
663 stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_3=(
664 0.2,
665 5.3,
666 0.6,
667 2,
668 {"uc": "mix", "analysis_window": 0.3},
669 [(9, 60), (60, 76)],
670 ),
671 stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_4=(
672 0.2,
673 5.4,
674 0.6,
675 2,
676 {"uc": "mix", "analysis_window": 0.3},
677 [(9, 63), (63, 76)],
678 ),
679 stereo_uc_mix_aw_0_4_max_silence_0=(
680 0.2,
681 5,
682 0,
683 2,
684 {"uc": "mix", "analysis_window": 0.4},
685 [(16, 24), (36, 76)],
686 ),
687 stereo_uc_mix_aw_0_4_max_silence_0_3=(
688 0.2,
689 5,
690 0.3,
691 2,
692 {"uc": "mix", "analysis_window": 0.4},
693 [(16, 24), (36, 76)],
694 ),
695 stereo_uc_mix_aw_0_4_max_silence_0_4=(
696 0.2,
697 5,
698 0.4,
699 2,
700 {"uc": "mix", "analysis_window": 0.4},
701 [(16, 28), (36, 76)],
702 ),
703 )
704 def test_split_analysis_window(
705 self, min_dur, max_dur, max_silence, channels, kwargs, expected
706 ):
707
708 mono_or_stereo = "mono" if channels == 1 else "stereo"
709 filename = "tests/data/test_split_10HZ_{}.raw".format(mono_or_stereo)
710 with open(filename, "rb") as fp:
711 data = fp.read()
712
713 regions = split(
714 data,
715 min_dur=min_dur,
716 max_dur=max_dur,
717 max_silence=max_silence,
718 drop_trailing_silence=False,
719 strict_min_dur=False,
720 sr=10,
721 sw=2,
722 ch=channels,
723 eth=49.99,
724 **kwargs
725 )
726
727 region = AudioRegion(data, 10, 2, channels)
728 regions_ar = region.split(
729 min_dur=min_dur,
730 max_dur=max_dur,
731 max_silence=max_silence,
732 drop_trailing_silence=False,
733 strict_min_dur=False,
734 eth=49.99,
735 **kwargs
736 )
737
738 regions = list(regions)
739 regions_ar = list(regions_ar)
740 err_msg = "Wrong number of regions after split, expected: "
741 err_msg += "{}, found: {}".format(len(expected), len(regions))
742 self.assertEqual(len(regions), len(expected), err_msg)
743 err_msg = "Wrong number of regions after AudioRegion.split, expected: "
744 err_msg += "{}, found: {}".format(len(expected), len(regions_ar))
745 self.assertEqual(len(regions_ar), len(expected), err_msg)
746
747 sample_width = 2
748 sample_size_bytes = sample_width * channels
749 for reg, reg_ar, exp in zip(regions, regions_ar, expected):
750 onset, offset = exp
751 exp_data = data[
752 onset * sample_size_bytes : offset * sample_size_bytes
753 ]
754 self.assertEqual(bytes(reg), exp_data)
755 self.assertEqual(reg, reg_ar)
756
757 def test_split_custom_validator(self):
758 filename = "tests/data/test_split_10HZ_mono.raw"
759 with open(filename, "rb") as fp:
760 data = fp.read()
761
762 regions = split(
763 data,
764 min_dur=0.2,
765 max_dur=5,
766 max_silence=0.2,
767 drop_trailing_silence=False,
768 strict_min_dur=False,
769 sr=10,
770 sw=2,
771 ch=1,
772 analysis_window=0.1,
773 validator=lambda x: array_("h", x)[0] >= 320,
774 )
775
776 region = AudioRegion(data, 10, 2, 1) 1007 region = AudioRegion(data, 10, 2, 1)
777 regions_ar = region.split( 1008
778 min_dur=0.2, 1009 with pytest.raises(RuntimeWarning):
779 max_dur=5, 1010 # max_read is not accepted when calling AudioRegion.split
780 max_silence=0.2, 1011 region.split(max_read=2)
781 drop_trailing_silence=False, 1012
782 strict_min_dur=False, 1013
783 analysis_window=0.1, 1014 @pytest.mark.parametrize(
784 validator=lambda x: array_("h", x)[0] >= 320, 1015 "data, start, sampling_rate, sample_width, channels, expected_end, expected_duration_s, expected_duration_ms",
785 ) 1016 [
786 1017 (b"\0" * 8000, 0, 8000, 1, 1, 1, 1, 1000),
787 expected = [(2, 16), (17, 31), (34, 76)] 1018 (b"\0" * 7992, 0, 8000, 1, 1, 0.999, 0.999, 999),
788 regions = list(regions) 1019 (b"\0" * 7994, 0, 8000, 1, 1, 0.99925, 0.99925, 999),
789 regions_ar = list(regions_ar) 1020 (b"\0" * 7996, 0, 8000, 1, 1, 0.9995, 0.9995, 1000),
790 err_msg = "Wrong number of regions after split, expected: " 1021 (b"\0" * 7998, 0, 8000, 1, 1, 0.99975, 0.99975, 1000),
791 err_msg += "{}, found: {}".format(len(expected), len(regions)) 1022 (b"\0" * 8000 * 2, 0, 8000, 2, 1, 1, 1, 1000),
792 self.assertEqual(len(regions), len(expected), err_msg) 1023 (b"\0" * 8000 * 2, 0, 8000, 1, 2, 1, 1, 1000),
793 err_msg = "Wrong number of regions after AudioRegion.split, expected: " 1024 (b"\0" * 8000 * 5, 0, 8000, 1, 5, 1, 1, 1000),
794 err_msg += "{}, found: {}".format(len(expected), len(regions_ar)) 1025 (b"\0" * 8000 * 2 * 5, 0, 8000, 2, 5, 1, 1, 1000),
795 self.assertEqual(len(regions_ar), len(expected), err_msg) 1026 (b"\0" * 7992 * 2 * 5, 0, 8000, 2, 5, 0.999, 0.999, 999),
796 1027 (b"\0" * 7994 * 2 * 5, 0, 8000, 2, 5, 0.99925, 0.99925, 999),
797 sample_size_bytes = 2 1028 (b"\0" * 7996 * 2 * 5, 0, 8000, 2, 5, 0.9995, 0.9995, 1000),
798 for reg, reg_ar, exp in zip(regions, regions_ar, expected): 1029 (b"\0" * 7998 * 2 * 5, 0, 8000, 2, 5, 0.99975, 0.99975, 1000),
799 onset, offset = exp 1030 (b"\0" * int(8000 * 1.33), 2.7, 8000, 1, 1, 4.03, 1.33, 1330),
800 exp_data = data[ 1031 (b"\0" * int(8000 * 0.476), 11.568, 8000, 1, 1, 12.044, 0.476, 476),
801 onset * sample_size_bytes : offset * sample_size_bytes 1032 (
802 ]
803 self.assertEqual(bytes(reg), exp_data)
804 self.assertEqual(reg, reg_ar)
805
806 @genty_dataset(
807 filename_audio_format=(
808 "tests/data/test_split_10HZ_stereo.raw",
809 {"audio_format": "raw", "sr": 10, "sw": 2, "ch": 2},
810 ),
811 filename_audio_format_short_name=(
812 "tests/data/test_split_10HZ_stereo.raw",
813 {"fmt": "raw", "sr": 10, "sw": 2, "ch": 2},
814 ),
815 filename_no_audio_format=(
816 "tests/data/test_split_10HZ_stereo.raw",
817 {"sr": 10, "sw": 2, "ch": 2},
818 ),
819 filename_no_long_audio_params=(
820 "tests/data/test_split_10HZ_stereo.raw",
821 {"sampling_rate": 10, "sample_width": 2, "channels": 2},
822 ),
823 bytes_=(
824 open("tests/data/test_split_10HZ_stereo.raw", "rb").read(),
825 {"sr": 10, "sw": 2, "ch": 2},
826 ),
827 audio_reader=(
828 AudioDataSource(
829 "tests/data/test_split_10HZ_stereo.raw",
830 sr=10,
831 sw=2,
832 ch=2,
833 block_dur=0.1,
834 ),
835 {},
836 ),
837 audio_region=(
838 AudioRegion(
839 open("tests/data/test_split_10HZ_stereo.raw", "rb").read(),
840 10,
841 2,
842 2,
843 ),
844 {},
845 ),
846 audio_source=(
847 get_audio_source(
848 "tests/data/test_split_10HZ_stereo.raw", sr=10, sw=2, ch=2
849 ),
850 {},
851 ),
852 )
853 def test_split_input_type(self, input, kwargs):
854
855 with open("tests/data/test_split_10HZ_stereo.raw", "rb") as fp:
856 data = fp.read()
857
858 regions = split(
859 input,
860 min_dur=0.2,
861 max_dur=5,
862 max_silence=0.2,
863 drop_trailing_silence=False,
864 strict_min_dur=False,
865 analysis_window=0.1,
866 **kwargs
867 )
868 regions = list(regions)
869 expected = [(2, 32), (34, 76)]
870 sample_width = 2
871 err_msg = "Wrong number of regions after split, expected: "
872 err_msg += "{}, found: {}".format(expected, regions)
873 self.assertEqual(len(regions), len(expected), err_msg)
874 for reg, exp in zip(regions, expected):
875 onset, offset = exp
876 exp_data = data[
877 onset * sample_width * 2 : offset * sample_width * 2
878 ]
879 self.assertEqual(bytes(reg), exp_data)
880
881 @genty_dataset(
882 min_dur_greater_than_max_dur=(0.5, 0.4, 0.1),
883 durations_OK_but_wrong_number_of_analysis_windows=(0.44, 0.49, 0.1),
884 )
885 def test_split_wrong_min_max_dur(self, min_dur, max_dur, analysis_window):
886
887 with self.assertRaises(ValueError) as val_err:
888 split(
889 b"0" * 16,
890 min_dur=min_dur,
891 max_dur=max_dur,
892 max_silence=0.2,
893 sr=16000,
894 sw=1,
895 ch=1,
896 analysis_window=analysis_window,
897 )
898
899 err_msg = "'min_dur' ({0} sec.) results in {1} analysis "
900 err_msg += "window(s) ({1} == ceil({0} / {2})) which is "
901 err_msg += "higher than the number of analysis window(s) for "
902 err_msg += "'max_dur' ({3} == floor({4} / {2}))"
903
904 err_msg = err_msg.format(
905 min_dur,
906 math.ceil(min_dur / analysis_window),
907 analysis_window,
908 math.floor(max_dur / analysis_window),
909 max_dur,
910 )
911 self.assertEqual(err_msg, str(val_err.exception))
912
913 @genty_dataset(
914 max_silence_equals_max_dur=(0.5, 0.5, 0.1),
915 max_silence_greater_than_max_dur=(0.5, 0.4, 0.1),
916 durations_OK_but_wrong_number_of_analysis_windows=(0.44, 0.49, 0.1),
917 )
918 def test_split_wrong_max_silence_max_dur(
919 self, max_silence, max_dur, analysis_window
920 ):
921
922 with self.assertRaises(ValueError) as val_err:
923 split(
924 b"0" * 16,
925 min_dur=0.2,
926 max_dur=max_dur,
927 max_silence=max_silence,
928 sr=16000,
929 sw=1,
930 ch=1,
931 analysis_window=analysis_window,
932 )
933
934 err_msg = "'max_silence' ({0} sec.) results in {1} analysis "
935 err_msg += "window(s) ({1} == floor({0} / {2})) which is "
936 err_msg += "higher or equal to the number of analysis window(s) for "
937 err_msg += "'max_dur' ({3} == floor({4} / {2}))"
938
939 err_msg = err_msg.format(
940 max_silence,
941 math.floor(max_silence / analysis_window),
942 analysis_window,
943 math.floor(max_dur / analysis_window),
944 max_dur,
945 )
946 self.assertEqual(err_msg, str(val_err.exception))
947
948 @genty_dataset(
949 negative_min_dur=({"min_dur": -1},),
950 zero_min_dur=({"min_dur": 0},),
951 negative_max_dur=({"max_dur": -1},),
952 zero_max_dur=({"max_dur": 0},),
953 negative_max_silence=({"max_silence": -1},),
954 zero_analysis_window=({"analysis_window": 0},),
955 negative_analysis_window=({"analysis_window": -1},),
956 )
957 def test_split_negative_temporal_params(self, wrong_param):
958
959 params = {
960 "min_dur": 0.2,
961 "max_dur": 0.5,
962 "max_silence": 0.1,
963 "analysis_window": 0.1,
964 }
965 params.update(wrong_param)
966 with self.assertRaises(ValueError) as val_err:
967 split(None, **params)
968
969 name = set(wrong_param).pop()
970 value = wrong_param[name]
971 err_msg = "'{}' ({}) must be >{} 0".format(
972 name, value, "=" if name == "max_silence" else ""
973 )
974 self.assertEqual(err_msg, str(val_err.exception))
975
976 def test_split_too_small_analysis_window(self):
977 with self.assertRaises(ValueError) as val_err:
978 split(b"", sr=10, sw=1, ch=1, analysis_window=0.09)
979 err_msg = "Too small 'analysis_windows' (0.09) for sampling rate (10)."
980 err_msg += " Analysis windows should at least be 1/10 to cover one "
981 err_msg += "single data sample"
982 self.assertEqual(err_msg, str(val_err.exception))
983
984 def test_split_and_plot(self):
985
986 with open("tests/data/test_split_10HZ_mono.raw", "rb") as fp:
987 data = fp.read()
988
989 region = AudioRegion(data, 10, 2, 1)
990 with patch("auditok.plotting.plot") as patch_fn:
991 regions = region.split_and_plot(
992 min_dur=0.2,
993 max_dur=5,
994 max_silence=0.2,
995 drop_trailing_silence=False,
996 strict_min_dur=False,
997 analysis_window=0.1,
998 sr=10,
999 sw=2,
1000 ch=1,
1001 eth=50,
1002 )
1003 self.assertTrue(patch_fn.called)
1004 expected = [(2, 16), (17, 31), (34, 76)]
1005 sample_width = 2
1006 expected_regions = []
1007 for (onset, offset) in expected:
1008 onset *= sample_width
1009 offset *= sample_width
1010 expected_regions.append(AudioRegion(data[onset:offset], 10, 2, 1))
1011 self.assertEqual(regions, expected_regions)
1012
1013 def test_split_exception(self):
1014 with open("tests/data/test_split_10HZ_mono.raw", "rb") as fp:
1015 data = fp.read()
1016 region = AudioRegion(data, 10, 2, 1)
1017
1018 with self.assertRaises(RuntimeWarning):
1019 # max_read is not accepted when calling AudioRegion.split
1020 region.split(max_read=2)
1021
1022
1023 @genty
1024 class TestAudioRegion(TestCase):
1025 @genty_dataset(
1026 simple=(b"\0" * 8000, 0, 8000, 1, 1, 1, 1, 1000),
1027 one_ms_less_than_1_sec=(
1028 b"\0" * 7992,
1029 0,
1030 8000,
1031 1,
1032 1,
1033 0.999,
1034 0.999,
1035 999,
1036 ),
1037 tree_quarter_ms_less_than_1_sec=(
1038 b"\0" * 7994,
1039 0,
1040 8000,
1041 1,
1042 1,
1043 0.99925,
1044 0.99925,
1045 999,
1046 ),
1047 half_ms_less_than_1_sec=(
1048 b"\0" * 7996,
1049 0,
1050 8000,
1051 1,
1052 1,
1053 0.9995,
1054 0.9995,
1055 1000,
1056 ),
1057 quarter_ms_less_than_1_sec=(
1058 b"\0" * 7998,
1059 0,
1060 8000,
1061 1,
1062 1,
1063 0.99975,
1064 0.99975,
1065 1000,
1066 ),
1067 simple_sample_width_2=(b"\0" * 8000 * 2, 0, 8000, 2, 1, 1, 1, 1000),
1068 simple_stereo=(b"\0" * 8000 * 2, 0, 8000, 1, 2, 1, 1, 1000),
1069 simple_multichannel=(b"\0" * 8000 * 5, 0, 8000, 1, 5, 1, 1, 1000),
1070 simple_sample_width_2_multichannel=(
1071 b"\0" * 8000 * 2 * 5,
1072 0,
1073 8000,
1074 2,
1075 5,
1076 1,
1077 1,
1078 1000,
1079 ),
1080 one_ms_less_than_1s_sw_2_multichannel=(
1081 b"\0" * 7992 * 2 * 5,
1082 0,
1083 8000,
1084 2,
1085 5,
1086 0.999,
1087 0.999,
1088 999,
1089 ),
1090 tree_qrt_ms_lt_1_s_sw_2_multichannel=(
1091 b"\0" * 7994 * 2 * 5,
1092 0,
1093 8000,
1094 2,
1095 5,
1096 0.99925,
1097 0.99925,
1098 999,
1099 ),
1100 half_ms_lt_1s_sw_2_multichannel=(
1101 b"\0" * 7996 * 2 * 5,
1102 0,
1103 8000,
1104 2,
1105 5,
1106 0.9995,
1107 0.9995,
1108 1000,
1109 ),
1110 quarter_ms_lt_1s_sw_2_multichannel=(
1111 b"\0" * 7998 * 2 * 5,
1112 0,
1113 8000,
1114 2,
1115 5,
1116 0.99975,
1117 0.99975,
1118 1000,
1119 ),
1120 arbitrary_length_1=(
1121 b"\0" * int(8000 * 1.33),
1122 2.7,
1123 8000,
1124 1,
1125 1,
1126 4.03,
1127 1.33,
1128 1330,
1129 ),
1130 arbitrary_length_2=(
1131 b"\0" * int(8000 * 0.476),
1132 11.568,
1133 8000,
1134 1,
1135 1,
1136 12.044,
1137 0.476,
1138 476,
1139 ),
1140 arbitrary_length_sw_2_multichannel=(
1141 b"\0" * int(8000 * 1.711) * 2 * 3, 1033 b"\0" * int(8000 * 1.711) * 2 * 3,
1142 9.415, 1034 9.415,
1143 8000, 1035 8000,
1144 2, 1036 2,
1145 3, 1037 3,
1146 11.126, 1038 11.126,
1147 1.711, 1039 1.711,
1148 1711, 1040 1711,
1149 ), 1041 ),
1150 arbitrary_samplig_rate=( 1042 (
1151 b"\0" * int(3172 * 1.318), 1043 b"\0" * int(3172 * 1.318),
1152 17.236, 1044 17.236,
1153 3172, 1045 3172,
1154 1, 1046 1,
1155 1, 1047 1,
1156 17.236 + int(3172 * 1.318) / 3172, 1048 17.236 + int(3172 * 1.318) / 3172,
1157 int(3172 * 1.318) / 3172, 1049 int(3172 * 1.318) / 3172,
1158 1318, 1050 1318,
1159 ), 1051 ),
1160 arbitrary_sr_sw_2_multichannel=( 1052 (
1161 b"\0" * int(11317 * 0.716) * 2 * 3, 1053 b"\0" * int(11317 * 0.716) * 2 * 3,
1162 18.811, 1054 18.811,
1163 11317, 1055 11317,
1164 2, 1056 2,
1165 3, 1057 3,
1166 18.811 + int(11317 * 0.716) / 11317, 1058 18.811 + int(11317 * 0.716) / 11317,
1167 int(11317 * 0.716) / 11317, 1059 int(11317 * 0.716) / 11317,
1168 716, 1060 716,
1169 ), 1061 ),
1170 ) 1062 ],
1171 def test_creation( 1063 ids=[
1172 self, 1064 "simple",
1173 data, 1065 "one_ms_less_than_1_sec",
1174 start, 1066 "tree_quarter_ms_less_than_1_sec",
1175 sampling_rate, 1067 "half_ms_less_than_1_sec",
1176 sample_width, 1068 "quarter_ms_less_than_1_sec",
1177 channels, 1069 "simple_sample_width_2",
1178 expected_end, 1070 "simple_stereo",
1179 expected_duration_s, 1071 "simple_multichannel",
1180 expected_duration_ms, 1072 "simple_sample_width_2_multichannel",
1181 ): 1073 "one_ms_less_than_1s_sw_2_multichannel",
1182 meta = {"start": start, "end": expected_end} 1074 "tree_qrt_ms_lt_1_s_sw_2_multichannel",
1183 region = AudioRegion(data, sampling_rate, sample_width, channels, meta) 1075 "half_ms_lt_1s_sw_2_multichannel",
1184 self.assertEqual(region.sampling_rate, sampling_rate) 1076 "quarter_ms_lt_1s_sw_2_multichannel",
1185 self.assertEqual(region.sr, sampling_rate) 1077 "arbitrary_length_1",
1186 self.assertEqual(region.sample_width, sample_width) 1078 "arbitrary_length_2",
1187 self.assertEqual(region.sw, sample_width) 1079 "arbitrary_length_sw_2_multichannel",
1188 self.assertEqual(region.channels, channels) 1080 "arbitrary_samplig_rate",
1189 self.assertEqual(region.ch, channels) 1081 "arbitrary_sr_sw_2_multichannel",
1190 self.assertEqual(region.meta.start, start) 1082 ],
1191 self.assertEqual(region.meta.end, expected_end) 1083 )
1192 self.assertEqual(region.duration, expected_duration_s) 1084 def test_creation(
1193 self.assertEqual(len(region.ms), expected_duration_ms) 1085 data,
1194 self.assertEqual(bytes(region), data) 1086 start,
1195 1087 sampling_rate,
1196 def test_creation_invalid_data_exception(self): 1088 sample_width,
1197 with self.assertRaises(AudioParameterError) as audio_param_err: 1089 channels,
1198 _ = AudioRegion( 1090 expected_end,
1199 data=b"ABCDEFGHI", sampling_rate=8, sample_width=2, channels=1 1091 expected_duration_s,
1200 ) 1092 expected_duration_ms,
1201 self.assertEqual( 1093 ):
1202 "The length of audio data must be an integer " 1094 meta = {"start": start, "end": expected_end}
1203 "multiple of `sample_width * channels`", 1095 region = AudioRegion(data, sampling_rate, sample_width, channels, meta)
1204 str(audio_param_err.exception), 1096 assert region.sampling_rate == sampling_rate
1097 assert region.sr == sampling_rate
1098 assert region.sample_width == sample_width
1099 assert region.sw == sample_width
1100 assert region.channels == channels
1101 assert region.ch == channels
1102 assert region.meta.start == start
1103 assert region.meta.end == expected_end
1104 assert region.duration == expected_duration_s
1105 assert len(region.ms) == expected_duration_ms
1106 assert bytes(region) == data
1107
1108
1109 def test_creation_invalid_data_exception():
1110 with pytest.raises(AudioParameterError) as audio_param_err:
1111 _ = AudioRegion(
1112 data=b"ABCDEFGHI", sampling_rate=8, sample_width=2, channels=1
1205 ) 1113 )
1206 1114 assert str(audio_param_err.value) == (
1207 @genty_dataset( 1115 "The length of audio data must be an integer "
1208 no_skip_read_all=(0, -1), 1116 "multiple of `sample_width * channels`"
1209 no_skip_read_all_stereo=(0, -1, 2), 1117 )
1210 skip_2_read_all=(2, -1), 1118
1211 skip_2_read_all_None=(2, None), 1119
1212 skip_2_read_3=(2, 3), 1120 @pytest.mark.parametrize(
1213 skip_2_read_3_5_stereo=(2, 3.5, 2), 1121 "skip, max_read, channels",
1214 skip_2_4_read_3_5_stereo=(2.4, 3.5, 2), 1122 [
1215 ) 1123 (0, -1, 1),
1216 def test_load(self, skip, max_read, channels=1): 1124 (0, -1, 2),
1217 sampling_rate = 10 1125 (2, -1, 1),
1218 sample_width = 2 1126 (2, None, 1),
1219 filename = "tests/data/test_split_10HZ_{}.raw" 1127 (2, 3, 1),
1220 filename = filename.format("mono" if channels == 1 else "stereo") 1128 (2, 3.5, 2),
1221 region = AudioRegion.load( 1129 (2.4, 3.5, 2),
1222 filename, 1130 ],
1223 skip=skip, 1131 ids=[
1224 max_read=max_read, 1132 "no_skip_read_all",
1225 sr=sampling_rate, 1133 "no_skip_read_all_stereo",
1226 sw=sample_width, 1134 "skip_2_read_all",
1227 ch=channels, 1135 "skip_2_read_all_None",
1228 ) 1136 "skip_2_read_3",
1229 with open(filename, "rb") as fp: 1137 "skip_2_read_3_5_stereo",
1230 fp.read(round(skip * sampling_rate * sample_width * channels)) 1138 "skip_2_4_read_3_5_stereo",
1231 if max_read is None or max_read < 0: 1139 ],
1232 to_read = -1 1140 )
1233 else: 1141 def test_load_AudioRegion(skip, max_read, channels):
1234 to_read = round( 1142 sampling_rate = 10
1235 max_read * sampling_rate * sample_width * channels 1143 sample_width = 2
1236 ) 1144 filename = "tests/data/test_split_10HZ_{}.raw"
1237 expected = fp.read(to_read) 1145 filename = filename.format("mono" if channels == 1 else "stereo")
1238 self.assertEqual(bytes(region), expected) 1146 region = AudioRegion.load(
1239 1147 filename,
1240 def test_load_from_microphone(self): 1148 skip=skip,
1241 with patch("auditok.io.PyAudioSource") as patch_pyaudio_source: 1149 max_read=max_read,
1242 with patch("auditok.core.AudioReader.read") as patch_reader: 1150 sr=sampling_rate,
1243 patch_reader.return_value = None 1151 sw=sample_width,
1244 with patch( 1152 ch=channels,
1245 "auditok.core.AudioRegion.__init__" 1153 )
1246 ) as patch_AudioRegion: 1154 with open(filename, "rb") as fp:
1247 patch_AudioRegion.return_value = None 1155 fp.read(round(skip * sampling_rate * sample_width * channels))
1248 AudioRegion.load( 1156 if max_read is None or max_read < 0:
1249 None, skip=0, max_read=5, sr=16000, sw=2, ch=1 1157 to_read = -1
1250 ) 1158 else:
1251 self.assertTrue(patch_pyaudio_source.called) 1159 to_read = round(max_read * sampling_rate * sample_width * channels)
1252 self.assertTrue(patch_reader.called) 1160 expected = fp.read(to_read)
1253 self.assertTrue(patch_AudioRegion.called) 1161 assert bytes(region) == expected
1254 1162
1255 @genty_dataset(none=(None,), negative=(-1,)) 1163
1256 def test_load_from_microphone_without_max_read_exception(self, max_read): 1164 def test_load_from_microphone():
1257 with self.assertRaises(ValueError) as val_err: 1165 with patch("auditok.io.PyAudioSource") as patch_pyaudio_source:
1258 AudioRegion.load(None, max_read=max_read, sr=16000, sw=2, ch=1) 1166 with patch("auditok.core.AudioReader.read") as patch_reader:
1259 self.assertEqual( 1167 patch_reader.return_value = None
1260 "'max_read' should not be None when reading from microphone", 1168 with patch(
1261 str(val_err.exception), 1169 "auditok.core.AudioRegion.__init__"
1262 ) 1170 ) as patch_AudioRegion:
1263 1171 patch_AudioRegion.return_value = None
1264 def test_load_from_microphone_with_nonzero_skip_exception(self): 1172 AudioRegion.load(None, skip=0, max_read=5, sr=16000, sw=2, ch=1)
1265 with self.assertRaises(ValueError) as val_err: 1173 assert patch_pyaudio_source.called
1266 AudioRegion.load(None, skip=1, max_read=5, sr=16000, sw=2, ch=1) 1174 assert patch_reader.called
1267 self.assertEqual( 1175 assert patch_AudioRegion.called
1268 "'skip' should be 0 when reading from microphone", 1176
1269 str(val_err.exception), 1177
1270 ) 1178 @pytest.mark.parametrize(
1271 1179 "max_read",
1272 @genty_dataset( 1180 [
1273 simple=("output.wav", 1.230, "output.wav"), 1181 None,
1274 start=("output_{meta.start:g}.wav", 1.230, "output_1.23.wav"), 1182 -1,
1275 start_2=("output_{meta.start}.wav", 1.233712, "output_1.233712.wav"), 1183 ],
1276 start_3=("output_{meta.start:.2f}.wav", 1.2300001, "output_1.23.wav"), 1184 ids=[
1277 start_4=("output_{meta.start:.3f}.wav", 1.233712, "output_1.234.wav"), 1185 "none",
1278 start_5=( 1186 "negative",
1279 "output_{meta.start:.8f}.wav", 1187 ],
1280 1.233712, 1188 )
1281 "output_1.23371200.wav", 1189 def test_load_from_microphone_without_max_read_exception(max_read):
1282 ), 1190 with pytest.raises(ValueError) as val_err:
1283 start_end_duration=( 1191 AudioRegion.load(None, max_read=max_read, sr=16000, sw=2, ch=1)
1192 assert str(val_err.value) == (
1193 "'max_read' should not be None when reading from microphone"
1194 )
1195
1196
1197 def test_load_from_microphone_with_nonzero_skip_exception():
1198 with pytest.raises(ValueError) as val_err:
1199 AudioRegion.load(None, skip=1, max_read=5, sr=16000, sw=2, ch=1)
1200 assert str(val_err.value) == (
1201 "'skip' should be 0 when reading from microphone"
1202 )
1203
1204
1205 @pytest.mark.parametrize(
1206 "format, start, expected",
1207 [
1208 ("output.wav", 1.230, "output.wav"),
1209 ("output_{meta.start:g}.wav", 1.230, "output_1.23.wav"),
1210 ("output_{meta.start}.wav", 1.233712, "output_1.233712.wav"),
1211 ("output_{meta.start:.2f}.wav", 1.2300001, "output_1.23.wav"),
1212 ("output_{meta.start:.3f}.wav", 1.233712, "output_1.234.wav"),
1213 ("output_{meta.start:.8f}.wav", 1.233712, "output_1.23371200.wav"),
1214 (
1284 "output_{meta.start}_{meta.end}_{duration}.wav", 1215 "output_{meta.start}_{meta.end}_{duration}.wav",
1285 1.455, 1216 1.455,
1286 "output_1.455_2.455_1.0.wav", 1217 "output_1.455_2.455_1.0.wav",
1287 ), 1218 ),
1288 start_end_duration_2=( 1219 (
1289 "output_{meta.start}_{meta.end}_{duration}.wav", 1220 "output_{meta.start}_{meta.end}_{duration}.wav",
1290 1.455321, 1221 1.455321,
1291 "output_1.455321_2.455321_1.0.wav", 1222 "output_1.455321_2.455321_1.0.wav",
1292 ), 1223 ),
1293 ) 1224 ],
1294 def test_save(self, format, start, expected): 1225 ids=[
1295 with TemporaryDirectory() as tmpdir: 1226 "simple",
1296 region = AudioRegion(b"0" * 160, 160, 1, 1) 1227 "start",
1297 meta = {"start": start, "end": start + region.duration} 1228 "start_2",
1298 region.meta = meta 1229 "start_3",
1299 format = os.path.join(tmpdir, format) 1230 "start_4",
1300 filename = region.save(format)[len(tmpdir) + 1 :] 1231 "start_5",
1301 self.assertEqual(filename, expected) 1232 "start_end_duration",
1302 1233 "start_end_duration_2",
1303 def test_save_file_exists_exception(self): 1234 ],
1304 with TemporaryDirectory() as tmpdir: 1235 )
1305 filename = os.path.join(tmpdir, "output.wav") 1236 def test_save(format, start, expected):
1306 open(filename, "w").close() 1237 with TemporaryDirectory() as tmpdir:
1307 region = AudioRegion(b"0" * 160, 160, 1, 1) 1238 region = AudioRegion(b"0" * 160, 160, 1, 1)
1308 with self.assertRaises(FileExistsError): 1239 meta = {"start": start, "end": start + region.duration}
1309 region.save(filename, exists_ok=False) 1240 region.meta = meta
1310 1241 format = os.path.join(tmpdir, format)
1311 @genty_dataset( 1242 filename = region.save(format)[len(tmpdir) + 1 :]
1312 first_half=( 1243 assert filename == expected
1244
1245
1246 def test_save_file_exists_exception():
1247 with TemporaryDirectory() as tmpdir:
1248 filename = os.path.join(tmpdir, "output.wav")
1249 open(filename, "w").close()
1250 region = AudioRegion(b"0" * 160, 160, 1, 1)
1251 with pytest.raises(FileExistsError):
1252 region.save(filename, exists_ok=False)
1253
1254
1255 @pytest.mark.parametrize(
1256 "region, slice_, expected_data",
1257 [
1258 (
1313 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1259 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1314 slice(0, 500), 1260 slice(0, 500),
1315 b"a" * 80, 1261 b"a" * 80,
1316 ), 1262 ),
1317 second_half=( 1263 (
1318 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1264 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1319 slice(500, None), 1265 slice(500, None),
1320 b"b" * 80, 1266 b"b" * 80,
1321 ), 1267 ),
1322 second_half_negative=( 1268 (
1323 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1269 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1324 slice(-500, None), 1270 slice(-500, None),
1325 b"b" * 80, 1271 b"b" * 80,
1326 ), 1272 ),
1327 middle=( 1273 (
1328 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1274 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1329 slice(200, 750), 1275 slice(200, 750),
1330 b"a" * 48 + b"b" * 40, 1276 b"a" * 48 + b"b" * 40,
1331 ), 1277 ),
1332 middle_negative=( 1278 (
1333 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1279 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1334 slice(-800, -250), 1280 slice(-800, -250),
1335 b"a" * 48 + b"b" * 40, 1281 b"a" * 48 + b"b" * 40,
1336 ), 1282 ),
1337 middle_sw2=( 1283 (
1338 AudioRegion(b"a" * 160 + b"b" * 160, 160, 2, 1), 1284 AudioRegion(b"a" * 160 + b"b" * 160, 160, 2, 1),
1339 slice(200, 750), 1285 slice(200, 750),
1340 b"a" * 96 + b"b" * 80, 1286 b"a" * 96 + b"b" * 80,
1341 ), 1287 ),
1342 middle_ch2=( 1288 (
1343 AudioRegion(b"a" * 160 + b"b" * 160, 160, 1, 2), 1289 AudioRegion(b"a" * 160 + b"b" * 160, 160, 1, 2),
1344 slice(200, 750), 1290 slice(200, 750),
1345 b"a" * 96 + b"b" * 80, 1291 b"a" * 96 + b"b" * 80,
1346 ), 1292 ),
1347 middle_sw2_ch2=( 1293 (
1348 AudioRegion(b"a" * 320 + b"b" * 320, 160, 2, 2), 1294 AudioRegion(b"a" * 320 + b"b" * 320, 160, 2, 2),
1349 slice(200, 750), 1295 slice(200, 750),
1350 b"a" * 192 + b"b" * 160, 1296 b"a" * 192 + b"b" * 160,
1351 ), 1297 ),
1352 but_first_sample=( 1298 (
1353 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), 1299 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1),
1354 slice(1, None), 1300 slice(1, None),
1355 b"a" * (4000 - 8) + b"b" * 4000, 1301 b"a" * (4000 - 8) + b"b" * 4000,
1356 ), 1302 ),
1357 but_first_sample_negative=( 1303 (
1358 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), 1304 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1),
1359 slice(-999, None), 1305 slice(-999, None),
1360 b"a" * (4000 - 8) + b"b" * 4000, 1306 b"a" * (4000 - 8) + b"b" * 4000,
1361 ), 1307 ),
1362 but_last_sample=( 1308 (
1363 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), 1309 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1),
1364 slice(0, 999), 1310 slice(0, 999),
1365 b"a" * 4000 + b"b" * (4000 - 8), 1311 b"a" * 4000 + b"b" * (4000 - 8),
1366 ), 1312 ),
1367 but_last_sample_negative=( 1313 (
1368 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), 1314 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1),
1369 slice(0, -1), 1315 slice(0, -1),
1370 b"a" * 4000 + b"b" * (4000 - 8), 1316 b"a" * 4000 + b"b" * (4000 - 8),
1371 ), 1317 ),
1372 big_negative_start=( 1318 (AudioRegion(b"a" * 160, 160, 1, 1), slice(-5000, None), b"a" * 160),
1373 AudioRegion(b"a" * 160, 160, 1, 1), 1319 (AudioRegion(b"a" * 160, 160, 1, 1), slice(None, -1500), b""),
1374 slice(-5000, None), 1320 (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, 0), b""),
1375 b"a" * 160, 1321 (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(200, 100), b""),
1376 ), 1322 (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(2000, 3000), b""),
1377 big_negative_stop=( 1323 (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-100, -200), b""),
1378 AudioRegion(b"a" * 160, 160, 1, 1), 1324 (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, -2000), b""),
1379 slice(None, -1500), 1325 (
1380 b"",
1381 ),
1382 empty=(
1383 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1384 slice(0, 0),
1385 b"",
1386 ),
1387 empty_start_stop_reversed=(
1388 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1389 slice(200, 100),
1390 b"",
1391 ),
1392 empty_big_positive_start=(
1393 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1394 slice(2000, 3000),
1395 b"",
1396 ),
1397 empty_negative_reversed=(
1398 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1399 slice(-100, -200),
1400 b"",
1401 ),
1402 empty_big_negative_stop=(
1403 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1404 slice(0, -2000),
1405 b"",
1406 ),
1407 arbitrary_sampling_rate=(
1408 AudioRegion(b"a" * 124 + b"b" * 376, 1234, 1, 1), 1326 AudioRegion(b"a" * 124 + b"b" * 376, 1234, 1, 1),
1409 slice(100, 200), 1327 slice(100, 200),
1410 b"a" + b"b" * 123, 1328 b"a" + b"b" * 123,
1411 ), 1329 ),
1412 ) 1330 ],
1413 def test_region_temporal_slicing(self, region, slice_, expected_data): 1331 ids=[
1414 sub_region = region.millis[slice_] 1332 "first_half",
1415 self.assertEqual(bytes(sub_region), expected_data) 1333 "second_half",
1416 start_sec = slice_.start / 1000 if slice_.start is not None else None 1334 "second_half_negative",
1417 stop_sec = slice_.stop / 1000 if slice_.stop is not None else None 1335 "middle",
1418 sub_region = region.sec[start_sec:stop_sec] 1336 "middle_negative",
1419 self.assertEqual(bytes(sub_region), expected_data) 1337 "middle_sw2",
1420 1338 "middle_ch2",
1421 @genty_dataset( 1339 "middle_sw2_ch2",
1422 first_half=( 1340 "but_first_sample",
1341 "but_first_sample_negative",
1342 "but_last_sample",
1343 "but_last_sample_negative",
1344 "big_negative_start",
1345 "big_negative_stop",
1346 "empty",
1347 "empty_start_stop_reversed",
1348 "empty_big_positive_start",
1349 "empty_negative_reversed",
1350 "empty_big_negative_stop",
1351 "arbitrary_sampling_rate",
1352 ],
1353 )
1354 def test_region_temporal_slicing(region, slice_, expected_data):
1355 sub_region = region.millis[slice_]
1356 assert bytes(sub_region) == expected_data
1357 start_sec = slice_.start / 1000 if slice_.start is not None else None
1358 stop_sec = slice_.stop / 1000 if slice_.stop is not None else None
1359 sub_region = region.sec[start_sec:stop_sec]
1360 assert bytes(sub_region) == expected_data
1361
1362
1363 @pytest.mark.parametrize(
1364 "region, slice_, time_shift, expected_data",
1365 [
1366 (
1423 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1367 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1424 slice(0, 80), 1368 slice(0, 80),
1425 0, 1369 0,
1426 b"a" * 80, 1370 b"a" * 80,
1427 ), 1371 ),
1428 second_half=( 1372 (
1429 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1373 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1430 slice(80, None), 1374 slice(80, None),
1431 0.5, 1375 0.5,
1432 b"b" * 80, 1376 b"b" * 80,
1433 ), 1377 ),
1434 second_half_negative=( 1378 (
1435 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1379 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1436 slice(-80, None), 1380 slice(-80, None),
1437 0.5, 1381 0.5,
1438 b"b" * 80, 1382 b"b" * 80,
1439 ), 1383 ),
1440 middle=( 1384 (
1441 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1385 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1442 slice(160 // 5, 160 // 4 * 3), 1386 slice(160 // 5, 160 // 4 * 3),
1443 0.2, 1387 0.2,
1444 b"a" * 48 + b"b" * 40, 1388 b"a" * 48 + b"b" * 40,
1445 ), 1389 ),
1446 middle_negative=( 1390 (
1447 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1391 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1448 slice(-160 // 5 * 4, -160 // 4), 1392 slice(-160 // 5 * 4, -160 // 4),
1449 0.2, 1393 0.2,
1450 b"a" * 48 + b"b" * 40, 1394 b"a" * 48 + b"b" * 40,
1451 ), 1395 ),
1452 middle_sw2=( 1396 (
1453 AudioRegion(b"a" * 160 + b"b" * 160, 160, 2, 1), 1397 AudioRegion(b"a" * 160 + b"b" * 160, 160, 2, 1),
1454 slice(160 // 5, 160 // 4 * 3), 1398 slice(160 // 5, 160 // 4 * 3),
1455 0.2, 1399 0.2,
1456 b"a" * 96 + b"b" * 80, 1400 b"a" * 96 + b"b" * 80,
1457 ), 1401 ),
1458 middle_ch2=( 1402 (
1459 AudioRegion(b"a" * 160 + b"b" * 160, 160, 1, 2), 1403 AudioRegion(b"a" * 160 + b"b" * 160, 160, 1, 2),
1460 slice(160 // 5, 160 // 4 * 3), 1404 slice(160 // 5, 160 // 4 * 3),
1461 0.2, 1405 0.2,
1462 b"a" * 96 + b"b" * 80, 1406 b"a" * 96 + b"b" * 80,
1463 ), 1407 ),
1464 middle_sw2_ch2=( 1408 (
1465 AudioRegion(b"a" * 320 + b"b" * 320, 160, 2, 2), 1409 AudioRegion(b"a" * 320 + b"b" * 320, 160, 2, 2),
1466 slice(160 // 5, 160 // 4 * 3), 1410 slice(160 // 5, 160 // 4 * 3),
1467 0.2, 1411 0.2,
1468 b"a" * 192 + b"b" * 160, 1412 b"a" * 192 + b"b" * 160,
1469 ), 1413 ),
1470 but_first_sample=( 1414 (
1471 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), 1415 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1),
1472 slice(1, None), 1416 slice(1, None),
1473 1 / 8000, 1417 1 / 8000,
1474 b"a" * (4000 - 1) + b"b" * 4000, 1418 b"a" * (4000 - 1) + b"b" * 4000,
1475 ), 1419 ),
1476 but_first_sample_negative=( 1420 (
1477 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), 1421 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1),
1478 slice(-7999, None), 1422 slice(-7999, None),
1479 1 / 8000, 1423 1 / 8000,
1480 b"a" * (4000 - 1) + b"b" * 4000, 1424 b"a" * (4000 - 1) + b"b" * 4000,
1481 ), 1425 ),
1482 but_last_sample=( 1426 (
1483 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), 1427 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1),
1484 slice(0, 7999), 1428 slice(0, 7999),
1485 0, 1429 0,
1486 b"a" * 4000 + b"b" * (4000 - 1), 1430 b"a" * 4000 + b"b" * (4000 - 1),
1487 ), 1431 ),
1488 but_last_sample_negative=( 1432 (
1489 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), 1433 AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1),
1490 slice(0, -1), 1434 slice(0, -1),
1491 0, 1435 0,
1492 b"a" * 4000 + b"b" * (4000 - 1), 1436 b"a" * 4000 + b"b" * (4000 - 1),
1493 ), 1437 ),
1494 big_negative_start=( 1438 (AudioRegion(b"a" * 160, 160, 1, 1), slice(-1600, None), 0, b"a" * 160),
1495 AudioRegion(b"a" * 160, 160, 1, 1), 1439 (AudioRegion(b"a" * 160, 160, 1, 1), slice(None, -1600), 0, b""),
1496 slice(-1600, None), 1440 (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, 0), 0, b""),
1497 0, 1441 (
1498 b"a" * 160,
1499 ),
1500 big_negative_stop=(
1501 AudioRegion(b"a" * 160, 160, 1, 1),
1502 slice(None, -1600),
1503 0,
1504 b"",
1505 ),
1506 empty=(
1507 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1508 slice(0, 0),
1509 0,
1510 b"",
1511 ),
1512 empty_start_stop_reversed=(
1513 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1442 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1514 slice(80, 40), 1443 slice(80, 40),
1515 0.5, 1444 0.5,
1516 b"", 1445 b"",
1517 ), 1446 ),
1518 empty_big_positive_start=( 1447 (
1519 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1448 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1520 slice(1600, 3000), 1449 slice(1600, 3000),
1521 10, 1450 10,
1522 b"", 1451 b"",
1523 ), 1452 ),
1524 empty_negative_reversed=( 1453 (
1525 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1454 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1526 slice(-16, -32), 1455 slice(-16, -32),
1527 0.9, 1456 0.9,
1528 b"", 1457 b"",
1529 ), 1458 ),
1530 empty_big_negative_stop=( 1459 (
1531 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), 1460 AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1),
1532 slice(0, -2000), 1461 slice(0, -2000),
1533 0, 1462 0,
1534 b"", 1463 b"",
1535 ), 1464 ),
1536 arbitrary_sampling_rate=( 1465 (
1537 AudioRegion(b"a" * 124 + b"b" * 376, 1235, 1, 1), 1466 AudioRegion(b"a" * 124 + b"b" * 376, 1235, 1, 1),
1538 slice(100, 200), 1467 slice(100, 200),
1539 100 / 1235, 1468 100 / 1235,
1540 b"a" * 24 + b"b" * 76, 1469 b"a" * 24 + b"b" * 76,
1541 ), 1470 ),
1542 arbitrary_sampling_rate_middle_sw2_ch2=( 1471 (
1543 AudioRegion(b"a" * 124 + b"b" * 376, 1235, 2, 2), 1472 AudioRegion(b"a" * 124 + b"b" * 376, 1235, 2, 2),
1544 slice(25, 50), 1473 slice(25, 50),
1545 25 / 1235, 1474 25 / 1235,
1546 b"a" * 24 + b"b" * 76, 1475 b"a" * 24 + b"b" * 76,
1547 ), 1476 ),
1548 ) 1477 ],
1549 def test_region_sample_slicing( 1478 ids=[
1550 self, region, slice_, time_shift, expected_data 1479 "first_half",
1551 ): 1480 "second_half",
1552 sub_region = region[slice_] 1481 "second_half_negative",
1553 self.assertEqual(bytes(sub_region), expected_data) 1482 "middle",
1554 1483 "middle_negative",
1555 @genty_dataset( 1484 "middle_sw2",
1556 simple=(8000, 1, 1), 1485 "middle_ch2",
1557 stereo_sw_2=(8000, 2, 2), 1486 "middle_sw2_ch2",
1558 arbitrary_sr_multichannel=(5413, 2, 3), 1487 "but_first_sample",
1559 ) 1488 "but_first_sample_negative",
1560 def test_concatenation(self, sampling_rate, sample_width, channels): 1489 "but_last_sample",
1561 1490 "but_last_sample_negative",
1562 region_1, region_2 = _make_random_length_regions( 1491 "big_negative_start",
1563 [b"a", b"b"], sampling_rate, sample_width, channels 1492 "big_negative_stop",
1564 ) 1493 "empty",
1565 expected_duration = region_1.duration + region_2.duration 1494 "empty_start_stop_reversed",
1566 expected_data = bytes(region_1) + bytes(region_2) 1495 "empty_big_positive_start",
1567 concat_region = region_1 + region_2 1496 "empty_negative_reversed",
1568 self.assertAlmostEqual( 1497 "empty_big_negative_stop",
1569 concat_region.duration, expected_duration, places=6 1498 "arbitrary_sampling_rate",
1570 ) 1499 "arbitrary_sampling_rate_middle_sw2_ch2",
1571 self.assertEqual(bytes(concat_region), expected_data) 1500 ],
1572 1501 )
1573 @genty_dataset( 1502 def test_region_sample_slicing(region, slice_, time_shift, expected_data):
1574 simple=(8000, 1, 1), 1503 sub_region = region[slice_]
1575 stereo_sw_2=(8000, 2, 2), 1504 assert bytes(sub_region) == expected_data
1576 arbitrary_sr_multichannel=(5413, 2, 3), 1505
1577 ) 1506
1578 def test_concatenation_many(self, sampling_rate, sample_width, channels): 1507 @pytest.mark.parametrize(
1579 1508 "sampling_rate, sample_width, channels",
1580 regions = _make_random_length_regions( 1509 [
1581 [b"a", b"b", b"c"], sampling_rate, sample_width, channels 1510 (8000, 1, 1),
1582 ) 1511 (8000, 2, 2),
1583 expected_duration = sum(r.duration for r in regions) 1512 (5413, 2, 3),
1584 expected_data = b"".join(bytes(r) for r in regions) 1513 ],
1585 concat_region = sum(regions) 1514 ids=[
1586 1515 "simple",
1587 self.assertAlmostEqual( 1516 "stereo_sw_2",
1588 concat_region.duration, expected_duration, places=6 1517 "arbitrary_sr_multichannel",
1589 ) 1518 ],
1590 self.assertEqual(bytes(concat_region), expected_data) 1519 )
1591 1520 def test_concatenation(sampling_rate, sample_width, channels):
1592 def test_concatenation_different_sampling_rate_error(self): 1521
1593 1522 region_1, region_2 = _make_random_length_regions(
1594 region_1 = AudioRegion(b"a" * 100, 8000, 1, 1) 1523 [b"a", b"b"], sampling_rate, sample_width, channels
1595 region_2 = AudioRegion(b"b" * 100, 3000, 1, 1) 1524 )
1596 1525 expected_duration = region_1.duration + region_2.duration
1597 with self.assertRaises(ValueError) as val_err: 1526 expected_data = bytes(region_1) + bytes(region_2)
1598 region_1 + region_2 1527 concat_region = region_1 + region_2
1599 self.assertEqual( 1528 assert concat_region.duration == pytest.approx(expected_duration, abs=1e-6)
1600 "Can only concatenate AudioRegions of the same " 1529 assert bytes(concat_region) == expected_data
1601 "sampling rate (8000 != 3000)", 1530
1602 str(val_err.exception), 1531
1603 ) 1532 @pytest.mark.parametrize(
1604 1533 "sampling_rate, sample_width, channels",
1605 def test_concatenation_different_sample_width_error(self): 1534 [
1606 1535 (8000, 1, 1),
1607 region_1 = AudioRegion(b"a" * 100, 8000, 2, 1) 1536 (8000, 2, 2),
1608 region_2 = AudioRegion(b"b" * 100, 8000, 4, 1) 1537 (5413, 2, 3),
1609 1538 ],
1610 with self.assertRaises(ValueError) as val_err: 1539 ids=[
1611 region_1 + region_2 1540 "simple",
1612 self.assertEqual( 1541 "stereo_sw_2",
1613 "Can only concatenate AudioRegions of the same " 1542 "arbitrary_sr_multichannel",
1614 "sample width (2 != 4)", 1543 ],
1615 str(val_err.exception), 1544 )
1616 ) 1545 def test_concatenation_many(sampling_rate, sample_width, channels):
1617 1546
1618 def test_concatenation_different_number_of_channels_error(self): 1547 regions = _make_random_length_regions(
1619 1548 [b"a", b"b", b"c"], sampling_rate, sample_width, channels
1620 region_1 = AudioRegion(b"a" * 100, 8000, 1, 1) 1549 )
1621 region_2 = AudioRegion(b"b" * 100, 8000, 1, 2) 1550 expected_duration = sum(r.duration for r in regions)
1622 1551 expected_data = b"".join(bytes(r) for r in regions)
1623 with self.assertRaises(ValueError) as val_err: 1552 concat_region = sum(regions)
1624 region_1 + region_2 1553
1625 self.assertEqual( 1554 assert concat_region.duration == pytest.approx(expected_duration, abs=1e-6)
1626 "Can only concatenate AudioRegions of the same " 1555 assert bytes(concat_region) == expected_data
1627 "number of channels (1 != 2)", 1556
1628 str(val_err.exception), 1557
1629 ) 1558 def test_concatenation_different_sampling_rate_error():
1630 1559
1631 @genty_dataset( 1560 region_1 = AudioRegion(b"a" * 100, 8000, 1, 1)
1632 simple=(0.01, 0.03, 240, 30), 1561 region_2 = AudioRegion(b"b" * 100, 3000, 1, 1)
1633 rounded_len_floor=(0.00575, 0.01725, 138, 17), 1562
1634 rounded_len_ceil=(0.00625, 0.01875, 150, 19), 1563 with pytest.raises(ValueError) as val_err:
1635 ) 1564 region_1 + region_2
1636 def test_multiplication( 1565 assert str(val_err.value) == (
1637 self, duration, expected_duration, expected_len, expected_len_ms 1566 "Can only concatenate AudioRegions of the same "
1638 ): 1567 "sampling rate (8000 != 3000)"
1639 sw = 2 1568 )
1640 data = b"0" * int(duration * 8000 * sw) 1569
1641 region = AudioRegion(data, 8000, sw, 1) 1570
1642 m_region = 1 * region * 3 1571 def test_concatenation_different_sample_width_error():
1643 self.assertEqual(bytes(m_region), data * 3) 1572
1644 self.assertEqual(m_region.sr, 8000) 1573 region_1 = AudioRegion(b"a" * 100, 8000, 2, 1)
1645 self.assertEqual(m_region.sw, 2) 1574 region_2 = AudioRegion(b"b" * 100, 8000, 4, 1)
1646 self.assertEqual(m_region.ch, 1) 1575
1647 self.assertEqual(m_region.duration, expected_duration) 1576 with pytest.raises(ValueError) as val_err:
1648 self.assertEqual(len(m_region), expected_len) 1577 region_1 + region_2
1649 self.assertEqual(m_region.len, expected_len) 1578 assert str(val_err.value) == (
1650 self.assertEqual(m_region.s.len, expected_duration) 1579 "Can only concatenate AudioRegions of the same " "sample width (2 != 4)"
1651 self.assertEqual(len(m_region.ms), expected_len_ms) 1580 )
1652 self.assertEqual(m_region.ms.len, expected_len_ms) 1581
1653 1582
1654 @genty_dataset(_str=("x", "str"), _float=(1.4, "float")) 1583 def test_concatenation_different_number_of_channels_error():
1655 def test_multiplication_non_int(self, factor, _type): 1584
1656 with self.assertRaises(TypeError) as type_err: 1585 region_1 = AudioRegion(b"a" * 100, 8000, 1, 1)
1657 AudioRegion(b"0" * 80, 8000, 1, 1) * factor 1586 region_2 = AudioRegion(b"b" * 100, 8000, 1, 2)
1658 err_msg = "Can't multiply AudioRegion by a non-int of type '{}'" 1587
1659 self.assertEqual(err_msg.format(_type), str(type_err.exception)) 1588 with pytest.raises(ValueError) as val_err:
1660 1589 region_1 + region_2
1661 @genty_dataset( 1590 assert str(val_err.value) == (
1662 simple=([b"a" * 80, b"b" * 80],), 1591 "Can only concatenate AudioRegions of the same "
1663 extra_samples_1=([b"a" * 31, b"b" * 31, b"c" * 30],), 1592 "number of channels (1 != 2)"
1664 extra_samples_2=([b"a" * 31, b"b" * 30, b"c" * 30],), 1593 )
1665 extra_samples_3=([b"a" * 11, b"b" * 11, b"c" * 10, b"c" * 10],), 1594
1666 ) 1595
1667 def test_truediv(self, data): 1596 @pytest.mark.parametrize(
1668 1597 "duration, expected_duration, expected_len, expected_len_ms",
1669 region = AudioRegion(b"".join(data), 80, 1, 1) 1598 [
1670 1599 (0.01, 0.03, 240, 30),
1671 sub_regions = region / len(data) 1600 (0.00575, 0.01725, 138, 17),
1672 for data_i, region in zip(data, sub_regions): 1601 (0.00625, 0.01875, 150, 19),
1673 self.assertEqual(len(data_i), len(bytes(region))) 1602 ],
1674 1603 ids=[
1675 @genty_dataset( 1604 "simple",
1676 mono_sw_1=(b"a" * 10, 1, 1, "b", [97] * 10), 1605 "rounded_len_floor",
1677 mono_sw_2=(b"a" * 10, 2, 1, "h", [24929] * 5), 1606 "rounded_len_ceil",
1678 mono_sw_4=(b"a" * 8, 4, 1, "i", [1633771873] * 2), 1607 ],
1679 stereo_sw_1=(b"ab" * 5, 1, 2, "b", [[97] * 5, [98] * 5]), 1608 )
1680 ) 1609 def test_multiplication(
1681 def test_samples(self, data, sample_width, channels, fmt, expected): 1610 duration, expected_duration, expected_len, expected_len_ms
1682 1611 ):
1683 region = AudioRegion(data, 10, sample_width, channels) 1612 sw = 2
1684 if isinstance(expected[0], list): 1613 data = b"0" * int(duration * 8000 * sw)
1685 expected = [array_(fmt, exp) for exp in expected] 1614 region = AudioRegion(data, 8000, sw, 1)
1686 else: 1615 m_region = 1 * region * 3
1687 expected = array_(fmt, expected) 1616 assert bytes(m_region) == data * 3
1688 samples = region.samples 1617 assert m_region.sr == 8000
1689 equal = samples == expected 1618 assert m_region.sw == 2
1690 try: 1619 assert m_region.ch == 1
1691 # for numpy 1620 assert m_region.duration == expected_duration
1692 equal = equal.all() 1621 assert len(m_region) == expected_len
1693 except AttributeError: 1622 assert m_region.len == expected_len
1694 pass 1623 assert m_region.s.len == expected_duration
1695 self.assertTrue(equal) 1624 assert len(m_region.ms) == expected_len_ms
1696 1625 assert m_region.ms.len == expected_len_ms
1697 1626
1698 if __name__ == "__main__": 1627
1699 unittest.main() 1628 @pytest.mark.parametrize(
1629 "factor, _type",
1630 [
1631 ("x", "str"),
1632 (1.4, "float"),
1633 ],
1634 ids=[
1635 "_str",
1636 "_float",
1637 ],
1638 )
1639 def test_multiplication_non_int(factor, _type):
1640 with pytest.raises(TypeError) as type_err:
1641 AudioRegion(b"0" * 80, 8000, 1, 1) * factor
1642 err_msg = "Can't multiply AudioRegion by a non-int of type '{}'"
1643 assert err_msg.format(_type) == str(type_err.value)
1644
1645
1646 @pytest.mark.parametrize(
1647 "data",
1648 [
1649 [b"a" * 80, b"b" * 80],
1650 [b"a" * 31, b"b" * 31, b"c" * 30],
1651 [b"a" * 31, b"b" * 30, b"c" * 30],
1652 [b"a" * 11, b"b" * 11, b"c" * 10, b"c" * 10],
1653 ],
1654 ids=[
1655 "simple",
1656 "extra_samples_1",
1657 "extra_samples_2",
1658 "extra_samples_3",
1659 ],
1660 )
1661 def test_truediv(data):
1662
1663 region = AudioRegion(b"".join(data), 80, 1, 1)
1664
1665 sub_regions = region / len(data)
1666 for data_i, region in zip(data, sub_regions):
1667 assert len(data_i) == len(bytes(region))
1668
1669
1670 @pytest.mark.parametrize(
1671 "data, sample_width, channels, fmt, expected",
1672 [
1673 (b"a" * 10, 1, 1, "b", [97] * 10),
1674 (b"a" * 10, 2, 1, "h", [24929] * 5),
1675 (b"a" * 8, 4, 1, "i", [1633771873] * 2),
1676 (b"ab" * 5, 1, 2, "b", [[97] * 5, [98] * 5]),
1677 ],
1678 ids=[
1679 "mono_sw_1",
1680 "mono_sw_2",
1681 "mono_sw_4",
1682 "stereo_sw_1",
1683 ],
1684 )
1685 def test_samples(data, sample_width, channels, fmt, expected):
1686
1687 region = AudioRegion(data, 10, sample_width, channels)
1688 if isinstance(expected[0], list):
1689 expected = [array_(fmt, exp) for exp in expected]
1690 else:
1691 expected = array_(fmt, expected)
1692 samples = region.samples
1693 equal = samples == expected
1694 try:
1695 # for numpy
1696 equal = equal.all()
1697 except AttributeError:
1698 pass
1699 assert equal