Mercurial > hg > auditok
view tests/test_core.py @ 403:996948ada980
Update tests
author | Amine Sehili <amine.sehili@gmail.com> |
---|---|
date | Sun, 26 May 2024 22:43:08 +0200 |
parents | 323d59b404a2 |
children | f56b4d8adfb8 |
line wrap: on
line source
import math import os from array import array as array_ from random import random from tempfile import TemporaryDirectory from unittest.mock import Mock, patch import pytest from auditok import AudioParameterError, AudioRegion, load, split from auditok.core import ( _duration_to_nb_windows, _make_audio_region, _read_chunks_online, _read_offline, ) from auditok.io import get_audio_source from auditok.util import AudioReader def _make_random_length_regions( byte_seq, sampling_rate, sample_width, channels ): regions = [] for b in byte_seq: duration = round(random() * 10, 6) data = b * int(duration * sampling_rate) * sample_width * channels region = AudioRegion(data, sampling_rate, sample_width, channels) regions.append(region) return regions @pytest.mark.parametrize( "skip, max_read, channels", [ (0, -1, 1), (0, -1, 2), (2, -1, 1), (2, None, 1), (2, 3, 1), (2, 3.5, 2), (2.4, 3.5, 2), ], ids=[ "no_skip_read_all", "no_skip_read_all_stereo", "skip_2_read_all", "skip_2_read_all_None", "skip_2_read_3", "skip_2_read_3_5_stereo", "skip_2_4_read_3_5_stereo", ], ) def test_load(skip, max_read, channels): sampling_rate = 10 sample_width = 2 filename = "tests/data/test_split_10HZ_{}.raw" filename = filename.format("mono" if channels == 1 else "stereo") region = load( filename, skip=skip, max_read=max_read, sr=sampling_rate, sw=sample_width, ch=channels, ) with open(filename, "rb") as fp: fp.read(round(skip * sampling_rate * sample_width * channels)) if max_read is None or max_read < 0: to_read = -1 else: to_read = round(max_read * sampling_rate * sample_width * channels) expected = fp.read(to_read) assert bytes(region) == expected @pytest.mark.parametrize( "duration, analysis_window, round_fn, expected, kwargs", [ (0, 1, None, 0, None), (0.3, 0.1, round, 3, None), (0.35, 0.1, math.ceil, 4, None), (0.35, 0.1, math.floor, 3, None), (0.05, 0.1, round, 0, None), (0.05, 0.1, math.ceil, 1, None), (0.3, 0.1, math.floor, 3, {"epsilon": 1e-6}), (-0.5, 0.1, math.ceil, ValueError, None), (0.5, -0.1, math.ceil, ValueError, None), ], ids=[ "zero_duration", "multiple", "not_multiple_ceil", "not_multiple_floor", "small_duration", "small_duration_ceil", "with_round_error", "negative_duration", "negative_analysis_window", ], ) def test_duration_to_nb_windows( duration, analysis_window, round_fn, expected, kwargs ): if expected == ValueError: with pytest.raises(ValueError): _duration_to_nb_windows(duration, analysis_window, round_fn) else: if kwargs is None: kwargs = {} result = _duration_to_nb_windows( duration, analysis_window, round_fn, **kwargs ) assert result == expected @pytest.mark.parametrize( "channels, skip, max_read", [ (1, 0, None), (1, 3, None), (1, 2, -1), (1, 2, 3), (2, 0, None), (2, 3, None), (2, 2, -1), (2, 2, 3), ], ids=[ "mono_skip_0_max_read_None", "mono_skip_3_max_read_None", "mono_skip_2_max_read_negative", "mono_skip_2_max_read_3", "stereo_skip_0_max_read_None", "stereo_skip_3_max_read_None", "stereo_skip_2_max_read_negative", "stereo_skip_2_max_read_3", ], ) def test_read_offline(channels, skip, max_read): sampling_rate = 10 sample_width = 2 mono_or_stereo = "mono" if channels == 1 else "stereo" filename = "tests/data/test_split_10HZ_{}.raw".format(mono_or_stereo) with open(filename, "rb") as fp: data = fp.read() onset = round(skip * sampling_rate * sample_width * channels) if max_read in (-1, None): offset = len(data) + 1 else: offset = onset + round( max_read * sampling_rate * sample_width * channels ) expected_data = data[onset:offset] read_data, *audio_params = _read_offline( filename, skip=skip, max_read=max_read, sr=sampling_rate, sw=sample_width, ch=channels, ) assert read_data == expected_data assert tuple(audio_params) == (sampling_rate, sample_width, channels) @pytest.mark.parametrize( "min_dur, max_dur, max_silence, drop_trailing_silence, strict_min_dur, kwargs, expected", [ (0.2, 5, 0.2, False, False, {"eth": 50}, [(2, 16), (17, 31), (34, 76)]), ( 0.3, 2, 0.2, False, False, {"eth": 50}, [(2, 16), (17, 31), (34, 54), (54, 74), (74, 76)], ), (3, 5, 0.2, False, False, {"eth": 50}, [(34, 76)]), (0.2, 80, 10, False, False, {"eth": 50}, [(2, 76)]), ( 0.2, 5, 0.0, False, False, {"eth": 50}, [(2, 14), (17, 24), (26, 29), (34, 76)], ), ( 0.2, 5, 0.2, False, False, {"energy_threshold": 40}, [(0, 50), (50, 76)], ), (0.2, 5, 0.2, False, False, {"energy_threshold": 60}, []), (0.2, 10, 0.5, True, False, {"eth": 50}, [(2, 76)]), (0.2, 5, 0.2, True, False, {"eth": 50}, [(2, 14), (17, 29), (34, 76)]), (1.5, 5, 0.2, True, False, {"eth": 50}, [(34, 76)]), ( 0.3, 2, 0.2, False, True, {"eth": 50}, [(2, 16), (17, 31), (34, 54), (54, 74)], ), ], ids=[ "simple", "short_max_dur", "long_min_dur", "long_max_silence", "zero_max_silence", "low_energy_threshold", "high_energy_threshold", "trim_leading_and_trailing_silence", "drop_trailing_silence", "drop_trailing_silence_2", "strict_min_dur", ], ) def test_split_params( min_dur, max_dur, max_silence, drop_trailing_silence, strict_min_dur, kwargs, expected, ): with open("tests/data/test_split_10HZ_mono.raw", "rb") as fp: data = fp.read() regions = split( data, min_dur, max_dur, max_silence, drop_trailing_silence, strict_min_dur, analysis_window=0.1, sr=10, sw=2, ch=1, **kwargs ) region = AudioRegion(data, 10, 2, 1) regions_ar = region.split( min_dur, max_dur, max_silence, drop_trailing_silence, strict_min_dur, analysis_window=0.1, **kwargs ) regions = list(regions) regions_ar = list(regions_ar) err_msg = "Wrong number of regions after split, expected: " err_msg += "{}, found: {}".format(len(expected), len(regions)) assert len(regions) == len(expected), err_msg err_msg = "Wrong number of regions after AudioRegion.split, expected: " err_msg += "{}, found: {}".format(len(expected), len(regions_ar)) assert len(regions_ar) == len(expected), err_msg sample_width = 2 for reg, reg_ar, exp in zip(regions, regions_ar, expected): onset, offset = exp exp_data = data[onset * sample_width : offset * sample_width] assert bytes(reg) == exp_data assert reg == reg_ar @pytest.mark.parametrize( "channels, kwargs, expected", [ (2, {}, [(2, 32), (34, 76)]), (1, {"max_read": 5}, [(2, 16), (17, 31), (34, 50)]), (1, {"mr": 5}, [(2, 16), (17, 31), (34, 50)]), (1, {"eth": 50, "use_channel": 0}, [(2, 16), (17, 31), (34, 76)]), (1, {"eth": 50, "uc": 1}, [(2, 16), (17, 31), (34, 76)]), (1, {"eth": 50, "use_channel": None}, [(2, 16), (17, 31), (34, 76)]), (2, {"eth": 50, "use_channel": 0}, [(2, 16), (17, 31), (34, 76)]), (2, {"eth": 50}, [(2, 32), (34, 76)]), (2, {"eth": 50, "use_channel": -2}, [(2, 16), (17, 31), (34, 76)]), (2, {"eth": 50, "uc": 1}, [(10, 32), (36, 76)]), (2, {"eth": 50, "uc": -1}, [(10, 32), (36, 76)]), (1, {"eth": 50, "uc": "mix"}, [(2, 16), (17, 31), (34, 76)]), (2, {"energy_threshold": 53.5, "use_channel": "mix"}, [(54, 76)]), (2, {"eth": 52, "uc": "mix"}, [(17, 26), (54, 76)]), (2, {"uc": "mix"}, [(10, 16), (17, 31), (36, 76)]), ], ids=[ "stereo_all_default", "mono_max_read", "mono_max_read_short_name", "mono_use_channel_1", "mono_uc_1", "mono_use_channel_None", "stereo_use_channel_1", "stereo_use_channel_no_use_channel_given", "stereo_use_channel_minus_2", "stereo_uc_2", "stereo_uc_minus_1", "mono_uc_mix", "stereo_use_channel_mix", "stereo_uc_mix", "stereo_uc_mix_default_eth", ], ) def test_split_kwargs(channels, kwargs, expected): mono_or_stereo = "mono" if channels == 1 else "stereo" filename = "tests/data/test_split_10HZ_{}.raw".format(mono_or_stereo) with open(filename, "rb") as fp: data = fp.read() regions = split( data, min_dur=0.2, max_dur=5, max_silence=0.2, drop_trailing_silence=False, strict_min_dur=False, analysis_window=0.1, sr=10, sw=2, ch=channels, **kwargs ) region = AudioRegion(data, 10, 2, channels) max_read = kwargs.get("max_read", kwargs.get("mr")) if max_read is not None: region = region.sec[:max_read] kwargs.pop("max_read", None) kwargs.pop("mr", None) regions_ar = region.split( min_dur=0.2, max_dur=5, max_silence=0.2, drop_trailing_silence=False, strict_min_dur=False, analysis_window=0.1, **kwargs ) regions = list(regions) regions_ar = list(regions_ar) err_msg = "Wrong number of regions after split, expected: " err_msg += "{}, found: {}".format(len(expected), len(regions)) assert len(regions) == len(expected), err_msg err_msg = "Wrong number of regions after AudioRegion.split, expected: " err_msg += "{}, found: {}".format(len(expected), len(regions_ar)) assert len(regions_ar) == len(expected), err_msg sample_width = 2 sample_size_bytes = sample_width * channels for reg, reg_ar, exp in zip(regions, regions_ar, expected): onset, offset = exp exp_data = data[onset * sample_size_bytes : offset * sample_size_bytes] assert len(bytes(reg)) == len(exp_data) assert reg == reg_ar @pytest.mark.parametrize( "min_dur, max_dur, max_silence, channels, kwargs, expected", [ (0.2, 5, 0.2, 1, {"aw": 0.2}, [(2, 30), (34, 76)]), (0.2, 5, 0.3, 1, {"aw": 0.2}, [(2, 30), (34, 76)]), (0.2, 5, 0.4, 1, {"aw": 0.2}, [(2, 32), (34, 76)]), (0.2, 5, 0, 1, {"aw": 0.2}, [(2, 14), (16, 24), (26, 28), (34, 76)]), (0.2, 5, 0.2, 1, {"aw": 0.2}, [(2, 30), (34, 76)]), (0.3, 5, 0, 1, {"aw": 0.3}, [(3, 12), (15, 24), (36, 76)]), (0.3, 5, 0.3, 1, {"aw": 0.3}, [(3, 27), (36, 76)]), (0.3, 5, 0.5, 1, {"aw": 0.3}, [(3, 27), (36, 76)]), (0.3, 5, 0.6, 1, {"aw": 0.3}, [(3, 30), (36, 76)]), (0.2, 5, 0, 1, {"aw": 0.4}, [(4, 12), (16, 24), (36, 76)]), (0.2, 5, 0.3, 1, {"aw": 0.4}, [(4, 12), (16, 24), (36, 76)]), (0.2, 5, 0.4, 1, {"aw": 0.4}, [(4, 28), (36, 76)]), (0.2, 5, 0.2, 2, {"analysis_window": 0.2}, [(2, 32), (34, 76)]), ( 0.2, 5, 0.2, 2, {"uc": None, "analysis_window": 0.2}, [(2, 32), (34, 76)], ), ( 0.2, 5, 0.2, 2, {"use_channel": None, "analysis_window": 0.3}, [(3, 30), (36, 76)], ), ( 0.2, 5, 0.3, 2, {"use_channel": "any", "analysis_window": 0.3}, [(3, 33), (36, 76)], ), ( 0.2, 5, 0.2, 2, {"use_channel": None, "analysis_window": 0.4}, [(4, 28), (36, 76)], ), ( 0.2, 5, 0.4, 2, {"use_channel": "any", "analysis_window": 0.4}, [(4, 32), (36, 76)], ), ( 0.2, 5, 0.2, 2, {"uc": 0, "analysis_window": 0.2}, [(2, 30), (34, 76)], ), ( 0.2, 5, 0.2, 2, {"uc": 1, "analysis_window": 0.2}, [(10, 32), (36, 76)], ), ( 0.2, 5, 0, 2, {"uc": "mix", "analysis_window": 0.1}, [(10, 14), (17, 24), (26, 29), (36, 76)], ), ( 0.2, 5, 0.1, 2, {"uc": "mix", "analysis_window": 0.1}, [(10, 15), (17, 25), (26, 30), (36, 76)], ), ( 0.2, 5, 0.2, 2, {"uc": "mix", "analysis_window": 0.1}, [(10, 16), (17, 31), (36, 76)], ), ( 0.2, 5, 0.3, 2, {"uc": "mix", "analysis_window": 0.1}, [(10, 32), (36, 76)], ), ( 0.3, 5, 0, 2, {"uc": "avg", "analysis_window": 0.2}, [(10, 14), (16, 24), (36, 76)], ), ( 0.41, 5, 0, 2, {"uc": "average", "analysis_window": 0.2}, [(16, 24), (36, 76)], ), ( 0.2, 5, 0.1, 2, {"uc": "mix", "analysis_window": 0.2}, [(10, 14), (16, 24), (26, 28), (36, 76)], ), ( 0.2, 5, 0.2, 2, {"uc": "mix", "analysis_window": 0.2}, [(10, 30), (36, 76)], ), ( 0.2, 5, 0.4, 2, {"uc": "mix", "analysis_window": 0.2}, [(10, 32), (36, 76)], ), ( 0.2, 5, 0.5, 2, {"uc": "mix", "analysis_window": 0.2}, [(10, 32), (36, 76)], ), ( 0.2, 5, 0.6, 2, {"uc": "mix", "analysis_window": 0.2}, [(10, 34), (36, 76)], ), ( 0.2, 5, 0, 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 24), (27, 30), (36, 76)], ), ( 0.4, 5, 0, 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 24), (36, 76)], ), ( 0.2, 5, 0.6, 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 57), (57, 76)], ), ( 0.2, 5.1, 0.6, 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 60), (60, 76)], ), ( 0.2, 5.2, 0.6, 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 60), (60, 76)], ), ( 0.2, 5.3, 0.6, 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 60), (60, 76)], ), ( 0.2, 5.4, 0.6, 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 63), (63, 76)], ), ( 0.2, 5, 0, 2, {"uc": "mix", "analysis_window": 0.4}, [(16, 24), (36, 76)], ), ( 0.2, 5, 0.3, 2, {"uc": "mix", "analysis_window": 0.4}, [(16, 24), (36, 76)], ), ( 0.2, 5, 0.4, 2, {"uc": "mix", "analysis_window": 0.4}, [(16, 28), (36, 76)], ), ], ids=[ "mono_aw_0_2_max_silence_0_2", "mono_aw_0_2_max_silence_0_3", "mono_aw_0_2_max_silence_0_4", "mono_aw_0_2_max_silence_0", "mono_aw_0_2", "mono_aw_0_3_max_silence_0", "mono_aw_0_3_max_silence_0_3", "mono_aw_0_3_max_silence_0_5", "mono_aw_0_3_max_silence_0_6", "mono_aw_0_4_max_silence_0", "mono_aw_0_4_max_silence_0_3", "mono_aw_0_4_max_silence_0_4", "stereo_uc_None_analysis_window_0_2", "stereo_uc_any_analysis_window_0_2", "stereo_use_channel_None_aw_0_3_max_silence_0_2", "stereo_use_channel_any_aw_0_3_max_silence_0_3", "stereo_use_channel_None_aw_0_4_max_silence_0_2", "stereo_use_channel_any_aw_0_3_max_silence_0_4", "stereo_uc_0_analysis_window_0_2", "stereo_uc_1_analysis_window_0_2", "stereo_uc_mix_aw_0_1_max_silence_0", "stereo_uc_mix_aw_0_1_max_silence_0_1", "stereo_uc_mix_aw_0_1_max_silence_0_2", "stereo_uc_mix_aw_0_1_max_silence_0_3", "stereo_uc_avg_aw_0_2_max_silence_0_min_dur_0_3", "stereo_uc_average_aw_0_2_max_silence_0_min_dur_0_41", "stereo_uc_mix_aw_0_2_max_silence_0_1", "stereo_uc_mix_aw_0_2_max_silence_0_2", "stereo_uc_mix_aw_0_2_max_silence_0_4", "stereo_uc_mix_aw_0_2_max_silence_0_5", "stereo_uc_mix_aw_0_2_max_silence_0_6", "stereo_uc_mix_aw_0_3_max_silence_0", "stereo_uc_mix_aw_0_3_max_silence_0_min_dur_0_3", "stereo_uc_mix_aw_0_3_max_silence_0_6", "stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_1", "stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_2", "stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_3", "stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_4", "stereo_uc_mix_aw_0_4_max_silence_0", "stereo_uc_mix_aw_0_4_max_silence_0_3", "stereo_uc_mix_aw_0_4_max_silence_0_4", ], ) def test_split_analysis_window( min_dur, max_dur, max_silence, channels, kwargs, expected ): mono_or_stereo = "mono" if channels == 1 else "stereo" filename = "tests/data/test_split_10HZ_{}.raw".format(mono_or_stereo) with open(filename, "rb") as fp: data = fp.read() regions = split( data, min_dur=min_dur, max_dur=max_dur, max_silence=max_silence, drop_trailing_silence=False, strict_min_dur=False, sr=10, sw=2, ch=channels, eth=49.99, **kwargs ) region = AudioRegion(data, 10, 2, channels) regions_ar = region.split( min_dur=min_dur, max_dur=max_dur, max_silence=max_silence, drop_trailing_silence=False, strict_min_dur=False, eth=49.99, **kwargs ) regions = list(regions) regions_ar = list(regions_ar) err_msg = "Wrong number of regions after split, expected: " err_msg += "{}, found: {}".format(len(expected), len(regions)) assert len(regions) == len(expected), err_msg err_msg = "Wrong number of regions after AudioRegion.split, expected: " err_msg += "{}, found: {}".format(len(expected), len(regions_ar)) assert len(regions_ar) == len(expected), err_msg sample_width = 2 sample_size_bytes = sample_width * channels for reg, reg_ar, exp in zip(regions, regions_ar, expected): onset, offset = exp exp_data = data[onset * sample_size_bytes : offset * sample_size_bytes] assert bytes(reg) == exp_data assert reg == reg_ar def test_split_custom_validator(): filename = "tests/data/test_split_10HZ_mono.raw" with open(filename, "rb") as fp: data = fp.read() regions = split( data, min_dur=0.2, max_dur=5, max_silence=0.2, drop_trailing_silence=False, strict_min_dur=False, sr=10, sw=2, ch=1, analysis_window=0.1, validator=lambda x: array_("h", x)[0] >= 320, ) region = AudioRegion(data, 10, 2, 1) regions_ar = region.split( min_dur=0.2, max_dur=5, max_silence=0.2, drop_trailing_silence=False, strict_min_dur=False, analysis_window=0.1, validator=lambda x: array_("h", x)[0] >= 320, ) expected = [(2, 16), (17, 31), (34, 76)] regions = list(regions) regions_ar = list(regions_ar) err_msg = "Wrong number of regions after split, expected: " err_msg += "{}, found: {}".format(len(expected), len(regions)) assert len(regions) == len(expected), err_msg err_msg = "Wrong number of regions after AudioRegion.split, expected: " err_msg += "{}, found: {}".format(len(expected), len(regions_ar)) assert len(regions_ar) == len(expected), err_msg sample_size_bytes = 2 for reg, reg_ar, exp in zip(regions, regions_ar, expected): onset, offset = exp exp_data = data[onset * sample_size_bytes : offset * sample_size_bytes] assert bytes(reg) == exp_data assert reg == reg_ar @pytest.mark.parametrize( "input, kwargs", [ ( "tests/data/test_split_10HZ_stereo.raw", {"audio_format": "raw", "sr": 10, "sw": 2, "ch": 2}, ), ( "tests/data/test_split_10HZ_stereo.raw", {"fmt": "raw", "sr": 10, "sw": 2, "ch": 2}, ), ("tests/data/test_split_10HZ_stereo.raw", {"sr": 10, "sw": 2, "ch": 2}), ( "tests/data/test_split_10HZ_stereo.raw", {"sampling_rate": 10, "sample_width": 2, "channels": 2}, ), ( open("tests/data/test_split_10HZ_stereo.raw", "rb").read(), {"sr": 10, "sw": 2, "ch": 2}, ), ( AudioReader( "tests/data/test_split_10HZ_stereo.raw", sr=10, sw=2, ch=2, block_dur=0.1, ), {}, ), ( AudioRegion( open("tests/data/test_split_10HZ_stereo.raw", "rb").read(), 10, 2, 2, ), {}, ), ( get_audio_source( "tests/data/test_split_10HZ_stereo.raw", sr=10, sw=2, ch=2 ), {}, ), ], ids=[ "filename_audio_format", "filename_audio_format_short_name", "filename_no_audio_format", "filename_no_long_audio_params", "bytes_", "audio_reader", "audio_region", "audio_source", ], ) def test_split_input_type(input, kwargs): with open("tests/data/test_split_10HZ_stereo.raw", "rb") as fp: data = fp.read() regions = split( input, min_dur=0.2, max_dur=5, max_silence=0.2, drop_trailing_silence=False, strict_min_dur=False, analysis_window=0.1, **kwargs ) regions = list(regions) expected = [(2, 32), (34, 76)] sample_width = 2 err_msg = "Wrong number of regions after split, expected: " err_msg += "{}, found: {}".format(expected, regions) assert len(regions) == len(expected), err_msg for reg, exp in zip(regions, expected): onset, offset = exp exp_data = data[onset * sample_width * 2 : offset * sample_width * 2] assert bytes(reg) == exp_data @pytest.mark.parametrize( "min_dur, max_dur, analysis_window", [ (0.5, 0.4, 0.1), (0.44, 0.49, 0.1), ], ids=[ "min_dur_greater_than_max_dur", "durations_OK_but_wrong_number_of_analysis_windows", ], ) def test_split_wrong_min_max_dur(min_dur, max_dur, analysis_window): with pytest.raises(ValueError) as val_err: split( b"0" * 16, min_dur=min_dur, max_dur=max_dur, max_silence=0.2, sr=16000, sw=1, ch=1, analysis_window=analysis_window, ) err_msg = "'min_dur' ({0} sec.) results in {1} analysis " err_msg += "window(s) ({1} == ceil({0} / {2})) which is " err_msg += "higher than the number of analysis window(s) for " err_msg += "'max_dur' ({3} == floor({4} / {2}))" err_msg = err_msg.format( min_dur, math.ceil(min_dur / analysis_window), analysis_window, math.floor(max_dur / analysis_window), max_dur, ) assert err_msg == str(val_err.value) @pytest.mark.parametrize( "max_silence, max_dur, analysis_window", [ (0.5, 0.5, 0.1), (0.5, 0.4, 0.1), (0.44, 0.49, 0.1), ], ids=[ "max_silence_equals_max_dur", "max_silence_greater_than_max_dur", "durations_OK_but_wrong_number_of_analysis_windows", ], ) def test_split_wrong_max_silence_max_dur(max_silence, max_dur, analysis_window): with pytest.raises(ValueError) as val_err: split( b"0" * 16, min_dur=0.2, max_dur=max_dur, max_silence=max_silence, sr=16000, sw=1, ch=1, analysis_window=analysis_window, ) err_msg = "'max_silence' ({0} sec.) results in {1} analysis " err_msg += "window(s) ({1} == floor({0} / {2})) which is " err_msg += "higher or equal to the number of analysis window(s) for " err_msg += "'max_dur' ({3} == floor({4} / {2}))" err_msg = err_msg.format( max_silence, math.floor(max_silence / analysis_window), analysis_window, math.floor(max_dur / analysis_window), max_dur, ) assert err_msg == str(val_err.value) @pytest.mark.parametrize( "wrong_param", [ {"min_dur": -1}, {"min_dur": 0}, {"max_dur": -1}, {"max_dur": 0}, {"max_silence": -1}, {"analysis_window": 0}, {"analysis_window": -1}, ], ids=[ "negative_min_dur", "zero_min_dur", "negative_max_dur", "zero_max_dur", "negative_max_silence", "zero_analysis_window", "negative_analysis_window", ], ) def test_split_negative_temporal_params(wrong_param): params = { "min_dur": 0.2, "max_dur": 0.5, "max_silence": 0.1, "analysis_window": 0.1, } params.update(wrong_param) with pytest.raises(ValueError) as val_err: split(None, **params) name = set(wrong_param).pop() value = wrong_param[name] err_msg = "'{}' ({}) must be >{} 0".format( name, value, "=" if name == "max_silence" else "" ) assert err_msg == str(val_err.value) def test_split_too_small_analysis_window(): with pytest.raises(ValueError) as val_err: split(b"", sr=10, sw=1, ch=1, analysis_window=0.09) err_msg = "Too small 'analysis_window' (0.09) for sampling rate (10)." err_msg += " Analysis window should at least be 1/10 to cover one " err_msg += "data sample" assert err_msg == str(val_err.value) def test_split_and_plot(): with open("tests/data/test_split_10HZ_mono.raw", "rb") as fp: data = fp.read() region = AudioRegion(data, 10, 2, 1) with patch("auditok.plotting.plot") as patch_fn: regions = region.split_and_plot( min_dur=0.2, max_dur=5, max_silence=0.2, drop_trailing_silence=False, strict_min_dur=False, analysis_window=0.1, sr=10, sw=2, ch=1, eth=50, ) assert patch_fn.called expected = [(2, 16), (17, 31), (34, 76)] sample_width = 2 expected_regions = [] for onset, offset in expected: onset *= sample_width offset *= sample_width expected_regions.append(AudioRegion(data[onset:offset], 10, 2, 1)) assert regions == expected_regions def test_split_exception(): with open("tests/data/test_split_10HZ_mono.raw", "rb") as fp: data = fp.read() region = AudioRegion(data, 10, 2, 1) with pytest.raises(RuntimeWarning): # max_read is not accepted when calling AudioRegion.split region.split(max_read=2) @pytest.mark.parametrize( "data, start, sampling_rate, sample_width, channels, expected_end, expected_duration_s, expected_duration_ms", [ (b"\0" * 8000, 0, 8000, 1, 1, 1, 1, 1000), (b"\0" * 7992, 0, 8000, 1, 1, 0.999, 0.999, 999), (b"\0" * 7994, 0, 8000, 1, 1, 0.99925, 0.99925, 999), (b"\0" * 7996, 0, 8000, 1, 1, 0.9995, 0.9995, 1000), (b"\0" * 7998, 0, 8000, 1, 1, 0.99975, 0.99975, 1000), (b"\0" * 8000 * 2, 0, 8000, 2, 1, 1, 1, 1000), (b"\0" * 8000 * 2, 0, 8000, 1, 2, 1, 1, 1000), (b"\0" * 8000 * 5, 0, 8000, 1, 5, 1, 1, 1000), (b"\0" * 8000 * 2 * 5, 0, 8000, 2, 5, 1, 1, 1000), (b"\0" * 7992 * 2 * 5, 0, 8000, 2, 5, 0.999, 0.999, 999), (b"\0" * 7994 * 2 * 5, 0, 8000, 2, 5, 0.99925, 0.99925, 999), (b"\0" * 7996 * 2 * 5, 0, 8000, 2, 5, 0.9995, 0.9995, 1000), (b"\0" * 7998 * 2 * 5, 0, 8000, 2, 5, 0.99975, 0.99975, 1000), (b"\0" * int(8000 * 1.33), 2.7, 8000, 1, 1, 4.03, 1.33, 1330), (b"\0" * int(8000 * 0.476), 11.568, 8000, 1, 1, 12.044, 0.476, 476), ( b"\0" * int(8000 * 1.711) * 2 * 3, 9.415, 8000, 2, 3, 11.126, 1.711, 1711, ), ( b"\0" * int(3172 * 1.318), 17.236, 3172, 1, 1, 17.236 + int(3172 * 1.318) / 3172, int(3172 * 1.318) / 3172, 1318, ), ( b"\0" * int(11317 * 0.716) * 2 * 3, 18.811, 11317, 2, 3, 18.811 + int(11317 * 0.716) / 11317, int(11317 * 0.716) / 11317, 716, ), ], ids=[ "simple", "one_ms_less_than_1_sec", "tree_quarter_ms_less_than_1_sec", "half_ms_less_than_1_sec", "quarter_ms_less_than_1_sec", "simple_sample_width_2", "simple_stereo", "simple_multichannel", "simple_sample_width_2_multichannel", "one_ms_less_than_1s_sw_2_multichannel", "tree_qrt_ms_lt_1_s_sw_2_multichannel", "half_ms_lt_1s_sw_2_multichannel", "quarter_ms_lt_1s_sw_2_multichannel", "arbitrary_length_1", "arbitrary_length_2", "arbitrary_length_sw_2_multichannel", "arbitrary_samplig_rate", "arbitrary_sr_sw_2_multichannel", ], ) def test_creation( data, start, sampling_rate, sample_width, channels, expected_end, expected_duration_s, expected_duration_ms, ): meta = {"start": start, "end": expected_end} region = AudioRegion(data, sampling_rate, sample_width, channels, meta) assert region.sampling_rate == sampling_rate assert region.sr == sampling_rate assert region.sample_width == sample_width assert region.sw == sample_width assert region.channels == channels assert region.ch == channels assert region.meta.start == start assert region.meta.end == expected_end assert region.duration == expected_duration_s assert len(region.ms) == expected_duration_ms assert bytes(region) == data def test_creation_invalid_data_exception(): with pytest.raises(AudioParameterError) as audio_param_err: _ = AudioRegion( data=b"ABCDEFGHI", sampling_rate=8, sample_width=2, channels=1 ) assert str(audio_param_err.value) == ( "The length of audio data must be an integer " "multiple of `sample_width * channels`" ) @pytest.mark.parametrize( "skip, max_read, channels", [ (0, -1, 1), (0, -1, 2), (2, -1, 1), (2, None, 1), (2, 3, 1), (2, 3.5, 2), (2.4, 3.5, 2), ], ids=[ "no_skip_read_all", "no_skip_read_all_stereo", "skip_2_read_all", "skip_2_read_all_None", "skip_2_read_3", "skip_2_read_3_5_stereo", "skip_2_4_read_3_5_stereo", ], ) def test_load_AudioRegion(skip, max_read, channels): sampling_rate = 10 sample_width = 2 filename = "tests/data/test_split_10HZ_{}.raw" filename = filename.format("mono" if channels == 1 else "stereo") region = AudioRegion.load( filename, skip=skip, max_read=max_read, sr=sampling_rate, sw=sample_width, ch=channels, ) with open(filename, "rb") as fp: fp.read(round(skip * sampling_rate * sample_width * channels)) if max_read is None or max_read < 0: to_read = -1 else: to_read = round(max_read * sampling_rate * sample_width * channels) expected = fp.read(to_read) assert bytes(region) == expected def test_load_from_microphone(): with patch("auditok.io.PyAudioSource") as patch_pyaudio_source: with patch("auditok.core.AudioReader.read") as patch_reader: patch_reader.return_value = None with patch( "auditok.core.AudioRegion.__init__" ) as patch_AudioRegion: patch_AudioRegion.return_value = None AudioRegion.load(None, skip=0, max_read=5, sr=16000, sw=2, ch=1) assert patch_pyaudio_source.called assert patch_reader.called assert patch_AudioRegion.called @pytest.mark.parametrize( "max_read", [ None, -1, ], ids=[ "none", "negative", ], ) def test_load_from_microphone_without_max_read_exception(max_read): with pytest.raises(ValueError) as val_err: AudioRegion.load(None, max_read=max_read, sr=16000, sw=2, ch=1) assert str(val_err.value) == ( "'max_read' should not be None when reading from microphone" ) def test_load_from_microphone_with_nonzero_skip_exception(): with pytest.raises(ValueError) as val_err: AudioRegion.load(None, skip=1, max_read=5, sr=16000, sw=2, ch=1) assert str(val_err.value) == ( "'skip' should be 0 when reading from microphone" ) @pytest.mark.parametrize( "format, start, expected", [ ("output.wav", 1.230, "output.wav"), ("output_{meta.start:g}.wav", 1.230, "output_1.23.wav"), ("output_{meta.start}.wav", 1.233712, "output_1.233712.wav"), ("output_{meta.start:.2f}.wav", 1.2300001, "output_1.23.wav"), ("output_{meta.start:.3f}.wav", 1.233712, "output_1.234.wav"), ("output_{meta.start:.8f}.wav", 1.233712, "output_1.23371200.wav"), ( "output_{meta.start}_{meta.end}_{duration}.wav", 1.455, "output_1.455_2.455_1.0.wav", ), ( "output_{meta.start}_{meta.end}_{duration}.wav", 1.455321, "output_1.455321_2.455321_1.0.wav", ), ], ids=[ "simple", "start", "start_2", "start_3", "start_4", "start_5", "start_end_duration", "start_end_duration_2", ], ) def test_save(format, start, expected): with TemporaryDirectory() as tmpdir: region = AudioRegion(b"0" * 160, 160, 1, 1) meta = {"start": start, "end": start + region.duration} region.meta = meta format = os.path.join(tmpdir, format) filename = region.save(format)[len(tmpdir) + 1 :] assert filename == expected def test_save_file_exists_exception(): with TemporaryDirectory() as tmpdir: filename = os.path.join(tmpdir, "output.wav") open(filename, "w").close() region = AudioRegion(b"0" * 160, 160, 1, 1) with pytest.raises(FileExistsError): region.save(filename, exists_ok=False) @pytest.mark.parametrize( "region, slice_, expected_data", [ ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, 500), b"a" * 80, ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(500, None), b"b" * 80, ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-500, None), b"b" * 80, ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(200, 750), b"a" * 48 + b"b" * 40, ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-800, -250), b"a" * 48 + b"b" * 40, ), ( AudioRegion(b"a" * 160 + b"b" * 160, 160, 2, 1), slice(200, 750), b"a" * 96 + b"b" * 80, ), ( AudioRegion(b"a" * 160 + b"b" * 160, 160, 1, 2), slice(200, 750), b"a" * 96 + b"b" * 80, ), ( AudioRegion(b"a" * 320 + b"b" * 320, 160, 2, 2), slice(200, 750), b"a" * 192 + b"b" * 160, ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(1, None), b"a" * (4000 - 8) + b"b" * 4000, ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(-999, None), b"a" * (4000 - 8) + b"b" * 4000, ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(0, 999), b"a" * 4000 + b"b" * (4000 - 8), ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(0, -1), b"a" * 4000 + b"b" * (4000 - 8), ), (AudioRegion(b"a" * 160, 160, 1, 1), slice(-5000, None), b"a" * 160), (AudioRegion(b"a" * 160, 160, 1, 1), slice(None, -1500), b""), (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, 0), b""), (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(200, 100), b""), (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(2000, 3000), b""), (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-100, -200), b""), (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, -2000), b""), ( AudioRegion(b"a" * 124 + b"b" * 376, 1234, 1, 1), slice(100, 200), b"a" + b"b" * 123, ), ], ids=[ "first_half", "second_half", "second_half_negative", "middle", "middle_negative", "middle_sw2", "middle_ch2", "middle_sw2_ch2", "but_first_sample", "but_first_sample_negative", "but_last_sample", "but_last_sample_negative", "big_negative_start", "big_negative_stop", "empty", "empty_start_stop_reversed", "empty_big_positive_start", "empty_negative_reversed", "empty_big_negative_stop", "arbitrary_sampling_rate", ], ) def test_region_temporal_slicing(region, slice_, expected_data): sub_region = region.millis[slice_] assert bytes(sub_region) == expected_data start_sec = slice_.start / 1000 if slice_.start is not None else None stop_sec = slice_.stop / 1000 if slice_.stop is not None else None sub_region = region.sec[start_sec:stop_sec] assert bytes(sub_region) == expected_data @pytest.mark.parametrize( "region, slice_, time_shift, expected_data", [ ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, 80), 0, b"a" * 80, ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(80, None), 0.5, b"b" * 80, ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-80, None), 0.5, b"b" * 80, ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(160 // 5, 160 // 4 * 3), 0.2, b"a" * 48 + b"b" * 40, ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-160 // 5 * 4, -160 // 4), 0.2, b"a" * 48 + b"b" * 40, ), ( AudioRegion(b"a" * 160 + b"b" * 160, 160, 2, 1), slice(160 // 5, 160 // 4 * 3), 0.2, b"a" * 96 + b"b" * 80, ), ( AudioRegion(b"a" * 160 + b"b" * 160, 160, 1, 2), slice(160 // 5, 160 // 4 * 3), 0.2, b"a" * 96 + b"b" * 80, ), ( AudioRegion(b"a" * 320 + b"b" * 320, 160, 2, 2), slice(160 // 5, 160 // 4 * 3), 0.2, b"a" * 192 + b"b" * 160, ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(1, None), 1 / 8000, b"a" * (4000 - 1) + b"b" * 4000, ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(-7999, None), 1 / 8000, b"a" * (4000 - 1) + b"b" * 4000, ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(0, 7999), 0, b"a" * 4000 + b"b" * (4000 - 1), ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(0, -1), 0, b"a" * 4000 + b"b" * (4000 - 1), ), (AudioRegion(b"a" * 160, 160, 1, 1), slice(-1600, None), 0, b"a" * 160), (AudioRegion(b"a" * 160, 160, 1, 1), slice(None, -1600), 0, b""), (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, 0), 0, b""), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(80, 40), 0.5, b"", ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(1600, 3000), 10, b"", ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-16, -32), 0.9, b"", ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, -2000), 0, b"", ), ( AudioRegion(b"a" * 124 + b"b" * 376, 1235, 1, 1), slice(100, 200), 100 / 1235, b"a" * 24 + b"b" * 76, ), ( AudioRegion(b"a" * 124 + b"b" * 376, 1235, 2, 2), slice(25, 50), 25 / 1235, b"a" * 24 + b"b" * 76, ), ], ids=[ "first_half", "second_half", "second_half_negative", "middle", "middle_negative", "middle_sw2", "middle_ch2", "middle_sw2_ch2", "but_first_sample", "but_first_sample_negative", "but_last_sample", "but_last_sample_negative", "big_negative_start", "big_negative_stop", "empty", "empty_start_stop_reversed", "empty_big_positive_start", "empty_negative_reversed", "empty_big_negative_stop", "arbitrary_sampling_rate", "arbitrary_sampling_rate_middle_sw2_ch2", ], ) def test_region_sample_slicing(region, slice_, time_shift, expected_data): sub_region = region[slice_] assert bytes(sub_region) == expected_data @pytest.mark.parametrize( "sampling_rate, sample_width, channels", [ (8000, 1, 1), (8000, 2, 2), (5413, 2, 3), ], ids=[ "simple", "stereo_sw_2", "arbitrary_sr_multichannel", ], ) def test_concatenation(sampling_rate, sample_width, channels): region_1, region_2 = _make_random_length_regions( [b"a", b"b"], sampling_rate, sample_width, channels ) expected_duration = region_1.duration + region_2.duration expected_data = bytes(region_1) + bytes(region_2) concat_region = region_1 + region_2 assert concat_region.duration == pytest.approx(expected_duration, abs=1e-6) assert bytes(concat_region) == expected_data @pytest.mark.parametrize( "sampling_rate, sample_width, channels", [ (8000, 1, 1), (8000, 2, 2), (5413, 2, 3), ], ids=[ "simple", "stereo_sw_2", "arbitrary_sr_multichannel", ], ) def test_concatenation_many(sampling_rate, sample_width, channels): regions = _make_random_length_regions( [b"a", b"b", b"c"], sampling_rate, sample_width, channels ) expected_duration = sum(r.duration for r in regions) expected_data = b"".join(bytes(r) for r in regions) concat_region = sum(regions) assert concat_region.duration == pytest.approx(expected_duration, abs=1e-6) assert bytes(concat_region) == expected_data def test_concatenation_different_sampling_rate_error(): region_1 = AudioRegion(b"a" * 100, 8000, 1, 1) region_2 = AudioRegion(b"b" * 100, 3000, 1, 1) with pytest.raises(ValueError) as val_err: region_1 + region_2 assert str(val_err.value) == ( "Can only concatenate AudioRegions of the same " "sampling rate (8000 != 3000)" ) def test_concatenation_different_sample_width_error(): region_1 = AudioRegion(b"a" * 100, 8000, 2, 1) region_2 = AudioRegion(b"b" * 100, 8000, 4, 1) with pytest.raises(ValueError) as val_err: region_1 + region_2 assert str(val_err.value) == ( "Can only concatenate AudioRegions of the same " "sample width (2 != 4)" ) def test_concatenation_different_number_of_channels_error(): region_1 = AudioRegion(b"a" * 100, 8000, 1, 1) region_2 = AudioRegion(b"b" * 100, 8000, 1, 2) with pytest.raises(ValueError) as val_err: region_1 + region_2 assert str(val_err.value) == ( "Can only concatenate AudioRegions of the same " "number of channels (1 != 2)" ) @pytest.mark.parametrize( "duration, expected_duration, expected_len, expected_len_ms", [ (0.01, 0.03, 240, 30), (0.00575, 0.01725, 138, 17), (0.00625, 0.01875, 150, 19), ], ids=[ "simple", "rounded_len_floor", "rounded_len_ceil", ], ) def test_multiplication( duration, expected_duration, expected_len, expected_len_ms ): sw = 2 data = b"0" * int(duration * 8000 * sw) region = AudioRegion(data, 8000, sw, 1) m_region = 1 * region * 3 assert bytes(m_region) == data * 3 assert m_region.sr == 8000 assert m_region.sw == 2 assert m_region.ch == 1 assert m_region.duration == expected_duration assert len(m_region) == expected_len assert m_region.len == expected_len assert m_region.s.len == expected_duration assert len(m_region.ms) == expected_len_ms assert m_region.ms.len == expected_len_ms @pytest.mark.parametrize( "factor, _type", [ ("x", "str"), (1.4, "float"), ], ids=[ "_str", "_float", ], ) def test_multiplication_non_int(factor, _type): with pytest.raises(TypeError) as type_err: AudioRegion(b"0" * 80, 8000, 1, 1) * factor err_msg = "Can't multiply AudioRegion by a non-int of type '{}'" assert err_msg.format(_type) == str(type_err.value) @pytest.mark.parametrize( "data", [ [b"a" * 80, b"b" * 80], [b"a" * 31, b"b" * 31, b"c" * 30], [b"a" * 31, b"b" * 30, b"c" * 30], [b"a" * 11, b"b" * 11, b"c" * 10, b"c" * 10], ], ids=[ "simple", "extra_samples_1", "extra_samples_2", "extra_samples_3", ], ) def test_truediv(data): region = AudioRegion(b"".join(data), 80, 1, 1) sub_regions = region / len(data) for data_i, region in zip(data, sub_regions): assert len(data_i) == len(bytes(region)) @pytest.mark.parametrize( "data, sample_width, channels, fmt, expected", [ (b"a" * 10, 1, 1, "b", [97] * 10), (b"a" * 10, 2, 1, "h", [24929] * 5), (b"a" * 8, 4, 1, "i", [1633771873] * 2), (b"ab" * 5, 1, 2, "b", [[97] * 5, [98] * 5]), ], ids=[ "mono_sw_1", "mono_sw_2", "mono_sw_4", "stereo_sw_1", ], ) def test_samples(data, sample_width, channels, fmt, expected): region = AudioRegion(data, 10, sample_width, channels) if isinstance(expected[0], list): expected = [array_(fmt, exp) for exp in expected] else: expected = array_(fmt, expected) samples = region.samples equal = samples == expected try: # for numpy equal = equal.all() except AttributeError: pass assert equal