view tests/test_AudioSource.py @ 455:7dae98b84cdd tip master

Merge branch 'master' of https://github.com/amsehili/auditok
author www-data <www-data@c4dm-xenserv-virt2.eecs.qmul.ac.uk>
date Tue, 03 Dec 2024 09:18:01 +0000
parents c5b4178aa80f
children
line wrap: on
line source
"""
@author: Amine Sehili <amine.sehili@gmail.com>
"""

from array import array

import numpy as np
import pytest

from auditok.io import (
    AudioIOError,
    AudioParameterError,
    BufferAudioSource,
    RawAudioSource,
    WaveAudioSource,
)
from auditok.signal import SAMPLE_WIDTH_TO_DTYPE


def _sample_generator(*data_buffers):
    """
    Takes a list of many mono audio data buffers and makes a sample generator
    of interleaved audio samples, one sample from each channel. The resulting
    generator can be used to build a multichannel audio buffer.
    >>> gen = _sample_generator("abcd", "ABCD")
    >>> list(gen)
    ["a", "A", "b", "B", "c", "C", "d", "D"]
    """
    frame_gen = zip(*data_buffers)
    return (sample for frame in frame_gen for sample in frame)


def _generate_pure_tone(
    frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4
):
    """
    Generates a pure tone with the given frequency.
    """
    assert frequency <= sampling_rate / 2
    max_value = (2 ** (sample_width * 8) // 2) - 1
    if volume > max_value:
        volume = max_value
    dtype = SAMPLE_WIDTH_TO_DTYPE[sample_width]
    total_samples = int(sampling_rate * duration_sec)
    step = frequency / sampling_rate
    two_pi_step = 2 * np.pi * step
    data = np.array(
        [int(np.sin(two_pi_step * i) * volume) for i in range(total_samples)]
    ).astype(dtype)
    return data


@pytest.fixture
def pure_tone_data(freq):

    PURE_TONE_DICT = {
        freq: _generate_pure_tone(freq, 1, 16000, 2)
        for freq in (400, 800, 1600)
    }
    PURE_TONE_DICT.update(
        {
            freq: _generate_pure_tone(freq, 0.1, 16000, 2)
            for freq in (600, 1150, 2400, 7220)
        }
    )
    return PURE_TONE_DICT[freq]


PURE_TONE_DICT = {
    freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600)
}
PURE_TONE_DICT.update(
    {
        freq: _generate_pure_tone(freq, 0.1, 16000, 2)
        for freq in (600, 1150, 2400, 7220)
    }
)


def audio_source_read_all_gen(audio_source, size=None):
    if size is None:
        size = int(audio_source.sr * 0.1)  # 100ms
    while True:
        data = audio_source.read(size)
        if data is None:
            break
        yield data


@pytest.mark.parametrize(
    "file_suffix, frequencies",
    [
        ("mono_400Hz", (400,)),  # mono
        ("3channel_400-800-1600Hz", (400, 800, 1600)),  # multichannel
    ],
    ids=["mono", "multichannel"],
)
def test_BufferAudioSource_read_all(file_suffix, frequencies):
    file = "tests/data/test_16KHZ_{}.raw".format(file_suffix)
    with open(file, "rb") as fp:
        expected = fp.read()
    channels = len(frequencies)
    audio_source = BufferAudioSource(expected, 16000, 2, channels)
    audio_source.open()
    data = audio_source.read(None)
    assert data == expected
    audio_source.rewind()
    data = audio_source.read(-10)
    assert data == expected
    audio_source.close()


@pytest.mark.parametrize(
    "file_suffix, frequencies",
    [
        ("mono_400Hz", (400,)),  # mono
        ("3channel_400-800-1600Hz", (400, 800, 1600)),  # multichannel
    ],
    ids=["mono", "multichannel"],
)
def test_RawAudioSource(file_suffix, frequencies):
    file = "tests/data/test_16KHZ_{}.raw".format(file_suffix)
    channels = len(frequencies)
    audio_source = RawAudioSource(file, 16000, 2, channels)
    audio_source.open()
    data_read_all = b"".join(audio_source_read_all_gen(audio_source))
    audio_source.close()
    mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
    dtype = SAMPLE_WIDTH_TO_DTYPE[audio_source.sample_width]
    expected = np.fromiter(_sample_generator(*mono_channels), dtype).tobytes()

    assert data_read_all == expected

    # assert read all data with None
    audio_source = RawAudioSource(file, 16000, 2, channels)
    audio_source.open()
    data_read_all = audio_source.read(None)
    audio_source.close()
    assert data_read_all == expected

    # assert read all data with a negative size
    audio_source = RawAudioSource(file, 16000, 2, channels)
    audio_source.open()
    data_read_all = audio_source.read(-10)
    audio_source.close()
    assert data_read_all == expected


@pytest.mark.parametrize(
    "file_suffix, frequencies",
    [
        ("mono_400Hz", (400,)),  # mono
        ("3channel_400-800-1600Hz", (400, 800, 1600)),  # multichannel
    ],
    ids=["mono", "multichannel"],
)
def test_WaveAudioSource(file_suffix, frequencies):
    file = "tests/data/test_16KHZ_{}.wav".format(file_suffix)
    audio_source = WaveAudioSource(file)
    audio_source.open()
    data = b"".join(audio_source_read_all_gen(audio_source))
    audio_source.close()
    mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
    dtype = SAMPLE_WIDTH_TO_DTYPE[audio_source.sample_width]
    expected = np.fromiter(_sample_generator(*mono_channels), dtype).tobytes()

    assert data == expected

    # assert read all data with None
    audio_source = WaveAudioSource(file)
    audio_source.open()
    data_read_all = audio_source.read(None)
    audio_source.close()
    assert data_read_all == expected

    # assert read all data with a negative size
    audio_source = WaveAudioSource(file)
    audio_source.open()
    data_read_all = audio_source.read(-10)
    audio_source.close()
    assert data_read_all == expected


class TestBufferAudioSource_SR10_SW1_CH1:
    @pytest.fixture(autouse=True)
    def setup_and_teardown(self):
        self.data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
        self.audio_source = BufferAudioSource(
            data=self.data, sampling_rate=10, sample_width=1, channels=1
        )
        self.audio_source.open()
        yield
        self.audio_source.close()

    def test_sr10_sw1_ch1_read_1(self):
        block = self.audio_source.read(1)
        exp = b"A"
        assert block == exp

    def test_sr10_sw1_ch1_read_6(self):
        block = self.audio_source.read(6)
        exp = b"ABCDEF"
        assert block == exp

    def test_sr10_sw1_ch1_read_multiple(self):
        block = self.audio_source.read(1)
        exp = b"A"
        assert block == exp

        block = self.audio_source.read(6)
        exp = b"BCDEFG"
        assert block == exp

        block = self.audio_source.read(13)
        exp = b"HIJKLMNOPQRST"
        assert block == exp

        block = self.audio_source.read(9999)
        exp = b"UVWXYZ012345"
        assert block == exp

    def test_sr10_sw1_ch1_read_all(self):
        block = self.audio_source.read(9999)
        assert block == self.data

        block = self.audio_source.read(1)
        assert block is None

    def test_sr10_sw1_ch1_sampling_rate(self):
        srate = self.audio_source.sampling_rate
        assert srate == 10

    def test_sr10_sw1_ch1_sample_width(self):
        swidth = self.audio_source.sample_width
        assert swidth == 1

    def test_sr10_sw1_ch1_channels(self):
        channels = self.audio_source.channels
        assert channels == 1

    @pytest.mark.parametrize(
        "block_sizes, expected_sample, expected_second, expected_ms",
        [
            ([], 0, 0, 0),  # empty
            ([0], 0, 0, 0),  # zero
            ([5], 5, 0.5, 500),  # five
            ([5, 20], 25, 2.5, 2500),  # multiple
        ],
        ids=["empty", "zero", "five", "multiple"],
    )
    def test_position(
        self, block_sizes, expected_sample, expected_second, expected_ms
    ):
        for block_size in block_sizes:
            self.audio_source.read(block_size)
        position = self.audio_source.position
        assert position == expected_sample

        position_s = self.audio_source.position_s
        assert position_s == expected_second

        position_ms = self.audio_source.position_ms
        assert position_ms == expected_ms

    @pytest.mark.parametrize(
        "position, expected_sample, expected_second, expected_ms",
        [
            (0, 0, 0, 0),  # zero
            (1, 1, 0.1, 100),  # one
            (10, 10, 1, 1000),  # ten
            (-1, 31, 3.1, 3100),  # negative_1
            (-7, 25, 2.5, 2500),  # negative_2
        ],
        ids=["zero", "one", "ten", "negative_1", "negative_2"],
    )
    def test_position_setter(
        self, position, expected_sample, expected_second, expected_ms
    ):
        self.audio_source.position = position

        position = self.audio_source.position
        assert position == expected_sample

        position_s = self.audio_source.position_s
        assert position_s == expected_second

        position_ms = self.audio_source.position_ms
        assert position_ms == expected_ms

    @pytest.mark.parametrize(
        "position_s, expected_sample, expected_second, expected_ms",
        [
            (0, 0, 0, 0),  # zero
            (0.1, 1, 0.1, 100),  # one
            (1, 10, 1, 1000),  # ten
            (-0.1, 31, 3.1, 3100),  # negative_1
            (-0.7, 25, 2.5, 2500),  # negative_2
        ],
        ids=["zero", "one", "ten", "negative_1", "negative_2"],
    )
    def test_position_s_setter(
        self, position_s, expected_sample, expected_second, expected_ms
    ):
        self.audio_source.position_s = position_s

        position = self.audio_source.position
        assert position == expected_sample

        position_s = self.audio_source.position_s
        assert position_s == expected_second

        position_ms = self.audio_source.position_ms
        assert position_ms == expected_ms

    @pytest.mark.parametrize(
        "position_ms, expected_sample, expected_second, expected_ms",
        [
            (0, 0, 0, 0),  # zero
            (100, 1, 0.1, 100),  # one
            (1000, 10, 1, 1000),  # ten
            (-100, 31, 3.1, 3100),  # negative_1
            (-700, 25, 2.5, 2500),  # negative_2
        ],
        ids=["zero", "one", "ten", "negative_1", "negative_2"],
    )
    def test_position_ms_setter(
        self, position_ms, expected_sample, expected_second, expected_ms
    ):
        self.audio_source.position_ms = position_ms

        position = self.audio_source.position
        assert position == expected_sample

        position_s = self.audio_source.position_s
        assert position_s == expected_second

        position_ms = self.audio_source.position_ms
        assert position_ms == expected_ms

    @pytest.mark.parametrize(
        "position",
        [
            100,  # positive
            -100,  # negative
        ],
        ids=["positive", "negative"],
    )
    def test_position_setter_out_of_range(self, position):
        with pytest.raises(IndexError):
            self.audio_source.position = position

    @pytest.mark.parametrize(
        "position_s",
        [
            100,  # positive
            -100,  # negative
        ],
        ids=["positive", "negative"],
    )
    def test_position_s_setter_out_of_range(self, position_s):
        with pytest.raises(IndexError):
            self.audio_source.position_s = position_s

    @pytest.mark.parametrize(
        "position_ms",
        [
            10000,  # positive
            -10000,  # negative
        ],
        ids=["positive", "negative"],
    )
    def test_position_ms_setter_out_of_range(self, position_ms):
        with pytest.raises(IndexError):
            self.audio_source.position_ms = position_ms

    def test_sr10_sw1_ch1_initial_position_s_0(self):
        tp = self.audio_source.position_s
        assert tp == 0.0

    def test_sr10_sw1_ch1_position_s_1_after_read(self):
        srate = self.audio_source.sampling_rate
        # read one second
        self.audio_source.read(srate)
        tp = self.audio_source.position_s
        assert tp == 1.0

    def test_sr10_sw1_ch1_position_s_2_5(self):
        # read 2.5 seconds
        self.audio_source.read(25)
        tp = self.audio_source.position_s
        assert tp == 2.5

    def test_sr10_sw1_ch1_position_s_0(self):
        self.audio_source.read(10)
        self.audio_source.position_s = 0
        tp = self.audio_source.position_s
        assert tp == 0.0

    def test_sr10_sw1_ch1_position_s_1(self):
        self.audio_source.position_s = 1
        tp = self.audio_source.position_s
        assert tp == 1.0

    def test_sr10_sw1_ch1_rewind(self):
        self.audio_source.read(10)
        self.audio_source.rewind()
        tp = self.audio_source.position
        assert tp == 0

    def test_sr10_sw1_ch1_read_closed(self):
        self.audio_source.close()
        with pytest.raises(AudioIOError):
            self.audio_source.read(1)


class TestBufferAudioSource_SR16_SW2_CH1:
    @pytest.fixture(autouse=True)
    def setup_and_teardown(self):
        self.data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
        self.audio_source = BufferAudioSource(
            data=self.data, sampling_rate=16, sample_width=2, channels=1
        )
        self.audio_source.open()
        yield
        self.audio_source.close()

    def test_sr16_sw2_ch1_read_1(self):
        block = self.audio_source.read(1)
        exp = b"AB"
        assert block == exp

    def test_sr16_sw2_ch1_read_6(self):
        block = self.audio_source.read(6)
        exp = b"ABCDEFGHIJKL"
        assert block == exp

    def test_sr16_sw2_ch1_read_multiple(self):
        block = self.audio_source.read(1)
        exp = b"AB"
        assert block == exp

        block = self.audio_source.read(6)
        exp = b"CDEFGHIJKLMN"
        assert block == exp

        block = self.audio_source.read(5)
        exp = b"OPQRSTUVWX"
        assert block == exp

        block = self.audio_source.read(9999)
        exp = b"YZ012345"
        assert block == exp

    def test_sr16_sw2_ch1_read_all(self):
        block = self.audio_source.read(9999)
        assert block == self.data

        block = self.audio_source.read(1)
        assert block is None

    def test_sr16_sw2_ch1_sampling_rate(self):
        srate = self.audio_source.sampling_rate
        assert srate == 16

    def test_sr16_sw2_ch1_sample_width(self):
        swidth = self.audio_source.sample_width
        assert swidth == 2

    def test_sr16_sw2_ch1_channels(self):
        channels = self.audio_source.channels
        assert channels == 1

    @pytest.mark.parametrize(
        "block_sizes, expected_sample, expected_second, expected_ms",
        [
            ([], 0, 0, 0),  # empty
            ([0], 0, 0, 0),  # zero
            ([2], 2, 2 / 16, int(2000 / 16)),  # two
            ([11], 11, 11 / 16, int(11 * 1000 / 16)),  # eleven
            ([4, 8], 12, 0.75, 750),  # multiple
        ],
        ids=["empty", "zero", "two", "eleven", "multiple"],
    )
    def test_position(
        self, block_sizes, expected_sample, expected_second, expected_ms
    ):
        for block_size in block_sizes:
            self.audio_source.read(block_size)
        position = self.audio_source.position
        assert position == expected_sample

        position_s = self.audio_source.position_s
        assert position_s == expected_second

        position_ms = self.audio_source.position_ms
        assert position_ms == expected_ms

    def test_sr16_sw2_ch1_read_position_0(self):
        self.audio_source.read(10)
        self.audio_source.position = 0
        pos = self.audio_source.position
        assert pos == 0

    @pytest.mark.parametrize(
        "position, expected_sample, expected_second, expected_ms",
        [
            (0, 0, 0, 0),  # zero
            (1, 1, 1 / 16, int(1000 / 16)),  # one
            (10, 10, 10 / 16, int(10000 / 16)),  # ten
            (-1, 15, 15 / 16, int(15000 / 16)),  # negative_1
            (-7, 9, 9 / 16, int(9000 / 16)),  # negative_2
        ],
        ids=["zero", "one", "ten", "negative_1", "negative_2"],
    )
    def test_position_setter(
        self, position, expected_sample, expected_second, expected_ms
    ):
        self.audio_source.position = position

        position = self.audio_source.position
        assert position == expected_sample

        position_s = self.audio_source.position_s
        assert position_s == expected_second

        position_ms = self.audio_source.position_ms
        assert position_ms == expected_ms

    @pytest.mark.parametrize(
        "position_s, expected_sample, expected_second, expected_ms",
        [
            (0, 0, 0, 0),  # zero
            (0.1, 1, 1 / 16, int(1000 / 16)),  # one
            (1 / 8, 2, 1 / 8, int(1 / 8 * 1000)),  # two
            (0.75, 12, 0.75, 750),  # twelve
            (-0.1, 15, 15 / 16, int(15000 / 16)),  # negative_1
            (-0.7, 5, 5 / 16, int(5000 / 16)),  # negative_2
        ],
        ids=["zero", "one", "two", "twelve", "negative_1", "negative_2"],
    )
    def test_position_s_setter(
        self, position_s, expected_sample, expected_second, expected_ms
    ):
        self.audio_source.position_s = position_s

        position = self.audio_source.position
        assert position == expected_sample

        position_s = self.audio_source.position_s
        assert position_s == expected_second

        position_ms = self.audio_source.position_ms
        assert position_ms == expected_ms

    @pytest.mark.parametrize(
        "position_ms, expected_sample, expected_second, expected_ms",
        [
            (0, 0, 0, 0),  # zero
            (100, 1, 1 / 16, int(1000 / 16)),  # one
            (1000, 16, 1, 1000),  # ten
            (-100, 15, 15 / 16, int(15 * 1000 / 16)),  # negative_1
            (-500, 8, 0.5, 500),  # negative_2
            (-700, 5, 5 / 16, int(5 * 1000 / 16)),  # negative_3
        ],
        ids=["zero", "one", "ten", "negative_1", "negative_2", "negative_3"],
    )
    def test_position_ms_setter(
        self, position_ms, expected_sample, expected_second, expected_ms
    ):
        self.audio_source.position_ms = position_ms

        position = self.audio_source.position
        assert position == expected_sample

        position_s = self.audio_source.position_s
        assert position_s == expected_second

        position_ms = self.audio_source.position_ms
        assert position_ms == expected_ms

    def test_sr16_sw2_ch1_rewind(self):
        self.audio_source.read(10)
        self.audio_source.rewind()
        tp = self.audio_source.position
        assert tp == 0


class TestBufferAudioSource_SR11_SW4_CH1:
    @pytest.fixture(autouse=True)
    def setup_and_teardown(self):
        self.data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefgh"
        self.audio_source = BufferAudioSource(
            data=self.data, sampling_rate=11, sample_width=4, channels=1
        )
        self.audio_source.open()
        yield
        self.audio_source.close()

    def test_sr11_sw4_ch1_read_1(self):
        block = self.audio_source.read(1)
        exp = b"ABCD"
        assert block == exp

    def test_sr11_sw4_ch1_read_6(self):
        block = self.audio_source.read(6)
        exp = b"ABCDEFGHIJKLMNOPQRSTUVWX"
        assert block == exp

    def test_sr11_sw4_ch1_read_multiple(self):
        block = self.audio_source.read(1)
        exp = b"ABCD"
        assert block == exp

        block = self.audio_source.read(6)
        exp = b"EFGHIJKLMNOPQRSTUVWXYZ01"
        assert block == exp

        block = self.audio_source.read(3)
        exp = b"23456789abcd"
        assert block == exp

        block = self.audio_source.read(9999)
        exp = b"efgh"
        assert block == exp

    def test_sr11_sw4_ch1_read_all(self):
        block = self.audio_source.read(9999)
        assert block == self.data

        block = self.audio_source.read(1)
        assert block is None

    def test_sr11_sw4_ch1_sampling_rate(self):
        srate = self.audio_source.sampling_rate
        assert srate == 11

    def test_sr11_sw4_ch1_sample_width(self):
        swidth = self.audio_source.sample_width
        assert swidth == 4

    def test_sr11_sw4_ch1_channels(self):
        channels = self.audio_source.channels
        assert channels == 1

    def test_sr11_sw4_ch1_intial_position_0(self):
        pos = self.audio_source.position
        assert pos == 0

    def test_sr11_sw4_ch1_position_5(self):
        self.audio_source.read(5)
        pos = self.audio_source.position
        assert pos == 5

    def test_sr11_sw4_ch1_position_9(self):
        self.audio_source.read(5)
        self.audio_source.read(4)
        pos = self.audio_source.position
        assert pos == 9

    def test_sr11_sw4_ch1_position_0(self):
        self.audio_source.read(10)
        self.audio_source.position = 0
        pos = self.audio_source.position
        assert pos == 0

    def test_sr11_sw4_ch1_position_10(self):
        self.audio_source.position = 10
        pos = self.audio_source.position
        assert pos == 10

    def test_sr11_sw4_ch1_initial_position_s_0(self):
        tp = self.audio_source.position_s
        assert tp == 0.0

    def test_sr11_sw4_ch1_position_s_1_after_read(self):
        srate = self.audio_source.sampling_rate
        # read one second
        self.audio_source.read(srate)
        tp = self.audio_source.position_s
        assert tp == 1.0

    def test_sr11_sw4_ch1_position_s_0_63(self):
        # read 2.5 seconds
        self.audio_source.read(7)
        tp = self.audio_source.position_s
        assert tp, pytest.approx(0.636363636364)

    def test_sr11_sw4_ch1_position_s_0(self):
        self.audio_source.read(10)
        self.audio_source.position_s = 0
        tp = self.audio_source.position_s
        assert tp == 0.0

    def test_sr11_sw4_ch1_position_s_1(self):
        self.audio_source.position_s = 1
        tp = self.audio_source.position_s
        assert tp == 1.0

    def test_sr11_sw4_ch1_rewind(self):
        self.audio_source.read(10)
        self.audio_source.rewind()
        tp = self.audio_source.position
        assert tp == 0


class TestBufferAudioSourceCreationException:
    def test_wrong_sample_width_value(self):
        with pytest.raises(AudioParameterError) as audio_param_err:
            _ = BufferAudioSource(
                data=b"ABCDEFGHI", sampling_rate=9, sample_width=3, channels=1
            )
        assert (
            str(audio_param_err.value)
            == "Sample width must be one of: 1, 2 or 4 (bytes)"
        )

    def test_wrong_data_buffer_size(self):
        with pytest.raises(AudioParameterError) as audio_param_err:
            _ = BufferAudioSource(
                data=b"ABCDEFGHI", sampling_rate=8, sample_width=2, channels=1
            )
        msg = "The length of audio data must be an integer multiple of "
        msg += "`sample_width * channels`"
        assert str(audio_param_err.value) == msg


class TestAudioSourceProperties:
    def test_read_properties(self):
        data = b""
        sampling_rate = 8000
        sample_width = 2
        channels = 1
        a_source = BufferAudioSource(
            data, sampling_rate, sample_width, channels
        )

        assert a_source.sampling_rate == sampling_rate
        assert a_source.sample_width == sample_width
        assert a_source.channels == channels

    def test_set_readonly_properties_exception(self):
        data = b""
        sampling_rate = 8000
        sample_width = 2
        channels = 1
        a_source = BufferAudioSource(
            data, sampling_rate, sample_width, channels
        )

        with pytest.raises(AttributeError):
            a_source.sampling_rate = 16000

        with pytest.raises(AttributeError):
            a_source.sample_width = 1

        with pytest.raises(AttributeError):
            a_source.channels = 2


class TestAudioSourceShortProperties:
    def test_read_short_properties(self):
        data = b""
        sampling_rate = 8000
        sample_width = 2
        channels = 1
        a_source = BufferAudioSource(
            data, sampling_rate, sample_width, channels
        )

        assert a_source.sr == sampling_rate
        assert a_source.sw == sample_width
        assert a_source.ch == channels

    def test_set_readonly_short_properties_exception(self):
        data = b""
        sampling_rate = 8000
        sample_width = 2
        channels = 1
        a_source = BufferAudioSource(
            data, sampling_rate, sample_width, channels
        )

        with pytest.raises(AttributeError):
            a_source.sr = 16000

        with pytest.raises(AttributeError):
            a_source.sw = 1

        with pytest.raises(AttributeError):
            a_source.ch = 2