# HG changeset patch # User Amine Sehili # Date 1564084252 -3600 # Node ID 173ffca58d23700a677010961a141823f673d3ca # Parent 6c3b56eb8052cf1f483d671e8d1ab8ff63df1fd2 Read data from all available channels in AudioSource diff -r 6c3b56eb8052 -r 173ffca58d23 auditok/io.py --- a/auditok/io.py Sun Jul 21 17:13:53 2019 +0100 +++ b/auditok/io.py Thu Jul 25 20:50:52 2019 +0100 @@ -71,7 +71,6 @@ DEFAULT_SAMPLING_RATE = 16000 DEFAULT_SAMPLE_WIDTH = 2 DEFAULT_NB_CHANNELS = 1 -DEFAULT_USE_CHANNEL = 0 DATA_FORMAT = {1: "b", 2: "h", 4: "i"} @@ -130,32 +129,17 @@ Gets audio parameters from a dictionary of parameters. A parameter can have a long name or a short name. If the long name is present, the short name is ignored. In neither is present then - `AudioParameterError` is raised except for the `use_channel` (or `uc`) - parameter for which a defalut value of 0 is returned. - - Also raises `AudioParameterError` if sampling rate, sample width or - channels is not an integer. + `AudioParameterError` is raised. Expected parameters are: `sampling_rate`, `sr`: int, sampling rate. `sample_width`, `sw`: int, sample size in bytes. `channels`, `ch`: int, number of channels. - `use_channel`, `us`: int or str, which channel to use from data. - Default value is 0 (first channel). The following special str - values are also accepted: - `left`: alias for 0 - `right`: alias for 1 - `mix`: indicates that all channels should be mixed up into one - single channel :Returns - - param_dict: tuple - audio parameters as a tuple (sampling_rate, - sample_width, - channels, - use_channel) + audio_parameters: tuple + audio parameters: (sampling_rate, sample_width, channels) """ err_message = ( "'{ln}' (or '{sn}') must be a positive integer, found: '{val}'" @@ -173,8 +157,7 @@ ) parameters.append(param) sampling_rate, sample_width, channels = parameters - use_channel = param_dict.get("use_channel", param_dict.get("uc")) - return sampling_rate, sample_width, channels, use_channel + return sampling_rate, sample_width, channels def _array_to_bytes(a): @@ -530,28 +513,14 @@ class _FileAudioSource(AudioSource): - def __init__(self, sampling_rate, sample_width, channels, use_channel): + def __init__(self, sampling_rate, sample_width, channels): AudioSource.__init__(self, sampling_rate, sample_width, channels) self._audio_stream = None - self._use_channel = _normalize_use_channel(use_channel) - if channels > 1: - self._extract_selected_channel = partial( - _extract_selected_channel, - channels=channels, - sample_width=sample_width, - use_channel=self._use_channel, - ) - else: - self._extract_selected_channel = lambda x: x def __del__(self): if self.is_open(): self.close() - @property - def use_channel(self): - return self._use_channel - def is_open(self): return self._audio_stream is not None @@ -567,18 +536,14 @@ if not self.is_open(): raise AudioIOError("Audio stream is not open") data = self._read_from_stream(size) - if data: - return self._extract_selected_channel(data) - return None + if not data: + return None + return data class RawAudioSource(_FileAudioSource, Rewindable): - def __init__( - self, file, sampling_rate, sample_width, channels, use_channel=None - ): - _FileAudioSource.__init__( - self, sampling_rate, sample_width, channels, use_channel - ) + def __init__(self, file, sampling_rate, sample_width, channels): + _FileAudioSource.__init__(self, sampling_rate, sample_width, channels) self._file = file self._audio_stream = None self._sample_size = sample_width * channels @@ -608,7 +573,7 @@ path to a valid wave file. """ - def __init__(self, filename, use_channel=None): + def __init__(self, filename): self._filename = filename self._audio_stream = None stream = wave.open(self._filename, "rb") @@ -617,7 +582,6 @@ stream.getframerate(), stream.getsampwidth(), stream.getnchannels(), - use_channel, ) stream.close() @@ -700,12 +664,9 @@ sampling_rate=DEFAULT_SAMPLING_RATE, sample_width=DEFAULT_SAMPLE_WIDTH, channels=DEFAULT_NB_CHANNELS, - use_channel=0, ): - _FileAudioSource.__init__( - self, sampling_rate, sample_width, channels, use_channel - ) + _FileAudioSource.__init__(self, sampling_rate, sample_width, channels) self._is_open = False self._sample_size = sample_width * channels if PYTHON_3: @@ -847,20 +808,12 @@ "-", raw data will be read from stdin. If None, read audio data from microphone using PyAudio. """ - sampling_rate, sample_width, channels, use_channel = _get_audio_parameters( - kwargs - ) + sampling_rate, sample_width, channels = _get_audio_parameters(kwargs) if input == "-": - return StdinAudioSource( - sampling_rate, sample_width, channels, use_channel - ) + return StdinAudioSource(sampling_rate, sample_width, channels) if isinstance(input, bytes): - use_channel = _normalize_use_channel(use_channel) - data = _extract_selected_channel( - input, channels, sample_width, use_channel - ) - return BufferAudioSource(data, sampling_rate, sample_width, 1) + return BufferAudioSource(input, sampling_rate, sample_width, channels) # read data from a file if input is not None: @@ -879,14 +832,7 @@ ) -def _load_raw( - file, - sampling_rate, - sample_width, - channels, - use_channel=1, - large_file=False, -): +def _load_raw(file, sampling_rate, sample_width, channels, large_file=False): """ Load a raw audio file with standard Python. If `large_file` is True, audio data will be lazily @@ -903,15 +849,13 @@ sample width of audio data `channels`: int number of channels of audio data - `use_channel`: int - audio channel to read if file is not mono audio. This must be an integer - 0 >= and < channels, or one of 'left' (treated as 0 or first channel), or - right (treated as 1 or second channels). + `large_file`: bool + If True, return a `RawAudioSource` object that reads data lazily + from disk, otherwise load all data and return a `BufferAudioSource` :Returns: - `PyAudioPlayer` that has the same sampling rate, sample width and number of channels - as `audio_source`. + `RawAudioSource` if `large_file` is True, `BufferAudioSource` otherwise """ if None in (sampling_rate, sample_width, channels): raise AudioParameterError( @@ -924,52 +868,38 @@ sampling_rate=sampling_rate, sample_width=sample_width, channels=channels, - use_channel=use_channel, - ) - else: - with open(file, "rb") as fp: - data = fp.read() - if channels != 1: - use_channel = _normalize_use_channel( - use_channel - ) # TODO: should happen in BufferAudioSource - data = _extract_selected_channel( - data, channels, sample_width, use_channel - ) - return BufferAudioSource( - data, - sampling_rate=sampling_rate, - sample_width=sample_width, - channels=1, ) + with open(file, "rb") as fp: + data = fp.read() + return BufferAudioSource( + data, + sampling_rate=sampling_rate, + sample_width=sample_width, + channels=channels, + ) -def _load_wave(filename, large_file=False, use_channel=1): + +def _load_wave(filename, large_file=False): """ Load a wave audio file with standard Python. If `large_file` is True, audio data will be lazily loaded to memory. - See also :func:`to_file`. """ if large_file: - return WaveAudioSource(filename, use_channel) + return WaveAudioSource(filename) with wave.open(filename) as fp: channels = fp.getnchannels() srate = fp.getframerate() swidth = fp.getsampwidth() data = fp.readframes(-1) - if channels > 1: - use_channel = _normalize_use_channel( - use_channel - ) # TODO: should happen in BufferAudioSource - data = _extract_selected_channel(data, channels, swidth, use_channel) return BufferAudioSource( - data, sampling_rate=srate, sample_width=swidth, channels=1 + data, sampling_rate=srate, sample_width=swidth, channels=channels ) -def _load_with_pydub(filename, audio_format, use_channel=1): +def _load_with_pydub(filename, audio_format): """Open compressed audio file using pydub. If a video file is passed, its audio track(s) are extracted and loaded. This function should not be called directely, use :func:`from_file` @@ -989,19 +919,11 @@ } open_function = func_dict.get(audio_format, AudioSegment.from_file) segment = open_function(filename) - data = segment._data - if segment.channels > 1: - use_channel = _normalize_use_channel( - use_channel - ) # TODO: should happen in BufferAudioSource - data = _extract_selected_channel( - data, segment.channels, segment.sample_width, use_channel - ) return BufferAudioSource( - data_buffer=data, + data_buffer=segment._data, sampling_rate=segment.frame_rate, sample_width=segment.sample_width, - channels=1, + channels=segment.channels, ) @@ -1032,7 +954,7 @@ audio format used to save data (e.g. raw, webm, wav, ogg) `large_file`: bool If True, audio won't fully be loaded to memory but only when a window - is read disk. + is read from disk. :kwargs: @@ -1045,10 +967,6 @@ sample width (i.e. number of bytes used to represent one audio sample) `channels`: int number of channels of audio data - `use_channel`: int, str - audio channel to extract from input file if file is not mono audio. - This must be an integer >= 0 and < channels, or one of the special - values `left` and `right` (treated as 0 and 1 respectively). :Returns: @@ -1062,20 +980,16 @@ audio_format = _guess_audio_format(audio_format, filename) if audio_format == "raw": - srate, swidth, channels, use_channel = _get_audio_parameters(kwargs) - return _load_raw( - filename, srate, swidth, channels, use_channel, large_file - ) + srate, swidth, channels = _get_audio_parameters(kwargs) + return _load_raw(filename, srate, swidth, channels, large_file) - use_channel = kwargs.get("use_channel", kwargs.get("uc")) if audio_format in ["wav", "wave"]: - return _load_wave(filename, large_file, use_channel) + return _load_wave(filename, large_file) if large_file: - raise AudioIOError("Large file format should be raw or wav") + err_msg = "if 'large_file` is True file format should be raw or wav" + raise AudioIOError(err_msg) if _WITH_PYDUB: - return _load_with_pydub( - filename, audio_format=audio_format, use_channel=use_channel - ) + return _load_with_pydub(filename, audio_format=audio_format) else: raise AudioIOError( "pydub is required for audio formats other than raw or wav" @@ -1160,8 +1074,7 @@ _save_raw(data, file) return try: - params = _get_audio_parameters(kwargs) - sampling_rate, sample_width, channels, _ = params + sampling_rate, sample_width, channels = _get_audio_parameters(kwargs) except AudioParameterError as exc: err_message = "All audio parameters are required to save formats " "other than raw. Error detail: {}".format(exc) diff -r 6c3b56eb8052 -r 173ffca58d23 tests/test_AudioSource.py --- a/tests/test_AudioSource.py Sun Jul 21 17:13:53 2019 +0100 +++ b/tests/test_AudioSource.py Thu Jul 25 20:50:52 2019 +0100 @@ -12,7 +12,7 @@ RawAudioSource, WaveAudioSource, ) -from test_util import PURE_TONE_DICT +from test_util import PURE_TONE_DICT, _sample_generator def audio_source_read_all_gen(audio_source, size=None): @@ -31,143 +31,88 @@ # TODO when use_channel is None, return samples from all channels @genty_dataset( - mono_default=("mono_400Hz", 1, None, 400), - mono_mix=("mono_400Hz", 1, "mix", 400), - mono_channel_selection=("mono_400Hz", 1, 2, 400), - multichannel_default=("3channel_400-800-1600Hz", 3, None, 400), - multichannel_channel_select_1st=("3channel_400-800-1600Hz", 3, 1, 400), - multichannel_channel_select_2nd=("3channel_400-800-1600Hz", 3, 2, 800), - multichannel_channel_select_3rd=( - "3channel_400-800-1600Hz", - 3, - 3, - 1600, - ), + mono=("mono_400Hz", (400,)), + multichannel=("3channel_400-800-1600Hz", (400, 800, 1600)), ) - def test_BufferAudioSource_read_all( - self, file_suffix, channels, use_channel, frequency - ): + def test_BufferAudioSource_read_all(self, file_suffix, frequencies): file = "tests/data/test_16KHZ_{}.raw".format(file_suffix) with open(file, "rb") as fp: expected = fp.read() - audio_source = BufferAudioSource(expected, 16000, 2, channels) - audio_source.open() - data = audio_source.read(None) - self.assertEqual(data, expected) - audio_source.rewind() - data = audio_source.read(-10) - self.assertEqual(data, expected) - audio_source.close() - + channels = len(frequencies) + audio_source = BufferAudioSource(expected, 16000, 2, channels) + audio_source.open() + data = audio_source.read(None) + self.assertEqual(data, expected) + audio_source.rewind() + data = audio_source.read(-10) + self.assertEqual(data, expected) + audio_source.close() @genty_dataset( - mono_default=("mono_400Hz", 1, None, 400), - mono_mix=("mono_400Hz", 1, "mix", 400), - mono_channel_selection=("mono_400Hz", 1, 2, 400), - multichannel_default=("3channel_400-800-1600Hz", 3, None, 400), - multichannel_channel_select_1st=("3channel_400-800-1600Hz", 3, 1, 400), - multichannel_channel_select_2nd=("3channel_400-800-1600Hz", 3, 2, 800), - multichannel_channel_select_3rd=( - "3channel_400-800-1600Hz", - 3, - 3, - 1600, - ), + mono=("mono_400Hz", (400,)), + multichannel=("3channel_400-800-1600Hz", (400, 800, 1600)), ) - def test_RawAudioSource( - self, file_suffix, channels, use_channel, frequency - ): + def test_RawAudioSource(self, file_suffix, frequencies): file = "tests/data/test_16KHZ_{}.raw".format(file_suffix) - audio_source = RawAudioSource(file, 16000, 2, channels, use_channel) + channels = len(frequencies) + audio_source = RawAudioSource(file, 16000, 2, channels) audio_source.open() - data = b"".join(audio_source_read_all_gen(audio_source)) + data_read_all = b"".join(audio_source_read_all_gen(audio_source)) audio_source.close() - expected = _array_to_bytes(PURE_TONE_DICT[frequency]) - self.assertEqual(data, expected) + mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] + fmt = DATA_FORMAT[audio_source.sample_width] + expected = _array_to_bytes( + array(fmt, _sample_generator(*mono_channels)) + ) + + self.assertEqual(data_read_all, expected) # assert read all data with None - audio_source = RawAudioSource(file, 16000, 2, channels, use_channel) + audio_source = RawAudioSource(file, 16000, 2, channels) audio_source.open() data_read_all = audio_source.read(None) audio_source.close() self.assertEqual(data_read_all, expected) # assert read all data with a negative size - audio_source = RawAudioSource(file, 16000, 2, channels, use_channel) + audio_source = RawAudioSource(file, 16000, 2, channels) audio_source.open() data_read_all = audio_source.read(-10) audio_source.close() self.assertEqual(data_read_all, expected) - - def test_RawAudioSource_mix(self): - file = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw" - audio_source = RawAudioSource(file, 16000, 2, 3, use_channel="mix") + @genty_dataset( + mono=("mono_400Hz", (400,)), + multichannel=("3channel_400-800-1600Hz", (400, 800, 1600)), + ) + def test_WaveAudioSource(self, file_suffix, frequencies): + file = "tests/data/test_16KHZ_{}.wav".format(file_suffix) + audio_source = WaveAudioSource(file) audio_source.open() data = b"".join(audio_source_read_all_gen(audio_source)) audio_source.close() + mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] + fmt = DATA_FORMAT[audio_source.sample_width] + expected = _array_to_bytes( + array(fmt, _sample_generator(*mono_channels)) + ) - mono_channels = [PURE_TONE_DICT[freq] for freq in [400, 800, 1600]] - fmt = DATA_FORMAT[2] - expected = _array_to_bytes( - array(fmt, (sum(samples) // 3 for samples in zip(*mono_channels))) - ) - expected = expected - self.assertEqual(data, expected) - - @genty_dataset( - mono_default=("mono_400Hz", 1, None, 400), - mono_mix=("mono_400Hz", 1, "mix", 400), - mono_channel_selection=("mono_400Hz", 1, 2, 400), - multichannel_default=("3channel_400-800-1600Hz", 3, None, 400), - multichannel_channel_select_1st=("3channel_400-800-1600Hz", 3, 1, 400), - multichannel_channel_select_2nd=("3channel_400-800-1600Hz", 3, 2, 800), - multichannel_channel_select_3rd=( - "3channel_400-800-1600Hz", - 3, - 3, - 1600, - ), - ) - def test_WaveAudioSource( - self, file_suffix, channels, use_channel, frequency - ): - file = "tests/data/test_16KHZ_{}.wav".format(file_suffix) - audio_source = WaveAudioSource(file, use_channel) - audio_source.open() - data = b"".join(audio_source_read_all_gen(audio_source)) - audio_source.close() - expected = _array_to_bytes(PURE_TONE_DICT[frequency]) self.assertEqual(data, expected) # assert read all data with None - audio_source = WaveAudioSource(file, use_channel) + audio_source = WaveAudioSource(file) audio_source.open() data_read_all = audio_source.read(None) audio_source.close() self.assertEqual(data_read_all, expected) # assert read all data with a negative size - audio_source = WaveAudioSource(file, use_channel) + audio_source = WaveAudioSource(file) audio_source.open() data_read_all = audio_source.read(-10) audio_source.close() self.assertEqual(data_read_all, expected) - def test_WaveAudioSource_mix(self): - file = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav" - audio_source = WaveAudioSource(file, use_channel="mix") - audio_source.open() - data = b"".join(audio_source_read_all_gen(audio_source)) - audio_source.close() - - mono_channels = [PURE_TONE_DICT[freq] for freq in [400, 800, 1600]] - fmt = DATA_FORMAT[2] - expected = _array_to_bytes( - array(fmt, (sum(samples) // 3 for samples in zip(*mono_channels))) - ) - self.assertEqual(data, expected) - @genty class TestBufferAudioSource_SR10_SW1_CH1(unittest.TestCase): diff -r 6c3b56eb8052 -r 173ffca58d23 tests/test_io.py --- a/tests/test_io.py Sun Jul 21 17:13:53 2019 +0100 +++ b/tests/test_io.py Thu Jul 25 20:50:52 2019 +0100 @@ -83,68 +83,44 @@ result = _normalize_use_channel(use_channel) self.assertEqual(result, expected) - @genty_dataset( - int_1=((8000, 2, 1, 1), (8000, 2, 1, 1)), - int_2=((8000, 2, 1, 2), (8000, 2, 1, 2)), - use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, "left")), - use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, "right")), - use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")), - use_channel_None=((8000, 2, 2, None), (8000, 2, 2, None)), - no_use_channel=((8000, 2, 2), (8000, 2, 2, None)), - ) - def test_get_audio_parameters_short_params(self, values, expected): - params = dict(zip(("sr", "sw", "ch", "uc"), values)) + def test_get_audio_parameters_short_params(self): + expected = (8000, 2, 1) + params = dict(zip(("sr", "sw", "ch"), expected)) result = _get_audio_parameters(params) self.assertEqual(result, expected) - @genty_dataset( - int_1=((8000, 2, 1, 1), (8000, 2, 1, 1)), - int_2=((8000, 2, 1, 2), (8000, 2, 1, 2)), - use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, "left")), - use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, "right")), - use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")), - use_channel_None=((8000, 2, 2, None), (8000, 2, 2, None)), - no_use_channel=((8000, 2, 2), (8000, 2, 2, None)), - ) - def test_get_audio_parameters_long_params(self, values, expected): + def test_get_audio_parameters_long_params(self): + expected = (8000, 2, 1) params = dict( zip( ("sampling_rate", "sample_width", "channels", "use_channel"), - values, + expected, ) ) result = _get_audio_parameters(params) self.assertEqual(result, expected) - @genty_dataset(simple=((8000, 2, 1, 1), (8000, 2, 1, 1))) - def test_get_audio_parameters_long_params_shadow_short_ones( - self, values, expected - ): + def test_get_audio_parameters_long_params_shadow_short_ones(self): + expected = (8000, 2, 1) params = dict( - zip( - ("sampling_rate", "sample_width", "channels", "use_channel"), - values, - ) + zip(("sampling_rate", "sample_width", "channels"), expected) ) - params.update(dict(zip(("sr", "sw", "ch", "uc"), "xxxx"))) + params.update(dict(zip(("sr", "sw", "ch"), "xxx"))) result = _get_audio_parameters(params) self.assertEqual(result, expected) @genty_dataset( - str_sampling_rate=(("x", 2, 1, 0),), - negative_sampling_rate=((-8000, 2, 1, 0),), - str_sample_width=((8000, "x", 1, 0),), - negative_sample_width=((8000, -2, 1, 0),), - str_channels=((8000, 2, "x", 0),), - negative_channels=((8000, 2, -1, 0),), + str_sampling_rate=(("x", 2, 1),), + negative_sampling_rate=((-8000, 2, 1),), + str_sample_width=((8000, "x", 1),), + negative_sample_width=((8000, -2, 1),), + str_channels=((8000, 2, "x"),), + negative_channels=((8000, 2, -1),), ) def test_get_audio_parameters_invalid(self, values): - # TODO 0 or negative use_channel must raise AudioParameterError - # change implementation, don't accept negative uc - # hifglight everywhere in doc that uc must be positive params = dict( zip( - ("sampling_rate", "sample_width", "channels", "use_channel"), + ("sampling_rate", "sample_width", "channels"), values, ) ) @@ -310,133 +286,22 @@ with self.assertRaises(AudioIOError): from_file("audio", "mp3") - @genty_dataset( - raw_first_channel=("raw", 1, 400), - raw_second_channel=("raw", 2, 800), - raw_third_channel=("raw", 3, 1600), - raw_left_channel=("raw", "left", 400), - raw_right_channel=("raw", "right", 800), - wav_first_channel=("wav", 1, 400), - wav_second_channel=("wav", 2, 800), - wav_third_channel=("wav", 3, 1600), - wav_left_channel=("wav", "left", 400), - wav_right_channel=("wav", "right", 800), - ) - def test_from_file_multichannel_audio( - self, audio_format, use_channel, frequency - ): - expected = PURE_TONE_DICT[frequency] - filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format( - audio_format - ) - sample_width = 2 - audio_source = from_file( - filename, - sampling_rate=16000, - sample_width=sample_width, - channels=3, - use_channel=use_channel, - ) - fmt = DATA_FORMAT[sample_width] - data = array(fmt, audio_source._buffer) - self.assertEqual(data, expected) - - @genty_dataset( - raw_mono=("raw", "mono_400Hz", (400,)), - raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)), - wav_mono=("wav", "mono_400Hz", (400,)), - wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)), - ) - def test_from_file_multichannel_audio_mix( - self, audio_format, filename_suffix, frequencies - ): - sampling_rate = 16000 - sample_width = 2 - channels = len(frequencies) - mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - channels = len(frequencies) - fmt = DATA_FORMAT[sample_width] - expected = _array_to_bytes( - array( - fmt, - (sum(samples) // channels for samples in zip(*mono_channels)), - ) - ) - filename = "tests/data/test_16KHZ_{}.{}".format( - filename_suffix, audio_format - ) - audio_source = from_file( - filename, - use_channel="mix", - sampling_rate=sampling_rate, - sample_width=2, - channels=channels, - ) - mixed = audio_source._buffer - self.assertEqual((mixed), expected) @patch("auditok.io._WITH_PYDUB", True) @patch("auditok.io.BufferAudioSource") @genty_dataset( - ogg_first_channel=("ogg", 1, "from_ogg"), - ogg_second_channel=("ogg", 2, "from_ogg"), - ogg_mix=("ogg", "mix", "from_ogg"), - ogg_default=("ogg", None, "from_ogg"), - mp3_left_channel=("mp3", "left", "from_mp3"), - mp3_right_channel=("mp3", "right", "from_mp3"), - flac_first_channel=("flac", 1, "from_file"), - flac_second_channel=("flac", 1, "from_file"), - flv_left_channel=("flv", "left", "from_flv"), - webm_right_channel=("webm", "right", "from_file"), + ogg_first_channel=("ogg", "from_ogg"), + ogg_second_channel=("ogg", "from_ogg"), + ogg_mix=("ogg", "from_ogg"), + ogg_default=("ogg", "from_ogg"), + mp3_left_channel=("mp3", "from_mp3"), + mp3_right_channel=("mp3", "from_mp3"), + flac_first_channel=("flac", "from_file"), + flac_second_channel=("flac", "from_file"), + flv_left_channel=("flv", "from_flv"), + webm_right_channel=("webm", "from_file"), ) def test_from_file_multichannel_audio_compressed( - self, audio_format, use_channel, function, *mocks - ): - filename = "audio.{}".format(audio_format) - segment_mock = Mock() - segment_mock.sample_width = 2 - segment_mock.channels = 2 - segment_mock._data = b"abcd" - with patch("auditok.io._extract_selected_channel") as ext_mock: - with patch( - "auditok.io.AudioSegment.{}".format(function) - ) as open_func: - open_func.return_value = segment_mock - from_file(filename, use_channel=use_channel) - self.assertTrue(open_func.called) - self.assertTrue(ext_mock.called) - - use_channel = {"left": 1, "right": 2, None: 1}.get( - use_channel, use_channel - ) - if isinstance(use_channel, int): - # _extract_selected_channel will be called with a channel starting from 0 - use_channel -= 1 - ext_mock.assert_called_with( - segment_mock._data, - segment_mock.channels, - segment_mock.sample_width, - use_channel, - ) - - with patch("auditok.io._extract_selected_channel") as ext_mock: - with patch( - "auditok.io.AudioSegment.{}".format(function) - ) as open_func: - segment_mock.channels = 1 - open_func.return_value = segment_mock - from_file(filename, use_channel=use_channel) - self.assertTrue(open_func.called) - self.assertFalse(ext_mock.called) - - @patch("auditok.io._WITH_PYDUB", True) - @patch("auditok.io.BufferAudioSource") - @genty_dataset( - ogg=("ogg", "from_ogg"), - mp3=("mp3", "from_mp3"), - flac=("flac", "from_file"), - ) - def test_from_file_multichannel_audio_mix_compressed( self, audio_format, function, *mocks ): filename = "audio.{}".format(audio_format) @@ -444,86 +309,38 @@ segment_mock.sample_width = 2 segment_mock.channels = 2 segment_mock._data = b"abcd" - with patch("auditok.io._mix_audio_channels") as mix_mock: - with patch( - "auditok.io.AudioSegment.{}".format(function) - ) as open_func: - open_func.return_value = segment_mock - from_file(filename, use_channel="mix") - self.assertTrue(open_func.called) - mix_mock.assert_called_with( - segment_mock._data, - segment_mock.channels, - segment_mock.sample_width, - ) + with patch( + "auditok.io.AudioSegment.{}".format(function) + ) as open_func: + open_func.return_value = segment_mock + from_file(filename) + self.assertTrue(open_func.called) + @genty_dataset( - dafault_first_channel=(None, 400), - first_channel=(1, 400), - second_channel=(2, 800), - third_channel=(3, 1600), - negative_first_channel=(-3, 400), - negative_second_channel=(-2, 800), - negative_third_channel=(-1, 1600), + mono=("mono_400", (400,)), + three_channel=("3channel_400-800-1600", (400, 800, 1600)), + + mono_large_file=("mono_400", (400,), True), + three_channel_large_file=("3channel_400-800-1600", (400, 800, 1600), True), ) - def test_load_raw(self, use_channel, frequency): - filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw" - if use_channel is not None: - audio_source = _load_raw( - filename, - sampling_rate=16000, - sample_width=2, - channels=3, - use_channel=use_channel, - ) - else: - audio_source = _load_raw( - filename, sampling_rate=16000, sample_width=2, channels=3 - ) - self.assertIsInstance(audio_source, BufferAudioSource) + def test_load_raw(self, file_id, frequencies, large_file=False): + filename = "tests/data/test_16KHZ_{}Hz.raw".format(file_id) + audio_source = _load_raw(filename, 16000, 2, len(frequencies), large_file=large_file) + audio_source.open() + data = audio_source.read(-1) + audio_source.close() + expected_class = RawAudioSource if large_file else BufferAudioSource + self.assertIsInstance(audio_source, expected_class) self.assertEqual(audio_source.sampling_rate, 16000) self.assertEqual(audio_source.sample_width, 2) - self.assertEqual(audio_source.channels, 1) - # generate a pure sine wave tone of the given frequency - expected = PURE_TONE_DICT[frequency] - # compre with data read from file - fmt = DATA_FORMAT[2] - data = array(fmt, audio_source._buffer) + self.assertEqual(audio_source.channels, len(frequencies)) + mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] + fmt = DATA_FORMAT[audio_source.sample_width] + expected =_array_to_bytes(array(fmt, _sample_generator(*mono_channels))) self.assertEqual(data, expected) @genty_dataset( - mono=("mono_400Hz", (400,)), - three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)), - ) - def test_load_raw_mix(self, filename_suffix, frequencies): - sampling_rate = 16000 - sample_width = 2 - channels = len(frequencies) - mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - - fmt = DATA_FORMAT[sample_width] - expected = _array_to_bytes( - array( - fmt, - (sum(samples) // channels for samples in zip(*mono_channels)), - ) - ) - filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix) - audio_source = _load_raw( - filename, - use_channel="mix", - sampling_rate=sampling_rate, - sample_width=2, - channels=channels, - ) - mixed = audio_source._buffer - self.assertEqual(mixed, expected) - self.assertIsInstance(audio_source, BufferAudioSource) - self.assertEqual(audio_source.sampling_rate, sampling_rate) - self.assertEqual(audio_source.sample_width, sample_width) - self.assertEqual(audio_source.channels, 1) - - @genty_dataset( missing_sampling_rate=("sr",), missing_sample_width=("sw",), missing_channels=("ch",), @@ -536,102 +353,60 @@ _load_raw("audio", srate, swidth, channels) @genty_dataset( - dafault_first_channel=(None, 400), - first_channel=(1, 400), - second_channel=(2, 800), - third_channel=(3, 1600), - negative_first_channel=(-3, 400), - negative_second_channel=(-2, 800), - negative_third_channel=(-1, 1600), + mono=("mono_400", (400,)), + three_channel=("3channel_400-800-1600", (400, 800, 1600)), + + mono_large_file=("mono_400", (400,), True), + three_channel_large_file=("3channel_400-800-1600", (400, 800, 1600), True), ) - def test_load_wave(self, use_channel, frequency): - filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav" - if use_channel is not None: - audio_source = _load_wave(filename, use_channel=use_channel) - else: - audio_source = _load_wave(filename) - self.assertIsInstance(audio_source, BufferAudioSource) + def test_load_wave(self, file_id, frequencies, large_file=False): + filename = "tests/data/test_16KHZ_{}Hz.wav".format(file_id) + audio_source = _load_wave(filename, large_file=large_file) + audio_source.open() + data = audio_source.read(-1) + audio_source.close() + expected_class = WaveAudioSource if large_file else BufferAudioSource + self.assertIsInstance(audio_source, expected_class) self.assertEqual(audio_source.sampling_rate, 16000) self.assertEqual(audio_source.sample_width, 2) - self.assertEqual(audio_source.channels, 1) - # generate a pure sine wave tone of the given frequency - expected = PURE_TONE_DICT[frequency] - # compre with data read from file - fmt = DATA_FORMAT[2] - data = array(fmt, audio_source._buffer) + self.assertEqual(audio_source.channels, len(frequencies)) + mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] + fmt = DATA_FORMAT[audio_source.sample_width] + expected =_array_to_bytes(array(fmt, _sample_generator(*mono_channels))) self.assertEqual(data, expected) - @genty_dataset( - mono=("mono_400Hz", (400,)), - three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)), - ) - def test_load_wave_mix(self, filename_suffix, frequencies): - sampling_rate = 16000 - sample_width = 2 - channels = len(frequencies) - mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies] - fmt = DATA_FORMAT[sample_width] - expected = _array_to_bytes( - array( - fmt, - (sum(samples) // channels for samples in zip(*mono_channels)), - ) - ) - filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix) - audio_source = _load_wave(filename, use_channel="mix") - mixed = audio_source._buffer - self.assertEqual(mixed, expected) - self.assertIsInstance(audio_source, BufferAudioSource) - self.assertEqual(audio_source.sampling_rate, sampling_rate) - self.assertEqual(audio_source.sample_width, sample_width) - self.assertEqual(audio_source.channels, 1) @patch("auditok.io._WITH_PYDUB", True) @patch("auditok.io.BufferAudioSource") @genty_dataset( - ogg_default_first_channel=("ogg", 2, None, "from_ogg"), - ogg_first_channel=("ogg", 1, 0, "from_ogg"), - ogg_second_channel=("ogg", 2, 1, "from_ogg"), - ogg_mix_channels=("ogg", 3, "mix", "from_ogg"), - mp3_left_channel=("mp3", 1, "left", "from_mp3"), - mp3_right_channel=("mp3", 2, "right", "from_mp3"), - mp3_mix_channels=("mp3", 3, "mix", "from_mp3"), - flac_first_channel=("flac", 2, 1, "from_file"), - flac_second_channel=("flac", 2, 1, "from_file"), - flv_left_channel=("flv", 1, "left", "from_flv"), - webm_right_channel=("webm", 2, "right", "from_file"), - webm_mix_channels=("webm", 4, "mix", "from_file"), + ogg_default_first_channel=("ogg", 2, "from_ogg"), + ogg_first_channel=("ogg", 1, "from_ogg"), + ogg_second_channel=("ogg", 2, "from_ogg"), + ogg_mix_channels=("ogg", 3, "from_ogg"), + mp3_left_channel=("mp3", 1, "from_mp3"), + mp3_right_channel=("mp3", 2, "from_mp3"), + mp3_mix_channels=("mp3", 3, "from_mp3"), + flac_first_channel=("flac", 2, "from_file"), + flac_second_channel=("flac", 2, "from_file"), + flv_left_channel=("flv", 1, "from_flv"), + webm_right_channel=("webm", 2, "from_file"), + webm_mix_channels=("webm", 4, "from_file"), ) def test_load_with_pydub( - self, audio_format, channels, use_channel, function, *mocks + self, audio_format, channels, function, *mocks ): filename = "audio.{}".format(audio_format) segment_mock = Mock() segment_mock.sample_width = 2 segment_mock.channels = channels segment_mock._data = b"abcdefgh" - with patch("auditok.io._extract_selected_channel") as ext_mock: - with patch( - "auditok.io.AudioSegment.{}".format(function) - ) as open_func: - open_func.return_value = segment_mock - normalized_use_channel = {"left": 1, "right": 2, None: 0}.get( - use_channel, use_channel - ) - if isinstance(normalized_use_channel, int) and normalized_use_channel > 0: - normalized_use_channel -= 1 - _load_with_pydub(filename, audio_format, use_channel) - self.assertTrue(open_func.called) - if channels > 1: - self.assertTrue(ext_mock.called) - ext_mock.assert_called_with( - segment_mock._data, - segment_mock.channels, - segment_mock.sample_width, - normalized_use_channel, - ) - else: - self.assertFalse(ext_mock.called) + with patch( + "auditok.io.AudioSegment.{}".format(function) + ) as open_func: + open_func.return_value = segment_mock + _load_with_pydub(filename, audio_format) + self.assertTrue(open_func.called) + @genty_dataset( mono=("mono_400Hz.raw", (400,)),