amine@330: import sys amine@330: import wave amine@403: from functools import partial amine@403: amine@403: import pytest amine@403: amine@330: from auditok import ( amine@403: AudioReader, amine@403: BufferAudioSource, amine@403: Recorder, amine@403: WaveAudioSource, amine@330: dataset, amine@330: ) amine@403: from auditok.util import _Limiter, _OverlapAudioReader amine@330: amine@330: amine@403: def _read_all_data(reader): amine@403: blocks = [] amine@403: while True: amine@403: data = reader.read() amine@403: if data is None: amine@403: break amine@403: blocks.append(data) amine@403: return b"".join(blocks) amine@403: amine@403: amine@403: class TestAudioReaderWithFileAudioSource: amine@403: @pytest.fixture(autouse=True) amine@403: def setup_and_teardown(self): amine@330: self.audio_source = WaveAudioSource( amine@330: filename=dataset.one_to_six_arabic_16000_mono_bc_noise amine@330: ) amine@403: self.audio_source.open() amine@403: yield amine@403: self.audio_source.close() amine@330: amine@403: def test_AudioReader_type(self): amine@403: reader = AudioReader(input=self.audio_source) amine@410: err_msg = "wrong object type, expected: 'AudioReader', found: {0}" amine@403: assert isinstance(reader, AudioReader), err_msg.format(type(reader)) amine@330: amine@403: def _test_default_block_size(self): amine@403: reader = AudioReader(input=self.audio_source) amine@403: data = reader.read() amine@403: size = len(data) amine@400: assert ( amine@400: size == 160 amine@400: ), "Wrong default block_size, expected: 160, found: {0}".format(size) amine@330: amine@403: @pytest.mark.parametrize( amine@403: "block_dur, expected_nb_samples", amine@403: [ amine@403: (None, 160), # default: 10 ms amine@403: (0.025, 400), # 25 ms amine@403: ], amine@403: ids=["default", "_25ms"], amine@403: ) amine@403: def test_block_duration(self, block_dur, expected_nb_samples): amine@403: """Test the number of samples read for a given block duration.""" amine@403: if block_dur is not None: amine@403: reader = AudioReader(input=self.audio_source, block_dur=block_dur) amine@403: else: amine@403: reader = AudioReader(input=self.audio_source) amine@403: data = reader.read() amine@403: nb_samples = len(data) // reader.sample_width amine@400: assert ( amine@403: nb_samples == expected_nb_samples amine@403: ), f"Wrong block_size, expected: {expected_nb_samples}, found: {nb_samples}" amine@330: amine@403: @pytest.mark.parametrize( amine@403: "block_dur, hop_dur, expected_nb_blocks, expected_last_block_nb_samples", amine@403: [ amine@403: (None, None, 1879, 126), # default: 10 ms amine@403: (0.01, None, 1879, 126), # block_dur_10ms_hop_dur_None amine@403: (0.01, 0.01, 1879, 126), # block_dur_10ms_hop_dur_10ms amine@403: (0.02, None, 940, 126), # block_dur_20ms_hop_dur_None amine@403: (0.025, None, 752, 206), # block_dur_25ms_hop_dur_None amine@403: (0.02, 0.01, 1878, 286), # block_dur_20ms_hop_dur_10ms amine@410: (0.025, 0.005, 3754, 366), # block_dur_25ms_hop_dur_5ms amine@403: ], amine@403: ids=[ amine@403: "default", amine@403: "block_dur_10ms_hop_dur_None", amine@410: "block_dur_10ms_hop_dur_10ms", amine@403: "block_dur_20ms_hop_dur_None", amine@403: "block_dur_25ms_hop_dur_None", amine@403: "block_dur_20ms_hop_dur_10ms", amine@410: "block_dur_25ms_hop_dur_5ms", amine@403: ], amine@403: ) amine@403: def test_hop_duration( amine@403: self, amine@403: block_dur, amine@403: hop_dur, amine@403: expected_nb_blocks, amine@403: expected_last_block_nb_samples, amine@403: ): amine@403: """Test the number of read blocks and the duration of last block for amine@403: different 'block_dur' and 'hop_dur' values. amine@330: amine@403: Args: amine@403: block_dur (float or None): block duration in seconds. amine@403: hop_dur (float or None): hop duration in seconds. amine@403: expected_nb_blocks (int): expected number of read block. amine@403: expected_last_block_nb_samples (int): expected number of sample amine@403: in the last block. amine@403: """ amine@403: if block_dur is not None: amine@403: reader = AudioReader( amine@403: input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur amine@403: ) amine@403: else: amine@403: reader = AudioReader(input=self.audio_source, hop_dur=hop_dur) amine@330: amine@403: nb_blocks = 0 amine@403: last_block_nb_samples = None amine@403: while True: amine@403: data = reader.read() amine@403: if data is not None: amine@403: nb_blocks += 1 amine@403: last_block_nb_samples = len(data) // reader.sample_width amine@403: else: amine@403: break amine@403: err_msg = "Wrong number of blocks read from source, expected: " amine@403: err_msg += f"{expected_nb_blocks}, found: {nb_blocks}" amine@403: assert nb_blocks == expected_nb_blocks, err_msg amine@330: amine@403: err_msg = ( amine@403: "Wrong number of samples in last block read from source, expected: " amine@403: ) amine@403: err_msg += ( amine@403: f"{expected_last_block_nb_samples}, found: {last_block_nb_samples}" amine@330: ) amine@330: amine@403: assert last_block_nb_samples == expected_last_block_nb_samples, err_msg amine@403: amine@403: def test_hop_duration_exception(self): amine@403: """Test passing hop_dur > block_dur raises ValueError""" amine@403: with pytest.raises(ValueError): amine@403: AudioReader(self.audio_source, block_dur=0.01, hop_dur=0.015) amine@403: amine@403: @pytest.mark.parametrize( amine@403: "block_dur, hop_dur", amine@403: [ amine@403: (None, None), # default amine@403: (0.01, None), # block_dur_10ms_hop_dur_None amine@403: (None, 0.01), # block_dur_None__hop_dur_10ms amine@403: (0.05, 0.05), # block_dur_50ms_hop_dur_50ms amine@403: ], amine@403: ids=[ amine@403: "default", amine@403: "block_dur_10ms_hop_dur_None", amine@403: "block_dur_None__hop_dur_10ms", amine@403: "block_dur_50ms_hop_dur_50ms", amine@403: ], amine@403: ) amine@403: def test_reader_class_block_dur_equals_hop_dur(self, block_dur, hop_dur): amine@403: """Test passing hop_dur == block_dur does not create an instance of amine@403: '_OverlapAudioReader'. amine@403: """ amine@403: if block_dur is not None: amine@403: reader = AudioReader( amine@403: input=self.audio_source, block_dur=block_dur, hop_dur=hop_dur amine@403: ) amine@403: else: amine@403: reader = AudioReader(input=self.audio_source, hop_dur=hop_dur) amine@403: assert not isinstance(reader, _OverlapAudioReader) amine@330: amine@330: def test_sampling_rate(self): amine@403: reader = AudioReader(input=self.audio_source) amine@403: sampling_rate = reader.sampling_rate amine@400: assert ( amine@403: sampling_rate == 16000 amine@403: ), f"Wrong sampling rate, expected: 16000, found: {sampling_rate}" amine@330: amine@330: def test_sample_width(self): amine@403: reader = AudioReader(input=self.audio_source) amine@403: sample_width = reader.sample_width amine@400: assert ( amine@403: sample_width == 2 amine@403: ), f"Wrong sample width, expected: 2, found: {sample_width}" amine@330: amine@330: def test_channels(self): amine@403: reader = AudioReader(input=self.audio_source) amine@403: channels = reader.channels amine@400: assert ( amine@400: channels == 1 amine@403: ), f"Wrong number of channels, expected: 1, found: {channels}" amine@330: amine@330: def test_read(self): amine@403: reader = AudioReader(input=self.audio_source, block_dur=0.02) amine@403: reader_data = reader.read() amine@330: audio_source = WaveAudioSource( amine@330: filename=dataset.one_to_six_arabic_16000_mono_bc_noise amine@330: ) amine@330: audio_source.open() amine@403: audio_source_data = audio_source.read(320) amine@330: audio_source.close() amine@403: assert ( amine@403: reader_data == audio_source_data amine@403: ), "Unexpected data read from AudioReader" amine@330: amine@403: def test_read_with_overlap(self): amine@403: reader = AudioReader( amine@403: input=self.audio_source, block_dur=0.02, hop_dur=0.01 amine@403: ) amine@403: _ = reader.read() # first block amine@403: reader_data = reader.read() # second block with 0.01 S overlap amine@403: audio_source = WaveAudioSource( amine@403: filename=dataset.one_to_six_arabic_16000_mono_bc_noise amine@403: ) amine@403: audio_source.open() amine@403: _ = audio_source.read(160) amine@403: audio_source_data = audio_source.read(320) amine@403: audio_source.close() amine@403: assert ( amine@403: reader_data == audio_source_data amine@403: ), "Unexpected data read from AudioReader" amine@330: amine@403: def test_read_from_AudioReader_with_max_read(self): amine@330: # read a maximum of 0.75 seconds from audio source amine@403: reader = AudioReader(input=self.audio_source, max_read=0.75) amine@403: assert isinstance(reader._audio_source._audio_source, _Limiter) amine@403: reader_data = _read_all_data(reader) amine@330: amine@330: audio_source = WaveAudioSource( amine@330: filename=dataset.one_to_six_arabic_16000_mono_bc_noise amine@330: ) amine@330: audio_source.open() amine@330: audio_source_data = audio_source.read(int(16000 * 0.75)) amine@330: audio_source.close() amine@330: amine@400: assert ( amine@403: reader_data == audio_source_data amine@403: ), f"Unexpected data read from AudioReader with 'max_read = {0.75}'" amine@330: amine@403: def test_read_data_size_from_AudioReader_with_max_read(self): amine@330: # read a maximum of 1.191 seconds from audio source amine@403: reader = AudioReader(input=self.audio_source, max_read=1.191) amine@403: assert isinstance(reader._audio_source._audio_source, _Limiter) amine@403: total_samples = round(reader.sampling_rate * 1.191) amine@403: block_size = int(reader.block_dur * reader.sampling_rate) amine@403: nb_full_blocks, last_block_size = divmod(total_samples, block_size) amine@330: total_samples_with_overlap = ( amine@403: nb_full_blocks * block_size + last_block_size amine@330: ) amine@403: expected_read_bytes = ( amine@403: total_samples_with_overlap * reader.sample_width * reader.channels amine@403: ) amine@330: amine@403: reader_data = _read_all_data(reader) amine@403: total_read = len(reader_data) amine@403: err_msg = f"Wrong data length read from LimiterADS, expected: {expected_read_bytes}, found: {total_read}" amine@403: assert total_read == expected_read_bytes, err_msg amine@403: amine@403: def test_read_from_Recorder(self): amine@403: reader = Recorder(input=self.audio_source, block_dur=0.025) amine@403: reader_data = [] amine@403: for _ in range(10): amine@403: block = reader.read() amine@330: if block is None: amine@330: break amine@403: reader_data.append(block) amine@403: reader_data = b"".join(reader_data) amine@330: amine@330: audio_source = WaveAudioSource( amine@330: filename=dataset.one_to_six_arabic_16000_mono_bc_noise amine@330: ) amine@330: audio_source.open() amine@403: audio_source_data = audio_source.read(400 * 10) amine@330: audio_source.close() amine@330: amine@400: assert ( amine@403: reader_data == audio_source_data amine@403: ), "Unexpected data read from Recorder" amine@330: amine@403: def test_AudioReader_rewindable(self): amine@403: reader = AudioReader(input=self.audio_source, record=True) amine@403: assert ( amine@403: reader.rewindable amine@403: ), "AudioReader with record=True should be rewindable" amine@330: amine@403: def test_AudioReader_record_and_rewind(self): amine@403: reader = AudioReader( amine@403: input=self.audio_source, record=True, block_dur=0.02 amine@330: ) amine@403: # read 0.02 * 10 = 0.2 sec. of data amine@330: for i in range(10): amine@403: reader.read() amine@403: reader.rewind() amine@330: amine@330: # read all available data after rewind amine@403: reader_data = _read_all_data(reader) amine@330: amine@330: audio_source = WaveAudioSource( amine@330: filename=dataset.one_to_six_arabic_16000_mono_bc_noise amine@330: ) amine@330: audio_source.open() amine@403: audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data amine@330: audio_source.close() amine@330: amine@400: assert ( amine@403: reader_data == audio_source_data amine@403: ), "Unexpected data read from AudioReader with record = True" amine@330: amine@403: def test_Recorder_record_and_rewind(self): amine@403: recorder = Recorder(input=self.audio_source, block_dur=0.02) amine@403: # read 0.02 * 10 = 0.2 sec. of data amine@403: for i in range(10): amine@403: recorder.read() amine@403: amine@403: recorder.rewind() amine@403: amine@403: # read all available data after rewind amine@403: recorder_data = [] amine@403: recorder_data = _read_all_data(recorder) amine@403: amine@403: audio_source = WaveAudioSource( amine@403: filename=dataset.one_to_six_arabic_16000_mono_bc_noise amine@403: ) amine@403: audio_source.open() amine@403: audio_source_data = audio_source.read(320 * 10) # read 0.2 sec. of data amine@403: audio_source.close() amine@403: amine@403: assert ( amine@403: recorder_data == audio_source_data amine@403: ), "Unexpected data read from Recorder" amine@403: amine@403: def test_read_overlapping_blocks(self): amine@330: # Use arbitrary valid block_size and hop_size amine@330: block_size = 1714 amine@330: hop_size = 313 amine@403: block_dur = block_size / self.audio_source.sampling_rate amine@403: hop_dur = hop_size / self.audio_source.sampling_rate amine@330: amine@403: reader = AudioReader( amine@403: input=self.audio_source, amine@403: block_dur=block_dur, amine@403: hop_dur=hop_dur, amine@330: ) amine@330: amine@403: # Read all available overlapping blocks of data amine@403: reader_data = [] amine@330: while True: amine@403: block = reader.read() amine@330: if block is None: amine@330: break amine@403: reader_data.append(block) amine@330: amine@330: # Read all data from file and build a BufferAudioSource amine@330: fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") amine@330: wave_data = fp.readframes(fp.getnframes()) amine@330: fp.close() amine@330: audio_source = BufferAudioSource( amine@403: wave_data, amine@403: reader.sampling_rate, amine@403: reader.sample_width, amine@403: reader.channels, amine@330: ) amine@330: audio_source.open() amine@330: amine@403: # Compare all blocks read from OverlapADS to those read from an amine@403: # audio source with a manual position setting amine@403: for i, block in enumerate(reader_data): amine@330: tmp = audio_source.read(block_size) amine@400: assert ( amine@400: block == tmp amine@403: ), f"Unexpected data (block {i}) from reader with overlapping blocks" amine@330: audio_source.position = (i + 1) * hop_size amine@330: amine@330: audio_source.close() amine@330: amine@403: def test_read_overlapping_blocks_with_max_read(self): amine@330: block_size = 256 amine@330: hop_size = 200 amine@403: block_dur = block_size / self.audio_source.sampling_rate amine@403: hop_dur = hop_size / self.audio_source.sampling_rate amine@330: amine@403: reader = AudioReader( amine@403: input=self.audio_source, amine@403: block_dur=block_dur, amine@403: hop_dur=hop_dur, amine@403: max_read=0.5, amine@330: ) amine@330: amine@403: # Read all available overlapping blocks of data amine@403: reader_data = [] amine@330: while True: amine@403: block = reader.read() amine@330: if block is None: amine@330: break amine@403: reader_data.append(block) amine@330: amine@330: # Read all data from file and build a BufferAudioSource amine@330: fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") amine@330: wave_data = fp.readframes(fp.getnframes()) amine@330: fp.close() amine@330: audio_source = BufferAudioSource( amine@403: wave_data, amine@403: reader.sampling_rate, amine@403: reader.sample_width, amine@403: reader.channels, amine@330: ) amine@330: audio_source.open() amine@330: amine@403: # Compare all blocks read from OverlapADS to those read from an amine@403: # audio source with a manual position setting amine@403: for i, block in enumerate(reader_data): amine@403: tmp = audio_source.read(len(block) // (reader.sw * reader.ch)) amine@403: assert ( amine@403: block == tmp amine@403: ), f"Unexpected data (block {i}) from reader with overlapping blocks and max_read" amine@330: audio_source.position = (i + 1) * hop_size amine@330: amine@330: audio_source.close() amine@330: amine@403: def test_length_read_overlapping_blocks_with_max_read(self): amine@330: block_size = 313 amine@330: hop_size = 207 amine@403: block_dur = block_size / self.audio_source.sampling_rate amine@403: hop_dur = hop_size / self.audio_source.sampling_rate amine@403: amine@403: reader = AudioReader( amine@403: input=self.audio_source, amine@403: max_read=1.932, amine@403: block_dur=block_dur, amine@403: hop_dur=hop_dur, amine@330: ) amine@330: amine@403: total_samples = round(reader.sampling_rate * 1.932) amine@330: first_read_size = block_size amine@330: next_read_size = block_size - hop_size amine@330: nb_next_blocks, last_block_size = divmod( amine@330: (total_samples - first_read_size), next_read_size amine@330: ) amine@330: total_samples_with_overlap = ( amine@330: first_read_size + next_read_size * nb_next_blocks + last_block_size amine@330: ) amine@403: expected_read_bytes = ( amine@403: total_samples_with_overlap * reader.sw * reader.channels amine@403: ) amine@330: amine@403: cache_size = ( amine@403: (block_size - hop_size) * reader.sample_width * reader.channels amine@403: ) amine@330: total_read = cache_size amine@330: amine@330: i = 0 amine@330: while True: amine@403: block = reader.read() amine@330: if block is None: amine@330: break amine@330: i += 1 amine@330: total_read += len(block) - cache_size amine@330: amine@400: err_msg = ( amine@400: "Wrong data length read from LimiterADS, expected: {0}, found: {1}" amine@400: ) amine@400: assert total_read == expected_read_bytes, err_msg.format( amine@400: expected_read_bytes, total_read amine@330: ) amine@330: amine@403: def test_reader_with_overlapping_blocks__rewindable(self): amine@403: reader = AudioReader( amine@403: input=self.audio_source, amine@403: block_dur=320, amine@403: hop_dur=160, amine@330: record=True, amine@330: ) amine@403: assert ( amine@403: reader.rewindable amine@403: ), "AudioReader with record=True should be rewindable" amine@330: amine@403: def test_overlapping_blocks_with_max_read_rewind_and_read(self): amine@330: # Use arbitrary valid block_size and hop_size amine@330: block_size = 1600 amine@330: hop_size = 400 amine@403: block_dur = block_size / self.audio_source.sampling_rate amine@403: hop_dur = hop_size / self.audio_source.sampling_rate amine@330: amine@403: reader = AudioReader( amine@403: input=self.audio_source, amine@403: block_dur=block_dur, amine@403: hop_dur=hop_dur, amine@330: record=True, amine@330: ) amine@330: amine@330: # Read all available data overlapping blocks amine@330: i = 0 amine@330: while True: amine@403: block = reader.read() amine@330: if block is None: amine@330: break amine@330: i += 1 amine@330: amine@403: reader.rewind() amine@330: amine@330: # Read all data from file and build a BufferAudioSource amine@330: fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") amine@330: wave_data = fp.readframes(fp.getnframes()) amine@330: fp.close() amine@330: audio_source = BufferAudioSource( amine@403: wave_data, amine@403: reader.sampling_rate, amine@403: reader.sample_width, amine@403: reader.channels, amine@330: ) amine@330: audio_source.open() amine@330: amine@403: # Compare blocks read from AudioReader to those read from an BufferAudioSource with manual position setting amine@330: for j in range(i): amine@330: tmp = audio_source.read(block_size) amine@400: assert ( amine@403: reader.read() == tmp amine@403: ), f"Unexpected data (block {i}) from reader with overlapping blocks and record = True" amine@330: audio_source.position = (j + 1) * hop_size amine@330: amine@330: audio_source.close() amine@330: amine@403: def test_overlapping_blocks_with_record_and_max_read_rewind_and_read(self): amine@330: # Use arbitrary valid block_size and hop_size amine@330: block_size = 1600 amine@330: hop_size = 400 amine@403: block_dur = block_size / self.audio_source.sampling_rate amine@403: hop_dur = hop_size / self.audio_source.sampling_rate amine@330: amine@403: reader = AudioReader( amine@403: input=self.audio_source, amine@330: max_time=1.50, amine@403: block_dur=block_dur, amine@403: hop_dur=hop_dur, amine@330: record=True, amine@330: ) amine@330: amine@330: # Read all available data overlapping blocks amine@330: i = 0 amine@330: while True: amine@403: block = reader.read() amine@330: if block is None: amine@330: break amine@330: i += 1 amine@330: amine@403: reader.rewind() amine@330: amine@330: # Read all data from file and build a BufferAudioSource amine@330: fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r") amine@330: wave_data = fp.readframes(fp.getnframes()) amine@330: fp.close() amine@330: audio_source = BufferAudioSource( amine@403: wave_data, amine@403: reader.sampling_rate, amine@403: reader.sample_width, amine@403: reader.channels, amine@330: ) amine@330: audio_source.open() amine@330: amine@403: # Compare all blocks read from AudioReader to those read from BufferAudioSource with a manual position setting amine@330: for j in range(i): amine@330: tmp = audio_source.read(block_size) amine@400: assert ( amine@403: reader.read() == tmp amine@400: ), "Unexpected block (N={0}) read from OverlapADS".format(i) amine@330: audio_source.position = (j + 1) * hop_size amine@330: amine@330: audio_source.close() amine@330: amine@403: def test_length_read_overlapping_blocks_with_record_and_max_read(self): amine@330: # Use arbitrary valid block_size and hop_size amine@330: block_size = 1000 amine@330: hop_size = 200 amine@403: block_dur = block_size / self.audio_source.sampling_rate amine@403: hop_dur = hop_size / self.audio_source.sampling_rate amine@330: amine@403: reader = AudioReader( amine@403: input=self.audio_source, amine@403: block_dur=block_dur, amine@403: hop_dur=hop_dur, amine@330: record=True, amine@403: max_read=1.317, amine@330: ) amine@403: total_samples = round(reader.sampling_rate * 1.317) amine@330: first_read_size = block_size amine@330: next_read_size = block_size - hop_size amine@330: nb_next_blocks, last_block_size = divmod( amine@330: (total_samples - first_read_size), next_read_size amine@330: ) amine@330: total_samples_with_overlap = ( amine@330: first_read_size + next_read_size * nb_next_blocks + last_block_size amine@330: ) amine@403: expected_read_bytes = ( amine@403: total_samples_with_overlap * reader.sample_width * reader.channels amine@403: ) amine@330: amine@403: cache_size = ( amine@403: (block_size - hop_size) * reader.sample_width * reader.channels amine@403: ) amine@330: total_read = cache_size amine@330: amine@330: i = 0 amine@330: while True: amine@403: block = reader.read() amine@330: if block is None: amine@330: break amine@330: i += 1 amine@330: total_read += len(block) - cache_size amine@330: amine@403: err_msg = f"Wrong data length read from AudioReader, expected: {expected_read_bytes}, found: {total_read}" amine@403: assert total_read == expected_read_bytes, err_msg amine@330: amine@330: amine@403: def test_AudioReader_raw_data(): amine@330: amine@403: data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345" amine@403: block_size = 5 amine@403: hop_size = 4 amine@403: reader = AudioReader( amine@403: input=data, amine@403: sampling_rate=16, amine@403: sample_width=2, amine@403: channels=1, amine@403: block_dur=block_size / 16, amine@403: hop_dur=hop_size / 16, amine@403: max_read=0.80, amine@403: record=True, amine@403: ) amine@403: reader.open() amine@403: amine@403: assert ( amine@403: reader.sampling_rate == 16 amine@403: ), f"Wrong sampling rate, expected: 16, found: {reader.sampling_rate }" amine@403: amine@403: assert ( amine@403: reader.sample_width == 2 amine@403: ), f"Wrong sample width, expected: 2, found: {reader.sample_width}" amine@403: amine@403: # Read all available data overlapping blocks amine@403: i = 0 amine@403: while True: amine@403: block = reader.read() amine@403: if block is None: amine@403: break amine@403: i += 1 amine@403: amine@403: reader.rewind() amine@403: amine@403: # Build a BufferAudioSource amine@403: audio_source = BufferAudioSource( amine@403: data, reader.sampling_rate, reader.sample_width, reader.channels amine@403: ) amine@403: audio_source.open() amine@403: amine@403: # Compare all blocks read from AudioReader to those read from an audio amine@403: # source with a manual position setting amine@403: for j in range(i): amine@403: tmp = audio_source.read(block_size) amine@403: block = reader.read() amine@400: assert ( amine@403: block == tmp amine@403: ), f"Unexpected block '{block}' (N={i}) read from OverlapADS" amine@403: audio_source.position = (j + 1) * hop_size amine@403: audio_source.close() amine@403: reader.close() amine@330: amine@330: amine@403: def test_AudioReader_alias_params(): amine@403: reader = AudioReader( amine@403: input=b"0" * 1600, amine@403: sr=16000, amine@403: sw=2, amine@403: channels=1, amine@403: ) amine@403: assert reader.sampling_rate == 16000, ( amine@403: "Unexpected sampling rate: reader.sampling_rate = " amine@403: + f"{reader.sampling_rate} instead of 16000" amine@403: ) amine@403: assert reader.sr == 16000, ( amine@403: "Unexpected sampling rate: reader.sr = " amine@403: + f"{reader.sr} instead of 16000" amine@403: ) amine@403: assert reader.sample_width == 2, ( amine@403: "Unexpected sample width: reader.sample_width = " amine@403: + f"{reader.sample_width} instead of 2" amine@403: ) amine@403: assert reader.sw == 2, ( amine@403: "Unexpected sample width: reader.sw = " + f"{reader.sw} instead of 2" amine@403: ) amine@403: assert reader.channels == 1, ( amine@403: "Unexpected number of channels: reader.channels = " amine@403: + f"{reader.channels} instead of 1" amine@403: ) amine@403: assert reader.ch == 1, ( amine@403: "Unexpected number of channels: reader.ch = " amine@403: + f"{reader.ch} instead of 1" amine@403: ) amine@330: amine@330: amine@400: @pytest.mark.parametrize( amine@400: "file_id, max_read, size", amine@400: [ amine@400: ("mono_400", 0.5, 16000), # mono amine@400: ("3channel_400-800-1600", 0.5, 16000 * 3), # multichannel amine@400: ], amine@400: ids=["mono", "multichannel"], amine@400: ) amine@400: def test_Limiter(file_id, max_read, size): amine@400: input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id) amine@400: input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id) amine@400: with open(input_raw, "rb") as fp: amine@400: expected = fp.read(size) amine@330: amine@400: reader = AudioReader(input_wav, block_dur=0.1, max_read=max_read) amine@400: reader.open() amine@400: data = _read_all_data(reader) amine@400: reader.close() amine@400: assert data == expected amine@330: amine@330: amine@400: @pytest.mark.parametrize( amine@400: "file_id", amine@400: [ amine@400: "mono_400", # mono amine@400: "3channel_400-800-1600", # multichannel amine@400: ], amine@400: ids=["mono", "multichannel"], amine@400: ) amine@400: def test_Recorder(file_id): amine@400: input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id) amine@400: input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id) amine@400: with open(input_raw, "rb") as fp: amine@400: expected = fp.read() amine@400: amine@400: reader = AudioReader(input_wav, block_dur=0.1, record=True) amine@400: reader.open() amine@400: data = _read_all_data(reader) amine@400: assert data == expected amine@400: amine@400: # rewind many times amine@400: for _ in range(3): amine@400: reader.rewind() amine@330: data = _read_all_data(reader) amine@400: assert data == expected amine@400: assert data == reader.data amine@400: reader.close() amine@330: amine@330: amine@400: @pytest.mark.parametrize( amine@400: "file_id", amine@400: [ amine@400: "mono_400", # mono amine@400: "3channel_400-800-1600", # multichannel amine@400: ], amine@400: ids=["mono", "multichannel"], amine@400: ) amine@400: def test_Recorder_alias(file_id): amine@400: input_wav = "tests/data/test_16KHZ_{}Hz.wav".format(file_id) amine@400: input_raw = "tests/data/test_16KHZ_{}Hz.raw".format(file_id) amine@400: with open(input_raw, "rb") as fp: amine@400: expected = fp.read() amine@400: amine@400: reader = Recorder(input_wav, block_dur=0.1) amine@400: reader.open() amine@400: data = _read_all_data(reader) amine@400: assert data == expected amine@400: amine@400: # rewind many times amine@400: for _ in range(3): amine@400: reader.rewind() amine@330: data = _read_all_data(reader) amine@400: assert data == expected amine@400: assert data == reader.data amine@400: reader.close()