amine@192: import os amine@207: from unittest import TestCase amine@88: from random import random amine@192: from tempfile import TemporaryDirectory amine@86: from genty import genty, genty_dataset amine@207: from auditok import split, AudioRegion, AudioParameterError amine@210: from auditok.io import ( amine@210: _normalize_use_channel, amine@210: _extract_selected_channel, amine@210: get_audio_source, amine@210: ) amine@86: amine@86: amine@88: def _make_random_length_regions( amine@88: byte_seq, sampling_rate, sample_width, channels amine@88: ): amine@88: regions = [] amine@88: for b in byte_seq: amine@88: duration = round(random() * 10, 6) amine@95: data = b * int(duration * sampling_rate) * sample_width * channels amine@88: start = round(random() * 13, 3) amine@88: region = AudioRegion( amine@88: data, start, sampling_rate, sample_width, channels amine@88: ) amine@88: regions.append(region) amine@88: return regions amine@88: amine@88: amine@86: @genty amine@207: class TestSplit(TestCase): amine@207: @genty_dataset( amine@207: simple=( amine@207: 0.2, amine@207: 5, amine@207: 0.2, amine@207: False, amine@207: False, amine@207: {"eth": 50}, amine@207: [(2, 16), (17, 31), (34, 76)], amine@207: ), amine@207: low_energy_threshold=( amine@207: 0.2, amine@207: 5, amine@207: 0.2, amine@207: False, amine@207: False, amine@207: {"energy_threshold": 40}, amine@207: [(0, 50), (50, 76)], amine@207: ), amine@207: high_energy_threshold=( amine@207: 0.2, amine@207: 5, amine@207: 0.2, amine@207: False, amine@207: False, amine@207: {"energy_threshold": 60}, amine@207: [], amine@207: ), amine@207: trim_leading_and_trailing_silence=( amine@207: 0.2, amine@207: 10, # use long max_dur amine@207: 0.5, # and a max_silence longer than any inter-region silence amine@207: True, amine@207: False, amine@207: {"eth": 50}, amine@207: [(2, 76)], amine@207: ), amine@207: drop_trailing_silence=( amine@207: 0.2, amine@207: 5, amine@207: 0.2, amine@207: True, amine@207: False, amine@207: {"eth": 50}, amine@207: [(2, 14), (17, 29), (34, 76)], amine@207: ), amine@207: drop_trailing_silence_2=( amine@207: 1.5, amine@207: 5, amine@207: 0.2, amine@207: True, amine@207: False, amine@207: {"eth": 50}, amine@207: [(34, 76)], amine@207: ), amine@207: strict_min_dur=( amine@207: 0.3, amine@207: 2, amine@207: 0.2, amine@207: False, amine@207: True, amine@207: {"eth": 50}, amine@207: [(2, 16), (17, 31), (34, 54), (54, 74)], amine@207: ), amine@207: ) amine@207: def test_split_params( amine@207: self, amine@207: min_dur, amine@207: max_dur, amine@207: max_silence, amine@207: drop_trailing_silence, amine@207: strict_min_dur, amine@207: kwargs, amine@207: expected, amine@207: ): amine@207: with open("tests/data/test_split_10HZ_mono.raw", "rb") as fp: amine@207: data = fp.read() amine@207: amine@207: regions = split( amine@207: data, amine@207: min_dur, amine@207: max_dur, amine@207: max_silence, amine@207: drop_trailing_silence, amine@207: strict_min_dur, amine@207: analysis_window=0.1, amine@207: sr=10, amine@207: sw=2, amine@207: ch=1, amine@207: **kwargs amine@207: ) amine@207: regions = list(regions) amine@207: err_msg = "Wrong number of regions after split, expected: " amine@210: err_msg += "{}, found: {}".format(len(expected), len(regions)) amine@207: self.assertEqual(len(regions), len(expected), err_msg) amine@207: amine@207: sample_width = 2 amine@207: for reg, exp in zip(regions, expected): amine@207: onset, offset = exp amine@207: exp_data = data[onset * sample_width : offset * sample_width] amine@207: self.assertEqual(bytes(reg), exp_data) amine@207: amine@207: amine@207: @genty amine@207: class TestAudioRegion(TestCase): amine@86: @genty_dataset( amine@86: simple=(b"\0" * 8000, 0, 8000, 1, 1, 1, 1, 1000), amine@86: one_ms_less_than_1_sec=( amine@86: b"\0" * 7992, amine@86: 0, amine@86: 8000, amine@86: 1, amine@86: 1, amine@86: 0.999, amine@86: 0.999, amine@86: 999, amine@86: ), amine@86: tree_quarter_ms_less_than_1_sec=( amine@86: b"\0" * 7994, amine@86: 0, amine@86: 8000, amine@86: 1, amine@86: 1, amine@86: 0.99925, amine@86: 0.99925, amine@86: 999, amine@86: ), amine@86: half_ms_less_than_1_sec=( amine@86: b"\0" * 7996, amine@86: 0, amine@86: 8000, amine@86: 1, amine@86: 1, amine@86: 0.9995, amine@86: 0.9995, amine@86: 1000, amine@86: ), amine@86: quarter_ms_less_than_1_sec=( amine@86: b"\0" * 7998, amine@86: 0, amine@86: 8000, amine@86: 1, amine@86: 1, amine@86: 0.99975, amine@86: 0.99975, amine@86: 1000, amine@86: ), amine@86: simple_sample_width_2=(b"\0" * 8000 * 2, 0, 8000, 2, 1, 1, 1, 1000), amine@86: simple_stereo=(b"\0" * 8000 * 2, 0, 8000, 1, 2, 1, 1, 1000), amine@86: simple_multichannel=(b"\0" * 8000 * 5, 0, 8000, 1, 5, 1, 1, 1000), amine@86: simple_sample_width_2_multichannel=( amine@86: b"\0" * 8000 * 2 * 5, amine@86: 0, amine@86: 8000, amine@86: 2, amine@86: 5, amine@86: 1, amine@86: 1, amine@86: 1000, amine@86: ), amine@86: one_ms_less_than_1s_sw_2_multichannel=( amine@86: b"\0" * 7992 * 2 * 5, amine@86: 0, amine@86: 8000, amine@86: 2, amine@86: 5, amine@86: 0.999, amine@86: 0.999, amine@86: 999, amine@86: ), amine@86: tree_qrt_ms_lt_1_s_sw_2_multichannel=( amine@86: b"\0" * 7994 * 2 * 5, amine@86: 0, amine@86: 8000, amine@86: 2, amine@86: 5, amine@86: 0.99925, amine@86: 0.99925, amine@86: 999, amine@86: ), amine@86: half_ms_lt_1s_sw_2_multichannel=( amine@86: b"\0" * 7996 * 2 * 5, amine@86: 0, amine@86: 8000, amine@86: 2, amine@86: 5, amine@86: 0.9995, amine@86: 0.9995, amine@86: 1000, amine@86: ), amine@86: quarter_ms_lt_1s_sw_2_multichannel=( amine@86: b"\0" * 7998 * 2 * 5, amine@86: 0, amine@86: 8000, amine@86: 2, amine@86: 5, amine@86: 0.99975, amine@86: 0.99975, amine@86: 1000, amine@86: ), amine@86: arbitrary_length_1=( amine@86: b"\0" * int(8000 * 1.33), amine@86: 2.7, amine@86: 8000, amine@86: 1, amine@86: 1, amine@86: 4.03, amine@86: 1.33, amine@86: 1330, amine@86: ), amine@86: arbitrary_length_2=( amine@86: b"\0" * int(8000 * 0.476), amine@86: 11.568, amine@86: 8000, amine@86: 1, amine@86: 1, amine@86: 12.044, amine@86: 0.476, amine@86: 476, amine@86: ), amine@86: arbitrary_length_sw_2_multichannel=( amine@86: b"\0" * int(8000 * 1.711) * 2 * 3, amine@86: 9.415, amine@86: 8000, amine@86: 2, amine@86: 3, amine@86: 11.126, amine@86: 1.711, amine@86: 1711, amine@86: ), amine@86: arbitrary_samplig_rate=( amine@86: b"\0" * int(3172 * 1.318), amine@86: 17.236, amine@86: 3172, amine@86: 1, amine@86: 1, amine@86: 17.236 + int(3172 * 1.318) / 3172, amine@86: int(3172 * 1.318) / 3172, amine@86: 1318, amine@86: ), amine@86: arbitrary_sr_sw_2_multichannel=( amine@86: b"\0" * int(11317 * 0.716) * 2 * 3, amine@86: 18.811, amine@86: 11317, amine@86: 2, amine@86: 3, amine@86: 18.811 + int(11317 * 0.716) / 11317, amine@86: int(11317 * 0.716) / 11317, amine@86: 716, amine@86: ), amine@86: ) amine@86: def test_creation( amine@86: self, amine@86: data, amine@86: start, amine@86: sampling_rate, amine@86: sample_width, amine@86: channels, amine@86: expected_end, amine@86: expected_duration_s, amine@86: expected_duration_ms, amine@86: ): amine@86: region = AudioRegion( amine@86: data, start, sampling_rate, sample_width, channels amine@86: ) amine@86: self.assertEqual(region.sampling_rate, sampling_rate) amine@86: self.assertEqual(region.sr, sampling_rate) amine@86: self.assertEqual(region.sample_width, sample_width) amine@86: self.assertEqual(region.sw, sample_width) amine@86: self.assertEqual(region.channels, channels) amine@86: self.assertEqual(region.ch, channels) amine@86: self.assertEqual(region.start, start) amine@86: self.assertEqual(region.end, expected_end) amine@86: self.assertEqual(region.duration, expected_duration_s) amine@86: self.assertEqual(len(region), expected_duration_ms) amine@86: self.assertEqual(bytes(region), data) amine@88: amine@97: def test_creation_invalid_data_exception(self): amine@97: with self.assertRaises(AudioParameterError) as audio_param_err: amine@97: _ = AudioRegion( amine@97: data=b"ABCDEFGHI", amine@97: start=0, amine@97: sampling_rate=8, amine@97: sample_width=2, amine@97: channels=1, amine@97: ) amine@97: self.assertEqual( amine@97: "The length of audio data must be an integer " amine@97: "multiple of `sample_width * channels`", amine@97: str(audio_param_err.exception), amine@97: ) amine@97: amine@88: @genty_dataset( amine@192: simple=("output.wav", 1.230, "output.wav"), amine@192: start=("output_{start}.wav", 1.230, "output_1.23.wav"), amine@192: start_2=("output_{start}.wav", 1.233712, "output_1.233712.wav"), amine@192: start_3=("output_{start}.wav", 1.2300001, "output_1.23.wav"), amine@192: start_4=("output_{start:.3f}.wav", 1.233712, "output_1.234.wav"), amine@192: start_5=( amine@192: "output_{start:.8f}.wav", amine@192: 1.233712345, amine@192: "output_1.23371200.wav", amine@192: ), amine@192: start_end_duration=( amine@192: "output_{start}_{end}_{duration}.wav", amine@192: 1.455, amine@192: "output_1.455_2.455_1.0.wav", amine@192: ), amine@192: start_end_duration_2=( amine@192: "output_{start}_{end}_{duration}.wav", amine@192: 1.455321, amine@192: "output_1.455321_2.455321_1.0.wav", amine@192: ), amine@192: ) amine@192: def test_save(self, format, start, expected): amine@192: with TemporaryDirectory() as tmpdir: amine@192: region = AudioRegion(b"0" * 160, start, 160, 1, 1) amine@192: format = os.path.join(tmpdir, format) amine@192: filename = region.save(format)[len(tmpdir) + 1 :] amine@192: self.assertEqual(filename, expected) amine@192: amine@193: def test_save_file_exists_exception(self): amine@193: with TemporaryDirectory() as tmpdir: amine@193: filename = os.path.join(tmpdir, "output.wav") amine@193: open(filename, "w").close() amine@193: region = AudioRegion(b"0" * 160, 0, 160, 1, 1) amine@193: with self.assertRaises(FileExistsError): amine@193: region.save(filename, exists_ok=False) amine@193: amine@192: @genty_dataset( amine@194: first_half=( amine@194: AudioRegion(b"a" * 80 + b"b" * 80, 0, 160, 1, 1), amine@194: slice(0, 500), amine@194: 0, amine@194: b"a" * 80, amine@194: ), amine@194: second_half=( amine@194: AudioRegion(b"a" * 80 + b"b" * 80, 0, 160, 1, 1), amine@194: slice(500, None), amine@194: 0.5, amine@194: b"b" * 80, amine@194: ), amine@194: second_half_negative=( amine@194: AudioRegion(b"a" * 80 + b"b" * 80, 0, 160, 1, 1), amine@194: slice(-500, None), amine@194: 0.5, amine@194: b"b" * 80, amine@194: ), amine@194: middle=( amine@194: AudioRegion(b"a" * 80 + b"b" * 80, 0, 160, 1, 1), amine@194: slice(200, 750), amine@194: 0.2, amine@194: b"a" * 48 + b"b" * 40, amine@194: ), amine@194: middle_negative=( amine@194: AudioRegion(b"a" * 80 + b"b" * 80, 0, 160, 1, 1), amine@194: slice(-800, -250), amine@194: 0.2, amine@194: b"a" * 48 + b"b" * 40, amine@194: ), amine@194: middle_sw2=( amine@194: AudioRegion(b"a" * 160 + b"b" * 160, 0, 160, 2, 1), amine@194: slice(200, 750), amine@194: 0.2, amine@194: b"a" * 96 + b"b" * 80, amine@194: ), amine@194: middle_ch2=( amine@194: AudioRegion(b"a" * 160 + b"b" * 160, 0, 160, 1, 2), amine@194: slice(200, 750), amine@194: 0.2, amine@194: b"a" * 96 + b"b" * 80, amine@194: ), amine@194: middle_sw2_ch2=( amine@194: AudioRegion(b"a" * 320 + b"b" * 320, 0, 160, 2, 2), amine@194: slice(200, 750), amine@194: 0.2, amine@194: b"a" * 192 + b"b" * 160, amine@194: ), amine@194: but_first_sample=( amine@194: AudioRegion(b"a" * 4000 + b"b" * 4000, 0, 8000, 1, 1), amine@194: slice(1, None), amine@194: 0.001, amine@194: b"a" * (4000 - 8) + b"b" * 4000, amine@194: ), amine@194: but_first_sample_negative=( amine@194: AudioRegion(b"a" * 4000 + b"b" * 4000, 0, 8000, 1, 1), amine@194: slice(-999, None), amine@194: 0.001, amine@194: b"a" * (4000 - 8) + b"b" * 4000, amine@194: ), amine@194: but_last_sample=( amine@194: AudioRegion(b"a" * 4000 + b"b" * 4000, 0, 8000, 1, 1), amine@194: slice(0, 999), amine@194: 0, amine@194: b"a" * 4000 + b"b" * (4000 - 8), amine@194: ), amine@194: but_last_sample_negative=( amine@194: AudioRegion(b"a" * 4000 + b"b" * 4000, 0, 8000, 1, 1), amine@194: slice(0, -1), amine@194: 0, amine@194: b"a" * 4000 + b"b" * (4000 - 8), amine@194: ), amine@194: big_negative_start=( amine@194: AudioRegion(b"a" * 160, 0, 160, 1, 1), amine@194: slice(-5000, None), amine@194: 0, amine@194: b"a" * 160, amine@194: ), amine@194: big_negative_stop=( amine@194: AudioRegion(b"a" * 160, 0, 160, 1, 1), amine@194: slice(None, -1500), amine@194: 0, amine@194: b"", amine@194: ), amine@194: empty=( amine@194: AudioRegion(b"a" * 80 + b"b" * 80, 0, 160, 1, 1), amine@194: slice(0, 0), amine@194: 0, amine@194: b"", amine@194: ), amine@194: empty_start_stop_reversed=( amine@194: AudioRegion(b"a" * 80 + b"b" * 80, 0, 160, 1, 1), amine@194: slice(200, 100), amine@194: 0.2, amine@194: b"", amine@194: ), amine@194: empty_big_positive_start=( amine@194: AudioRegion(b"a" * 80 + b"b" * 80, 0, 160, 1, 1), amine@194: slice(2000, 3000), amine@194: 2, amine@194: b"", amine@194: ), amine@194: empty_negative_reversed=( amine@194: AudioRegion(b"a" * 80 + b"b" * 80, 0, 160, 1, 1), amine@194: slice(-100, -200), amine@194: 0.9, amine@194: b"", amine@194: ), amine@194: empty_big_negative_stop=( amine@194: AudioRegion(b"a" * 80 + b"b" * 80, 0, 160, 1, 1), amine@194: slice(0, -2000), amine@194: 0, amine@194: b"", amine@194: ), amine@194: ) amine@194: def test_region_slicing( amine@194: self, region, slice_, expected_start, expected_data amine@194: ): amine@194: sub_region = region[slice_] amine@194: self.assertEqual(sub_region.start, expected_start) amine@194: self.assertEqual(bytes(sub_region), expected_data) amine@194: amine@194: @genty_dataset( amine@88: simple=(8000, 1, 1), amine@88: stereo_sw_2=(8000, 2, 2), amine@88: arbitray_sr_multichannel=(5413, 2, 3), amine@88: ) amine@88: def test_concatenation(self, sampling_rate, sample_width, channels): amine@88: amine@88: region_1, region_2 = _make_random_length_regions( amine@88: [b"a", b"b"], sampling_rate, sample_width, channels amine@88: ) amine@88: amine@88: expected_start = region_1.start amine@88: expected_duration = region_1.duration + region_2.duration amine@88: expected_end = expected_start + expected_duration amine@88: expected_data = bytes(region_1) + bytes(region_2) amine@88: concat_region = region_1 + region_2 amine@88: amine@88: self.assertEqual(concat_region.start, expected_start) amine@88: self.assertAlmostEqual(concat_region.end, expected_end, places=6) amine@88: self.assertAlmostEqual( amine@88: concat_region.duration, expected_duration, places=6 amine@88: ) amine@88: self.assertEqual(bytes(concat_region), expected_data) amine@88: amine@88: @genty_dataset( amine@88: simple=(8000, 1, 1), amine@88: stereo_sw_2=(8000, 2, 2), amine@88: arbitray_sr_multichannel=(5413, 2, 3), amine@88: ) amine@88: def test_concatenation_many(self, sampling_rate, sample_width, channels): amine@88: amine@88: regions = _make_random_length_regions( amine@88: [b"a", b"b", b"c"], sampling_rate, sample_width, channels amine@88: ) amine@88: expected_start = regions[0].start amine@88: expected_duration = sum(r.duration for r in regions) amine@88: expected_end = expected_start + expected_duration amine@88: expected_data = b"".join(bytes(r) for r in regions) amine@88: concat_region = sum(regions) amine@88: amine@88: self.assertEqual(concat_region.start, expected_start) amine@88: self.assertAlmostEqual(concat_region.end, expected_end, places=6) amine@88: self.assertAlmostEqual( amine@88: concat_region.duration, expected_duration, places=6 amine@88: ) amine@88: self.assertEqual(bytes(concat_region), expected_data) amine@88: amine@88: def test_concatenation_different_sampling_rate_error(self): amine@88: amine@88: region_1 = AudioRegion(b"a" * 100, 0, 8000, 1, 1) amine@88: region_2 = AudioRegion(b"b" * 100, 0, 3000, 1, 1) amine@88: amine@88: with self.assertRaises(ValueError) as val_err: amine@88: region_1 + region_2 amine@88: self.assertEqual( amine@88: "Can only concatenate AudioRegions of the same " amine@88: "sampling rate (8000 != 3000)", amine@88: str(val_err.exception), amine@88: ) amine@88: amine@88: def test_concatenation_different_sample_width_error(self): amine@88: amine@88: region_1 = AudioRegion(b"a" * 100, 0, 8000, 2, 1) amine@88: region_2 = AudioRegion(b"b" * 100, 0, 8000, 4, 1) amine@88: amine@88: with self.assertRaises(ValueError) as val_err: amine@88: region_1 + region_2 amine@88: self.assertEqual( amine@88: "Can only concatenate AudioRegions of the same " amine@88: "sample width (2 != 4)", amine@88: str(val_err.exception), amine@88: ) amine@88: amine@88: def test_concatenation_different_number_of_channels_error(self): amine@88: amine@88: region_1 = AudioRegion(b"a" * 100, 0, 8000, 1, 1) amine@88: region_2 = AudioRegion(b"b" * 100, 0, 8000, 1, 2) amine@88: amine@88: with self.assertRaises(ValueError) as val_err: amine@88: region_1 + region_2 amine@88: self.assertEqual( amine@88: "Can only concatenate AudioRegions of the same " amine@88: "number of channels (1 != 2)", amine@88: str(val_err.exception), amine@88: ) amine@196: amine@196: @genty_dataset( amine@196: simple=(0.01, 0.03, 30), amine@196: rounded_len_floor=(0.00575, 0.01725, 17), amine@196: rounded_len_ceil=(0.00625, 0.01875, 19), amine@196: ) amine@196: def test_multiplication( amine@196: self, duration, expected_duration, expected_length amine@196: ): amine@196: sw = 2 amine@196: data = b"0" * int(duration * 8000 * sw) amine@196: region = AudioRegion(data, 0, 8000, sw, 1) amine@196: m_region = 1 * region * 3 amine@196: self.assertEqual(bytes(m_region), data * 3) amine@196: self.assertEqual(m_region.sr, 8000) amine@196: self.assertEqual(m_region.sw, 2) amine@196: self.assertEqual(m_region.ch, 1) amine@196: self.assertEqual(m_region.duration, expected_duration) amine@196: self.assertEqual(len(m_region), expected_length) amine@197: amine@198: @genty_dataset(_str=("x", "str"), _float=(1.4, "float")) amine@197: def test_multiplication_non_int(self, factor, _type): amine@197: with self.assertRaises(TypeError) as type_err: amine@198: AudioRegion(b"0" * 80, 0, 8000, 1, 1) * factor amine@197: err_msg = "Can't multiply AudioRegion by a non-int of type '{}'" amine@197: self.assertEqual(err_msg.format(_type), str(type_err.exception))