Mercurial > hg > auditok
changeset 323:7c2fb10b949f
Add tests for _read_offline
author | Amine Sehili <amine.sehili@gmail.com> |
---|---|
date | Sun, 20 Oct 2019 14:33:39 +0100 |
parents | 2cb8e29e1c9c |
children | 9541798ff4d9 |
files | auditok/core.py tests/test_core.py |
diffstat | 2 files changed, 54 insertions(+), 7 deletions(-) [+] |
line wrap: on
line diff
--- a/auditok/core.py Sat Oct 19 23:28:11 2019 +0100 +++ b/auditok/core.py Sun Oct 20 14:33:39 2019 +0100 @@ -187,9 +187,9 @@ token_gen = tokenizer.tokenize(source, generator=True) region_gen = ( _make_audio_region( + token[0], + token[1], source.block_dur, - token[1], - token[0], source.sr, source.sw, source.ch, @@ -239,9 +239,9 @@ def _make_audio_region( + data_frames, + start_frame, frame_duration, - start_frame, - data_frames, sampling_rate, sample_width, channels, @@ -280,6 +280,7 @@ try: while True: frame = reader.read() + print("read:", frame) if frame is None: break data.append(frame) @@ -308,6 +309,7 @@ else: max_read = round(max_read * audio_source.sampling_rate) data = audio_source.read(max_read) + audio_source.close() return ( data, audio_source.sampling_rate, @@ -685,7 +687,9 @@ @property def samples(self): if self._samples is None: - self._samples = signal.to_array(self._data, self.sample_width, self.channels) + self._samples = signal.to_array( + self._data, self.sample_width, self.channels + ) return self._samples def __len__(self):
--- a/tests/test_core.py Sat Oct 19 23:28:11 2019 +0100 +++ b/tests/test_core.py Sun Oct 20 14:33:39 2019 +0100 @@ -4,10 +4,15 @@ from tempfile import TemporaryDirectory from array import array as array_ from unittest import TestCase -from mock import patch +from unittest.mock import patch from genty import genty, genty_dataset from auditok import split, AudioRegion, AudioParameterError -from auditok.core import _duration_to_nb_windows, _read_chunks_online +from auditok.core import ( + _duration_to_nb_windows, + _make_audio_region, + _read_chunks_online, + _read_offline, +) from auditok.util import AudioDataSource from auditok.io import get_audio_source @@ -51,6 +56,44 @@ ) self.assertEqual(result, expected) + @genty_dataset( + mono_skip_0_max_read_None=(1, 0, None), + mono_skip_3_max_read_None=(1, 3, None), + mono_skip_2_max_read_negative=(1, 2, -1), + mono_skip_2_max_read_3=(1, 2, 3), + stereo_skip_0_max_read_None=(2, 0, None), + stereo_skip_3_max_read_None=(2, 3, None), + stereo_skip_2_max_read_negative=(2, 2, -1), + stereo_skip_2_max_read_3=(2, 2, 3), + ) + def test_read_offline(self, channels, skip, max_read=None): + sampling_rate = 10 + sample_width = 2 + mono_or_stereo = "mono" if channels == 1 else "stereo" + filename = "tests/data/test_split_10HZ_{}.raw".format(mono_or_stereo) + with open(filename, "rb") as fp: + data = fp.read() + onset = round(skip * sampling_rate * sample_width * channels) + if max_read in (-1, None): + offset = len(data) + 1 + else: + offset = onset + round( + max_read * sampling_rate * sample_width * channels + ) + expected_data = data[onset:offset] + read_data, *audio_params = _read_offline( + filename, + skip=skip, + max_read=max_read, + sr=sampling_rate, + sw=sample_width, + ch=channels, + ) + self.assertEqual(read_data, expected_data) + self.assertEqual( + tuple(audio_params), (sampling_rate, sample_width, channels) + ) + @genty class TestSplit(TestCase):