# HG changeset patch # User Amine Sehili # Date 1716756188 -7200 # Node ID 996948ada9804a4a00173249ceea4196b0200217 # Parent 954c1e2790681893a27e104f9e554e85b74c3eea Update tests diff -r 954c1e279068 -r 996948ada980 tests/images/plot_mono_region.png Binary file tests/images/plot_mono_region.png has changed diff -r 954c1e279068 -r 996948ada980 tests/images/plot_stereo_region.png Binary file tests/images/plot_stereo_region.png has changed diff -r 954c1e279068 -r 996948ada980 tests/images/split_and_plot_mono_region.png Binary file tests/images/split_and_plot_mono_region.png has changed diff -r 954c1e279068 -r 996948ada980 tests/images/split_and_plot_uc_0_stereo_region.png Binary file tests/images/split_and_plot_uc_0_stereo_region.png has changed diff -r 954c1e279068 -r 996948ada980 tests/images/split_and_plot_uc_1_stereo_region.png Binary file tests/images/split_and_plot_uc_1_stereo_region.png has changed diff -r 954c1e279068 -r 996948ada980 tests/images/split_and_plot_uc_any_stereo_region.png Binary file tests/images/split_and_plot_uc_any_stereo_region.png has changed diff -r 954c1e279068 -r 996948ada980 tests/images/split_and_plot_uc_mix_stereo_region.png Binary file tests/images/split_and_plot_uc_mix_stereo_region.png has changed diff -r 954c1e279068 -r 996948ada980 tests/test_AudioReader.py --- a/tests/test_AudioReader.py Sun May 26 17:19:31 2024 +0200 +++ b/tests/test_AudioReader.py Sun May 26 22:43:08 2024 +0200 @@ -1,135 +1,229 @@ -import pytest -from functools import partial import sys import wave +from functools import partial + +import pytest + from auditok import ( + AudioReader, + BufferAudioSource, + Recorder, + WaveAudioSource, dataset, - ADSFactory, - AudioDataSource, - AudioReader, - Recorder, - BufferAudioSource, - WaveAudioSource, - DuplicateArgument, ) +from auditok.util import _Limiter, _OverlapAudioReader -class TestADSFactoryFileAudioSource: - def setup_method(self): +def _read_all_data(reader): + blocks = [] + while True: + data = reader.read() + if data is None: + break + blocks.append(data) + return b"".join(blocks) + + +class TestAudioReaderWithFileAudioSource: + @pytest.fixture(autouse=True) + def setup_and_teardown(self): self.audio_source = WaveAudioSource( filename=dataset.one_to_six_arabic_16000_mono_bc_noise ) + self.audio_source.open() + yield + self.audio_source.close() - def test_ADS_type(self): - ads = ADSFactory.ads(audio_source=self.audio_source) + def test_AudioReader_type(self): + reader = AudioReader(input=self.audio_source) err_msg = ( - "wrong type for ads object, expected: 'AudioDataSource', found: {0}" + "wrong object type, expected: 'AudioReader', found: {0}" ) - assert isinstance(ads, AudioDataSource), err_msg.format(type(ads)) + assert isinstance(reader, AudioReader), err_msg.format(type(reader)) - def test_default_block_size(self): - ads = ADSFactory.ads(audio_source=self.audio_source) - size = ads.block_size + def _test_default_block_size(self): + reader = AudioReader(input=self.audio_source) + data = reader.read() + size = len(data) assert ( size == 160 ), "Wrong default block_size, expected: 160, found: {0}".format(size) - def test_block_size(self): - ads = ADSFactory.ads(audio_source=self.audio_source, block_size=512) - size = ads.block_size + @pytest.mark.parametrize( + "block_dur, expected_nb_samples", + [ + (None, 160), # default: 10 ms + (0.025, 400), # 25 ms + ], + ids=["default", "_25ms"], + ) + def test_block_duration(self, block_dur, expected_nb_samples): + """Test the number of samples read for a given block duration.""" + if block_dur is not None: + reader = AudioReader(input=self.audio_source, block_dur=block_dur) + else: + reader = AudioReader(input=self.audio_source) + data = reader.read() + nb_samples = len(data) // reader.sample_width assert ( - size == 512 - ), "Wrong block_size, expected: 512, found: {0}".format(size) + nb_samples == expected_nb_samples + ), f"Wrong block_size, expected: {expected_nb_samples}, found: {nb_samples}" - # with alias keyword - ads = ADSFactory.ads(audio_source=self.audio_source, bs=160) - size = ads.block_size - assert ( - size == 160 - ), "Wrong block_size, expected: 160, found: {0}".format(size) + @pytest.mark.parametrize( + "block_dur, hop_dur, expected_nb_blocks, expected_last_block_nb_samples", + [ + (None, None, 1879, 126), # default: 10 ms + (0.01, None, 1879, 126), # block_dur_10ms_hop_dur_None + (0.01, 0.01, 1879, 126), # block_dur_10ms_hop_dur_10ms + (0.02, None, 940, 126), # block_dur_20ms_hop_dur_None + (0.025, None, 752, 206), # block_dur_25ms_hop_dur_None + (0.02, 0.01, 1878, 286), # block_dur_20ms_hop_dur_10ms + (0.025, 0.005, 3754, 366), # block_dur_25ms_hop_dur_None + ], + ids=[ + "default", + "block_dur_10ms_hop_dur_None", + "block_dur_10ms_hop_dur_100ms", + "block_dur_20ms_hop_dur_None", + "block_dur_25ms_hop_dur_None", + "block_dur_20ms_hop_dur_10ms", + "block_dur_25ms_hop_dur_None", + ], + ) + def test_hop_duration( + self, + block_dur, + hop_dur, + expected_nb_blocks, + expected_last_block_nb_samples, + ): + """Test the number of read blocks and the duration of last block for + different 'block_dur' and 'hop_dur' values. - def test_block_duration(self): - ads = ADSFactory.ads( - audio_source=self.audio_source, block_dur=0.01 - ) # 10 ms - size = ads.block_size - assert ( - size == 160 - ), "Wrong block_size, expected: 160, found: {0}".format(size) + Args: + block_dur (float or None): block duration in seconds. + hop_dur (float or None): hop duration in seconds. + expected_nb_blocks (int): expected number of read block. + expected_last_block_nb_samples (int): expected number of sample + in the last block. + """ + if block_dur is not None: + reader = AudioReader( + input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur + ) + else: + reader = AudioReader(input=self.audio_source, hop_dur=hop_dur) - # with alias keyword - ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025) # 25 ms - size = ads.block_size - assert ( - size == 400 - ), "Wrong block_size, expected: 400, found: {0}".format(size) + nb_blocks = 0 + last_block_nb_samples = None + while True: + data = reader.read() + if data is not None: + nb_blocks += 1 + last_block_nb_samples = len(data) // reader.sample_width + else: + break + err_msg = "Wrong number of blocks read from source, expected: " + err_msg += f"{expected_nb_blocks}, found: {nb_blocks}" + assert nb_blocks == expected_nb_blocks, err_msg - def test_hop_duration(self): - ads = ADSFactory.ads( - audio_source=self.audio_source, block_dur=0.02, hop_dur=0.01 - ) # 10 ms - size = ads.hop_size - assert size == 160, "Wrong hop_size, expected: 160, found: {0}".format( - size + err_msg = ( + "Wrong number of samples in last block read from source, expected: " + ) + err_msg += ( + f"{expected_last_block_nb_samples}, found: {last_block_nb_samples}" ) - # with alias keyword - ads = ADSFactory.ads( - audio_source=self.audio_source, bd=0.025, hop_dur=0.015 - ) # 15 ms - size = ads.hop_size - assert ( - size == 240 - ), "Wrong block_size, expected: 240, found: {0}".format(size) + assert last_block_nb_samples == expected_last_block_nb_samples, err_msg + + def test_hop_duration_exception(self): + """Test passing hop_dur > block_dur raises ValueError""" + with pytest.raises(ValueError): + AudioReader(self.audio_source, block_dur=0.01, hop_dur=0.015) + + @pytest.mark.parametrize( + "block_dur, hop_dur", + [ + (None, None), # default + (0.01, None), # block_dur_10ms_hop_dur_None + (None, 0.01), # block_dur_None__hop_dur_10ms + (0.05, 0.05), # block_dur_50ms_hop_dur_50ms + ], + ids=[ + "default", + "block_dur_10ms_hop_dur_None", + "block_dur_None__hop_dur_10ms", + "block_dur_50ms_hop_dur_50ms", + ], + ) + def test_reader_class_block_dur_equals_hop_dur(self, block_dur, hop_dur): + """Test passing hop_dur == block_dur does not create an instance of + '_OverlapAudioReader'. + """ + if block_dur is not None: + reader = AudioReader( + input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur + ) + else: + reader = AudioReader(input=self.audio_source, hop_dur=hop_dur) + assert not isinstance(reader, _OverlapAudioReader) def test_sampling_rate(self): - ads = ADSFactory.ads(audio_source=self.audio_source) - srate = ads.sampling_rate + reader = AudioReader(input=self.audio_source) + sampling_rate = reader.sampling_rate assert ( - srate == 16000 - ), "Wrong sampling rate, expected: 16000, found: {0}".format(srate) + sampling_rate == 16000 + ), f"Wrong sampling rate, expected: 16000, found: {sampling_rate}" def test_sample_width(self): - ads = ADSFactory.ads(audio_source=self.audio_source) - swidth = ads.sample_width + reader = AudioReader(input=self.audio_source) + sample_width = reader.sample_width assert ( - swidth == 2 - ), "Wrong sample width, expected: 2, found: {0}".format(swidth) + sample_width == 2 + ), f"Wrong sample width, expected: 2, found: {sample_width}" def test_channels(self): - ads = ADSFactory.ads(audio_source=self.audio_source) - channels = ads.channels + reader = AudioReader(input=self.audio_source) + channels = reader.channels assert ( channels == 1 - ), "Wrong number of channels, expected: 1, found: {0}".format(channels) + ), f"Wrong number of channels, expected: 1, found: {channels}" def test_read(self): - ads = ADSFactory.ads(audio_source=self.audio_source, block_size=256) - ads.open() - ads_data = ads.read() - ads.close() - + reader = AudioReader(input=self.audio_source, block_dur=0.02) + reader_data = reader.read() audio_source = WaveAudioSource( filename=dataset.one_to_six_arabic_16000_mono_bc_noise ) audio_source.open() - audio_source_data = audio_source.read(256) + audio_source_data = audio_source.read(320) audio_source.close() + assert ( + reader_data == audio_source_data + ), "Unexpected data read from AudioReader" - assert ads_data == audio_source_data, "Unexpected data read from ads" + def test_read_with_overlap(self): + reader = AudioReader( + input=self.audio_source, block_dur=0.02, hop_dur=0.01 + ) + _ = reader.read() # first block + reader_data = reader.read() # second block with 0.01 S overlap + audio_source = WaveAudioSource( + filename=dataset.one_to_six_arabic_16000_mono_bc_noise + ) + audio_source.open() + _ = audio_source.read(160) + audio_source_data = audio_source.read(320) + audio_source.close() + assert ( + reader_data == audio_source_data + ), "Unexpected data read from AudioReader" - def test_Limiter_Deco_read(self): + def test_read_from_AudioReader_with_max_read(self): # read a maximum of 0.75 seconds from audio source - ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.75) - ads_data = [] - ads.open() - while True: - block = ads.read() - if block is None: - break - ads_data.append(block) - ads.close() - ads_data = b"".join(ads_data) + reader = AudioReader(input=self.audio_source, max_read=0.75) + assert isinstance(reader._audio_source._audio_source, _Limiter) + reader_data = _read_all_data(reader) audio_source = WaveAudioSource( filename=dataset.one_to_six_arabic_16000_mono_bc_noise @@ -139,188 +233,203 @@ audio_source.close() assert ( - ads_data == audio_source_data - ), "Unexpected data read from LimiterADS" + reader_data == audio_source_data + ), f"Unexpected data read from AudioReader with 'max_read = {0.75}'" - def test_Limiter_Deco_read_limit(self): + def test_read_data_size_from_AudioReader_with_max_read(self): # read a maximum of 1.191 seconds from audio source - ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1.191) - total_samples = round(ads.sampling_rate * 1.191) - nb_full_blocks, last_block_size = divmod(total_samples, ads.block_size) + reader = AudioReader(input=self.audio_source, max_read=1.191) + assert isinstance(reader._audio_source._audio_source, _Limiter) + total_samples = round(reader.sampling_rate * 1.191) + block_size = int(reader.block_dur * reader.sampling_rate) + nb_full_blocks, last_block_size = divmod(total_samples, block_size) total_samples_with_overlap = ( - nb_full_blocks * ads.block_size + last_block_size + nb_full_blocks * block_size + last_block_size ) - expected_read_bytes = total_samples_with_overlap * ads.sw * ads.channels + expected_read_bytes = ( + total_samples_with_overlap * reader.sample_width * reader.channels + ) - total_read = 0 - ads.open() - i = 0 - while True: - block = ads.read() + reader_data = _read_all_data(reader) + total_read = len(reader_data) + err_msg = f"Wrong data length read from LimiterADS, expected: {expected_read_bytes}, found: {total_read}" + assert total_read == expected_read_bytes, err_msg + + def test_read_from_Recorder(self): + reader = Recorder(input=self.audio_source, block_dur=0.025) + reader_data = [] + for _ in range(10): + block = reader.read() if block is None: break - i += 1 - total_read += len(block) - - ads.close() - err_msg = ( - "Wrong data length read from LimiterADS, expected: {0}, found: {1}" - ) - assert total_read == expected_read_bytes, err_msg.format( - expected_read_bytes, total_read - ) - - def test_Recorder_Deco_read(self): - ads = ADSFactory.ads( - audio_source=self.audio_source, record=True, block_size=500 - ) - ads_data = [] - ads.open() - for i in range(10): - block = ads.read() - if block is None: - break - ads_data.append(block) - ads.close() - ads_data = b"".join(ads_data) + reader_data.append(block) + reader_data = b"".join(reader_data) audio_source = WaveAudioSource( filename=dataset.one_to_six_arabic_16000_mono_bc_noise ) audio_source.open() - audio_source_data = audio_source.read(500 * 10) + audio_source_data = audio_source.read(400 * 10) audio_source.close() assert ( - ads_data == audio_source_data - ), "Unexpected data read from RecorderADS" + reader_data == audio_source_data + ), "Unexpected data read from Recorder" - def test_Recorder_Deco_is_rewindable(self): - ads = ADSFactory.ads(audio_source=self.audio_source, record=True) - assert ads.rewindable, "RecorderADS.is_rewindable should return True" + def test_AudioReader_rewindable(self): + reader = AudioReader(input=self.audio_source, record=True) + assert ( + reader.rewindable + ), "AudioReader with record=True should be rewindable" - def test_Recorder_Deco_rewind_and_read(self): - ads = ADSFactory.ads( - audio_source=self.audio_source, record=True, block_size=320 + def test_AudioReader_record_and_rewind(self): + reader = AudioReader( + input=self.audio_source, record=True, block_dur=0.02 ) - ads.open() + # read 0.02 * 10 = 0.2 sec. of data for i in range(10): - ads.read() - - ads.rewind() + reader.read() + reader.rewind() # read all available data after rewind - ads_data = [] - while True: - block = ads.read() - if block is None: - break - ads_data.append(block) - ads.close() - ads_data = b"".join(ads_data) + reader_data = _read_all_data(reader) audio_source = WaveAudioSource( filename=dataset.one_to_six_arabic_16000_mono_bc_noise ) audio_source.open() - audio_source_data = audio_source.read(320 * 10) + audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data audio_source.close() assert ( - ads_data == audio_source_data - ), "Unexpected data read from RecorderADS" + reader_data == audio_source_data + ), "Unexpected data read from AudioReader with record = True" - def test_Overlap_Deco_read(self): + def test_Recorder_record_and_rewind(self): + recorder = Recorder(input=self.audio_source, block_dur=0.02) + # read 0.02 * 10 = 0.2 sec. of data + for i in range(10): + recorder.read() + + recorder.rewind() + + # read all available data after rewind + recorder_data = [] + recorder_data = _read_all_data(recorder) + + audio_source = WaveAudioSource( + filename=dataset.one_to_six_arabic_16000_mono_bc_noise + ) + audio_source.open() + audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data + audio_source.close() + + assert ( + recorder_data == audio_source_data + ), "Unexpected data read from Recorder" + + def test_read_overlapping_blocks(self): # Use arbitrary valid block_size and hop_size block_size = 1714 hop_size = 313 + block_dur = block_size / self.audio_source.sampling_rate + hop_dur = hop_size / self.audio_source.sampling_rate - ads = ADSFactory.ads( - audio_source=self.audio_source, - block_size=block_size, - hop_size=hop_size, + reader = AudioReader( + input=self.audio_source, + block_dur=block_dur, + hop_dur=hop_dur, ) - # Read all available data overlapping blocks - ads.open() - ads_data = [] + # Read all available overlapping blocks of data + reader_data = [] while True: - block = ads.read() + block = reader.read() if block is None: break - ads_data.append(block) - ads.close() + reader_data.append(block) # Read all data from file and build a BufferAudioSource fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") wave_data = fp.readframes(fp.getnframes()) fp.close() audio_source = BufferAudioSource( - wave_data, ads.sampling_rate, ads.sample_width, ads.channels + wave_data, + reader.sampling_rate, + reader.sample_width, + reader.channels, ) audio_source.open() - # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting - for i, block in enumerate(ads_data): + # Compare all blocks read from OverlapADS to those read from an + # audio source with a manual position setting + for i, block in enumerate(reader_data): tmp = audio_source.read(block_size) assert ( block == tmp - ), "Unexpected block (N={0}) read from OverlapADS".format(i) + ), f"Unexpected data (block {i}) from reader with overlapping blocks" audio_source.position = (i + 1) * hop_size audio_source.close() - def test_Limiter_Overlap_Deco_read(self): + def test_read_overlapping_blocks_with_max_read(self): block_size = 256 hop_size = 200 + block_dur = block_size / self.audio_source.sampling_rate + hop_dur = hop_size / self.audio_source.sampling_rate - ads = ADSFactory.ads( - audio_source=self.audio_source, - max_time=0.50, - block_size=block_size, - hop_size=hop_size, + reader = AudioReader( + input=self.audio_source, + block_dur=block_dur, + hop_dur=hop_dur, + max_read=0.5, ) - # Read all available data overlapping blocks - ads.open() - ads_data = [] + # Read all available overlapping blocks of data + reader_data = [] while True: - block = ads.read() + block = reader.read() if block is None: break - ads_data.append(block) - ads.close() + reader_data.append(block) # Read all data from file and build a BufferAudioSource fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") wave_data = fp.readframes(fp.getnframes()) fp.close() audio_source = BufferAudioSource( - wave_data, ads.sampling_rate, ads.sample_width, ads.channels + wave_data, + reader.sampling_rate, + reader.sample_width, + reader.channels, ) audio_source.open() - # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting - for i, block in enumerate(ads_data): - tmp = audio_source.read(len(block) // (ads.sw * ads.ch)) - assert len(block) == len( - tmp - ), "Unexpected block (N={0}) read from OverlapADS".format(i) + # Compare all blocks read from OverlapADS to those read from an + # audio source with a manual position setting + for i, block in enumerate(reader_data): + tmp = audio_source.read(len(block) // (reader.sw * reader.ch)) + assert ( + block == tmp + ), f"Unexpected data (block {i}) from reader with overlapping blocks and max_read" audio_source.position = (i + 1) * hop_size audio_source.close() - def test_Limiter_Overlap_Deco_read_limit(self): + def test_length_read_overlapping_blocks_with_max_read(self): block_size = 313 hop_size = 207 - ads = ADSFactory.ads( - audio_source=self.audio_source, - max_time=1.932, - block_size=block_size, - hop_size=hop_size, + block_dur = block_size / self.audio_source.sampling_rate + hop_dur = hop_size / self.audio_source.sampling_rate + + reader = AudioReader( + input=self.audio_source, + max_read=1.932, + block_dur=block_dur, + hop_dur=hop_dur, ) - total_samples = round(ads.sampling_rate * 1.932) + total_samples = round(reader.sampling_rate * 1.932) first_read_size = block_size next_read_size = block_size - hop_size nb_next_blocks, last_block_size = divmod( @@ -329,21 +438,23 @@ total_samples_with_overlap = ( first_read_size + next_read_size * nb_next_blocks + last_block_size ) - expected_read_bytes = total_samples_with_overlap * ads.sw * ads.channels + expected_read_bytes = ( + total_samples_with_overlap * reader.sw * reader.channels + ) - cache_size = (block_size - hop_size) * ads.sample_width * ads.channels + cache_size = ( + (block_size - hop_size) * reader.sample_width * reader.channels + ) total_read = cache_size - ads.open() i = 0 while True: - block = ads.read() + block = reader.read() if block is None: break i += 1 total_read += len(block) - cache_size - ads.close() err_msg = ( "Wrong data length read from LimiterADS, expected: {0}, found: {1}" ) @@ -351,115 +462,125 @@ expected_read_bytes, total_read ) - def test_Recorder_Overlap_Deco_is_rewindable(self): - ads = ADSFactory.ads( - audio_source=self.audio_source, - block_size=320, - hop_size=160, + def test_reader_with_overlapping_blocks__rewindable(self): + reader = AudioReader( + input=self.audio_source, + block_dur=320, + hop_dur=160, record=True, ) - assert ads.rewindable, "RecorderADS.is_rewindable should return True" + assert ( + reader.rewindable + ), "AudioReader with record=True should be rewindable" - def test_Recorder_Overlap_Deco_rewind_and_read(self): + def test_overlapping_blocks_with_max_read_rewind_and_read(self): # Use arbitrary valid block_size and hop_size block_size = 1600 hop_size = 400 + block_dur = block_size / self.audio_source.sampling_rate + hop_dur = hop_size / self.audio_source.sampling_rate - ads = ADSFactory.ads( - audio_source=self.audio_source, - block_size=block_size, - hop_size=hop_size, + reader = AudioReader( + input=self.audio_source, + block_dur=block_dur, + hop_dur=hop_dur, record=True, ) # Read all available data overlapping blocks - ads.open() i = 0 while True: - block = ads.read() + block = reader.read() if block is None: break i += 1 - ads.rewind() + reader.rewind() # Read all data from file and build a BufferAudioSource fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") wave_data = fp.readframes(fp.getnframes()) fp.close() audio_source = BufferAudioSource( - wave_data, ads.sampling_rate, ads.sample_width, ads.channels + wave_data, + reader.sampling_rate, + reader.sample_width, + reader.channels, ) audio_source.open() - # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting + # Compare blocks read from AudioReader to those read from an BufferAudioSource with manual position setting for j in range(i): tmp = audio_source.read(block_size) assert ( - ads.read() == tmp - ), "Unexpected block (N={0}) read from OverlapADS".format(i) + reader.read() == tmp + ), f"Unexpected data (block {i}) from reader with overlapping blocks and record = True" audio_source.position = (j + 1) * hop_size - ads.close() audio_source.close() - def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self): + def test_overlapping_blocks_with_record_and_max_read_rewind_and_read(self): # Use arbitrary valid block_size and hop_size block_size = 1600 hop_size = 400 + block_dur = block_size / self.audio_source.sampling_rate + hop_dur = hop_size / self.audio_source.sampling_rate - ads = ADSFactory.ads( - audio_source=self.audio_source, + reader = AudioReader( + input=self.audio_source, max_time=1.50, - block_size=block_size, - hop_size=hop_size, + block_dur=block_dur, + hop_dur=hop_dur, record=True, ) # Read all available data overlapping blocks - ads.open() i = 0 while True: - block = ads.read() + block = reader.read() if block is None: break i += 1 - ads.rewind() + reader.rewind() # Read all data from file and build a BufferAudioSource fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") wave_data = fp.readframes(fp.getnframes()) fp.close() audio_source = BufferAudioSource( - wave_data, ads.sampling_rate, ads.sample_width, ads.channels + wave_data, + reader.sampling_rate, + reader.sample_width, + reader.channels, ) audio_source.open() - # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting + # Compare all blocks read from AudioReader to those read from BufferAudioSource with a manual position setting for j in range(i): tmp = audio_source.read(block_size) assert ( - ads.read() == tmp + reader.read() == tmp ), "Unexpected block (N={0}) read from OverlapADS".format(i) audio_source.position = (j + 1) * hop_size - ads.close() audio_source.close() - def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_limit(self): + def test_length_read_overlapping_blocks_with_record_and_max_read(self): # Use arbitrary valid block_size and hop_size block_size = 1000 hop_size = 200 + block_dur = block_size / self.audio_source.sampling_rate + hop_dur = hop_size / self.audio_source.sampling_rate - ads = ADSFactory.ads( - audio_source=self.audio_source, - max_time=1.317, - block_size=block_size, - hop_size=hop_size, + reader = AudioReader( + input=self.audio_source, + block_dur=block_dur, + hop_dur=hop_dur, record=True, + max_read=1.317, ) - total_samples = round(ads.sampling_rate * 1.317) + total_samples = round(reader.sampling_rate * 1.317) first_read_size = block_size next_read_size = block_size - hop_size nb_next_blocks, last_block_size = divmod( @@ -468,434 +589,111 @@ total_samples_with_overlap = ( first_read_size + next_read_size * nb_next_blocks + last_block_size ) - expected_read_bytes = total_samples_with_overlap * ads.sw * ads.channels + expected_read_bytes = ( + total_samples_with_overlap * reader.sample_width * reader.channels + ) - cache_size = (block_size - hop_size) * ads.sample_width * ads.channels + cache_size = ( + (block_size - hop_size) * reader.sample_width * reader.channels + ) total_read = cache_size - ads.open() i = 0 while True: - block = ads.read() + block = reader.read() if block is None: break i += 1 total_read += len(block) - cache_size - ads.close() - err_msg = ( - "Wrong data length read from LimiterADS, expected: {0}, found: {1}" - ) - assert total_read == expected_read_bytes, err_msg.format( - expected_read_bytes, total_read - ) + err_msg = f"Wrong data length read from AudioReader, expected: {expected_read_bytes}, found: {total_read}" + assert total_read == expected_read_bytes, err_msg -class TestADSFactoryBufferAudioSource: - def setup_method(self): - self.signal = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345" - self.ads = ADSFactory.ads( - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - block_size=4, - ) +def test_AudioReader_raw_data(): - def test_ADS_BAS_sampling_rate(self): - srate = self.ads.sampling_rate + data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345" + block_size = 5 + hop_size = 4 + reader = AudioReader( + input=data, + sampling_rate=16, + sample_width=2, + channels=1, + block_dur=block_size / 16, + hop_dur=hop_size / 16, + max_read=0.80, + record=True, + ) + reader.open() + + assert ( + reader.sampling_rate == 16 + ), f"Wrong sampling rate, expected: 16, found: {reader.sampling_rate }" + + assert ( + reader.sample_width == 2 + ), f"Wrong sample width, expected: 2, found: {reader.sample_width}" + + # Read all available data overlapping blocks + i = 0 + while True: + block = reader.read() + if block is None: + break + i += 1 + + reader.rewind() + + # Build a BufferAudioSource + audio_source = BufferAudioSource( + data, reader.sampling_rate, reader.sample_width, reader.channels + ) + audio_source.open() + + # Compare all blocks read from AudioReader to those read from an audio + # source with a manual position setting + for j in range(i): + tmp = audio_source.read(block_size) + block = reader.read() assert ( - srate == 16 - ), "Wrong sampling rate, expected: 16000, found: {0}".format(srate) + block == tmp + ), f"Unexpected block '{block}' (N={i}) read from OverlapADS" + audio_source.position = (j + 1) * hop_size + audio_source.close() + reader.close() - def test_ADS_BAS_sample_width(self): - swidth = self.ads.sample_width - assert ( - swidth == 2 - ), "Wrong sample width, expected: 2, found: {0}".format(swidth) - def test_ADS_BAS_channels(self): - channels = self.ads.channels - assert ( - channels == 1 - ), "Wrong number of channels, expected: 1, found: {0}".format(channels) - - def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self): - # Use arbitrary valid block_size and hop_size - block_size = 5 - hop_size = 4 - - ads = ADSFactory.ads( - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - max_time=0.80, - block_size=block_size, - hop_size=hop_size, - record=True, - ) - - # Read all available data overlapping blocks - ads.open() - i = 0 - while True: - block = ads.read() - if block is None: - break - i += 1 - - ads.rewind() - - # Build a BufferAudioSource - audio_source = BufferAudioSource( - self.signal, ads.sampling_rate, ads.sample_width, ads.channels - ) - audio_source.open() - - # Compare all blocks read from OverlapADS to those read from an audio source with a manual position setting - for j in range(i): - tmp = audio_source.read(block_size) - block = ads.read() - assert ( - block == tmp - ), "Unexpected block '{}' (N={}) read from OverlapADS".format( - block, i - ) - audio_source.position = (j + 1) * hop_size - - ads.close() - audio_source.close() - - -class TestADSFactoryAlias: - def setup_method(self): - self.signal = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345" - - def test_sampling_rate_alias(self): - ads = ADSFactory.ads( - data_buffer=self.signal, - sr=16, - sample_width=2, - channels=1, - block_dur=0.5, - ) - srate = ads.sampling_rate - assert ( - srate == 16 - ), "Wrong sampling rate, expected: 16000, found: {0}".format(srate) - - def test_sampling_rate_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sr=16, - sampling_rate=16, - sample_width=2, - channels=1, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_sample_width_alias(self): - ads = ADSFactory.ads( - data_buffer=self.signal, - sampling_rate=16, - sw=2, - channels=1, - block_dur=0.5, - ) - swidth = ads.sample_width - assert ( - swidth == 2 - ), "Wrong sample width, expected: 2, found: {0}".format(swidth) - - def test_sample_width_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sampling_rate=16, - sw=2, - sample_width=2, - channels=1, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_channels_alias(self): - ads = ADSFactory.ads( - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - ch=1, - block_dur=4, - ) - channels = ads.channels - assert ( - channels == 1 - ), "Wrong number of channels, expected: 1, found: {0}".format(channels) - - def test_channels_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - ch=1, - channels=1, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_block_size_alias(self): - ads = ADSFactory.ads( - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - bs=8, - ) - size = ads.block_size - assert ( - size == 8 - ), "Wrong block_size using bs alias, expected: 8, found: {0}".format( - size - ) - - def test_block_size_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - bs=4, - block_size=4, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_block_duration_alias(self): - ads = ADSFactory.ads( - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - bd=0.75, - ) - size = ads.block_size - err_msg = "Wrong block_size set with a block_dur alias 'bd', expected: 8, found: {0}" - assert size == 12, err_msg.format(size) - - def test_block_duration_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - bd=4, - block_dur=4, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_block_size_duration_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - bd=4, - bs=12, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_hop_duration_alias(self): - ads = ADSFactory.ads( - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - bd=0.75, - hd=0.5, - ) - size = ads.hop_size - assert ( - size == 8 - ), "Wrong block_size using bs alias, expected: 8, found: {0}".format( - size - ) - - def test_hop_duration_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - bd=0.75, - hd=0.5, - hop_dur=0.5, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_hop_size_duration_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - bs=8, - hs=4, - hd=1, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_hop_size_greater_than_block_size(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - bs=4, - hs=8, - ) - with pytest.raises(ValueError): - func() - - def test_filename_duplicate(self): - func = partial( - ADSFactory.ads, - fn=dataset.one_to_six_arabic_16000_mono_bc_noise, - filename=dataset.one_to_six_arabic_16000_mono_bc_noise, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_data_buffer_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - db=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_max_time_alias(self): - ads = ADSFactory.ads( - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - mt=10, - block_dur=0.5, - ) - assert ( - ads.max_read == 10 - ), "Wrong AudioDataSource.max_read, expected: 10, found: {}".format( - ads.max_read - ) - - def test_max_time_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - mt=True, - max_time=True, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_record_alias(self): - ads = ADSFactory.ads( - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - rec=True, - block_dur=0.5, - ) - assert ads.rewindable, "AudioDataSource.rewindable expected to be True" - - def test_record_duplicate(self): - func = partial( - ADSFactory.ads, - data_buffer=self.signal, - sampling_rate=16, - sample_width=2, - channels=1, - rec=True, - record=True, - ) - with pytest.raises(DuplicateArgument): - func() - - def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_alias(self): - # Use arbitrary valid block_size and hop_size - block_size = 5 - hop_size = 4 - - ads = ADSFactory.ads( - db=self.signal, - sr=16, - sw=2, - ch=1, - mt=0.80, - bs=block_size, - hs=hop_size, - rec=True, - ) - - # Read all available data overlapping blocks - ads.open() - i = 0 - while True: - block = ads.read() - if block is None: - break - i += 1 - - ads.rewind() - - # Build a BufferAudioSource - audio_source = BufferAudioSource( - self.signal, ads.sampling_rate, ads.sample_width, ads.channels - ) - audio_source.open() - - # Compare all blocks read from AudioDataSource to those read from an audio source with manual position definition - for j in range(i): - tmp = audio_source.read(block_size) - block = ads.read() - assert ( - block == tmp - ), "Unexpected block (N={0}) read from OverlapADS".format(i) - audio_source.position = (j + 1) * hop_size - ads.close() - audio_source.close() - - -def _read_all_data(reader): - blocks = [] - while True: - data = reader.read() - if data is None: - break - blocks.append(data) - return b"".join(blocks) +def test_AudioReader_alias_params(): + reader = AudioReader( + input=b"0" * 1600, + sr=16000, + sw=2, + channels=1, + ) + assert reader.sampling_rate == 16000, ( + "Unexpected sampling rate: reader.sampling_rate = " + + f"{reader.sampling_rate} instead of 16000" + ) + assert reader.sr == 16000, ( + "Unexpected sampling rate: reader.sr = " + + f"{reader.sr} instead of 16000" + ) + assert reader.sample_width == 2, ( + "Unexpected sample width: reader.sample_width = " + + f"{reader.sample_width} instead of 2" + ) + assert reader.sw == 2, ( + "Unexpected sample width: reader.sw = " + f"{reader.sw} instead of 2" + ) + assert reader.channels == 1, ( + "Unexpected number of channels: reader.channels = " + + f"{reader.channels} instead of 1" + ) + assert reader.ch == 1, ( + "Unexpected number of channels: reader.ch = " + + f"{reader.ch} instead of 1" + ) @pytest.mark.parametrize( diff -r 954c1e279068 -r 996948ada980 tests/test_AudioSource.py --- a/tests/test_AudioSource.py Sun May 26 17:19:31 2024 +0200 +++ b/tests/test_AudioSource.py Sun May 26 22:43:08 2024 +0200 @@ -3,7 +3,10 @@ """ from array import array + import pytest +from test_util import PURE_TONE_DICT, _sample_generator + from auditok.io import ( AudioParameterError, BufferAudioSource, @@ -11,7 +14,6 @@ WaveAudioSource, ) from auditok.signal import FORMAT -from test_util import PURE_TONE_DICT, _sample_generator def audio_source_read_all_gen(audio_source, size=None): diff -r 954c1e279068 -r 996948ada980 tests/test_StreamTokenizer.py --- a/tests/test_StreamTokenizer.py Sun May 26 17:19:31 2024 +0200 +++ b/tests/test_StreamTokenizer.py Sun May 26 22:43:08 2024 +0200 @@ -1,5 +1,8 @@ +import os + import pytest -from auditok import StreamTokenizer, StringDataSource, DataValidator + +from auditok import DataValidator, StreamTokenizer, StringDataSource class AValidator(DataValidator): @@ -30,7 +33,7 @@ assert ( len(tokens) == 2 - ), f"wrong number of tokens, expected: 2, found: {len(tokens)}" + ), "wrong number of tokens, expected: 2, found: {}".format(len(tokens)) tok1, tok2 = tokens[0], tokens[1] data = "".join(tok1[0]) @@ -38,22 +41,28 @@ end = tok1[2] assert ( data == "AaaaAaAaaAaAaaaa" - ), f"wrong data for token 1, expected: 'AaaaAaAaaAaAaaaa', found: {data}" + ), "wrong data for token 1, expected: 'AaaaAaAaaAaAaaaa', found: {}".format( + data + ) assert ( start == 1 - ), f"wrong start frame for token 1, expected: 1, found: {start}" - assert end == 16, f"wrong end frame for token 1, expected: 16, found: {end}" + ), "wrong start frame for token 1, expected: 1, found: {}".format(start) + assert ( + end == 16 + ), "wrong end frame for token 1, expected: 16, found: {}".format(end) data = "".join(tok2[0]) start = tok2[1] end = tok2[2] assert ( data == "AAAAAAAA" - ), f"wrong data for token 2, expected: 'AAAAAAAA', found: {data}" + ), "wrong data for token 2, expected: 'AAAAAAAA', found: {}".format(data) assert ( start == 20 - ), f"wrong start frame for token 2, expected: 20, found: {start}" - assert end == 27, f"wrong end frame for token 2, expected: 27, found: {end}" + ), "wrong start frame for token 2, expected: 20, found: {}".format(start) + assert ( + end == 27 + ), "wrong end frame for token 2, expected: 27, found: {}".format(end) def test_init_min_3_init_max_silence_0(validator): @@ -75,7 +84,7 @@ assert ( len(tokens) == 2 - ), f"wrong number of tokens, expected: 2, found: {len(tokens)}" + ), "wrong number of tokens, expected: 2, found: {}".format(len(tokens)) tok1, tok2 = tokens[0], tokens[1] data = "".join(tok1[0]) @@ -83,22 +92,28 @@ end = tok1[2] assert ( data == "AAAAAAAAAaaaa" - ), f"wrong data for token 1, expected: 'AAAAAAAAAaaaa', found: '{data}'" + ), "wrong data for token 1, expected: 'AAAAAAAAAaaaa', found: {}".format( + data + ) assert ( start == 18 - ), f"wrong start frame for token 1, expected: 18, found: {start}" - assert end == 30, f"wrong end frame for token 1, expected: 30, found: {end}" + ), "wrong start frame for token 1, expected: 18, found: {}".format(start) + assert ( + end == 30 + ), "wrong end frame for token 1, expected: 30, found: {}".format(end) data = "".join(tok2[0]) start = tok2[1] end = tok2[2] assert ( data == "AAAAA" - ), f"wrong data for token 2, expected: 'AAAAA', found: '{data}'" + ), "wrong data for token 2, expected: 'AAAAA', found: {}".format(data) assert ( start == 33 - ), f"wrong start frame for token 2, expected: 33, found: {start}" - assert end == 37, f"wrong end frame for token 2, expected: 37, found: {end}" + ), "wrong start frame for token 2, expected: 33, found: {}".format(start) + assert ( + end == 37 + ), "wrong end frame for token 2, expected: 37, found: {}".format(end) def test_init_min_3_init_max_silence_2(validator): @@ -119,7 +134,7 @@ assert ( len(tokens) == 3 - ), f"wrong number of tokens, expected: 3, found: {len(tokens)}" + ), "wrong number of tokens, expected: 3, found: {}".format(len(tokens)) tok1, tok2, tok3 = tokens[0], tokens[1], tokens[2] data = "".join(tok1[0]) @@ -127,33 +142,41 @@ end = tok1[2] assert ( data == "AaAaaAaAaaaa" - ), f"wrong data for token 1, expected: 'AaAaaAaA', found: '{data}'" + ), "wrong data for token 1, expected: 'AaAaaAaA', found: {}".format(data) assert ( start == 5 - ), f"wrong start frame for token 1, expected: 5, found: {start}" - assert end == 16, f"wrong end frame for token 1, expected: 16, found: {end}" + ), "wrong start frame for token 1, expected: 5, found: {}".format(start) + assert ( + end == 16 + ), "wrong end frame for token 1, expected: 16, found: {}".format(end) data = "".join(tok2[0]) start = tok2[1] end = tok2[2] assert ( data == "AAAAAAAAAaaaa" - ), f"wrong data for token 2, expected: 'AAAAAAAAAaaaa', found: '{data}'" + ), "wrong data for token 2, expected: 'AAAAAAAAAaaaa', found: {}".format( + data + ) assert ( start == 19 - ), f"wrong start frame for token 2, expected: 19, found: {start}" - assert end == 31, f"wrong end frame for token 2, expected: 31, found: {end}" + ), "wrong start frame for token 2, expected: 19, found: {}".format(start) + assert ( + end == 31 + ), "wrong end frame for token 2, expected: 31, found: {}".format(end) data = "".join(tok3[0]) start = tok3[1] end = tok3[2] assert ( data == "AAAAA" - ), f"wrong data for token 3, expected: 'AAAAA', found: '{data}'" + ), "wrong data for token 3, expected: 'AAAAA', found: {}".format(data) assert ( start == 35 - ), f"wrong start frame for token 3, expected: 35, found: {start}" - assert end == 39, f"wrong end frame for token 3, expected: 39, found: {end}" + ), "wrong start frame for token 3, expected: 35, found: {}".format(start) + assert ( + end == 39 + ), "wrong end frame for token 3, expected: 39, found: {}".format(end) @pytest.fixture @@ -178,7 +201,7 @@ assert ( len(tokens) == 2 - ), f"wrong number of tokens, expected: 2, found: {len(tokens)}" + ), "wrong number of tokens, expected: 2, found: {}".format(len(tokens)) tok1, tok2 = tokens[0], tokens[1] data = "".join(tok1[0]) @@ -186,22 +209,28 @@ end = tok1[2] assert ( data == "AaaaAaAaaAaAaa" - ), f"wrong data for token 1, expected: 'AaaaAaAaaAaAaa', found: '{data}'" + ), "wrong data for token 1, expected: 'AaaaAaAaaAaAaa', found: {}".format( + data + ) assert ( start == 1 - ), f"wrong start frame for token 1, expected: 1, found: {start}" - assert end == 14, f"wrong end frame for token 1, expected: 14, found: {end}" + ), "wrong start frame for token 1, expected: 1, found: {}".format(start) + assert ( + end == 14 + ), "wrong end frame for token 1, expected: 14, found: {}".format(end) data = "".join(tok2[0]) start = tok2[1] end = tok2[2] assert ( data == "AAAAAAAAAaa" - ), f"wrong data for token 2, expected: 'AAAAAAAAAaa', found: '{data}'" + ), "wrong data for token 2, expected: 'AAAAAAAAAaa', found: {}".format(data) assert ( start == 18 - ), f"wrong start frame for token 2, expected: 18, found: {start}" - assert end == 28, f"wrong end frame for token 2, expected: 28, found: {end}" + ), "wrong start frame for token 2, expected: 18, found: {}".format(start) + assert ( + end == 28 + ), "wrong end frame for token 2, expected: 28, found: {}".format(end) @pytest.fixture @@ -224,7 +253,7 @@ assert ( len(tokens) == 21 - ), f"wrong number of tokens, expected: 21, found: {len(tokens)}" + ), "wrong number of tokens, expected: 21, found: {}".format(len(tokens)) @pytest.fixture @@ -251,7 +280,7 @@ assert ( len(tokens) == 2 - ), f"wrong number of tokens, expected: 2, found: {len(tokens)}" + ), "wrong number of tokens, expected: 2, found: {}".format(len(tokens)) tok1, tok2 = tokens[0], tokens[1] data = "".join(tok1[0]) @@ -259,22 +288,30 @@ end = tok1[2] assert ( data == "AaaaAaAaaAaAaaaa" - ), f"wrong data for token 1, expected: 'AaaaAaAaaAaAaaaa', found: '{data}'" + ), "wrong data for token 1, expected: 'AaaaAaAaaAaAaaaa', found: {}".format( + data + ) assert ( start == 1 - ), f"wrong start frame for token 1, expected: 1, found: {start}" - assert end == 16, f"wrong end frame for token 1, expected: 16, found: {end}" + ), "wrong start frame for token 1, expected: 1, found: {}".format(start) + assert ( + end == 16 + ), "wrong end frame for token 1, expected: 16, found: {}".format(end) data = "".join(tok2[0]) start = tok2[1] end = tok2[2] assert ( data == "AAAAAaaAAaaAAA" - ), f"wrong data for token 2, expected: 'AAAAAaaAAaaAAA', found: '{data}'" + ), "wrong data for token 2, expected: 'AAAAAaaAAaaAAA', found: {}".format( + data + ) assert ( start == 30 - ), f"wrong start frame for token 2, expected: 30, found: {start}" - assert end == 43, f"wrong end frame for token 2, expected: 43, found: {end}" + ), "wrong start frame for token 2, expected: 30, found: {}".format(start) + assert ( + end == 43 + ), "wrong end frame for token 2, expected: 43, found: {}".format(end) @pytest.fixture @@ -301,7 +338,7 @@ assert ( len(tokens) == 4 - ), f"wrong number of tokens, expected: 4, found: {len(tokens)}" + ), "wrong number of tokens, expected: 4, found: {}".format(len(tokens)) tok1, tok2, tok3, tok4 = tokens[0], tokens[1], tokens[2], tokens[3] data = "".join(tok1[0]) @@ -309,44 +346,52 @@ end = tok1[2] assert ( data == "AAAAA" - ), f"wrong data for token 1, expected: 'AAAAA', found: '{data}'" + ), "wrong data for token 1, expected: 'AAAAA', found: {}".format(data) assert ( start == 18 - ), f"wrong start frame for token 1, expected: 18, found: {start}" - assert end == 22, f"wrong end frame for token 1, expected: 22, found: {end}" + ), "wrong start frame for token 1, expected: 18, found: {}".format(start) + assert ( + end == 22 + ), "wrong end frame for token 1, expected: 22, found: {}".format(end) data = "".join(tok2[0]) start = tok2[1] end = tok2[2] assert ( data == "AAAaa" - ), f"wrong data for token 2, expected: 'AAAaa', found: '{data}'" + ), "wrong data for token 2, expected: 'AAAaa', found: {}".format(data) assert ( start == 23 - ), f"wrong start frame for token 2, expected: 23, found: {start}" - assert end == 27, f"wrong end frame for token 2, expected: 27, found: {end}" + ), "wrong start frame for token 2, expected: 23, found: {}".format(start) + assert ( + end == 27 + ), "wrong end frame for token 2, expected: 27, found: {}".format(end) data = "".join(tok3[0]) start = tok3[1] end = tok3[2] assert ( data == "AAAAA" - ), f"wrong data for token 3, expected: 'AAAAA', found: '{data}'" + ), "wrong data for token 3, expected: 'AAAAA', found: {}".format(data) assert ( start == 32 - ), f"wrong start frame for token 3, expected: 32, found: {start}" - assert end == 36, f"wrong end frame for token 3, expected: 36, found: {end}" + ), "wrong start frame for token 3, expected: 32, found: {}".format(start) + assert ( + end == 36 + ), "wrong end frame for token 3, expected: 36, found: {}".format(end) data = "".join(tok4[0]) start = tok4[1] end = tok4[2] assert ( data == "AAaaA" - ), f"wrong data for token 4, expected: 'AAaaA', found: '{data}'" + ), "wrong data for token 4, expected: 'AAaaA', found: {}".format(data) assert ( start == 42 - ), f"wrong start frame for token 4, expected: 42, found: {start}" - assert end == 46, f"wrong end frame for token 4, expected: 46, found: {end}" + ), "wrong start frame for token 4, expected: 42, found: {}".format(start) + assert ( + end == 46 + ), "wrong end frame for token 4, expected: 46, found: {}".format(end) @pytest.fixture @@ -373,7 +418,7 @@ assert ( len(tokens) == 3 - ), f"wrong number of tokens, expected: 3, found: {len(tokens)}" + ), "wrong number of tokens, expected: 3, found: {}".format(len(tokens)) tok1, tok2, tok3 = tokens[0], tokens[1], tokens[2] data = "".join(tok1[0]) @@ -381,33 +426,39 @@ end = tok1[2] assert ( data == "AAAAA" - ), f"wrong data for token 1, expected: 'AAAAA', found: '{data}'" + ), "wrong data for token 1, expected: 'AAAAA', found: {}".format(data) assert ( start == 3 - ), f"wrong start frame for token 1, expected: 3, found: {start}" - assert end == 7, f"wrong end frame for token 1, expected: 7, found: {end}" + ), "wrong start frame for token 1, expected: 3, found: {}".format(start) + assert ( + end == 7 + ), "wrong end frame for token 1, expected: 7, found: {}".format(end) data = "".join(tok2[0]) start = tok2[1] end = tok2[2] assert ( data == "AAAAAA" - ), f"wrong data for token 2, expected: 'AAAAAA', found: '{data}'" + ), "wrong data for token 2, expected: 'AAAAAA', found: {}".format(data) assert ( start == 9 - ), f"wrong start frame for token 2, expected: 9, found: {start}" - assert end == 14, f"wrong end frame for token 2, expected: 14, found: {end}" + ), "wrong start frame for token 2, expected: 9, found: {}".format(start) + assert ( + end == 14 + ), "wrong end frame for token 2, expected: 14, found: {}".format(end) data = "".join(tok3[0]) start = tok3[1] end = tok3[2] assert ( data == "AAAAAAAAA" - ), f"wrong data for token 3, expected: 'AAAAAAAAA', found: '{data}'" + ), "wrong data for token 3, expected: 'AAAAAAAAA', found: {}".format(data) assert ( start == 17 - ), f"wrong start frame for token 3, expected: 17, found: {start}" - assert end == 25, f"wrong end frame for token 3, expected: 25, found: {end}" + ), "wrong start frame for token 3, expected: 17, found: {}".format(start) + assert ( + end == 25 + ), "wrong end frame for token 3, expected: 25, found: {}".format(end) @pytest.fixture @@ -435,7 +486,7 @@ assert ( len(tokens) == 3 - ), f"wrong number of tokens, expected: 3, found: {len(tokens)}" + ), "wrong number of tokens, expected: 3, found: {}".format(len(tokens)) tok1, tok2, tok3 = tokens[0], tokens[1], tokens[2] data = "".join(tok1[0]) @@ -443,33 +494,39 @@ end = tok1[2] assert ( data == "AAAAAaAAAA" - ), f"wrong data for token 1, expected: 'AAAAAaAAAA', found: '{data}'" + ), "wrong data for token 1, expected: 'AAAAAaAAAA', found: {}".format(data) assert ( start == 3 - ), f"wrong start frame for token 1, expected: 3, found: {start}" - assert end == 12, f"wrong end frame for token 1, expected: 12, found: {end}" + ), "wrong start frame for token 1, expected: 3, found: {}".format(start) + assert ( + end == 12 + ), "wrong end frame for token 1, expected: 12, found: {}".format(end) data = "".join(tok2[0]) start = tok2[1] end = tok2[2] assert ( data == "AAa" - ), f"wrong data for token 2, expected: 'AAa', found: '{data}'" + ), "wrong data for token 2, expected: 'AAa', found: {}".format(data) assert ( start == 13 - ), f"wrong start frame for token 2, expected: 13, found: {start}" - assert end == 15, f"wrong end frame for token 2, expected: 15, found: {end}" + ), "wrong start frame for token 2, expected: 13, found: {}".format(start) + assert ( + end == 15 + ), "wrong end frame for token 2, expected: 15, found: {}".format(end) data = "".join(tok3[0]) start = tok3[1] end = tok3[2] assert ( data == "AAAAAAAAAa" - ), f"wrong data for token 3, expected: 'AAAAAAAAAa', found: '{data}'" + ), "wrong data for token 3, expected: 'AAAAAAAAAa', found: {}".format(data) assert ( start == 17 - ), f"wrong start frame for token 3, expected: 17, found: {start}" - assert end == 26, f"wrong end frame for token 3, expected: 26, found: {end}" + ), "wrong start frame for token 3, expected: 17, found: {}".format(start) + assert ( + end == 26 + ), "wrong end frame for token 3, expected: 26, found: {}".format(end) @pytest.fixture @@ -494,7 +551,7 @@ assert ( len(tokens) == 1 - ), f"wrong number of tokens, expected: 1, found: {len(tokens)}" + ), "wrong number of tokens, expected: 1, found: {}".format(len(tokens)) tok1 = tokens[0] data = "".join(tok1[0]) @@ -502,11 +559,13 @@ end = tok1[2] assert ( data == "AAAAAAAA" - ), f"wrong data for token 1, expected: 'AAAAAAAA', found: '{data}'" + ), "wrong data for token 1, expected: 'AAAAAAAA', found: {}".format(data) assert ( start == 2 - ), f"wrong start frame for token 1, expected: 2, found: {start}" - assert end == 9, f"wrong end frame for token 1, expected: 9, found: {end}" + ), "wrong start frame for token 1, expected: 2, found: {}".format(start) + assert ( + end == 9 + ), "wrong end frame for token 1, expected: 9, found: {}".format(end) @pytest.fixture @@ -531,7 +590,7 @@ assert ( len(tokens) == 1 - ), f"wrong number of tokens, expected: 1, found: {len(tokens)}" + ), "wrong number of tokens, expected: 1, found: {}".format(len(tokens)) tok1 = tokens[0] data = "".join(tok1[0]) @@ -539,11 +598,13 @@ end = tok1[2] assert ( data == "AAAAA" - ), f"wrong data for token 1, expected: 'AAAAA', found: '{data}'" + ), "wrong data for token 1, expected: 'AAAAA', found: {}".format(data) assert ( start == 2 - ), f"wrong start frame for token 1, expected: 2, found: {start}" - assert end == 6, f"wrong end frame for token 1, expected: 6, found: {end}" + ), "wrong start frame for token 1, expected: 2, found: {}".format(start) + assert ( + end == 6 + ), "wrong end frame for token 1, expected: 6, found: {}".format(end) @pytest.fixture @@ -573,7 +634,7 @@ assert ( len(tokens) == 1 - ), f"wrong number of tokens, expected: 1, found: {len(tokens)}" + ), "wrong number of tokens, expected: 1, found: {}".format(len(tokens)) tok1 = tokens[0] data = "".join(tok1[0]) @@ -581,11 +642,13 @@ end = tok1[2] assert ( data == "AAAAAAAA" - ), f"wrong data for token 1, expected: 'AAAAAAAA', found: '{data}'" + ), "wrong data for token 1, expected: 'AAAAAAAA', found: {}".format(data) assert ( start == 2 - ), f"wrong start frame for token 1, expected: 2, found: {start}" - assert end == 9, f"wrong end frame for token 1, expected: 9, found: {end}" + ), "wrong start frame for token 1, expected: 2, found: {}".format(start) + assert ( + end == 9 + ), "wrong end frame for token 1, expected: 9, found: {}".format(end) @pytest.fixture @@ -615,4 +678,4 @@ assert ( len(tokens) == 2 - ), f"wrong number of tokens, expected: 2, found: {len(tokens)}" + ), "wrong number of tokens, expected: 2, found: {}".format(len(tokens)) diff -r 954c1e279068 -r 996948ada980 tests/test_cmdline_util.py --- a/tests/test_cmdline_util.py Sun May 26 17:19:31 2024 +0200 +++ b/tests/test_cmdline_util.py Sun May 26 22:43:08 2024 +0200 @@ -1,22 +1,23 @@ import os +from collections import namedtuple +from tempfile import TemporaryDirectory +from unittest.mock import patch + import pytest -from tempfile import TemporaryDirectory -from collections import namedtuple -from unittest.mock import patch from auditok.cmdline_util import ( _AUDITOK_LOGGER, + KeywordArguments, + initialize_workers, make_kwargs, make_logger, - initialize_workers, - KeywordArguments, ) from auditok.workers import ( + CommandLineWorker, + PlayerWorker, + PrintWorker, + RegionSaverWorker, StreamSaverWorker, - RegionSaverWorker, - PlayerWorker, - CommandLineWorker, - PrintWorker, ) _ArgsNamespace = namedtuple( diff -r 954c1e279068 -r 996948ada980 tests/test_core.py --- a/tests/test_core.py Sun May 26 17:19:31 2024 +0200 +++ b/tests/test_core.py Sun May 26 22:43:08 2024 +0200 @@ -1,19 +1,21 @@ +import math import os -import math +from array import array as array_ from random import random from tempfile import TemporaryDirectory -from array import array as array_ +from unittest.mock import Mock, patch + import pytest -from unittest.mock import patch, Mock -from auditok import load, split, AudioRegion, AudioParameterError + +from auditok import AudioParameterError, AudioRegion, load, split from auditok.core import ( _duration_to_nb_windows, _make_audio_region, _read_chunks_online, _read_offline, ) -from auditok.util import AudioDataSource from auditok.io import get_audio_source +from auditok.util import AudioReader def _make_random_length_regions( @@ -776,7 +778,7 @@ {"sr": 10, "sw": 2, "ch": 2}, ), ( - AudioDataSource( + AudioReader( "tests/data/test_split_10HZ_stereo.raw", sr=10, sw=2, @@ -965,9 +967,9 @@ def test_split_too_small_analysis_window(): with pytest.raises(ValueError) as val_err: split(b"", sr=10, sw=1, ch=1, analysis_window=0.09) - err_msg = "Too small 'analysis_windows' (0.09) for sampling rate (10)." - err_msg += " Analysis windows should at least be 1/10 to cover one " - err_msg += "single data sample" + err_msg = "Too small 'analysis_window' (0.09) for sampling rate (10)." + err_msg += " Analysis window should at least be 1/10 to cover one " + err_msg += "data sample" assert err_msg == str(val_err.value) diff -r 954c1e279068 -r 996948ada980 tests/test_io.py --- a/tests/test_io.py Sun May 26 17:19:31 2024 +0200 +++ b/tests/test_io.py Sun May 26 22:43:08 2024 +0200 @@ -1,33 +1,35 @@ +import filecmp +import math import os import sys -import math from array import array from tempfile import NamedTemporaryFile, TemporaryDirectory -import filecmp +from unittest.mock import Mock, patch + import pytest -from unittest.mock import patch, Mock -from test_util import _sample_generator, _generate_pure_tone, PURE_TONE_DICT -from auditok.signal import FORMAT +from test_util import PURE_TONE_DICT, _generate_pure_tone, _sample_generator + from auditok.io import ( AudioIOError, AudioParameterError, BufferAudioSource, RawAudioSource, + StdinAudioSource, WaveAudioSource, - StdinAudioSource, - check_audio_data, + _get_audio_parameters, _guess_audio_format, - _get_audio_parameters, _load_raw, _load_wave, _load_with_pydub, - get_audio_source, - from_file, _save_raw, _save_wave, _save_with_pydub, + check_audio_data, + from_file, + get_audio_source, to_file, ) +from auditok.signal import FORMAT AUDIO_PARAMS_SHORT = {"sr": 16000, "sw": 2, "ch": 1} @@ -633,3 +635,55 @@ kwargs.update(extra_args) audio_source = get_audio_source(input, **kwargs) assert isinstance(audio_source, expected_type) + assert audio_source.sampling_rate == 16000, ( + "Unexpected sampling rate: audio_source.sampling_rate = " + + f"{audio_source.sampling_rate} instead of 16000" + ) + assert audio_source.sr == 16000, ( + "Unexpected sampling rate: audio_source.sr = " + + f"{audio_source.sr} instead of 16000" + ) + assert audio_source.sample_width == 2, ( + "Unexpected sample width: audio_source.sample_width = " + + f"{audio_source.sample_width} instead of 2" + ) + assert audio_source.sw == 2, ( + "Unexpected sample width: audio_source.sw = " + + f"{audio_source.sw} instead of 2" + ) + assert audio_source.channels == 1, ( + "Unexpected number of channels: audio_source.channels = " + + f"{audio_source.channels} instead of 1" + ) + assert audio_source.ch == 1, ( + "Unexpected number of channels: audio_source.ch = " + + f"{audio_source.ch} instead of 1" + ) + + +def test_get_audio_source_alias_prams(): + audio_source = get_audio_source(b"0" * 1600, sr=16000, sw=2, ch=1) + assert audio_source.sampling_rate == 16000, ( + "Unexpected sampling rate: audio_source.sampling_rate = " + + f"{audio_source.sampling_rate} instead of 16000" + ) + assert audio_source.sr == 16000, ( + "Unexpected sampling rate: audio_source.sr = " + + f"{audio_source.sr} instead of 16000" + ) + assert audio_source.sample_width == 2, ( + "Unexpected sample width: audio_source.sample_width = " + + f"{audio_source.sample_width} instead of 2" + ) + assert audio_source.sw == 2, ( + "Unexpected sample width: audio_source.sw = " + + f"{audio_source.sw} instead of 2" + ) + assert audio_source.channels == 1, ( + "Unexpected number of channels: audio_source.channels = " + + f"{audio_source.channels} instead of 1" + ) + assert audio_source.ch == 1, ( + "Unexpected number of channels: audio_source.ch = " + + f"{audio_source.ch} instead of 1" + ) diff -r 954c1e279068 -r 996948ada980 tests/test_plotting.py --- a/tests/test_plotting.py Sun May 26 17:19:31 2024 +0200 +++ b/tests/test_plotting.py Sun May 26 22:43:08 2024 +0200 @@ -1,11 +1,13 @@ import os import sys +from tempfile import TemporaryDirectory + +import matplotlib import pytest -from tempfile import TemporaryDirectory -import matplotlib matplotlib.use("AGG") import matplotlib.pyplot as plt # noqa E402 + from auditok.core import AudioRegion # noqa E402 if sys.version_info.minor <= 5: @@ -23,8 +25,8 @@ @pytest.mark.parametrize("channels", [1, 2], ids=["mono", "stereo"]) def test_region_plot(channels): type_ = "mono" if channels == 1 else "stereo" - audio_filename = f"tests/data/test_split_10HZ_{type_}.raw" - image_filename = f"tests/images/{PREFIX}plot_{type_}_region.png" + audio_filename = "tests/data/test_split_10HZ_{}.raw".format(type_) + image_filename = "tests/images/{}plot_{}_region.png".format(PREFIX, type_) expected_image = plt.imread(image_filename) with TemporaryDirectory() as tmpdir: output_image_filename = os.path.join(tmpdir, "image.png") @@ -50,7 +52,7 @@ ) def test_region_split_and_plot(channels, use_channel): type_ = "mono" if channels == 1 else "stereo" - audio_filename = f"tests/data/test_split_10HZ_{type_}.raw" + audio_filename = "tests/data/test_split_10HZ_{}.raw".format(type_) if type_ == "mono": fmt = "tests/images/{}split_and_plot_mono_region.png" else: @@ -73,7 +75,3 @@ if SAVE_NEW_IMAGES: shutil.copy(output_image_filename, image_filename) assert (output_image == expected_image).all() - - -if __name__ == "__main__": - pytest.main() diff -r 954c1e279068 -r 996948ada980 tests/test_signal.py --- a/tests/test_signal.py Sun May 26 17:19:31 2024 +0200 +++ b/tests/test_signal.py Sun May 26 22:43:08 2024 +0200 @@ -1,6 +1,8 @@ +from array import array as array_ + +import numpy as np import pytest -from array import array as array_ -import numpy as np + from auditok import signal as signal_ from auditok import signal_numpy diff -r 954c1e279068 -r 996948ada980 tests/test_util.py --- a/tests/test_util.py Sun May 26 17:19:31 2024 +0200 +++ b/tests/test_util.py Sun May 26 22:43:08 2024 +0200 @@ -1,15 +1,17 @@ -import pytest -from unittest.mock import patch import math from array import array as array_ -from auditok.util import ( - AudioEnergyValidator, - make_duration_formatter, - make_channel_selector, -) +from unittest.mock import patch + +import pytest + from auditok import signal as signal_ from auditok import signal_numpy from auditok.exceptions import TimeFormatError +from auditok.util import ( + AudioEnergyValidator, + make_channel_selector, + make_duration_formatter, +) def _sample_generator(*data_buffers): @@ -384,7 +386,3 @@ assert validator.is_valid(data) else: assert not validator.is_valid(data) - - -if __name__ == "__main__": - pytest.main() diff -r 954c1e279068 -r 996948ada980 tests/test_workers.py --- a/tests/test_workers.py Sun May 26 17:19:31 2024 +0200 +++ b/tests/test_workers.py Sun May 26 22:43:08 2024 +0200 @@ -1,23 +1,25 @@ import os -from unittest.mock import patch, call, Mock from tempfile import TemporaryDirectory +from unittest.mock import Mock, call, patch + import pytest -from auditok import AudioRegion, AudioDataSource + +from auditok import AudioReader, AudioRegion +from auditok.cmdline_util import make_logger from auditok.exceptions import AudioEncodingWarning -from auditok.cmdline_util import make_logger from auditok.workers import ( + CommandLineWorker, + PlayerWorker, + PrintWorker, + RegionSaverWorker, + StreamSaverWorker, TokenizerWorker, - StreamSaverWorker, - RegionSaverWorker, - PlayerWorker, - CommandLineWorker, - PrintWorker, ) @pytest.fixture def audio_data_source(): - reader = AudioDataSource( + reader = AudioReader( input="tests/data/test_split_10HZ_mono.raw", block_dur=0.1, sr=10,