view tests/test_AudioDataSource.py @ 240:173ffca58d23

Read data from all available channels in AudioSource
author Amine Sehili <amine.sehili@gmail.com>
date Thu, 25 Jul 2019 20:50:52 +0100
parents 41e2ce69d4f4
children 755bb58f3db0
line wrap: on
line source
"""
@author: Amine Sehili <amine.sehili@gmail.com>
September 2015

"""

import unittest
from functools import partial
import sys
from auditok import (
    dataset,
    ADSFactory,
    AudioDataSource,
    BufferAudioSource,
    WaveAudioSource,
    DuplicateArgument,
)
import wave


try:
    from builtins import range
except ImportError:
    if sys.version_info < (3, 0):
        range = xrange


class TestADSFactoryFileAudioSource(unittest.TestCase):
    def setUp(self):
        self.audio_source = WaveAudioSource(
            filename=dataset.one_to_six_arabic_16000_mono_bc_noise
        )

    def test_ADS_type(self):

        ads = ADSFactory.ads(audio_source=self.audio_source)

        self.assertIsInstance(
            ads,
            AudioDataSource,
            msg="wrong type for ads object, expected: 'AudioDataSource', found: {0}".format(
                type(ads)
            ),
        )

    def test_default_block_size(self):
        ads = ADSFactory.ads(audio_source=self.audio_source)
        size = ads.block_size
        self.assertEqual(
            size,
            160,
            "Wrong default block_size, expected: 160, found: {0}".format(size),
        )

    def test_block_size(self):
        ads = ADSFactory.ads(audio_source=self.audio_source, block_size=512)
        size = ads.block_size
        self.assertEqual(
            size,
            512,
            "Wrong block_size, expected: 512, found: {0}".format(size),
        )

        # with alias keyword
        ads = ADSFactory.ads(audio_source=self.audio_source, bs=160)
        size = ads.block_size
        self.assertEqual(
            size,
            160,
            "Wrong block_size, expected: 160, found: {0}".format(size),
        )

    def test_block_duration(self):

        ads = ADSFactory.ads(
            audio_source=self.audio_source, block_dur=0.01
        )  # 10 ms
        size = ads.block_size
        self.assertEqual(
            size,
            160,
            "Wrong block_size, expected: 160, found: {0}".format(size),
        )

        # with alias keyword
        ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025)  # 25 ms
        size = ads.block_size
        self.assertEqual(
            size,
            400,
            "Wrong block_size, expected: 400, found: {0}".format(size),
        )

    def test_hop_duration(self):

        ads = ADSFactory.ads(
            audio_source=self.audio_source, block_dur=0.02, hop_dur=0.01
        )  # 10 ms
        size = ads.hop_size
        self.assertEqual(
            size, 160, "Wrong hop_size, expected: 160, found: {0}".format(size)
        )

        # with alias keyword
        ads = ADSFactory.ads(
            audio_source=self.audio_source, bd=0.025, hop_dur=0.015
        )  # 15 ms
        size = ads.hop_size
        self.assertEqual(
            size,
            240,
            "Wrong block_size, expected: 240, found: {0}".format(size),
        )

    def test_sampling_rate(self):
        ads = ADSFactory.ads(audio_source=self.audio_source)

        srate = ads.sampling_rate
        self.assertEqual(
            srate,
            16000,
            "Wrong sampling rate, expected: 16000, found: {0}".format(srate),
        )

    def test_sample_width(self):
        ads = ADSFactory.ads(audio_source=self.audio_source)

        swidth = ads.sample_width
        self.assertEqual(
            swidth,
            2,
            "Wrong sample width, expected: 2, found: {0}".format(swidth),
        )

    def test_channels(self):
        ads = ADSFactory.ads(audio_source=self.audio_source)

        channels = ads.channels
        self.assertEqual(
            channels,
            1,
            "Wrong number of channels, expected: 1, found: {0}".format(
                channels
            ),
        )

    def test_read(self):
        ads = ADSFactory.ads(audio_source=self.audio_source, block_size=256)

        ads.open()
        ads_data = ads.read()
        ads.close()

        audio_source = WaveAudioSource(
            filename=dataset.one_to_six_arabic_16000_mono_bc_noise
        )
        audio_source.open()
        audio_source_data = audio_source.read(256)
        audio_source.close()

        self.assertEqual(
            ads_data, audio_source_data, "Unexpected data read from ads"
        )

    def test_Limiter_Deco_read(self):
        # read a maximum of 0.75 seconds from audio source
        ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.75)

        ads_data = []
        ads.open()
        while True:
            block = ads.read()
            if block is None:
                break
            ads_data.append(block)
        ads.close()
        ads_data = b"".join(ads_data)

        audio_source = WaveAudioSource(
            filename=dataset.one_to_six_arabic_16000_mono_bc_noise
        )
        audio_source.open()
        audio_source_data = audio_source.read(int(16000 * 0.75))
        audio_source.close()

        self.assertEqual(
            ads_data, audio_source_data, "Unexpected data read from LimiterADS"
        )

    def test_Limiter_Deco_read_limit(self):
        # read a maximum of 1.191 seconds from audio source
        ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1.191)
        total_samples = round(ads.sampling_rate * 1.191)
        nb_full_blocks, last_block_size = divmod(total_samples, ads.block_size)
        total_samples_with_overlap = (
            nb_full_blocks * ads.block_size + last_block_size
        )
        expected_read_bytes = (
            total_samples_with_overlap * ads.sw * ads.channels
        )

        total_read = 0
        ads.open()
        i = 0
        while True:
            block = ads.read()
            if block is None:
                break
            i += 1
            total_read += len(block)

        ads.close()

        self.assertEqual(
            total_read,
            expected_read_bytes,
            "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(
                expected_read_bytes, total_read
            ),
        )

    def test_Recorder_Deco_read(self):
        ads = ADSFactory.ads(
            audio_source=self.audio_source, record=True, block_size=500
        )

        ads_data = []
        ads.open()
        for i in range(10):
            block = ads.read()
            if block is None:
                break
            ads_data.append(block)
        ads.close()
        ads_data = b"".join(ads_data)

        audio_source = WaveAudioSource(
            filename=dataset.one_to_six_arabic_16000_mono_bc_noise
        )
        audio_source.open()
        audio_source_data = audio_source.read(500 * 10)
        audio_source.close()

        self.assertEqual(
            ads_data,
            audio_source_data,
            "Unexpected data read from RecorderADS",
        )

    def test_Recorder_Deco_is_rewindable(self):
        ads = ADSFactory.ads(audio_source=self.audio_source, record=True)

        self.assertTrue(
            ads.rewindable, "RecorderADS.is_rewindable should return True"
        )

    def test_Recorder_Deco_rewind_and_read(self):
        ads = ADSFactory.ads(
            audio_source=self.audio_source, record=True, block_size=320
        )

        ads.open()
        for i in range(10):
            ads.read()

        ads.rewind()

        # read all available data after rewind
        ads_data = []
        while True:
            block = ads.read()
            if block is None:
                break
            ads_data.append(block)
        ads.close()
        ads_data = b"".join(ads_data)

        audio_source = WaveAudioSource(
            filename=dataset.one_to_six_arabic_16000_mono_bc_noise
        )
        audio_source.open()
        audio_source_data = audio_source.read(320 * 10)
        audio_source.close()

        self.assertEqual(
            ads_data,
            audio_source_data,
            "Unexpected data read from RecorderADS",
        )

    def test_Overlap_Deco_read(self):

        # Use arbitrary valid block_size and hop_size
        block_size = 1714
        hop_size = 313

        ads = ADSFactory.ads(
            audio_source=self.audio_source,
            block_size=block_size,
            hop_size=hop_size,
        )

        # Read all available data overlapping blocks
        ads.open()
        ads_data = []
        while True:
            block = ads.read()
            if block is None:
                break
            ads_data.append(block)
        ads.close()

        # Read all data from file and build a BufferAudioSource
        fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
        wave_data = fp.readframes(fp.getnframes())
        fp.close()
        audio_source = BufferAudioSource(
            wave_data, ads.sampling_rate, ads.sample_width, ads.channels
        )
        audio_source.open()

        # Compare all blocks read from OverlapADS to those read
        # from an audio source with a manual position setting
        for i, block in enumerate(ads_data):

            tmp = audio_source.read(block_size)

            self.assertEqual(
                block,
                tmp,
                "Unexpected block (N={0}) read from OverlapADS".format(i),
            )

            audio_source.position = (i + 1) * hop_size

        audio_source.close()

    def test_Limiter_Overlap_Deco_read(self):

        block_size = 256
        hop_size = 200

        ads = ADSFactory.ads(
            audio_source=self.audio_source,
            max_time=0.50,
            block_size=block_size,
            hop_size=hop_size,
        )

        # Read all available data overlapping blocks
        ads.open()
        ads_data = []
        while True:
            block = ads.read()
            if block is None:
                break
            ads_data.append(block)
        ads.close()

        # Read all data from file and build a BufferAudioSource
        fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
        wave_data = fp.readframes(fp.getnframes())
        fp.close()
        audio_source = BufferAudioSource(
            wave_data, ads.sampling_rate, ads.sample_width, ads.channels
        )
        audio_source.open()

        # Compare all blocks read from OverlapADS to those read
        # from an audio source with a manual position setting
        for i, block in enumerate(ads_data):
            tmp = audio_source.read(len(block) // (ads.sw * ads.ch))
            self.assertEqual(
                len(block),
                len(tmp),
                "Unexpected block (N={0}) read from OverlapADS".format(i),
            )
            audio_source.position = (i + 1) * hop_size

        audio_source.close()

    def test_Limiter_Overlap_Deco_read_limit(self):

        block_size = 313
        hop_size = 207
        ads = ADSFactory.ads(
            audio_source=self.audio_source,
            max_time=1.932,
            block_size=block_size,
            hop_size=hop_size,
        )

        total_samples = round(ads.sampling_rate * 1.932)
        first_read_size = block_size
        next_read_size = block_size - hop_size
        nb_next_blocks, last_block_size = divmod(
            (total_samples - first_read_size), next_read_size
        )
        total_samples_with_overlap = (
            first_read_size + next_read_size * nb_next_blocks + last_block_size
        )
        expected_read_bytes = (
            total_samples_with_overlap * ads.sw * ads.channels
        )

        cache_size = (block_size - hop_size) * ads.sample_width * ads.channels
        total_read = cache_size

        ads.open()
        i = 0
        while True:
            block = ads.read()
            if block is None:
                break
            i += 1
            total_read += len(block) - cache_size

        ads.close()
        self.assertEqual(
            total_read,
            expected_read_bytes,
            "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(
                expected_read_bytes, total_read
            ),
        )

    def test_Recorder_Overlap_Deco_is_rewindable(self):
        ads = ADSFactory.ads(
            audio_source=self.audio_source,
            block_size=320,
            hop_size=160,
            record=True,
        )
        self.assertTrue(
            ads.rewindable, "RecorderADS.is_rewindable should return True"
        )

    def test_Recorder_Overlap_Deco_rewind_and_read(self):

        # Use arbitrary valid block_size and hop_size
        block_size = 1600
        hop_size = 400

        ads = ADSFactory.ads(
            audio_source=self.audio_source,
            block_size=block_size,
            hop_size=hop_size,
            record=True,
        )

        # Read all available data overlapping blocks
        ads.open()
        i = 0
        while True:
            block = ads.read()
            if block is None:
                break
            i += 1

        ads.rewind()

        # Read all data from file and build a BufferAudioSource
        fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
        wave_data = fp.readframes(fp.getnframes())
        fp.close()
        audio_source = BufferAudioSource(
            wave_data, ads.sampling_rate, ads.sample_width, ads.channels
        )
        audio_source.open()

        # Compare all blocks read from OverlapADS to those read
        # from an audio source with a manual position setting
        for j in range(i):

            tmp = audio_source.read(block_size)

            self.assertEqual(
                ads.read(),
                tmp,
                "Unexpected block (N={0}) read from OverlapADS".format(i),
            )
            audio_source.position = (j + 1) * hop_size

        ads.close()
        audio_source.close()

    def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):

        # Use arbitrary valid block_size and hop_size
        block_size = 1600
        hop_size = 400

        ads = ADSFactory.ads(
            audio_source=self.audio_source,
            max_time=1.50,
            block_size=block_size,
            hop_size=hop_size,
            record=True,
        )

        # Read all available data overlapping blocks
        ads.open()
        i = 0
        while True:
            block = ads.read()
            if block is None:
                break
            i += 1

        ads.rewind()

        # Read all data from file and build a BufferAudioSource
        fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
        wave_data = fp.readframes(fp.getnframes())
        fp.close()
        audio_source = BufferAudioSource(
            wave_data, ads.sampling_rate, ads.sample_width, ads.channels
        )
        audio_source.open()

        # Compare all blocks read from OverlapADS to those read
        # from an audio source with a manual position setting
        for j in range(i):

            tmp = audio_source.read(block_size)

            self.assertEqual(
                ads.read(),
                tmp,
                "Unexpected block (N={0}) read from OverlapADS".format(i),
            )
            audio_source.position = (j + 1) * hop_size

        ads.close()
        audio_source.close()

    def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_limit(self):

        # Use arbitrary valid block_size and hop_size
        block_size = 1000
        hop_size = 200

        ads = ADSFactory.ads(
            audio_source=self.audio_source,
            max_time=1.317,
            block_size=block_size,
            hop_size=hop_size,
            record=True,
        )
        total_samples = round(ads.sampling_rate * 1.317)
        first_read_size = block_size
        next_read_size = block_size - hop_size
        nb_next_blocks, last_block_size = divmod(
            (total_samples - first_read_size), next_read_size
        )
        total_samples_with_overlap = (
            first_read_size + next_read_size * nb_next_blocks + last_block_size
        )
        expected_read_bytes = (
            total_samples_with_overlap * ads.sw * ads.channels
        )

        cache_size = (block_size - hop_size) * ads.sample_width * ads.channels
        total_read = cache_size

        ads.open()
        i = 0
        while True:
            block = ads.read()
            if block is None:
                break
            i += 1
            total_read += len(block) - cache_size

        ads.close()
        self.assertEqual(
            total_read,
            expected_read_bytes,
            "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(
                expected_read_bytes, total_read
            ),
        )


class TestADSFactoryBufferAudioSource(unittest.TestCase):
    def setUp(self):
        self.signal = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
        self.ads = ADSFactory.ads(
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            block_size=4,
        )

    def test_ADS_BAS_sampling_rate(self):
        srate = self.ads.sampling_rate
        self.assertEqual(
            srate,
            16,
            "Wrong sampling rate, expected: 16000, found: {0}".format(srate),
        )

    def test_ADS_BAS_get_sample_width(self):
        swidth = self.ads.sample_width
        self.assertEqual(
            swidth,
            2,
            "Wrong sample width, expected: 2, found: {0}".format(swidth),
        )

    def test_ADS_BAS_get_channels(self):
        channels = self.ads.channels
        self.assertEqual(
            channels,
            1,
            "Wrong number of channels, expected: 1, found: {0}".format(
                channels
            ),
        )

    def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):

        # Use arbitrary valid block_size and hop_size
        block_size = 5
        hop_size = 4

        ads = ADSFactory.ads(
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            max_time=0.80,
            block_size=block_size,
            hop_size=hop_size,
            record=True,
        )

        # Read all available data overlapping blocks
        ads.open()
        i = 0
        while True:
            block = ads.read()
            if block is None:
                break
            i += 1

        ads.rewind()

        # Build a BufferAudioSource
        audio_source = BufferAudioSource(
            self.signal, ads.sampling_rate, ads.sample_width, ads.channels
        )
        audio_source.open()

        # Compare all blocks read from OverlapADS to those read
        # from an audio source with a manual position setting
        for j in range(i):

            tmp = audio_source.read(block_size)

            block = ads.read()

            self.assertEqual(
                block,
                tmp,
                "Unexpected block '{}' (N={}) read from OverlapADS".format(
                    block, i
                ),
            )
            audio_source.position = (j + 1) * hop_size

        ads.close()
        audio_source.close()


class TestADSFactoryAlias(unittest.TestCase):
    def setUp(self):
        self.signal = "ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"

    def test_sampling_rate_alias(self):
        ads = ADSFactory.ads(
            data_buffer=self.signal,
            sr=16,
            sample_width=2,
            channels=1,
            block_dur=0.5,
        )
        srate = ads.sampling_rate
        self.assertEqual(
            srate,
            16,
            "Wrong sampling rate, expected: 16000, found: {0}".format(srate),
        )

    def test_sampling_rate_duplicate(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sr=16,
            sampling_rate=16,
            sample_width=2,
            channels=1,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_sample_width_alias(self):
        ads = ADSFactory.ads(
            data_buffer=self.signal,
            sampling_rate=16,
            sw=2,
            channels=1,
            block_dur=0.5,
        )
        swidth = ads.sample_width
        self.assertEqual(
            swidth,
            2,
            "Wrong sample width, expected: 2, found: {0}".format(swidth),
        )

    def test_sample_width_duplicate(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sampling_rate=16,
            sw=2,
            sample_width=2,
            channels=1,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_channels_alias(self):
        ads = ADSFactory.ads(
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            ch=1,
            block_dur=4,
        )
        channels = ads.channels
        self.assertEqual(
            channels,
            1,
            "Wrong number of channels, expected: 1, found: {0}".format(
                channels
            ),
        )

    def test_channels_duplicate(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            ch=1,
            channels=1,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_block_size_alias(self):
        ads = ADSFactory.ads(
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            bs=8,
        )
        size = ads.block_size
        self.assertEqual(
            size,
            8,
            "Wrong block_size using bs alias, expected: 8, found: {0}".format(
                size
            ),
        )

    def test_block_size_duplicate(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            bs=4,
            block_size=4,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_block_duration_alias(self):
        ads = ADSFactory.ads(
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            bd=0.75,
        )
        # 0.75 ms = 0.75 * 16 = 12
        size = ads.block_size
        self.assertEqual(
            size,
            12,
            "Wrong block_size set with a block_dur alias 'bd', expected: 8, found: {0}".format(
                size
            ),
        )

    def test_block_duration_duplicate(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            bd=4,
            block_dur=4,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_block_size_duration_duplicate(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            bd=4,
            bs=12,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_hop_duration_alias(self):

        ads = ADSFactory.ads(
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            bd=0.75,
            hd=0.5,
        )
        size = ads.hop_size
        self.assertEqual(
            size,
            8,
            "Wrong block_size using bs alias, expected: 8, found: {0}".format(
                size
            ),
        )

    def test_hop_duration_duplicate(self):

        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            bd=0.75,
            hd=0.5,
            hop_dur=0.5,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_hop_size_duration_duplicate(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            bs=8,
            hs=4,
            hd=1,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_hop_size_greater_than_block_size(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            bs=4,
            hs=8,
        )
        self.assertRaises(ValueError, func)

    def test_filename_alias(self):
        ads = ADSFactory.ads(fn=dataset.one_to_six_arabic_16000_mono_bc_noise)

    def test_filename_duplicate(self):

        func = partial(
            ADSFactory.ads,
            fn=dataset.one_to_six_arabic_16000_mono_bc_noise,
            filename=dataset.one_to_six_arabic_16000_mono_bc_noise,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_data_buffer_duplicate(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            db=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_max_time_alias(self):
        ads = ADSFactory.ads(
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            mt=10,
            block_dur=0.5,
        )
        self.assertEqual(
            ads.max_read,
            10,
            "Wrong AudioDataSource.max_read, expected: 10, found: {}".format(
                ads.max_read
            ),
        )

    def test_max_time_duplicate(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            mt=True,
            max_time=True,
        )

        self.assertRaises(DuplicateArgument, func)

    def test_record_alias(self):
        ads = ADSFactory.ads(
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            rec=True,
            block_dur=0.5,
        )
        self.assertTrue(
            ads.rewindable, "AudioDataSource.rewindable expected to be True"
        )

    def test_record_duplicate(self):
        func = partial(
            ADSFactory.ads,
            data_buffer=self.signal,
            sampling_rate=16,
            sample_width=2,
            channels=1,
            rec=True,
            record=True,
        )
        self.assertRaises(DuplicateArgument, func)

    def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_alias(self):

        # Use arbitrary valid block_size and hop_size
        block_size = 5
        hop_size = 4

        ads = ADSFactory.ads(
            db=self.signal,
            sr=16,
            sw=2,
            ch=1,
            mt=0.80,
            bs=block_size,
            hs=hop_size,
            rec=True,
        )

        # Read all available data overlapping blocks
        ads.open()
        i = 0
        while True:
            block = ads.read()
            if block is None:
                break
            i += 1

        ads.rewind()

        # Build a BufferAudioSource
        audio_source = BufferAudioSource(
            self.signal, ads.sampling_rate, ads.sample_width, ads.channels
        )
        audio_source.open()

        # Compare all blocks read from AudioDataSource to those read
        # from an audio source with manual position definition
        for j in range(i):
            tmp = audio_source.read(block_size)
            block = ads.read()
            self.assertEqual(
                block,
                tmp,
                "Unexpected block (N={0}) read from OverlapADS".format(i),
            )
            audio_source.position = (j + 1) * hop_size
        ads.close()
        audio_source.close()


if __name__ == "__main__":
    unittest.main()