Mercurial > hg > auditok
changeset 405:f56b4d8adfb8
Use numpy instead of audioop everywhere
author | Amine Sehili <amine.sehili@gmail.com> |
---|---|
date | Mon, 17 Jun 2024 19:45:51 +0200 |
parents | 08a7af37f2e9 |
children | 79bd3de43a5b |
files | auditok/signal.py auditok/signal_numpy.py auditok/util.py tests/test_AudioSource.py tests/test_core.py tests/test_io.py tests/test_signal.py tests/test_util.py |
diffstat | 8 files changed, 1052 insertions(+), 1027 deletions(-) [+] |
line wrap: on
line diff
--- a/auditok/signal.py Sun May 26 23:29:33 2024 +0200 +++ b/auditok/signal.py Mon Jun 17 19:45:51 2024 +0200 @@ -1,140 +1,48 @@ -""" -Module for basic audio signal processing and array operations. +import numpy as np -.. autosummary:: - :toctree: generated/ +SAMPLE_WIDTH_TO_DTYPE = {1: np.int8, 2: np.int16, 4: np.int32} +EPSILON = 1e-10 - to_array - extract_single_channel - compute_average_channel - compute_average_channel_stereo - separate_channels - calculate_energy_single_channel - calculate_energy_multichannel -""" -import audioop -import math -from array import array as array_ - -FORMAT = {1: "b", 2: "h", 4: "i"} -_EPSILON = 1e-10 +def _get_numpy_dtype(sample_width): + """Helper function to convert sample with to the corresponding numpy type.""" + dtype = SAMPLE_WIDTH_TO_DTYPE.get(sample_width) + if dtype is None: + err_msg = "'sample_width' must be 1, 2 or 4, given: {}" + raise ValueError(err_msg.format(sample_width)) + return dtype def to_array(data, sample_width, channels): - """Extract individual channels of audio data and return a list of arrays of - numeric samples. This will always return a list of `array.array` objects - (one per channel) even if audio data is mono. + """ + Convert raw audio data into a NumPy array. + + The returned array will have a data type of `numpy.float64` regardless of + the sample width. Parameters ---------- data : bytes - raw audio data. + The raw audio data. sample_width : int - size in bytes of one audio sample (one channel considered). + The sample width (in bytes) of each audio sample. + channels : int + The number of audio channels. Returns ------- - samples_arrays : list - list of arrays of audio samples. + numpy.ndarray + A 2-D NumPy array representing the audio data. The array will have a + shape of (number of channels, number of samples) and will be of data + type `numpy.float64`. """ - fmt = FORMAT[sample_width] - if channels == 1: - return [array_(fmt, data)] - return separate_channels(data, fmt, channels) + dtype = _get_numpy_dtype(sample_width) + array = np.frombuffer(data, dtype=dtype).astype(np.float64) + return array.reshape(channels, -1, order="F") -def extract_single_channel(data, fmt, channels, selected): - samples = array_(fmt, data) - return samples[selected::channels] - - -def compute_average_channel(data, fmt, channels): - """ - Compute and return average channel of multi-channel audio data. If the - number of channels is 2, use :func:`compute_average_channel_stereo` (much - faster). This function uses satandard `array` module to convert `bytes` data - into an array of numeric values. - - Parameters - ---------- - data : bytes - multi-channel audio data to mix down. - fmt : str - format (single character) to pass to `array.array` to convert `data` - into an array of samples. This should be "b" if audio data's sample width - is 1, "h" if it's 2 and "i" if it's 4. - channels : int - number of channels of audio data. - - Returns - ------- - mono_audio : bytes - mixed down audio data. - """ - all_channels = array_(fmt, data) - mono_channels = [ - array_(fmt, all_channels[ch::channels]) for ch in range(channels) - ] - avg_arr = array_( - fmt, - (round(sum(samples) / channels) for samples in zip(*mono_channels)), - ) - return avg_arr - - -def compute_average_channel_stereo(data, sample_width): - """Compute and return average channel of stereo audio data. This function - should be used when the number of channels is exactly 2 because in that - case we can use standard `audioop` module which *much* faster then calling - :func:`compute_average_channel`. - - Parameters - ---------- - data : bytes - 2-channel audio data to mix down. - sample_width : int - size in bytes of one audio sample (one channel considered). - - Returns - ------- - mono_audio : bytes - mixed down audio data. - """ - fmt = FORMAT[sample_width] - arr = array_(fmt, audioop.tomono(data, sample_width, 0.5, 0.5)) - return arr - - -def separate_channels(data, fmt, channels): - """Create a list of arrays of audio samples (`array.array` objects), one for - each channel. - - Parameters - ---------- - data : bytes - multi-channel audio data to mix down. - fmt : str - format (single character) to pass to `array.array` to convert `data` - into an array of samples. This should be "b" if audio data's sample width - is 1, "h" if it's 2 and "i" if it's 4. - channels : int - number of channels of audio data. - - Returns - ------- - channels_arr : list - list of audio channels, each as a standard `array.array`. - """ - all_channels = array_(fmt, data) - mono_channels = [ - array_(fmt, all_channels[ch::channels]) for ch in range(channels) - ] - return mono_channels - - -def calculate_energy_single_channel(data, sample_width): - """Calculate the energy of mono audio data. Energy is computed as: +def calculate_energy(x, agg_fn=None): + """Calculate the energy of audio data. The energy is calculated as: .. math:: energy = 20 \log(\sqrt({1}/{N}\sum_{i}^{N}{a_i}^2)) % # noqa: W605 @@ -143,38 +51,23 @@ Parameters ---------- - data : bytes - single-channel audio data. - sample_width : int - size in bytes of one audio sample. + x : array + array of audio data. + agg_fn : callable + aggregation function to use for multi-channel data. If None, the energy + will be computed and returned for each channel separately. + Returns ------- - energy : float - energy of audio signal. + energy : float, numpy.ndarray + energy of audio signal. If x is multichannel and agg_fn is None, this + an array of energies, one per channel. """ - energy_sqrt = max(audioop.rms(data, sample_width), _EPSILON) - return 20 * math.log10(energy_sqrt) - - -def calculate_energy_multichannel(x, sample_width, aggregation_fn=max): - """Calculate the energy of multi-channel audio data. Energy is calculated - channel-wise. An aggregation function is applied to the resulting energies - (default: `max`). Also see :func:`calculate_energy_single_channel`. - - Parameters - ---------- - data : bytes - single-channel audio data. - sample_width : int - size in bytes of one audio sample (one channel considered). - aggregation_fn : callable, default: max - aggregation function to apply to the resulting per-channel energies. - - Returns - ------- - energy : float - aggregated energy of multi-channel audio signal. - """ - energies = (calculate_energy_single_channel(xi, sample_width) for xi in x) - return aggregation_fn(energies) + x = np.array(x).astype(np.float64) + energy_sqrt = np.sqrt(np.mean(x**2, axis=-1)) + energy_sqrt = np.clip(energy_sqrt, a_min=EPSILON, a_max=None) + energy = 20 * np.log10(energy_sqrt) + if agg_fn is not None: + energy = agg_fn(energy) + return energy
--- a/auditok/signal_numpy.py Sun May 26 23:29:33 2024 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,31 +0,0 @@ -import numpy as np - -from .signal import ( - calculate_energy_multichannel, - calculate_energy_single_channel, - compute_average_channel_stereo, -) - -FORMAT = {1: np.int8, 2: np.int16, 4: np.int32} - - -def to_array(data, sample_width, channels): - fmt = FORMAT[sample_width] - if channels == 1: - return np.frombuffer(data, dtype=fmt).astype(np.float64) - return separate_channels(data, fmt, channels).astype(np.float64) - - -def extract_single_channel(data, fmt, channels, selected): - samples = np.frombuffer(data, dtype=fmt) - return np.asanyarray(samples[selected::channels], order="C") - - -def compute_average_channel(data, fmt, channels): - array = np.frombuffer(data, dtype=fmt).astype(np.float64) - return array.reshape(-1, channels).mean(axis=1).round().astype(fmt) - - -def separate_channels(data, fmt, channels): - array = np.frombuffer(data, dtype=fmt) - return np.asanyarray(array.reshape(-1, channels).T, order="C")
--- a/auditok/util.py Sun May 26 23:29:33 2024 +0200 +++ b/auditok/util.py Mon Jun 17 19:45:51 2024 +0200 @@ -13,6 +13,9 @@ from abc import ABC, abstractmethod from functools import partial +import numpy as np + +from . import signal from .exceptions import TimeFormatError, TooSmallBlockDuration from .io import ( AudioIOError, @@ -23,12 +26,6 @@ get_audio_source, ) -try: - from . import signal_numpy as signal -except ImportError: - from . import signal - - __all__ = [ "make_duration_formatter", "make_channel_selector", @@ -142,13 +139,13 @@ Importantly, if `selected` is None or equals "any", `selector(audio_data)` will separate and return a list of available channels: - `[data_channe_1, data_channe_2, ...].` + `[data_channel_1, data_channel_2, ...].` Note also that returned `selector` expects `bytes` format for input data but - does notnecessarily return a `bytes` object. In fact, in order to extract + does not necessarily return a `bytes` object. In fact, in order to extract the desired channel (or compute the average channel if `selected` = "avg"), it first converts input data into a `array.array` (or `numpy.ndarray`) - object. After channel of interst is selected/computed, it is returned as + object. After the channel of interest is selected/computed, it is returned as such, without any reconversion to `bytes`. This behavior is wanted for efficiency purposes because returned objects can be directly used as buffers of bytes. In any case, returned objects can be converted back to `bytes` @@ -176,7 +173,7 @@ ------- selector : callable a callable that can be used as `selector(audio_data)` and returns data - that contains channel of interst. + that contains channel of interest. Raises ------ @@ -184,12 +181,11 @@ if `sample_width` is not one of 1, 2 or 4, or if `selected` has an unexpected value. """ - fmt = signal.FORMAT.get(sample_width) - if fmt is None: - err_msg = "'sample_width' must be 1, 2 or 4, given: {}" - raise ValueError(err_msg.format(sample_width)) - if channels == 1: - return lambda x: x + to_array_ = partial( + signal.to_array, sample_width=sample_width, channels=channels + ) + if channels == 1 or selected in (None, "any"): + return to_array_ if isinstance(selected, int): if selected < 0: @@ -198,27 +194,10 @@ err_msg = "Selected channel must be >= -channels and < channels" err_msg += ", given: {}" raise ValueError(err_msg.format(selected)) - return partial( - signal.extract_single_channel, - fmt=fmt, - channels=channels, - selected=selected, - ) + return lambda x: to_array_(x)[selected] if selected in ("mix", "avg", "average"): - if channels == 2: - # when data is stereo, using audioop when possible is much faster - return partial( - signal.compute_average_channel_stereo, - sample_width=sample_width, - ) - - return partial( - signal.compute_average_channel, fmt=fmt, channels=channels - ) - - if selected in (None, "any"): - return partial(signal.separate_channels, fmt=fmt, channels=channels) + return lambda x: to_array_(x).mean(axis=0) raise ValueError( "Selected channel must be an integer, None (alias 'any') or 'average' " @@ -292,15 +271,12 @@ def __init__( self, energy_threshold, sample_width, channels, use_channel=None ): + self._energy_threshold = energy_threshold self._sample_width = sample_width self._selector = make_channel_selector( sample_width, channels, use_channel ) - if channels == 1 or use_channel not in (None, "any"): - self._energy_fn = signal.calculate_energy_single_channel - else: - self._energy_fn = signal.calculate_energy_multichannel - self._energy_threshold = energy_threshold + self._energy_agg_fn = np.max if use_channel in (None, "any") else None def is_valid(self, data): """ @@ -315,7 +291,9 @@ bool True if the energy of audio data is >= threshold, False otherwise. """ - log_energy = self._energy_fn(self._selector(data), self._sample_width) + log_energy = signal.calculate_energy( + self._selector(data), self._energy_agg_fn + ) return log_energy >= self._energy_threshold @@ -697,7 +675,6 @@ if max_read is not None: input = _Limiter(input, max_read) self._max_read = max_read - # TODO: warning if block_dur and hop_dur yield the same size in terms of nb samples if hop_dur is None or hop_dur == block_dur: input = _FixedSizeAudioReader(input, block_dur) else:
--- a/tests/test_AudioSource.py Sun May 26 23:29:33 2024 +0200 +++ b/tests/test_AudioSource.py Mon Jun 17 19:45:51 2024 +0200 @@ -4,16 +4,77 @@ from array import array +import numpy as np import pytest -from test_util import PURE_TONE_DICT, _sample_generator from auditok.io import ( + AudioIOError, AudioParameterError, BufferAudioSource, RawAudioSource, WaveAudioSource, ) -from auditok.signal import FORMAT +from auditok.signal import SAMPLE_WIDTH_TO_DTYPE + + +def _sample_generator(*data_buffers): + """ + Takes a list of many mono audio data buffers and makes a sample generator + of interleaved audio samples, one sample from each channel. The resulting + generator can be used to build a multichannel audio buffer. + >>> gen = _sample_generator("abcd", "ABCD") + >>> list(gen) + ["a", "A", "b", "B", "c", "C", "d", "D"] + """ + frame_gen = zip(*data_buffers, strict=True) + return (sample for frame in frame_gen for sample in frame) + + +def _generate_pure_tone( + frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4 +): + """ + Generates a pure tone with the given frequency. + """ + assert frequency <= sampling_rate / 2 + max_value = (2 ** (sample_width * 8) // 2) - 1 + if volume > max_value: + volume = max_value + dtype = SAMPLE_WIDTH_TO_DTYPE[sample_width] + total_samples = int(sampling_rate * duration_sec) + step = frequency / sampling_rate + two_pi_step = 2 * np.pi * step + data = np.array( + [int(np.sin(two_pi_step * i) * volume) for i in range(total_samples)] + ).astype(dtype) + return data + + +@pytest.fixture +def pure_tone_data(freq): + + PURE_TONE_DICT = { + freq: _generate_pure_tone(freq, 1, 16000, 2) + for freq in (400, 800, 1600) + } + PURE_TONE_DICT.update( + { + freq: _generate_pure_tone(freq, 0.1, 16000, 2) + for freq in (600, 1150, 2400, 7220) + } + ) + return PURE_TONE_DICT[freq] + + +PURE_TONE_DICT = { + freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600) +} +PURE_TONE_DICT.update( + { + freq: _generate_pure_tone(freq, 0.1, 16000, 2) + for freq in (600, 1150, 2400, 7220) + } +) def audio_source_read_all_gen(audio_source, size=None): @@ -65,8 +126,8 @@ data_read_all = b"".join(audio_source_read_all_gen(audio_source)) audio_source.close() mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - fmt = FORMAT[audio_source.sample_width] - expected = array(fmt, _sample_generator(*mono_channels)).tobytes() + dtype = SAMPLE_WIDTH_TO_DTYPE[audio_source.sample_width] + expected = np.fromiter(_sample_generator(*mono_channels), dtype).tobytes() assert data_read_all == expected @@ -100,8 +161,8 @@ data = b"".join(audio_source_read_all_gen(audio_source)) audio_source.close() mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - fmt = FORMAT[audio_source.sample_width] - expected = array(fmt, _sample_generator(*mono_channels)).tobytes() + dtype = SAMPLE_WIDTH_TO_DTYPE[audio_source.sample_width] + expected = np.fromiter(_sample_generator(*mono_channels), dtype).tobytes() assert data == expected @@ -348,7 +409,7 @@ def test_sr10_sw1_ch1_read_closed(self): self.audio_source.close() - with pytest.raises(Exception): + with pytest.raises(AudioIOError): self.audio_source.read(1) @@ -658,10 +719,9 @@ _ = BufferAudioSource( data=b"ABCDEFGHI", sampling_rate=8, sample_width=2, channels=1 ) - assert ( - str(audio_param_err.value) - == "The length of audio data must be an integer multiple of `sample_width * channels`" - ) + msg = "The length of audio data must be an integer multiple of " + msg += "`sample_width * channels`" + assert str(audio_param_err.value) == msg class TestAudioSourceProperties: @@ -689,7 +749,11 @@ with pytest.raises(AttributeError): a_source.sampling_rate = 16000 + + with pytest.raises(AttributeError): a_source.sample_width = 1 + + with pytest.raises(AttributeError): a_source.channels = 2 @@ -718,5 +782,9 @@ with pytest.raises(AttributeError): a_source.sr = 16000 + + with pytest.raises(AttributeError): a_source.sw = 1 + + with pytest.raises(AttributeError): a_source.ch = 2
--- a/tests/test_core.py Sun May 26 23:29:33 2024 +0200 +++ b/tests/test_core.py Mon Jun 17 19:45:51 2024 +0200 @@ -1,10 +1,10 @@ import math import os -from array import array as array_ from random import random from tempfile import TemporaryDirectory from unittest.mock import Mock, patch +import numpy as np import pytest from auditok import AudioParameterError, AudioRegion, load, split @@ -15,6 +15,7 @@ _read_offline, ) from auditok.io import get_audio_source +from auditok.signal import to_array from auditok.util import AudioReader @@ -33,13 +34,13 @@ @pytest.mark.parametrize( "skip, max_read, channels", [ - (0, -1, 1), - (0, -1, 2), - (2, -1, 1), - (2, None, 1), - (2, 3, 1), - (2, 3.5, 2), - (2.4, 3.5, 2), + (0, -1, 1), # no_skip_read_all + (0, -1, 2), # no_skip_read_all_stereo + (2, -1, 1), # skip_2_read_all + (2, None, 1), # skip_2_read_all_None + (2, 3, 1), # skip_2_read_3 + (2, 3.5, 2), # skip_2_read_3_5_stereo + (2.4, 3.5, 2), # skip_2_4_read_3_5_stereo ], ids=[ "no_skip_read_all", @@ -77,15 +78,15 @@ @pytest.mark.parametrize( "duration, analysis_window, round_fn, expected, kwargs", [ - (0, 1, None, 0, None), - (0.3, 0.1, round, 3, None), - (0.35, 0.1, math.ceil, 4, None), - (0.35, 0.1, math.floor, 3, None), - (0.05, 0.1, round, 0, None), - (0.05, 0.1, math.ceil, 1, None), - (0.3, 0.1, math.floor, 3, {"epsilon": 1e-6}), - (-0.5, 0.1, math.ceil, ValueError, None), - (0.5, -0.1, math.ceil, ValueError, None), + (0, 1, None, 0, None), # zero_duration + (0.3, 0.1, round, 3, None), # multiple + (0.35, 0.1, math.ceil, 4, None), # not_multiple_ceil + (0.35, 0.1, math.floor, 3, None), # not_multiple_floor + (0.05, 0.1, round, 0, None), # small_duration + (0.05, 0.1, math.ceil, 1, None), # small_duration_ceil + (0.3, 0.1, math.floor, 3, {"epsilon": 1e-6}), # with_round_error + (-0.5, 0.1, math.ceil, ValueError, None), # negative_duration + (0.5, -0.1, math.ceil, ValueError, None), # negative_analysis_window ], ids=[ "zero_duration", @@ -117,14 +118,14 @@ @pytest.mark.parametrize( "channels, skip, max_read", [ - (1, 0, None), - (1, 3, None), - (1, 2, -1), - (1, 2, 3), - (2, 0, None), - (2, 3, None), - (2, 2, -1), - (2, 2, 3), + (1, 0, None), # mono_skip_0_max_read_None + (1, 3, None), # mono_skip_3_max_read_None + (1, 2, -1), # mono_skip_2_max_read_negative + (1, 2, 3), # mono_skip_2_max_read_3 + (2, 0, None), # stereo_skip_0_max_read_None + (2, 3, None), # stereo_skip_3_max_read_None + (2, 2, -1), # stereo_skip_2_max_read_negative + (2, 2, 3), # stereo_skip_2_max_read_3 ], ids=[ "mono_skip_0_max_read_None", @@ -165,9 +166,20 @@ @pytest.mark.parametrize( - "min_dur, max_dur, max_silence, drop_trailing_silence, strict_min_dur, kwargs, expected", + ( + "min_dur, max_dur, max_silence, drop_trailing_silence, " + + "strict_min_dur, kwargs, expected" + ), [ - (0.2, 5, 0.2, False, False, {"eth": 50}, [(2, 16), (17, 31), (34, 76)]), + ( + 0.2, + 5, + 0.2, + False, + False, + {"eth": 50}, + [(2, 16), (17, 31), (34, 76)], + ), # simple ( 0.3, 2, @@ -176,9 +188,9 @@ False, {"eth": 50}, [(2, 16), (17, 31), (34, 54), (54, 74), (74, 76)], - ), - (3, 5, 0.2, False, False, {"eth": 50}, [(34, 76)]), - (0.2, 80, 10, False, False, {"eth": 50}, [(2, 76)]), + ), # short_max_dur + (3, 5, 0.2, False, False, {"eth": 50}, [(34, 76)]), # long_min_dur + (0.2, 80, 10, False, False, {"eth": 50}, [(2, 76)]), # long_max_silence ( 0.2, 5, @@ -187,7 +199,7 @@ False, {"eth": 50}, [(2, 14), (17, 24), (26, 29), (34, 76)], - ), + ), # zero_max_silence ( 0.2, 5, @@ -196,11 +208,43 @@ False, {"energy_threshold": 40}, [(0, 50), (50, 76)], - ), - (0.2, 5, 0.2, False, False, {"energy_threshold": 60}, []), - (0.2, 10, 0.5, True, False, {"eth": 50}, [(2, 76)]), - (0.2, 5, 0.2, True, False, {"eth": 50}, [(2, 14), (17, 29), (34, 76)]), - (1.5, 5, 0.2, True, False, {"eth": 50}, [(34, 76)]), + ), # low_energy_threshold + ( + 0.2, + 5, + 0.2, + False, + False, + {"energy_threshold": 60}, + [], + ), # high_energy_threshold + ( + 0.2, + 10, + 0.5, + True, + False, + {"eth": 50}, + [(2, 76)], + ), # trim_leading_and_trailing_silence + ( + 0.2, + 5, + 0.2, + True, + False, + {"eth": 50}, + [(2, 14), (17, 29), (34, 76)], + ), # drop_trailing_silence + ( + 1.5, + 5, + 0.2, + True, + False, + {"eth": 50}, + [(34, 76)], + ), # drop_trailing_silence_2 ( 0.3, 2, @@ -209,7 +253,7 @@ True, {"eth": 50}, [(2, 16), (17, 31), (34, 54), (54, 74)], - ), + ), # strict_min_dur ], ids=[ "simple", @@ -272,7 +316,7 @@ assert len(regions_ar) == len(expected), err_msg sample_width = 2 - for reg, reg_ar, exp in zip(regions, regions_ar, expected): + for reg, reg_ar, exp in zip(regions, regions_ar, expected, strict=True): onset, offset = exp exp_data = data[onset * sample_width : offset * sample_width] assert bytes(reg) == exp_data @@ -282,21 +326,57 @@ @pytest.mark.parametrize( "channels, kwargs, expected", [ - (2, {}, [(2, 32), (34, 76)]), - (1, {"max_read": 5}, [(2, 16), (17, 31), (34, 50)]), - (1, {"mr": 5}, [(2, 16), (17, 31), (34, 50)]), - (1, {"eth": 50, "use_channel": 0}, [(2, 16), (17, 31), (34, 76)]), - (1, {"eth": 50, "uc": 1}, [(2, 16), (17, 31), (34, 76)]), - (1, {"eth": 50, "use_channel": None}, [(2, 16), (17, 31), (34, 76)]), - (2, {"eth": 50, "use_channel": 0}, [(2, 16), (17, 31), (34, 76)]), - (2, {"eth": 50}, [(2, 32), (34, 76)]), - (2, {"eth": 50, "use_channel": -2}, [(2, 16), (17, 31), (34, 76)]), - (2, {"eth": 50, "uc": 1}, [(10, 32), (36, 76)]), - (2, {"eth": 50, "uc": -1}, [(10, 32), (36, 76)]), - (1, {"eth": 50, "uc": "mix"}, [(2, 16), (17, 31), (34, 76)]), - (2, {"energy_threshold": 53.5, "use_channel": "mix"}, [(54, 76)]), - (2, {"eth": 52, "uc": "mix"}, [(17, 26), (54, 76)]), - (2, {"uc": "mix"}, [(10, 16), (17, 31), (36, 76)]), + (2, {}, [(2, 32), (34, 76)]), # stereo_all_default + (1, {"max_read": 5}, [(2, 16), (17, 31), (34, 50)]), # mono_max_read + ( + 1, + {"mr": 5}, + [(2, 16), (17, 31), (34, 50)], + ), # mono_max_read_short_name + ( + 1, + {"eth": 50, "use_channel": 0}, + [(2, 16), (17, 31), (34, 76)], + ), # mono_use_channel_1 + (1, {"eth": 50, "uc": 1}, [(2, 16), (17, 31), (34, 76)]), # mono_uc_1 + ( + 1, + {"eth": 50, "use_channel": None}, + [(2, 16), (17, 31), (34, 76)], + ), # mono_use_channel_None + ( + 2, + {"eth": 50, "use_channel": 0}, + [(2, 16), (17, 31), (34, 76)], + ), # stereo_use_channel_1 + ( + 2, + {"eth": 50}, + [(2, 32), (34, 76)], + ), # stereo_use_channel_no_use_channel_given + ( + 2, + {"eth": 50, "use_channel": -2}, + [(2, 16), (17, 31), (34, 76)], + ), # stereo_use_channel_minus_2 + (2, {"eth": 50, "uc": 1}, [(10, 32), (36, 76)]), # stereo_uc_2 + (2, {"eth": 50, "uc": -1}, [(10, 32), (36, 76)]), # stereo_uc_minus_1 + ( + 1, + {"eth": 50, "uc": "mix"}, + [(2, 16), (17, 31), (34, 76)], + ), # mono_uc_mix + ( + 2, + {"energy_threshold": 53.5, "use_channel": "mix"}, + [(54, 76)], + ), # stereo_use_channel_mix + (2, {"eth": 52, "uc": "mix"}, [(17, 26), (54, 76)]), # stereo_uc_mix + ( + 2, + {"uc": "mix"}, + [(10, 16), (17, 31), (36, 76)], + ), # stereo_uc_mix_default_eth ], ids=[ "stereo_all_default", @@ -365,7 +445,7 @@ sample_width = 2 sample_size_bytes = sample_width * channels - for reg, reg_ar, exp in zip(regions, regions_ar, expected): + for reg, reg_ar, exp in zip(regions, regions_ar, expected, strict=True): onset, offset = exp exp_data = data[onset * sample_size_bytes : offset * sample_size_bytes] assert len(bytes(reg)) == len(exp_data) @@ -375,19 +455,103 @@ @pytest.mark.parametrize( "min_dur, max_dur, max_silence, channels, kwargs, expected", [ - (0.2, 5, 0.2, 1, {"aw": 0.2}, [(2, 30), (34, 76)]), - (0.2, 5, 0.3, 1, {"aw": 0.2}, [(2, 30), (34, 76)]), - (0.2, 5, 0.4, 1, {"aw": 0.2}, [(2, 32), (34, 76)]), - (0.2, 5, 0, 1, {"aw": 0.2}, [(2, 14), (16, 24), (26, 28), (34, 76)]), - (0.2, 5, 0.2, 1, {"aw": 0.2}, [(2, 30), (34, 76)]), - (0.3, 5, 0, 1, {"aw": 0.3}, [(3, 12), (15, 24), (36, 76)]), - (0.3, 5, 0.3, 1, {"aw": 0.3}, [(3, 27), (36, 76)]), - (0.3, 5, 0.5, 1, {"aw": 0.3}, [(3, 27), (36, 76)]), - (0.3, 5, 0.6, 1, {"aw": 0.3}, [(3, 30), (36, 76)]), - (0.2, 5, 0, 1, {"aw": 0.4}, [(4, 12), (16, 24), (36, 76)]), - (0.2, 5, 0.3, 1, {"aw": 0.4}, [(4, 12), (16, 24), (36, 76)]), - (0.2, 5, 0.4, 1, {"aw": 0.4}, [(4, 28), (36, 76)]), - (0.2, 5, 0.2, 2, {"analysis_window": 0.2}, [(2, 32), (34, 76)]), + ( + 0.2, + 5, + 0.2, + 1, + {"aw": 0.2}, + [(2, 30), (34, 76)], + ), # mono_aw_0_2_max_silence_0_2 + ( + 0.2, + 5, + 0.3, + 1, + {"aw": 0.2}, + [(2, 30), (34, 76)], + ), # mono_aw_0_2_max_silence_0_3 + ( + 0.2, + 5, + 0.4, + 1, + {"aw": 0.2}, + [(2, 32), (34, 76)], + ), # mono_aw_0_2_max_silence_0_4 + ( + 0.2, + 5, + 0, + 1, + {"aw": 0.2}, + [(2, 14), (16, 24), (26, 28), (34, 76)], + ), # mono_aw_0_2_max_silence_0 + (0.2, 5, 0.2, 1, {"aw": 0.2}, [(2, 30), (34, 76)]), # mono_aw_0_2 + ( + 0.3, + 5, + 0, + 1, + {"aw": 0.3}, + [(3, 12), (15, 24), (36, 76)], + ), # mono_aw_0_3_max_silence_0 + ( + 0.3, + 5, + 0.3, + 1, + {"aw": 0.3}, + [(3, 27), (36, 76)], + ), # mono_aw_0_3_max_silence_0_3 + ( + 0.3, + 5, + 0.5, + 1, + {"aw": 0.3}, + [(3, 27), (36, 76)], + ), # mono_aw_0_3_max_silence_0_5 + ( + 0.3, + 5, + 0.6, + 1, + {"aw": 0.3}, + [(3, 30), (36, 76)], + ), # mono_aw_0_3_max_silence_0_6 + ( + 0.2, + 5, + 0, + 1, + {"aw": 0.4}, + [(4, 12), (16, 24), (36, 76)], + ), # mono_aw_0_4_max_silence_0 + ( + 0.2, + 5, + 0.3, + 1, + {"aw": 0.4}, + [(4, 12), (16, 24), (36, 76)], + ), # mono_aw_0_4_max_silence_0_3 + ( + 0.2, + 5, + 0.4, + 1, + {"aw": 0.4}, + [(4, 28), (36, 76)], + ), # mono_aw_0_4_max_silence_0_4 + ( + 0.2, + 5, + 0.2, + 2, + {"analysis_window": 0.2}, + [(2, 32), (34, 76)], + ), # stereo_uc_None_analysis_window_0_2 ( 0.2, 5, @@ -395,7 +559,7 @@ 2, {"uc": None, "analysis_window": 0.2}, [(2, 32), (34, 76)], - ), + ), # stereo_uc_any_analysis_window_0_2 ( 0.2, 5, @@ -403,7 +567,7 @@ 2, {"use_channel": None, "analysis_window": 0.3}, [(3, 30), (36, 76)], - ), + ), # stereo_use_channel_None_aw_0_3_max_silence_0_2 ( 0.2, 5, @@ -411,7 +575,7 @@ 2, {"use_channel": "any", "analysis_window": 0.3}, [(3, 33), (36, 76)], - ), + ), # stereo_use_channel_any_aw_0_3_max_silence_0_3 ( 0.2, 5, @@ -419,7 +583,7 @@ 2, {"use_channel": None, "analysis_window": 0.4}, [(4, 28), (36, 76)], - ), + ), # stereo_use_channel_None_aw_0_4_max_silence_0_2 ( 0.2, 5, @@ -427,7 +591,7 @@ 2, {"use_channel": "any", "analysis_window": 0.4}, [(4, 32), (36, 76)], - ), + ), # stereo_use_channel_any_aw_0_3_max_silence_0_4 ( 0.2, 5, @@ -435,7 +599,7 @@ 2, {"uc": 0, "analysis_window": 0.2}, [(2, 30), (34, 76)], - ), + ), # stereo_uc_0_analysis_window_0_2 ( 0.2, 5, @@ -443,7 +607,7 @@ 2, {"uc": 1, "analysis_window": 0.2}, [(10, 32), (36, 76)], - ), + ), # stereo_uc_1_analysis_window_0_2 ( 0.2, 5, @@ -451,7 +615,7 @@ 2, {"uc": "mix", "analysis_window": 0.1}, [(10, 14), (17, 24), (26, 29), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_1_max_silence_0 ( 0.2, 5, @@ -459,7 +623,7 @@ 2, {"uc": "mix", "analysis_window": 0.1}, [(10, 15), (17, 25), (26, 30), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_1_max_silence_0_1 ( 0.2, 5, @@ -467,7 +631,7 @@ 2, {"uc": "mix", "analysis_window": 0.1}, [(10, 16), (17, 31), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_1_max_silence_0_2 ( 0.2, 5, @@ -475,7 +639,7 @@ 2, {"uc": "mix", "analysis_window": 0.1}, [(10, 32), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_1_max_silence_0_3 ( 0.3, 5, @@ -483,7 +647,7 @@ 2, {"uc": "avg", "analysis_window": 0.2}, [(10, 14), (16, 24), (36, 76)], - ), + ), # stereo_uc_avg_aw_0_2_max_silence_0_min_dur_0_3 ( 0.41, 5, @@ -491,7 +655,7 @@ 2, {"uc": "average", "analysis_window": 0.2}, [(16, 24), (36, 76)], - ), + ), # stereo_uc_average_aw_0_2_max_silence_0_min_dur_0_41 ( 0.2, 5, @@ -499,7 +663,7 @@ 2, {"uc": "mix", "analysis_window": 0.2}, [(10, 14), (16, 24), (26, 28), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_2_max_silence_0_1 ( 0.2, 5, @@ -507,7 +671,7 @@ 2, {"uc": "mix", "analysis_window": 0.2}, [(10, 30), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_2_max_silence_0_2 ( 0.2, 5, @@ -515,7 +679,7 @@ 2, {"uc": "mix", "analysis_window": 0.2}, [(10, 32), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_2_max_silence_0_4 ( 0.2, 5, @@ -523,7 +687,7 @@ 2, {"uc": "mix", "analysis_window": 0.2}, [(10, 32), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_2_max_silence_0_5 ( 0.2, 5, @@ -531,7 +695,7 @@ 2, {"uc": "mix", "analysis_window": 0.2}, [(10, 34), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_2_max_silence_0_6 ( 0.2, 5, @@ -539,7 +703,7 @@ 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 24), (27, 30), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_3_max_silence_0 ( 0.4, 5, @@ -547,7 +711,7 @@ 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 24), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_3_max_silence_0_min_dur_0_3 ( 0.2, 5, @@ -555,7 +719,7 @@ 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 57), (57, 76)], - ), + ), # stereo_uc_mix_aw_0_3_max_silence_0_6 ( 0.2, 5.1, @@ -563,7 +727,7 @@ 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 60), (60, 76)], - ), + ), # stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_1 ( 0.2, 5.2, @@ -571,7 +735,7 @@ 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 60), (60, 76)], - ), + ), # stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_2 ( 0.2, 5.3, @@ -579,7 +743,7 @@ 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 60), (60, 76)], - ), + ), # stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_3 ( 0.2, 5.4, @@ -587,7 +751,7 @@ 2, {"uc": "mix", "analysis_window": 0.3}, [(9, 63), (63, 76)], - ), + ), # stereo_uc_mix_aw_0_3_max_silence_0_6_max_dur_5_4 ( 0.2, 5, @@ -595,7 +759,7 @@ 2, {"uc": "mix", "analysis_window": 0.4}, [(16, 24), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_4_max_silence_0 ( 0.2, 5, @@ -603,7 +767,7 @@ 2, {"uc": "mix", "analysis_window": 0.4}, [(16, 24), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_4_max_silence_0_3 ( 0.2, 5, @@ -611,7 +775,7 @@ 2, {"uc": "mix", "analysis_window": 0.4}, [(16, 28), (36, 76)], - ), + ), # stereo_uc_mix_aw_0_4_max_silence_0_4 ], ids=[ "mono_aw_0_2_max_silence_0_2", @@ -702,7 +866,7 @@ sample_width = 2 sample_size_bytes = sample_width * channels - for reg, reg_ar, exp in zip(regions, regions_ar, expected): + for reg, reg_ar, exp in zip(regions, regions_ar, expected, strict=True): onset, offset = exp exp_data = data[onset * sample_size_bytes : offset * sample_size_bytes] assert bytes(reg) == exp_data @@ -725,7 +889,7 @@ sw=2, ch=1, analysis_window=0.1, - validator=lambda x: array_("h", x)[0] >= 320, + validator=lambda x: to_array(x, sample_width=2, channels=1)[0] >= 320, ) region = AudioRegion(data, 10, 2, 1) @@ -736,7 +900,7 @@ drop_trailing_silence=False, strict_min_dur=False, analysis_window=0.1, - validator=lambda x: array_("h", x)[0] >= 320, + validator=lambda x: to_array(x, sample_width=2, channels=1)[0] >= 320, ) expected = [(2, 16), (17, 31), (34, 76)] @@ -750,7 +914,7 @@ assert len(regions_ar) == len(expected), err_msg sample_size_bytes = 2 - for reg, reg_ar, exp in zip(regions, regions_ar, expected): + for reg, reg_ar, exp in zip(regions, regions_ar, expected, strict=True): onset, offset = exp exp_data = data[onset * sample_size_bytes : offset * sample_size_bytes] assert bytes(reg) == exp_data @@ -763,20 +927,23 @@ ( "tests/data/test_split_10HZ_stereo.raw", {"audio_format": "raw", "sr": 10, "sw": 2, "ch": 2}, - ), + ), # filename_audio_format ( "tests/data/test_split_10HZ_stereo.raw", {"fmt": "raw", "sr": 10, "sw": 2, "ch": 2}, - ), - ("tests/data/test_split_10HZ_stereo.raw", {"sr": 10, "sw": 2, "ch": 2}), + ), # filename_audio_format_short_name + ( + "tests/data/test_split_10HZ_stereo.raw", + {"sr": 10, "sw": 2, "ch": 2}, + ), # filename_no_audio_format ( "tests/data/test_split_10HZ_stereo.raw", {"sampling_rate": 10, "sample_width": 2, "channels": 2}, - ), + ), # filename_no_long_audio_params ( open("tests/data/test_split_10HZ_stereo.raw", "rb").read(), {"sr": 10, "sw": 2, "ch": 2}, - ), + ), # bytes_ ( AudioReader( "tests/data/test_split_10HZ_stereo.raw", @@ -786,7 +953,7 @@ block_dur=0.1, ), {}, - ), + ), # audio_reader ( AudioRegion( open("tests/data/test_split_10HZ_stereo.raw", "rb").read(), @@ -795,13 +962,13 @@ 2, ), {}, - ), + ), # audio_region ( get_audio_source( "tests/data/test_split_10HZ_stereo.raw", sr=10, sw=2, ch=2 ), {}, - ), + ), # audio_source ], ids=[ "filename_audio_format", @@ -835,7 +1002,7 @@ err_msg = "Wrong number of regions after split, expected: " err_msg += "{}, found: {}".format(expected, regions) assert len(regions) == len(expected), err_msg - for reg, exp in zip(regions, expected): + for reg, exp in zip(regions, expected, strict=True): onset, offset = exp exp_data = data[onset * sample_width * 2 : offset * sample_width * 2] assert bytes(reg) == exp_data @@ -884,9 +1051,9 @@ @pytest.mark.parametrize( "max_silence, max_dur, analysis_window", [ - (0.5, 0.5, 0.1), - (0.5, 0.4, 0.1), - (0.44, 0.49, 0.1), + (0.5, 0.5, 0.1), # max_silence_equals_max_dur + (0.5, 0.4, 0.1), # max_silence_greater_than_max_dur + (0.44, 0.49, 0.1), # durations_OK_but_wrong_number_of_analysis_windows ], ids=[ "max_silence_equals_max_dur", @@ -926,13 +1093,13 @@ @pytest.mark.parametrize( "wrong_param", [ - {"min_dur": -1}, - {"min_dur": 0}, - {"max_dur": -1}, - {"max_dur": 0}, - {"max_silence": -1}, - {"analysis_window": 0}, - {"analysis_window": -1}, + {"min_dur": -1}, # negative_min_dur + {"min_dur": 0}, # zero_min_dur + {"max_dur": -1}, # negative_max_dur + {"max_dur": 0}, # zero_max_dur + {"max_silence": -1}, # negative_max_silence + {"analysis_window": 0}, # zero_analysis_window + {"analysis_window": -1}, # negative_analysis_window ], ids=[ "negative_min_dur", @@ -979,7 +1146,7 @@ data = fp.read() region = AudioRegion(data, 10, 2, 1) - with patch("auditok.plotting.plot") as patch_fn: + with patch("auditok.core.plot") as patch_fn: regions = region.split_and_plot( min_dur=0.2, max_dur=5, @@ -1014,23 +1181,125 @@ @pytest.mark.parametrize( - "data, start, sampling_rate, sample_width, channels, expected_end, expected_duration_s, expected_duration_ms", + ( + "data, start, sampling_rate, sample_width, channels, expected_end, " + + "expected_duration_s, expected_duration_ms" + ), [ - (b"\0" * 8000, 0, 8000, 1, 1, 1, 1, 1000), - (b"\0" * 7992, 0, 8000, 1, 1, 0.999, 0.999, 999), - (b"\0" * 7994, 0, 8000, 1, 1, 0.99925, 0.99925, 999), - (b"\0" * 7996, 0, 8000, 1, 1, 0.9995, 0.9995, 1000), - (b"\0" * 7998, 0, 8000, 1, 1, 0.99975, 0.99975, 1000), - (b"\0" * 8000 * 2, 0, 8000, 2, 1, 1, 1, 1000), - (b"\0" * 8000 * 2, 0, 8000, 1, 2, 1, 1, 1000), - (b"\0" * 8000 * 5, 0, 8000, 1, 5, 1, 1, 1000), - (b"\0" * 8000 * 2 * 5, 0, 8000, 2, 5, 1, 1, 1000), - (b"\0" * 7992 * 2 * 5, 0, 8000, 2, 5, 0.999, 0.999, 999), - (b"\0" * 7994 * 2 * 5, 0, 8000, 2, 5, 0.99925, 0.99925, 999), - (b"\0" * 7996 * 2 * 5, 0, 8000, 2, 5, 0.9995, 0.9995, 1000), - (b"\0" * 7998 * 2 * 5, 0, 8000, 2, 5, 0.99975, 0.99975, 1000), - (b"\0" * int(8000 * 1.33), 2.7, 8000, 1, 1, 4.03, 1.33, 1330), - (b"\0" * int(8000 * 0.476), 11.568, 8000, 1, 1, 12.044, 0.476, 476), + (b"\0" * 8000, 0, 8000, 1, 1, 1, 1, 1000), # simple + ( + b"\0" * 7992, + 0, + 8000, + 1, + 1, + 0.999, + 0.999, + 999, + ), # one_ms_less_than_1_sec + ( + b"\0" * 7994, + 0, + 8000, + 1, + 1, + 0.99925, + 0.99925, + 999, + ), # tree_quarter_ms_less_than_1_sec + ( + b"\0" * 7996, + 0, + 8000, + 1, + 1, + 0.9995, + 0.9995, + 1000, + ), # half_ms_less_than_1_sec + ( + b"\0" * 7998, + 0, + 8000, + 1, + 1, + 0.99975, + 0.99975, + 1000, + ), # quarter_ms_less_than_1_sec + (b"\0" * 8000 * 2, 0, 8000, 2, 1, 1, 1, 1000), # simple_sample_width_2 + (b"\0" * 8000 * 2, 0, 8000, 1, 2, 1, 1, 1000), # simple_stereo + (b"\0" * 8000 * 5, 0, 8000, 1, 5, 1, 1, 1000), # simple_multichannel + ( + b"\0" * 8000 * 2 * 5, + 0, + 8000, + 2, + 5, + 1, + 1, + 1000, + ), # simple_sample_width_2_multichannel + ( + b"\0" * 7992 * 2 * 5, + 0, + 8000, + 2, + 5, + 0.999, + 0.999, + 999, + ), # one_ms_less_than_1s_sw_2_multichannel + ( + b"\0" * 7994 * 2 * 5, + 0, + 8000, + 2, + 5, + 0.99925, + 0.99925, + 999, + ), # tree_qrt_ms_lt_1_s_sw_2_multichannel + ( + b"\0" * 7996 * 2 * 5, + 0, + 8000, + 2, + 5, + 0.9995, + 0.9995, + 1000, + ), # half_ms_lt_1s_sw_2_multichannel + ( + b"\0" * 7998 * 2 * 5, + 0, + 8000, + 2, + 5, + 0.99975, + 0.99975, + 1000, + ), # quarter_ms_lt_1s_sw_2_multichannel + ( + b"\0" * int(8000 * 1.33), + 2.7, + 8000, + 1, + 1, + 4.03, + 1.33, + 1330, + ), # arbitrary_length_1 + ( + b"\0" * int(8000 * 0.476), + 11.568, + 8000, + 1, + 1, + 12.044, + 0.476, + 476, + ), # arbitrary_length_2 ( b"\0" * int(8000 * 1.711) * 2 * 3, 9.415, @@ -1040,7 +1309,7 @@ 11.126, 1.711, 1711, - ), + ), # arbitrary_length_sw_2_multichannel ( b"\0" * int(3172 * 1.318), 17.236, @@ -1050,7 +1319,7 @@ 17.236 + int(3172 * 1.318) / 3172, int(3172 * 1.318) / 3172, 1318, - ), + ), # arbitrary_sampling_rate ( b"\0" * int(11317 * 0.716) * 2 * 3, 18.811, @@ -1060,7 +1329,7 @@ 18.811 + int(11317 * 0.716) / 11317, int(11317 * 0.716) / 11317, 716, - ), + ), # arbitrary_sr_sw_2_multichannel ], ids=[ "simple", @@ -1079,7 +1348,7 @@ "arbitrary_length_1", "arbitrary_length_2", "arbitrary_length_sw_2_multichannel", - "arbitrary_samplig_rate", + "arbitrary_sampling_rate", "arbitrary_sr_sw_2_multichannel", ], ) @@ -1122,13 +1391,13 @@ @pytest.mark.parametrize( "skip, max_read, channels", [ - (0, -1, 1), - (0, -1, 2), - (2, -1, 1), - (2, None, 1), - (2, 3, 1), - (2, 3.5, 2), - (2.4, 3.5, 2), + (0, -1, 1), # no_skip_read_all + (0, -1, 2), # no_skip_read_all_stereo + (2, -1, 1), # skip_2_read_all + (2, None, 1), # skip_2_read_all_None + (2, 3, 1), # skip_2_read_3 + (2, 3.5, 2), # skip_2_read_3_5_stereo + (2.4, 3.5, 2), # skip_2_4_read_3_5_stereo ], ids=[ "no_skip_read_all", @@ -1180,11 +1449,11 @@ @pytest.mark.parametrize( "max_read", [ - None, - -1, + None, # None + -1, # negative ], ids=[ - "none", + "None", "negative", ], ) @@ -1207,22 +1476,34 @@ @pytest.mark.parametrize( "format, start, expected", [ - ("output.wav", 1.230, "output.wav"), - ("output_{meta.start:g}.wav", 1.230, "output_1.23.wav"), - ("output_{meta.start}.wav", 1.233712, "output_1.233712.wav"), - ("output_{meta.start:.2f}.wav", 1.2300001, "output_1.23.wav"), - ("output_{meta.start:.3f}.wav", 1.233712, "output_1.234.wav"), - ("output_{meta.start:.8f}.wav", 1.233712, "output_1.23371200.wav"), + ("output.wav", 1.230, "output.wav"), # simple + ("output_{meta.start:g}.wav", 1.230, "output_1.23.wav"), # start + ("output_{meta.start}.wav", 1.233712, "output_1.233712.wav"), # start_2 + ( + "output_{meta.start:.2f}.wav", + 1.2300001, + "output_1.23.wav", + ), # start_3 + ( + "output_{meta.start:.3f}.wav", + 1.233712, + "output_1.234.wav", + ), # start_4 + ( + "output_{meta.start:.8f}.wav", + 1.233712, + "output_1.23371200.wav", + ), # start_5 ( "output_{meta.start}_{meta.end}_{duration}.wav", 1.455, "output_1.455_2.455_1.0.wav", - ), + ), # start_end_duration ( "output_{meta.start}_{meta.end}_{duration}.wav", 1.455321, "output_1.455321_2.455321_1.0.wav", - ), + ), # start_end_duration_2 ], ids=[ "simple", @@ -1260,74 +1541,102 @@ ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, 500), - b"a" * 80, + b"a" * 80, # first_half ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(500, None), - b"b" * 80, + b"b" * 80, # second_half ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-500, None), - b"b" * 80, + b"b" * 80, # second_half_negative ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(200, 750), - b"a" * 48 + b"b" * 40, + b"a" * 48 + b"b" * 40, # middle ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-800, -250), - b"a" * 48 + b"b" * 40, + b"a" * 48 + b"b" * 40, # middle_negative ), ( AudioRegion(b"a" * 160 + b"b" * 160, 160, 2, 1), slice(200, 750), - b"a" * 96 + b"b" * 80, + b"a" * 96 + b"b" * 80, # middle_sw2 ), ( AudioRegion(b"a" * 160 + b"b" * 160, 160, 1, 2), slice(200, 750), - b"a" * 96 + b"b" * 80, + b"a" * 96 + b"b" * 80, # middle_ch2 ), ( AudioRegion(b"a" * 320 + b"b" * 320, 160, 2, 2), slice(200, 750), - b"a" * 192 + b"b" * 160, + b"a" * 192 + b"b" * 160, # middle_sw2_ch2 ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(1, None), - b"a" * (4000 - 8) + b"b" * 4000, + b"a" * (4000 - 8) + b"b" * 4000, # but_first_sample ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(-999, None), - b"a" * (4000 - 8) + b"b" * 4000, + b"a" * (4000 - 8) + b"b" * 4000, # but_first_sample_negative ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(0, 999), - b"a" * 4000 + b"b" * (4000 - 8), + b"a" * 4000 + b"b" * (4000 - 8), # but_last_sample ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(0, -1), - b"a" * 4000 + b"b" * (4000 - 8), + b"a" * 4000 + b"b" * (4000 - 8), # but_last_sample_negative ), - (AudioRegion(b"a" * 160, 160, 1, 1), slice(-5000, None), b"a" * 160), - (AudioRegion(b"a" * 160, 160, 1, 1), slice(None, -1500), b""), - (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, 0), b""), - (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(200, 100), b""), - (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(2000, 3000), b""), - (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-100, -200), b""), - (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, -2000), b""), + ( + AudioRegion(b"a" * 160, 160, 1, 1), + slice(-5000, None), + b"a" * 160, # big_negative_start + ), + ( + AudioRegion(b"a" * 160, 160, 1, 1), + slice(None, -1500), + b"", # big_negative_stop + ), + ( + AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), + slice(0, 0), + b"", # empty + ), + ( + AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), + slice(200, 100), + b"", # empty_start_stop_reversed + ), + ( + AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), + slice(2000, 3000), + b"", # empty_big_positive_start + ), + ( + AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), + slice(-100, -200), + b"", # empty_negative_reversed + ), + ( + AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), + slice(0, -2000), + b"", # empty_big_negative_stop + ), ( AudioRegion(b"a" * 124 + b"b" * 376, 1234, 1, 1), slice(100, 200), - b"a" + b"b" * 123, + b"a" + b"b" * 123, # arbitrary_sampling_rate ), ], ids=[ @@ -1369,112 +1678,127 @@ AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, 80), 0, - b"a" * 80, + b"a" * 80, # first_half ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(80, None), 0.5, - b"b" * 80, + b"b" * 80, # second_half ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-80, None), 0.5, - b"b" * 80, + b"b" * 80, # second_half_negative ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(160 // 5, 160 // 4 * 3), 0.2, - b"a" * 48 + b"b" * 40, + b"a" * 48 + b"b" * 40, # middle ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-160 // 5 * 4, -160 // 4), 0.2, - b"a" * 48 + b"b" * 40, + b"a" * 48 + b"b" * 40, # middle_negative ), ( AudioRegion(b"a" * 160 + b"b" * 160, 160, 2, 1), slice(160 // 5, 160 // 4 * 3), 0.2, - b"a" * 96 + b"b" * 80, + b"a" * 96 + b"b" * 80, # middle_sw2 ), ( AudioRegion(b"a" * 160 + b"b" * 160, 160, 1, 2), slice(160 // 5, 160 // 4 * 3), 0.2, - b"a" * 96 + b"b" * 80, + b"a" * 96 + b"b" * 80, # middle_ch2 ), ( AudioRegion(b"a" * 320 + b"b" * 320, 160, 2, 2), slice(160 // 5, 160 // 4 * 3), 0.2, - b"a" * 192 + b"b" * 160, + b"a" * 192 + b"b" * 160, # middle_sw2_ch2 ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(1, None), 1 / 8000, - b"a" * (4000 - 1) + b"b" * 4000, + b"a" * (4000 - 1) + b"b" * 4000, # but_first_sample ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(-7999, None), 1 / 8000, - b"a" * (4000 - 1) + b"b" * 4000, + b"a" * (4000 - 1) + b"b" * 4000, # but_first_sample_negative ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(0, 7999), 0, - b"a" * 4000 + b"b" * (4000 - 1), + b"a" * 4000 + b"b" * (4000 - 1), # but_last_sample ), ( AudioRegion(b"a" * 4000 + b"b" * 4000, 8000, 1, 1), slice(0, -1), 0, - b"a" * 4000 + b"b" * (4000 - 1), + b"a" * 4000 + b"b" * (4000 - 1), # but_last_sample_negative ), - (AudioRegion(b"a" * 160, 160, 1, 1), slice(-1600, None), 0, b"a" * 160), - (AudioRegion(b"a" * 160, 160, 1, 1), slice(None, -1600), 0, b""), - (AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, 0), 0, b""), + ( + AudioRegion(b"a" * 160, 160, 1, 1), + slice(-1600, None), + 0, + b"a" * 160, # big_negative_start + ), + ( + AudioRegion(b"a" * 160, 160, 1, 1), + slice(None, -1600), + 0, + b"", # big_negative_stop + ), + ( + AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), + slice(0, 0), + 0, + b"", # empty + ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(80, 40), 0.5, - b"", + b"", # empty_start_stop_reversed ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(1600, 3000), 10, - b"", + b"", # empty_big_positive_start ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(-16, -32), 0.9, - b"", + b"", # empty_negative_reversed ), ( AudioRegion(b"a" * 80 + b"b" * 80, 160, 1, 1), slice(0, -2000), 0, - b"", + b"", # empty_big_negative_stop ), ( AudioRegion(b"a" * 124 + b"b" * 376, 1235, 1, 1), slice(100, 200), 100 / 1235, - b"a" * 24 + b"b" * 76, + b"a" * 24 + b"b" * 76, # arbitrary_sampling_rate ), ( AudioRegion(b"a" * 124 + b"b" * 376, 1235, 2, 2), slice(25, 50), 25 / 1235, - b"a" * 24 + b"b" * 76, + b"a" * 24 + b"b" * 76, # arbitrary_sampling_rate_middle_sw2_ch2 ), ], ids=[ @@ -1509,9 +1833,9 @@ @pytest.mark.parametrize( "sampling_rate, sample_width, channels", [ - (8000, 1, 1), - (8000, 2, 2), - (5413, 2, 3), + (8000, 1, 1), # simple + (8000, 2, 2), # stereo_sw_2 + (5413, 2, 3), # arbitrary_sr_multichannel ], ids=[ "simple", @@ -1534,9 +1858,9 @@ @pytest.mark.parametrize( "sampling_rate, sample_width, channels", [ - (8000, 1, 1), - (8000, 2, 2), - (5413, 2, 3), + (8000, 1, 1), # simple + (8000, 2, 2), # stereo_sw_2 + (5413, 2, 3), # arbitrary_sr_multichannel ], ids=[ "simple", @@ -1558,7 +1882,6 @@ def test_concatenation_different_sampling_rate_error(): - region_1 = AudioRegion(b"a" * 100, 8000, 1, 1) region_2 = AudioRegion(b"b" * 100, 3000, 1, 1) @@ -1566,24 +1889,22 @@ region_1 + region_2 assert str(val_err.value) == ( "Can only concatenate AudioRegions of the same " - "sampling rate (8000 != 3000)" + "sampling rate (8000 != 3000)" # different_sampling_rate ) def test_concatenation_different_sample_width_error(): - region_1 = AudioRegion(b"a" * 100, 8000, 2, 1) region_2 = AudioRegion(b"b" * 100, 8000, 4, 1) with pytest.raises(ValueError) as val_err: region_1 + region_2 assert str(val_err.value) == ( - "Can only concatenate AudioRegions of the same " "sample width (2 != 4)" + "Can only concatenate AudioRegions of the same sample width (2 != 4)" ) def test_concatenation_different_number_of_channels_error(): - region_1 = AudioRegion(b"a" * 100, 8000, 1, 1) region_2 = AudioRegion(b"b" * 100, 8000, 1, 2) @@ -1591,16 +1912,16 @@ region_1 + region_2 assert str(val_err.value) == ( "Can only concatenate AudioRegions of the same " - "number of channels (1 != 2)" + "number of channels (1 != 2)" # different_number_of_channels ) @pytest.mark.parametrize( "duration, expected_duration, expected_len, expected_len_ms", [ - (0.01, 0.03, 240, 30), - (0.00575, 0.01725, 138, 17), - (0.00625, 0.01875, 150, 19), + (0.01, 0.03, 240, 30), # simple + (0.00575, 0.01725, 138, 17), # rounded_len_floor + (0.00625, 0.01875, 150, 19), # rounded_len_ceil ], ids=[ "simple", @@ -1630,28 +1951,28 @@ @pytest.mark.parametrize( "factor, _type", [ - ("x", "str"), - (1.4, "float"), + ("x", str), # string + (1.4, float), # float ], ids=[ - "_str", - "_float", + "string", + "float", ], ) def test_multiplication_non_int(factor, _type): with pytest.raises(TypeError) as type_err: AudioRegion(b"0" * 80, 8000, 1, 1) * factor - err_msg = "Can't multiply AudioRegion by a non-int of type '{}'" - assert err_msg.format(_type) == str(type_err.value) + err_msg = "Can't multiply AudioRegion by a non-int of type '{}'" + assert err_msg.format(_type) == str(type_err.value) @pytest.mark.parametrize( "data", [ - [b"a" * 80, b"b" * 80], - [b"a" * 31, b"b" * 31, b"c" * 30], - [b"a" * 31, b"b" * 30, b"c" * 30], - [b"a" * 11, b"b" * 11, b"c" * 10, b"c" * 10], + [b"a" * 80, b"b" * 80], # simple + [b"a" * 31, b"b" * 31, b"c" * 30], # extra_samples_1 + [b"a" * 31, b"b" * 30, b"c" * 30], # extra_samples_2 + [b"a" * 11, b"b" * 11, b"c" * 10, b"c" * 10], # extra_samples_3 ], ids=[ "simple", @@ -1665,17 +1986,17 @@ region = AudioRegion(b"".join(data), 80, 1, 1) sub_regions = region / len(data) - for data_i, region in zip(data, sub_regions): + for data_i, region in zip(data, sub_regions, strict=True): assert len(data_i) == len(bytes(region)) @pytest.mark.parametrize( - "data, sample_width, channels, fmt, expected", + "data, sample_width, channels, expected", [ - (b"a" * 10, 1, 1, "b", [97] * 10), - (b"a" * 10, 2, 1, "h", [24929] * 5), - (b"a" * 8, 4, 1, "i", [1633771873] * 2), - (b"ab" * 5, 1, 2, "b", [[97] * 5, [98] * 5]), + (b"a" * 10, 1, 1, [97] * 10), # mono_sw_1 + (b"a" * 10, 2, 1, [24929] * 5), # mono_sw_2 + (b"a" * 8, 4, 1, [1633771873] * 2), # mono_sw_4 + (b"ab" * 5, 1, 2, [[97] * 5, [98] * 5]), # stereo_sw_1 ], ids=[ "mono_sw_1", @@ -1684,18 +2005,10 @@ "stereo_sw_1", ], ) -def test_samples(data, sample_width, channels, fmt, expected): +def test_samples(data, sample_width, channels, expected): region = AudioRegion(data, 10, sample_width, channels) - if isinstance(expected[0], list): - expected = [array_(fmt, exp) for exp in expected] - else: - expected = array_(fmt, expected) - samples = region.samples - equal = samples == expected - try: - # for numpy - equal = equal.all() - except AttributeError: - pass - assert equal + expected = np.array(expected) + assert (region.samples == expected).all() + assert (region.numpy() == expected).all() + assert (np.array(region) == expected).all()
--- a/tests/test_io.py Sun May 26 23:29:33 2024 +0200 +++ b/tests/test_io.py Mon Jun 17 19:45:51 2024 +0200 @@ -6,8 +6,13 @@ from tempfile import NamedTemporaryFile, TemporaryDirectory from unittest.mock import Mock, patch +import numpy as np import pytest -from test_util import PURE_TONE_DICT, _generate_pure_tone, _sample_generator +from test_AudioSource import ( + PURE_TONE_DICT, + _generate_pure_tone, + _sample_generator, +) from auditok.io import ( AudioIOError, @@ -29,8 +34,9 @@ get_audio_source, to_file, ) -from auditok.signal import FORMAT +from auditok.signal import SAMPLE_WIDTH_TO_DTYPE +AUDIO_PARAMS = {"sampling_rate": 16000, "sample_width": 2, "channels": 1} AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1} @@ -87,7 +93,7 @@ def test_get_audio_parameters_short_params(): expected = (8000, 2, 1) - params = dict(zip(("sr", "sw", "ch"), expected)) + params = dict(zip(("sr", "sw", "ch"), expected, strict=True)) result = _get_audio_parameters(params) assert result == expected @@ -96,8 +102,9 @@ expected = (8000, 2, 1) params = dict( zip( - ("sampling_rate", "sample_width", "channels", "use_channel"), + ("sampling_rate", "sample_width", "channels"), expected, + strict=True, ) ) result = _get_audio_parameters(params) @@ -106,13 +113,51 @@ def test_get_audio_parameters_long_params_shadow_short_ones(): expected = (8000, 2, 1) - params = dict(zip(("sampling_rate", "sample_width", "channels"), expected)) - params.update(dict(zip(("sr", "sw", "ch"), "xxx"))) + params = dict( + zip( + ("sampling_rate", "sample_width", "channels"), expected, strict=True + ) + ) + params.update(dict(zip(("sr", "sw", "ch"), "xxx", strict=True))) result = _get_audio_parameters(params) assert result == expected @pytest.mark.parametrize( + "missing_param", + [ + "sampling_rate", # missing_sampling_rate + "sample_width", # missing_sample_width + "channels", # missing_channels + ], + ids=["missing_sampling_rate", "missing_sample_width", "missing_channels"], +) +def test_get_audio_parameters_missing_parameter(missing_param): + + params = AUDIO_PARAMS.copy() + del params[missing_param] + with pytest.raises(AudioParameterError): + _get_audio_parameters(params) + + +@pytest.mark.parametrize( + "missing_param", + [ + "sr", # missing_sampling_rate + "sw", # missing_sample_width + "ch", # missing_channels + ], + ids=["missing_sampling_rate", "missing_sample_width", "missing_channels"], +) +def test_get_audio_parameters_missing_parameter_short(missing_param): + + params = AUDIO_PARAMS_SHORT.copy() + del params[missing_param] + with pytest.raises(AudioParameterError): + _get_audio_parameters(params) + + +@pytest.mark.parametrize( "values", [ ("x", 2, 1), # str_sampling_rate @@ -132,7 +177,9 @@ ], ) def test_get_audio_parameters_invalid(values): - params = dict(zip(("sampling_rate", "sample_width", "channels"), values)) + params = dict( + zip(("sampling_rate", "sample_width", "channels"), values, strict=True) + ) with pytest.raises(AudioParameterError): _get_audio_parameters(params) @@ -227,9 +274,9 @@ ids=["missing_sampling_rate", "missing_sample_width", "missing_channels"], ) def test_from_file_missing_audio_param(missing_param): + params = AUDIO_PARAMS_SHORT.copy() + del params[missing_param] with pytest.raises(AudioParameterError): - params = AUDIO_PARAMS_SHORT.copy() - del params[missing_param] from_file("audio", audio_format="raw", **params) @@ -315,26 +362,22 @@ assert audio_source.sample_width == 2 assert audio_source.channels == len(frequencies) mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - fmt = FORMAT[audio_source.sample_width] - expected = array(fmt, _sample_generator(*mono_channels)).tobytes() + dtype = SAMPLE_WIDTH_TO_DTYPE[audio_source.sample_width] + expected = np.fromiter( + _sample_generator(*mono_channels), dtype=dtype + ).tobytes() assert data == expected -@pytest.mark.parametrize( - "missing_param", - [ - "sr", # missing_sampling_rate - "sw", # missing_sample_width - "ch", # missing_channels - ], - ids=["missing_sampling_rate", "missing_sample_width", "missing_channels"], -) -def test_load_raw_missing_audio_param(missing_param): +def test_load_raw_missing_audio_param(): with pytest.raises(AudioParameterError): - params = AUDIO_PARAMS_SHORT.copy() - del params[missing_param] - srate, swidth, channels, _ = _get_audio_parameters(params) - _load_raw("audio", srate, swidth, channels) + _load_raw("audio", sampling_rate=None, sample_width=1, channels=1) + + with pytest.raises(AudioParameterError): + _load_raw("audio", sampling_rate=16000, sample_width=None, channels=1) + + with pytest.raises(AudioParameterError): + _load_raw("audio", sampling_rate=16000, sample_width=1, channels=None) @pytest.mark.parametrize( @@ -368,8 +411,10 @@ assert audio_source.sample_width == 2 assert audio_source.channels == len(frequencies) mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - fmt = FORMAT[audio_source.sample_width] - expected = array(fmt, _sample_generator(*mono_channels)).tobytes() + dtype = SAMPLE_WIDTH_TO_DTYPE[audio_source.sample_width] + expected = np.fromiter( + _sample_generator(*mono_channels), dtype=dtype + ).tobytes() assert data == expected @@ -431,9 +476,9 @@ def test_save_raw(filename, frequencies): filename = "tests/data/test_16KHZ_{}".format(filename) sample_width = 2 - fmt = FORMAT[sample_width] + dtype = SAMPLE_WIDTH_TO_DTYPE[sample_width] mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - data = array(fmt, _sample_generator(*mono_channels)).tobytes() + data = np.fromiter(_sample_generator(*mono_channels), dtype=dtype).tobytes() tmpfile = NamedTemporaryFile() _save_raw(data, tmpfile.name) assert filecmp.cmp(tmpfile.name, filename, shallow=False) @@ -452,9 +497,9 @@ sampling_rate = 16000 sample_width = 2 channels = len(frequencies) - fmt = FORMAT[sample_width] mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - data = array(fmt, _sample_generator(*mono_channels)).tobytes() + dtype = SAMPLE_WIDTH_TO_DTYPE[sample_width] + data = np.fromiter(_sample_generator(*mono_channels), dtype=dtype).tobytes() tmpfile = NamedTemporaryFile() _save_wave(data, tmpfile.name, sampling_rate, sample_width, channels) assert filecmp.cmp(tmpfile.name, filename, shallow=False) @@ -471,10 +516,19 @@ ) def test_save_wave_missing_audio_param(missing_param): with pytest.raises(AudioParameterError): - params = AUDIO_PARAMS_SHORT.copy() - del params[missing_param] - srate, swidth, channels, _ = _get_audio_parameters(params) - _save_wave(b"\0\0", "audio", srate, swidth, channels) + _save_wave( + b"\0\0", "audio", sampling_rate=None, sample_width=1, channels=1 + ) + + with pytest.raises(AudioParameterError): + _save_wave( + b"\0\0", "audio", sampling_rate=16000, sample_width=None, channels=1 + ) + + with pytest.raises(AudioParameterError): + _save_wave( + b"\0\0", "audio", sampling_rate=16000, sample_width=1, channels=None + ) def test_save_with_pydub():
--- a/tests/test_signal.py Sun May 26 23:29:33 2024 +0200 +++ b/tests/test_signal.py Mon Jun 17 19:45:51 2024 +0200 @@ -3,8 +3,10 @@ import numpy as np import pytest -from auditok import signal as signal_ -from auditok import signal_numpy +from auditok import signal + +# from auditok import signal as signal_ +# from auditok import signal @pytest.fixture @@ -12,251 +14,81 @@ return b"012345679ABC" -@pytest.fixture -def numpy_fmt(): - return {"b": np.int8, "h": np.int16, "i": np.int32} - - @pytest.mark.parametrize( "sample_width, expected", [ - (1, [[48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67]]), # int8_mono - (2, [[12592, 13106, 13620, 14134, 16697, 17218]]), # int16_mono - (4, [[858927408, 926299444, 1128415545]]), # int32_mono + ( + 1, + [[48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67]], + ), # int8_1channel ( 1, [[48, 50, 52, 54, 57, 66], [49, 51, 53, 55, 65, 67]], - ), # int8_stereo - (2, [[12592, 13620, 16697], [13106, 14134, 17218]]), # int16_stereo + ), # int8_2channel + ( + 1, + [[48, 52, 57], [49, 53, 65], [50, 54, 66], [51, 55, 67]], + ), # int8_4channel + (2, [[12592, 13106, 13620, 14134, 16697, 17218]]), # int16_1channel + (2, [[12592, 13620, 16697], [13106, 14134, 17218]]), # int16_2channel + (4, [[858927408, 926299444, 1128415545]]), # int32_1channel (4, [[858927408], [926299444], [1128415545]]), # int32_3channel ], ids=[ - "int8_mono", - "int16_mono", - "int32_mono", - "int8_stereo", - "int16_stereo", + "int8_1channel", + "int8_2channel", + "int8_4channel", + "int16_1channel", + "int16_2channel", + "int32_1channel", "int32_3channel", ], ) def test_to_array(setup_data, sample_width, expected): data = setup_data channels = len(expected) - expected = [array_(signal_.FORMAT[sample_width], xi) for xi in expected] - result = signal_.to_array(data, sample_width, channels) - result_numpy = signal_numpy.to_array(data, sample_width, channels) - assert result == expected - assert (result_numpy == np.asarray(expected)).all() - assert result_numpy.dtype == np.float64 + expected = np.array(expected) + result = signal.to_array(data, sample_width, channels) + assert (result == expected).all() + assert result.dtype == np.float64 + assert result.shape == expected.shape @pytest.mark.parametrize( - "fmt, channels, selected, expected", + "x, aggregation_fn, expected", [ + ([300, 320, 400, 600], None, 52.506639194632434), # mono_simple + ([0, 0, 0], None, -200), # mono_zeros ( - "b", - 1, - 0, - [48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67], - ), # int8_1channel_select_0 - ("b", 2, 0, [48, 50, 52, 54, 57, 66]), # int8_2channel_select_0 - ("b", 3, 0, [48, 51, 54, 65]), # int8_3channel_select_0 - ("b", 3, 1, [49, 52, 55, 66]), # int8_3channel_select_1 - ("b", 3, 2, [50, 53, 57, 67]), # int8_3channel_select_2 - ("b", 4, 0, [48, 52, 57]), # int8_4channel_select_0 + [[300, 320, 400, 600], [150, 160, 200, 300]], + None, + [52.506639194632434, 46.48603928135281], + ), # stereo_no_agg ( - "h", - 1, - 0, - [12592, 13106, 13620, 14134, 16697, 17218], - ), # int16_1channel_select_0 - ("h", 2, 0, [12592, 13620, 16697]), # int16_2channel_select_0 - ("h", 2, 1, [13106, 14134, 17218]), # int16_2channel_select_1 - ("h", 3, 0, [12592, 14134]), # int16_3channel_select_0 - ("h", 3, 1, [13106, 16697]), # int16_3channel_select_1 - ("h", 3, 2, [13620, 17218]), # int16_3channel_select_2 + [[300, 320, 400, 600], [150, 160, 200, 300]], + np.mean, + 49.49633923799262, + ), # stereo_mean_agg ( - "i", - 1, - 0, - [858927408, 926299444, 1128415545], - ), # int32_1channel_select_0 - ("i", 3, 0, [858927408]), # int32_3channel_select_0 - ("i", 3, 1, [926299444]), # int32_3channel_select_1 - ("i", 3, 2, [1128415545]), # int32_3channel_select_2 + [[300, 320, 400, 600], [150, 160, 200, 300]], + min, + 46.48603928135281, + ), # stereo_min_agg + ( + [[300, 320, 400, 600], [150, 160, 200, 300]], + max, + 52.506639194632434, + ), # stereo_max_agg ], ids=[ - "int8_1channel_select_0", - "int8_2channel_select_0", - "int8_3channel_select_0", - "int8_3channel_select_1", - "int8_3channel_select_2", - "int8_4channel_select_0", - "int16_1channel_select_0", - "int16_2channel_select_0", - "int16_2channel_select_1", - "int16_3channel_select_0", - "int16_3channel_select_1", - "int16_3channel_select_2", - "int32_1channel_select_0", - "int32_3channel_select_0", - "int32_3channel_select_1", - "int32_3channel_select_2", + "mono_simple", + "mono_zeros", + "stereo_no_agg", + "mean_agg", + "stereo_min_agg", + "stereo_max_agg", ], ) -def test_extract_single_channel( - setup_data, numpy_fmt, fmt, channels, selected, expected -): - data = setup_data - result = signal_.extract_single_channel(data, fmt, channels, selected) - expected = array_(fmt, expected) - expected_numpy_fmt = numpy_fmt[fmt] - assert result == expected - result_numpy = signal_numpy.extract_single_channel( - data, numpy_fmt[fmt], channels, selected - ) - assert all(result_numpy == expected) - assert result_numpy.dtype == expected_numpy_fmt - - -@pytest.mark.parametrize( - "fmt, channels, expected", - [ - ("b", 2, [48, 50, 52, 54, 61, 66]), # int8_2channel - ("b", 4, [50, 54, 64]), # int8_4channel - ("h", 1, [12592, 13106, 13620, 14134, 16697, 17218]), # int16_1channel - ("h", 2, [12849, 13877, 16958]), # int16_2channel - ("i", 3, [971214132]), # int32_3channel - ], - ids=[ - "int8_2channel", - "int8_4channel", - "int16_1channel", - "int16_2channel", - "int32_3channel", - ], -) -def test_compute_average_channel( - setup_data, numpy_fmt, fmt, channels, expected -): - data = setup_data - result = signal_.compute_average_channel(data, fmt, channels) - expected = array_(fmt, expected) - expected_numpy_fmt = numpy_fmt[fmt] - assert result == expected - result_numpy = signal_numpy.compute_average_channel( - data, numpy_fmt[fmt], channels - ) - assert all(result_numpy == expected) - assert result_numpy.dtype == expected_numpy_fmt - - -@pytest.mark.parametrize( - "sample_width, expected", - [ - (1, [48, 50, 52, 54, 61, 66]), # int8_2channel - (2, [12849, 13877, 16957]), # int16_2channel - ], - ids=["int8_2channel", "int16_2channel"], -) -def test_compute_average_channel_stereo(setup_data, sample_width, expected): - data = setup_data - result = signal_.compute_average_channel_stereo(data, sample_width) - fmt = signal_.FORMAT[sample_width] - expected = array_(fmt, expected) - assert result == expected - - -@pytest.mark.parametrize( - "fmt, channels, expected", - [ - ( - "b", - 1, - [[48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67]], - ), # int8_1channel - ( - "b", - 2, - [[48, 50, 52, 54, 57, 66], [49, 51, 53, 55, 65, 67]], - ), # int8_2channel - ( - "b", - 4, - [[48, 52, 57], [49, 53, 65], [50, 54, 66], [51, 55, 67]], - ), # int8_4channel - ( - "h", - 2, - [[12592, 13620, 16697], [13106, 14134, 17218]], - ), # int16_2channel - ("i", 3, [[858927408], [926299444], [1128415545]]), # int32_3channel - ], - ids=[ - "int8_1channel", - "int8_2channel", - "int8_4channel", - "int16_2channel", - "int32_3channel", - ], -) -def test_separate_channels(setup_data, numpy_fmt, fmt, channels, expected): - data = setup_data - result = signal_.separate_channels(data, fmt, channels) - expected = [array_(fmt, exp) for exp in expected] - expected_numpy_fmt = numpy_fmt[fmt] - assert result == expected - result_numpy = signal_numpy.separate_channels( - data, numpy_fmt[fmt], channels - ) - assert (result_numpy == expected).all() - assert result_numpy.dtype == expected_numpy_fmt - - -@pytest.mark.parametrize( - "x, sample_width, expected", - [ - ([300, 320, 400, 600], 2, 52.50624901923348), # simple - ([0], 2, -200), # zero - ([0, 0, 0], 2, -200), # zeros - ], - ids=["simple", "zero", "zeros"], -) -def test_calculate_energy_single_channel(x, sample_width, expected): - x = array_(signal_.FORMAT[sample_width], x) - energy = signal_.calculate_energy_single_channel(x, sample_width) - assert energy == expected - energy = signal_numpy.calculate_energy_single_channel(x, sample_width) - assert energy == expected - - -@pytest.mark.parametrize( - "x, sample_width, aggregation_fn, expected", - [ - ( - [[300, 320, 400, 600], [150, 160, 200, 300]], - 2, - min, - 46.485649105953854, - ), # min_ - ( - [[300, 320, 400, 600], [150, 160, 200, 300]], - 2, - max, - 52.50624901923348, - ), # max_ - ], - ids=["min_", "max_"], -) -def test_calculate_energy_multichannel( - x, sample_width, aggregation_fn, expected -): - x = [array_(signal_.FORMAT[sample_width], xi) for xi in x] - energy = signal_.calculate_energy_multichannel( - x, sample_width, aggregation_fn - ) - assert energy == expected - energy = signal_numpy.calculate_energy_multichannel( - x, sample_width, aggregation_fn - ) - assert energy == expected +def test_calculate_energy(x, aggregation_fn, expected): + energy = signal.calculate_energy(x, aggregation_fn) + assert (energy == expected).all()
--- a/tests/test_util.py Sun May 26 23:29:33 2024 +0200 +++ b/tests/test_util.py Mon Jun 17 19:45:51 2024 +0200 @@ -2,10 +2,10 @@ from array import array as array_ from unittest.mock import patch +import numpy as np import pytest -from auditok import signal as signal_ -from auditok import signal_numpy +from auditok import signal from auditok.exceptions import TimeFormatError from auditok.util import ( AudioEnergyValidator, @@ -14,305 +14,224 @@ ) -def _sample_generator(*data_buffers): - """ - Takes a list of many mono audio data buffers and makes a sample generator - of interleaved audio samples, one sample from each channel. The resulting - generator can be used to build a multichannel audio buffer. - >>> gen = _sample_generator("abcd", "ABCD") - >>> list(gen) - ["a", "A", 1, 1, "c", "C", "d", "D"] - """ - frame_gen = zip(*data_buffers) - return (sample for frame in frame_gen for sample in frame) +@pytest.fixture +def setup_data(): + return b"012345679ABC" -def _generate_pure_tone( - frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4 +@pytest.mark.parametrize( + "fmt, duration, expected", + [ + ("%S", 5400, "5400.000"), # only_seconds + ("%I", 5400, "5400000"), # only_millis + ("%h:%m:%s.%i", 3725.365, "01:02:05.365"), # full + ("%h:%m:%s.%i", 1925.075, "00:32:05.075"), # full_zero_hours + ("%h:%m:%s.%i", 3659.075, "01:00:59.075"), # full_zero_minutes + ("%h:%m:%s.%i", 3720.075, "01:02:00.075"), # full_zero_seconds + ("%h:%m:%s.%i", 3725, "01:02:05.000"), # full_zero_millis + ( + "%h %h:%m:%s.%i %s", + 3725.365, + "01 01:02:05.365 05", + ), # duplicate_directive + ("%h:%m:%s", 3725, "01:02:05"), # no_millis + ("%h:%m", 3725, "01:02"), # no_seconds + ("%h", 3725, "01"), # no_minutes + ("%m:%s.%i", 3725, "02:05.000"), # no_hours + ], + ids=[ + "only_seconds", + "only_millis", + "full", + "full_zero_hours", + "full_zero_minutes", + "full_zero_seconds", + "full_zero_millis", + "duplicate_directive", + "no_millis", + "no_seconds", + "no_minutes", + "no_hours", + ], +) +def test_make_duration_formatter(fmt, duration, expected): + formatter = make_duration_formatter(fmt) + result = formatter(duration) + assert result == expected + + +@pytest.mark.parametrize( + "fmt", + [ + "%S %S", # duplicate_only_seconds + "%I %I", # duplicate_only_millis + "%x", # unknown_directive + ], + ids=[ + "duplicate_only_seconds", + "duplicate_only_millis", + "unknown_directive", + ], +) +def test_make_duration_formatter_error(fmt): + with pytest.raises(TimeFormatError): + make_duration_formatter(fmt) + + +@pytest.mark.parametrize( + "sample_width, channels, selected, expected", + [ + ( + 1, + 1, + 0, + [48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67], + ), # int8_1channel_select_0 + (1, 2, 0, [48, 50, 52, 54, 57, 66]), # int8_2channel_select_0 + (1, 3, 0, [48, 51, 54, 65]), # int8_3channel_select_0 + (1, 3, 1, [49, 52, 55, 66]), # int8_3channel_select_1 + (1, 3, 2, [50, 53, 57, 67]), # int8_3channel_select_2 + (1, 4, 0, [48, 52, 57]), # int8_4channel_select_0 + ( + 2, + 1, + 0, + [12592, 13106, 13620, 14134, 16697, 17218], + ), # int16_1channel_select_0 + (2, 2, 0, [12592, 13620, 16697]), # int16_2channel_select_0 + (2, 2, 1, [13106, 14134, 17218]), # int16_2channel_select_1 + (2, 3, 0, [12592, 14134]), # int16_3channel_select_0 + (2, 3, 1, [13106, 16697]), # int16_3channel_select_1 + (2, 3, 2, [13620, 17218]), # int16_3channel_select_2 + ( + 4, + 1, + 0, + [858927408, 926299444, 1128415545], + ), # int32_1channel_select_0 + (4, 3, 0, [858927408]), # int32_3channel_select_0 + (4, 3, 1, [926299444]), # int32_3channel_select_1 + (4, 3, 2, [1128415545]), # int32_3channel_select_2 + ], + ids=[ + "int8_1channel_select_0", + "int8_2channel_select_0", + "int8_3channel_select_0", + "int8_3channel_select_1", + "int8_3channel_select_2", + "int8_4channel_select_0", + "int16_1channel_select_0", + "int16_2channel_select_0", + "int16_2channel_select_1", + "int16_3channel_select_0", + "int16_3channel_select_1", + "int16_3channel_select_2", + "int32_1channel_select_0", + "int32_3channel_select_0", + "int32_3channel_select_1", + "int32_3channel_select_2", + ], +) +def test_make_channel_selector_one_channel( + setup_data, sample_width, channels, selected, expected ): - """ - Generates a pure tone with the given frequency. - """ - assert frequency <= sampling_rate / 2 - max_value = (2 ** (sample_width * 8) // 2) - 1 - if volume > max_value: - volume = max_value - fmt = signal_.FORMAT[sample_width] - total_samples = int(sampling_rate * duration_sec) - step = frequency / sampling_rate - two_pi_step = 2 * math.pi * step - data = array_( - fmt, - (int(math.sin(two_pi_step * i) * volume) for i in range(total_samples)), - ) - return data + selector = make_channel_selector(sample_width, channels, selected) + result = selector(setup_data) -PURE_TONE_DICT = { - freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600) -} -PURE_TONE_DICT.update( - { - freq: _generate_pure_tone(freq, 0.1, 16000, 2) - for freq in (600, 1150, 2400, 7220) - } + dtype = signal.SAMPLE_WIDTH_TO_DTYPE[sample_width] + expected = np.array(expected).astype(dtype) + assert (result == expected).all() + + +@pytest.mark.parametrize( + "sample_width, channels, selected, expected", + [ + ( + 1, + 1, + "avg", + [48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67], + ), # int8_1channel + (1, 2, "mix", [48.5, 50.5, 52.5, 54.5, 61, 66.5]), # int8_2channel + (1, 4, "average", [49.5, 53.5, 63.75]), # int8_4channel + ( + 2, + 1, + "mix", + [12592, 13106, 13620, 14134, 16697, 17218], + ), # int16_1channel + (2, 2, "avg", [12849, 13877, 16957.5]), # int16_2channel + (4, 3, "average", [971214132.33]), # int32_3channel + ], + ids=[ + "int8_1channel", + "int8_2channel", + "int8_4channel", + "int16_1channel", + "int16_2channel", + "int32_3channel", + ], ) +def test_make_channel_selector_average( + setup_data, sample_width, channels, selected, expected +): + selector = make_channel_selector(sample_width, channels, selected) + result = selector(setup_data).round(2) + assert (result == expected).all() -class TestFunctions: - def setup_method(self): - self.data = b"012345679ABC" - @pytest.mark.parametrize( - "fmt, duration, expected", - [ - ("%S", 5400, "5400.000"), # only_seconds - ("%I", 5400, "5400000"), # only_millis - ("%h:%m:%s.%i", 3725.365, "01:02:05.365"), # full - ("%h:%m:%s.%i", 1925.075, "00:32:05.075"), # full_zero_hours - ("%h:%m:%s.%i", 3659.075, "01:00:59.075"), # full_zero_minutes - ("%h:%m:%s.%i", 3720.075, "01:02:00.075"), # full_zero_seconds - ("%h:%m:%s.%i", 3725, "01:02:05.000"), # full_zero_millis - ( - "%h %h:%m:%s.%i %s", - 3725.365, - "01 01:02:05.365 05", - ), # duplicate_directive - ("%h:%m:%s", 3725, "01:02:05"), # no_millis - ("%h:%m", 3725, "01:02"), # no_seconds - ("%h", 3725, "01"), # no_minutes - ("%m:%s.%i", 3725, "02:05.000"), # no_hours - ], - ids=[ - "only_seconds", - "only_millis", - "full", - "full_zero_hours", - "full_zero_minutes", - "full_zero_seconds", - "full_zero_millis", - "duplicate_directive", - "no_millis", - "no_seconds", - "no_minutes", - "no_hours", - ], - ) - def test_make_duration_formatter(self, fmt, duration, expected): - formatter = make_duration_formatter(fmt) - result = formatter(duration) - assert result == expected +@pytest.mark.parametrize( + "sample_width, channels, selected, expected", + [ + ( + 1, + 1, + "any", + [[48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67]], + ), # int8_1channel + ( + 1, + 2, + None, + [[48, 50, 52, 54, 57, 66], [49, 51, 53, 55, 65, 67]], + ), # int8_2channel + ( + 1, + 4, + "any", + [[48, 52, 57], [49, 53, 65], [50, 54, 66], [51, 55, 67]], + ), # int8_4channel + ( + 2, + 2, + None, + [[12592, 13620, 16697], [13106, 14134, 17218]], + ), # int16_2channel + ( + 4, + 3, + "any", + [[858927408], [926299444], [1128415545]], + ), # int32_3channel + ], + ids=[ + "int8_1channel", + "int8_2channel", + "int8_4channel", + "int16_2channel", + "int32_3channel", + ], +) +def test_make_channel_selector_any( + setup_data, sample_width, channels, selected, expected +): - @pytest.mark.parametrize( - "fmt", - [ - "%S %S", # duplicate_only_seconds - "%I %I", # duplicate_only_millis - "%x", # unknown_directive - ], - ids=[ - "duplicate_only_seconds", - "duplicate_only_millis", - "unknown_directive", - ], - ) - def test_make_duration_formatter_error(self, fmt): - with pytest.raises(TimeFormatError): - make_duration_formatter(fmt) - - @pytest.mark.parametrize( - "sample_width, channels, selected, expected", - [ - ( - 1, - 1, - 0, - [48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67], - ), # int8_1channel_select_0 - (1, 2, 0, [48, 50, 52, 54, 57, 66]), # int8_2channel_select_0 - (1, 3, 0, [48, 51, 54, 65]), # int8_3channel_select_0 - (1, 3, 1, [49, 52, 55, 66]), # int8_3channel_select_1 - (1, 3, 2, [50, 53, 57, 67]), # int8_3channel_select_2 - (1, 4, 0, [48, 52, 57]), # int8_4channel_select_0 - ( - 2, - 1, - 0, - [12592, 13106, 13620, 14134, 16697, 17218], - ), # int16_1channel_select_0 - (2, 2, 0, [12592, 13620, 16697]), # int16_2channel_select_0 - (2, 2, 1, [13106, 14134, 17218]), # int16_2channel_select_1 - (2, 3, 0, [12592, 14134]), # int16_3channel_select_0 - (2, 3, 1, [13106, 16697]), # int16_3channel_select_1 - (2, 3, 2, [13620, 17218]), # int16_3channel_select_2 - ( - 4, - 1, - 0, - [858927408, 926299444, 1128415545], - ), # int32_1channel_select_0 - (4, 3, 0, [858927408]), # int32_3channel_select_0 - (4, 3, 1, [926299444]), # int32_3channel_select_1 - (4, 3, 2, [1128415545]), # int32_3channel_select_2 - ], - ids=[ - "int8_1channel_select_0", - "int8_2channel_select_0", - "int8_3channel_select_0", - "int8_3channel_select_1", - "int8_3channel_select_2", - "int8_4channel_select_0", - "int16_1channel_select_0", - "int16_2channel_select_0", - "int16_2channel_select_1", - "int16_3channel_select_0", - "int16_3channel_select_1", - "int16_3channel_select_2", - "int32_1channel_select_0", - "int32_3channel_select_0", - "int32_3channel_select_1", - "int32_3channel_select_2", - ], - ) - def test_make_channel_selector_one_channel( - self, sample_width, channels, selected, expected - ): - - # force using signal functions with standard python implementation - with patch("auditok.util.signal", signal_): - selector = make_channel_selector(sample_width, channels, selected) - result = selector(self.data) - - fmt = signal_.FORMAT[sample_width] - expected = array_(fmt, expected) - if channels == 1: - expected = bytes(expected) - assert result == expected - - # Use signal functions with numpy implementation - with patch("auditok.util.signal", signal_numpy): - selector = make_channel_selector(sample_width, channels, selected) - result_numpy = selector(self.data) - - expected = array_(fmt, expected) - if channels == 1: - expected = bytes(expected) - assert result_numpy == expected - else: - assert all(result_numpy == expected) - - @pytest.mark.parametrize( - "sample_width, channels, selected, expected", - [ - (1, 2, "avg", [48, 50, 52, 54, 61, 66]), # int8_2channel - (1, 4, "average", [50, 54, 64]), # int8_4channel - ( - 2, - 1, - "mix", - [12592, 13106, 13620, 14134, 16697, 17218], - ), # int16_1channel - (2, 2, "avg", [12849, 13877, 16957]), # int16_2channel - (4, 3, "average", [971214132]), # int32_3channel - ], - ids=[ - "int8_2channel", - "int8_4channel", - "int16_1channel", - "int16_2channel", - "int32_3channel", - ], - ) - def test_make_channel_selector_average( - self, sample_width, channels, selected, expected - ): - # force using signal functions with standard python implementation - with patch("auditok.util.signal", signal_): - selector = make_channel_selector(sample_width, channels, selected) - result = selector(self.data) - - fmt = signal_.FORMAT[sample_width] - expected = array_(fmt, expected) - if channels == 1: - expected = bytes(expected) - assert result == expected - - # Use signal functions with numpy implementation - with patch("auditok.util.signal", signal_numpy): - selector = make_channel_selector(sample_width, channels, selected) - result_numpy = selector(self.data) - - if channels in (1, 2): - assert result_numpy == expected - else: - assert all(result_numpy == expected) - - @pytest.mark.parametrize( - "sample_width, channels, selected, expected", - [ - ( - 1, - 1, - "any", - [[48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67]], - ), # int8_1channel - ( - 1, - 2, - None, - [[48, 50, 52, 54, 57, 66], [49, 51, 53, 55, 65, 67]], - ), # int8_2channel - ( - 1, - 4, - "any", - [[48, 52, 57], [49, 53, 65], [50, 54, 66], [51, 55, 67]], - ), # int8_4channel - ( - 2, - 2, - None, - [[12592, 13620, 16697], [13106, 14134, 17218]], - ), # int16_2channel - ( - 4, - 3, - "any", - [[858927408], [926299444], [1128415545]], - ), # int32_3channel - ], - ids=[ - "int8_1channel", - "int8_2channel", - "int8_4channel", - "int16_2channel", - "int32_3channel", - ], - ) - def test_make_channel_selector_any( - self, sample_width, channels, selected, expected - ): - - # force using signal functions with standard python implementation - with patch("auditok.util.signal", signal_): - selector = make_channel_selector(sample_width, channels, selected) - result = selector(self.data) - - fmt = signal_.FORMAT[sample_width] - expected = [array_(fmt, exp) for exp in expected] - if channels == 1: - expected = bytes(expected[0]) - assert result == expected - - # Use signal functions with numpy implementation - with patch("auditok.util.signal", signal_numpy): - selector = make_channel_selector(sample_width, channels, selected) - result_numpy = selector(self.data) - - if channels == 1: - assert result_numpy == expected - else: - assert (result_numpy == expected).all() + # Use signal functions with numpy implementation + selector = make_channel_selector(sample_width, channels, selected) + result = selector(setup_data) + assert (result == expected).all() class TestAudioEnergyValidator: