changeset 240:173ffca58d23

Read data from all available channels in AudioSource
author Amine Sehili <amine.sehili@gmail.com>
date Thu, 25 Jul 2019 20:50:52 +0100
parents 6c3b56eb8052
children 79b668c48fce
files auditok/io.py tests/test_AudioSource.py tests/test_io.py
diffstat 3 files changed, 173 insertions(+), 540 deletions(-) [+]
line wrap: on
line diff
--- a/auditok/io.py	Sun Jul 21 17:13:53 2019 +0100
+++ b/auditok/io.py	Thu Jul 25 20:50:52 2019 +0100
@@ -71,7 +71,6 @@
 DEFAULT_SAMPLING_RATE = 16000
 DEFAULT_SAMPLE_WIDTH = 2
 DEFAULT_NB_CHANNELS = 1
-DEFAULT_USE_CHANNEL = 0
 DATA_FORMAT = {1: "b", 2: "h", 4: "i"}
 
 
@@ -130,32 +129,17 @@
     Gets audio parameters from a dictionary of parameters.
     A parameter can have a long name or a short name. If the long name is
     present, the short name is ignored. In neither is present then
-    `AudioParameterError` is raised  except for the `use_channel` (or `uc`)
-    parameter for which a defalut value of 0 is returned.
-
-    Also raises `AudioParameterError` if sampling rate, sample width or
-    channels is not an integer.
+    `AudioParameterError` is raised.
 
     Expected parameters are:
 
         `sampling_rate`, `sr`: int, sampling rate.
         `sample_width`, `sw`: int, sample size in bytes.
         `channels`, `ch`: int, number of channels.
-        `use_channel`, `us`: int or str, which channel to use from data.
-            Default value is 0 (first channel). The following special str
-            values are also accepted:
-                `left`: alias for 0
-                `right`: alias for 1
-                `mix`: indicates that all channels should be mixed up into one
-                    single channel
 
     :Returns
-
-        param_dict: tuple
-            audio parameters as a tuple (sampling_rate,
-                                         sample_width,
-                                         channels,
-                                         use_channel)
+        audio_parameters: tuple
+            audio parameters: (sampling_rate, sample_width, channels)
     """
     err_message = (
         "'{ln}' (or '{sn}') must be a positive integer, found: '{val}'"
@@ -173,8 +157,7 @@
             )
         parameters.append(param)
     sampling_rate, sample_width, channels = parameters
-    use_channel = param_dict.get("use_channel", param_dict.get("uc"))
-    return sampling_rate, sample_width, channels, use_channel
+    return sampling_rate, sample_width, channels
 
 
 def _array_to_bytes(a):
@@ -530,28 +513,14 @@
 
 
 class _FileAudioSource(AudioSource):
-    def __init__(self, sampling_rate, sample_width, channels, use_channel):
+    def __init__(self, sampling_rate, sample_width, channels):
         AudioSource.__init__(self, sampling_rate, sample_width, channels)
         self._audio_stream = None
-        self._use_channel = _normalize_use_channel(use_channel)
-        if channels > 1:
-            self._extract_selected_channel = partial(
-                _extract_selected_channel,
-                channels=channels,
-                sample_width=sample_width,
-                use_channel=self._use_channel,
-            )
-        else:
-            self._extract_selected_channel = lambda x: x
 
     def __del__(self):
         if self.is_open():
             self.close()
 
-    @property
-    def use_channel(self):
-        return self._use_channel
-
     def is_open(self):
         return self._audio_stream is not None
 
@@ -567,18 +536,14 @@
         if not self.is_open():
             raise AudioIOError("Audio stream is not open")
         data = self._read_from_stream(size)
-        if data:
-            return self._extract_selected_channel(data)
-        return None
+        if not data:
+            return None
+        return data
 
 
 class RawAudioSource(_FileAudioSource, Rewindable):
-    def __init__(
-        self, file, sampling_rate, sample_width, channels, use_channel=None
-    ):
-        _FileAudioSource.__init__(
-            self, sampling_rate, sample_width, channels, use_channel
-        )
+    def __init__(self, file, sampling_rate, sample_width, channels):
+        _FileAudioSource.__init__(self, sampling_rate, sample_width, channels)
         self._file = file
         self._audio_stream = None
         self._sample_size = sample_width * channels
@@ -608,7 +573,7 @@
             path to a valid wave file.
     """
 
-    def __init__(self, filename, use_channel=None):
+    def __init__(self, filename):
         self._filename = filename
         self._audio_stream = None
         stream = wave.open(self._filename, "rb")
@@ -617,7 +582,6 @@
             stream.getframerate(),
             stream.getsampwidth(),
             stream.getnchannels(),
-            use_channel,
         )
         stream.close()
 
@@ -700,12 +664,9 @@
         sampling_rate=DEFAULT_SAMPLING_RATE,
         sample_width=DEFAULT_SAMPLE_WIDTH,
         channels=DEFAULT_NB_CHANNELS,
-        use_channel=0,
     ):
 
-        _FileAudioSource.__init__(
-            self, sampling_rate, sample_width, channels, use_channel
-        )
+        _FileAudioSource.__init__(self, sampling_rate, sample_width, channels)
         self._is_open = False
         self._sample_size = sample_width * channels
         if PYTHON_3:
@@ -847,20 +808,12 @@
         "-", raw data will be read from stdin. If None, read audio data from
         microphone using PyAudio.
     """
-    sampling_rate, sample_width, channels, use_channel = _get_audio_parameters(
-        kwargs
-    )
+    sampling_rate, sample_width, channels = _get_audio_parameters(kwargs)
     if input == "-":
-        return StdinAudioSource(
-            sampling_rate, sample_width, channels, use_channel
-        )
+        return StdinAudioSource(sampling_rate, sample_width, channels)
 
     if isinstance(input, bytes):
-        use_channel = _normalize_use_channel(use_channel)
-        data = _extract_selected_channel(
-            input, channels, sample_width, use_channel
-        )
-        return BufferAudioSource(data, sampling_rate, sample_width, 1)
+        return BufferAudioSource(input, sampling_rate, sample_width, channels)
 
     # read data from a file
     if input is not None:
@@ -879,14 +832,7 @@
         )
 
 
-def _load_raw(
-    file,
-    sampling_rate,
-    sample_width,
-    channels,
-    use_channel=1,
-    large_file=False,
-):
+def _load_raw(file, sampling_rate, sample_width, channels, large_file=False):
     """
     Load a raw audio file with standard Python.
     If `large_file` is True, audio data will be lazily
@@ -903,15 +849,13 @@
             sample width of audio data
         `channels`: int
             number of channels of audio data
-        `use_channel`: int
-            audio channel to read if file is not mono audio. This must be an integer
-            0 >= and < channels, or one of 'left' (treated as 0 or first channel), or
-            right (treated as 1 or second channels). 
+        `large_file`: bool
+            If True, return a `RawAudioSource` object that reads data lazily
+            from disk, otherwise load all data and return a `BufferAudioSource`
 
     :Returns:
 
-        `PyAudioPlayer` that has the same sampling rate, sample width and number of channels
-        as `audio_source`.
+        `RawAudioSource` if `large_file` is True, `BufferAudioSource` otherwise
     """
     if None in (sampling_rate, sample_width, channels):
         raise AudioParameterError(
@@ -924,52 +868,38 @@
             sampling_rate=sampling_rate,
             sample_width=sample_width,
             channels=channels,
-            use_channel=use_channel,
-        )
-    else:
-        with open(file, "rb") as fp:
-            data = fp.read()
-        if channels != 1:
-            use_channel = _normalize_use_channel(
-                use_channel
-            )  # TODO: should happen in BufferAudioSource
-            data = _extract_selected_channel(
-                data, channels, sample_width, use_channel
-            )
-        return BufferAudioSource(
-            data,
-            sampling_rate=sampling_rate,
-            sample_width=sample_width,
-            channels=1,
         )
 
+    with open(file, "rb") as fp:
+        data = fp.read()
+    return BufferAudioSource(
+        data,
+        sampling_rate=sampling_rate,
+        sample_width=sample_width,
+        channels=channels,
+    )
 
-def _load_wave(filename, large_file=False, use_channel=1):
+
+def _load_wave(filename, large_file=False):
     """
     Load a wave audio file with standard Python.
     If `large_file` is True, audio data will be lazily
     loaded to memory.
 
-    See also :func:`to_file`.
     """
     if large_file:
-        return WaveAudioSource(filename, use_channel)
+        return WaveAudioSource(filename)
     with wave.open(filename) as fp:
         channels = fp.getnchannels()
         srate = fp.getframerate()
         swidth = fp.getsampwidth()
         data = fp.readframes(-1)
-    if channels > 1:
-        use_channel = _normalize_use_channel(
-            use_channel
-        )  # TODO: should happen in BufferAudioSource
-        data = _extract_selected_channel(data, channels, swidth, use_channel)
     return BufferAudioSource(
-        data, sampling_rate=srate, sample_width=swidth, channels=1
+        data, sampling_rate=srate, sample_width=swidth, channels=channels
     )
 
 
-def _load_with_pydub(filename, audio_format, use_channel=1):
+def _load_with_pydub(filename, audio_format):
     """Open compressed audio file using pydub. If a video file
     is passed, its audio track(s) are extracted and loaded.
     This function should not be called directely, use :func:`from_file`
@@ -989,19 +919,11 @@
     }
     open_function = func_dict.get(audio_format, AudioSegment.from_file)
     segment = open_function(filename)
-    data = segment._data
-    if segment.channels > 1:
-        use_channel = _normalize_use_channel(
-            use_channel
-        )  # TODO: should happen in BufferAudioSource
-        data = _extract_selected_channel(
-            data, segment.channels, segment.sample_width, use_channel
-        )
     return BufferAudioSource(
-        data_buffer=data,
+        data_buffer=segment._data,
         sampling_rate=segment.frame_rate,
         sample_width=segment.sample_width,
-        channels=1,
+        channels=segment.channels,
     )
 
 
@@ -1032,7 +954,7 @@
         audio format used to save data  (e.g. raw, webm, wav, ogg)
     `large_file`: bool
         If True, audio won't fully be loaded to memory but only when a window
-        is read disk.
+        is read from disk.
 
     :kwargs:
 
@@ -1045,10 +967,6 @@
         sample width (i.e. number of bytes used to represent one audio sample)
     `channels`: int
         number of channels of audio data
-    `use_channel`: int, str
-        audio channel to extract from input file if file is not mono audio.
-        This must be an integer >= 0 and < channels, or one of the special
-        values `left` and `right` (treated as 0 and 1 respectively).
 
     :Returns:
 
@@ -1062,20 +980,16 @@
     audio_format = _guess_audio_format(audio_format, filename)
 
     if audio_format == "raw":
-        srate, swidth, channels, use_channel = _get_audio_parameters(kwargs)
-        return _load_raw(
-            filename, srate, swidth, channels, use_channel, large_file
-        )
+        srate, swidth, channels = _get_audio_parameters(kwargs)
+        return _load_raw(filename, srate, swidth, channels, large_file)
 
-    use_channel = kwargs.get("use_channel", kwargs.get("uc"))
     if audio_format in ["wav", "wave"]:
-        return _load_wave(filename, large_file, use_channel)
+        return _load_wave(filename, large_file)
     if large_file:
-        raise AudioIOError("Large file format should be raw or wav")
+        err_msg = "if 'large_file` is True file format should be raw or wav"
+        raise AudioIOError(err_msg)
     if _WITH_PYDUB:
-        return _load_with_pydub(
-            filename, audio_format=audio_format, use_channel=use_channel
-        )
+        return _load_with_pydub(filename, audio_format=audio_format)
     else:
         raise AudioIOError(
             "pydub is required for audio formats other than raw or wav"
@@ -1160,8 +1074,7 @@
         _save_raw(data, file)
         return
     try:
-        params = _get_audio_parameters(kwargs)
-        sampling_rate, sample_width, channels, _ = params
+        sampling_rate, sample_width, channels = _get_audio_parameters(kwargs)
     except AudioParameterError as exc:
         err_message = "All audio parameters are required to save formats "
         "other than raw. Error detail: {}".format(exc)
--- a/tests/test_AudioSource.py	Sun Jul 21 17:13:53 2019 +0100
+++ b/tests/test_AudioSource.py	Thu Jul 25 20:50:52 2019 +0100
@@ -12,7 +12,7 @@
     RawAudioSource,
     WaveAudioSource,
 )
-from test_util import PURE_TONE_DICT
+from test_util import PURE_TONE_DICT, _sample_generator
 
 
 def audio_source_read_all_gen(audio_source, size=None):
@@ -31,143 +31,88 @@
     # TODO when use_channel is None, return samples from all channels
 
     @genty_dataset(
-        mono_default=("mono_400Hz", 1, None, 400),
-        mono_mix=("mono_400Hz", 1, "mix", 400),
-        mono_channel_selection=("mono_400Hz", 1, 2, 400),
-        multichannel_default=("3channel_400-800-1600Hz", 3, None, 400),
-        multichannel_channel_select_1st=("3channel_400-800-1600Hz", 3, 1, 400),
-        multichannel_channel_select_2nd=("3channel_400-800-1600Hz", 3, 2, 800),
-        multichannel_channel_select_3rd=(
-            "3channel_400-800-1600Hz",
-            3,
-            3,
-            1600,
-        ),
+        mono=("mono_400Hz", (400,)),
+        multichannel=("3channel_400-800-1600Hz", (400, 800, 1600)),
     )
-    def test_BufferAudioSource_read_all(
-        self, file_suffix, channels, use_channel, frequency
-    ):
+    def test_BufferAudioSource_read_all(self, file_suffix, frequencies):
         file = "tests/data/test_16KHZ_{}.raw".format(file_suffix)
         with open(file, "rb") as fp:
             expected = fp.read()
-            audio_source = BufferAudioSource(expected, 16000, 2, channels)
-            audio_source.open()
-            data = audio_source.read(None)
-            self.assertEqual(data, expected)
-            audio_source.rewind()
-            data = audio_source.read(-10)
-            self.assertEqual(data, expected)
-            audio_source.close()
-
+        channels = len(frequencies)
+        audio_source = BufferAudioSource(expected, 16000, 2, channels)
+        audio_source.open()
+        data = audio_source.read(None)
+        self.assertEqual(data, expected)
+        audio_source.rewind()
+        data = audio_source.read(-10)
+        self.assertEqual(data, expected)
+        audio_source.close()
 
     @genty_dataset(
-        mono_default=("mono_400Hz", 1, None, 400),
-        mono_mix=("mono_400Hz", 1, "mix", 400),
-        mono_channel_selection=("mono_400Hz", 1, 2, 400),
-        multichannel_default=("3channel_400-800-1600Hz", 3, None, 400),
-        multichannel_channel_select_1st=("3channel_400-800-1600Hz", 3, 1, 400),
-        multichannel_channel_select_2nd=("3channel_400-800-1600Hz", 3, 2, 800),
-        multichannel_channel_select_3rd=(
-            "3channel_400-800-1600Hz",
-            3,
-            3,
-            1600,
-        ),
+        mono=("mono_400Hz", (400,)),
+        multichannel=("3channel_400-800-1600Hz", (400, 800, 1600)),
     )
-    def test_RawAudioSource(
-        self, file_suffix, channels, use_channel, frequency
-    ):
+    def test_RawAudioSource(self, file_suffix, frequencies):
         file = "tests/data/test_16KHZ_{}.raw".format(file_suffix)
-        audio_source = RawAudioSource(file, 16000, 2, channels, use_channel)
+        channels = len(frequencies)
+        audio_source = RawAudioSource(file, 16000, 2, channels)
         audio_source.open()
-        data = b"".join(audio_source_read_all_gen(audio_source))
+        data_read_all = b"".join(audio_source_read_all_gen(audio_source))
         audio_source.close()
-        expected = _array_to_bytes(PURE_TONE_DICT[frequency])
-        self.assertEqual(data, expected)
+        mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
+        fmt = DATA_FORMAT[audio_source.sample_width]
+        expected = _array_to_bytes(
+            array(fmt, _sample_generator(*mono_channels))
+        )
+
+        self.assertEqual(data_read_all, expected)
 
         # assert read all data with None
-        audio_source = RawAudioSource(file, 16000, 2, channels, use_channel)
+        audio_source = RawAudioSource(file, 16000, 2, channels)
         audio_source.open()
         data_read_all = audio_source.read(None)
         audio_source.close()
         self.assertEqual(data_read_all, expected)
 
         # assert read all data with a negative size
-        audio_source = RawAudioSource(file, 16000, 2, channels, use_channel)
+        audio_source = RawAudioSource(file, 16000, 2, channels)
         audio_source.open()
         data_read_all = audio_source.read(-10)
         audio_source.close()
         self.assertEqual(data_read_all, expected)
 
-
-    def test_RawAudioSource_mix(self):
-        file = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw"
-        audio_source = RawAudioSource(file, 16000, 2, 3, use_channel="mix")
+    @genty_dataset(
+        mono=("mono_400Hz", (400,)),
+        multichannel=("3channel_400-800-1600Hz", (400, 800, 1600)),
+    )
+    def test_WaveAudioSource(self, file_suffix, frequencies):
+        file = "tests/data/test_16KHZ_{}.wav".format(file_suffix)
+        audio_source = WaveAudioSource(file)
         audio_source.open()
         data = b"".join(audio_source_read_all_gen(audio_source))
         audio_source.close()
+        mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
+        fmt = DATA_FORMAT[audio_source.sample_width]
+        expected = _array_to_bytes(
+            array(fmt, _sample_generator(*mono_channels))
+        )
 
-        mono_channels = [PURE_TONE_DICT[freq] for freq in [400, 800, 1600]]
-        fmt = DATA_FORMAT[2]
-        expected = _array_to_bytes(
-            array(fmt, (sum(samples) // 3 for samples in zip(*mono_channels)))
-        )
-        expected = expected
-        self.assertEqual(data, expected)
-
-    @genty_dataset(
-        mono_default=("mono_400Hz", 1, None, 400),
-        mono_mix=("mono_400Hz", 1, "mix", 400),
-        mono_channel_selection=("mono_400Hz", 1, 2, 400),
-        multichannel_default=("3channel_400-800-1600Hz", 3, None, 400),
-        multichannel_channel_select_1st=("3channel_400-800-1600Hz", 3, 1, 400),
-        multichannel_channel_select_2nd=("3channel_400-800-1600Hz", 3, 2, 800),
-        multichannel_channel_select_3rd=(
-            "3channel_400-800-1600Hz",
-            3,
-            3,
-            1600,
-        ),
-    )
-    def test_WaveAudioSource(
-        self, file_suffix, channels, use_channel, frequency
-    ):
-        file = "tests/data/test_16KHZ_{}.wav".format(file_suffix)
-        audio_source = WaveAudioSource(file, use_channel)
-        audio_source.open()
-        data = b"".join(audio_source_read_all_gen(audio_source))
-        audio_source.close()
-        expected = _array_to_bytes(PURE_TONE_DICT[frequency])
         self.assertEqual(data, expected)
 
         # assert read all data with None
-        audio_source = WaveAudioSource(file, use_channel)
+        audio_source = WaveAudioSource(file)
         audio_source.open()
         data_read_all = audio_source.read(None)
         audio_source.close()
         self.assertEqual(data_read_all, expected)
 
         # assert read all data with a negative size
-        audio_source = WaveAudioSource(file, use_channel)
+        audio_source = WaveAudioSource(file)
         audio_source.open()
         data_read_all = audio_source.read(-10)
         audio_source.close()
         self.assertEqual(data_read_all, expected)
 
-    def test_WaveAudioSource_mix(self):
-        file = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav"
-        audio_source = WaveAudioSource(file, use_channel="mix")
-        audio_source.open()
-        data = b"".join(audio_source_read_all_gen(audio_source))
-        audio_source.close()
-
-        mono_channels = [PURE_TONE_DICT[freq] for freq in [400, 800, 1600]]
-        fmt = DATA_FORMAT[2]
-        expected = _array_to_bytes(
-            array(fmt, (sum(samples) // 3 for samples in zip(*mono_channels)))
-        )
-        self.assertEqual(data, expected)
-
 
 @genty
 class TestBufferAudioSource_SR10_SW1_CH1(unittest.TestCase):
--- a/tests/test_io.py	Sun Jul 21 17:13:53 2019 +0100
+++ b/tests/test_io.py	Thu Jul 25 20:50:52 2019 +0100
@@ -83,68 +83,44 @@
         result = _normalize_use_channel(use_channel)
         self.assertEqual(result, expected)
 
-    @genty_dataset(
-        int_1=((8000, 2, 1, 1), (8000, 2, 1, 1)),
-        int_2=((8000, 2, 1, 2), (8000, 2, 1, 2)),
-        use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, "left")),
-        use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, "right")),
-        use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
-        use_channel_None=((8000, 2, 2, None), (8000, 2, 2, None)),
-        no_use_channel=((8000, 2, 2), (8000, 2, 2, None)),
-    )
-    def test_get_audio_parameters_short_params(self, values, expected):
-        params = dict(zip(("sr", "sw", "ch", "uc"), values))
+    def test_get_audio_parameters_short_params(self):
+        expected = (8000, 2, 1)
+        params = dict(zip(("sr", "sw", "ch"), expected))
         result = _get_audio_parameters(params)
         self.assertEqual(result, expected)
 
-    @genty_dataset(
-        int_1=((8000, 2, 1, 1), (8000, 2, 1, 1)),
-        int_2=((8000, 2, 1, 2), (8000, 2, 1, 2)),
-        use_channel_left=((8000, 2, 1, "left"), (8000, 2, 1, "left")),
-        use_channel_right=((8000, 2, 1, "right"), (8000, 2, 1, "right")),
-        use_channel_mix=((8000, 2, 1, "mix"), (8000, 2, 1, "mix")),
-        use_channel_None=((8000, 2, 2, None), (8000, 2, 2, None)),
-        no_use_channel=((8000, 2, 2), (8000, 2, 2, None)),
-    )
-    def test_get_audio_parameters_long_params(self, values, expected):
+    def test_get_audio_parameters_long_params(self):
+        expected = (8000, 2, 1)
         params = dict(
             zip(
                 ("sampling_rate", "sample_width", "channels", "use_channel"),
-                values,
+                expected,
             )
         )
         result = _get_audio_parameters(params)
         self.assertEqual(result, expected)
 
-    @genty_dataset(simple=((8000, 2, 1, 1), (8000, 2, 1, 1)))
-    def test_get_audio_parameters_long_params_shadow_short_ones(
-        self, values, expected
-    ):
+    def test_get_audio_parameters_long_params_shadow_short_ones(self):
+        expected = (8000, 2, 1)
         params = dict(
-            zip(
-                ("sampling_rate", "sample_width", "channels", "use_channel"),
-                values,
-            )
+            zip(("sampling_rate", "sample_width", "channels"), expected)
         )
-        params.update(dict(zip(("sr", "sw", "ch", "uc"), "xxxx")))
+        params.update(dict(zip(("sr", "sw", "ch"), "xxx")))
         result = _get_audio_parameters(params)
         self.assertEqual(result, expected)
 
     @genty_dataset(
-        str_sampling_rate=(("x", 2, 1, 0),),
-        negative_sampling_rate=((-8000, 2, 1, 0),),
-        str_sample_width=((8000, "x", 1, 0),),
-        negative_sample_width=((8000, -2, 1, 0),),
-        str_channels=((8000, 2, "x", 0),),
-        negative_channels=((8000, 2, -1, 0),),
+        str_sampling_rate=(("x", 2, 1),),
+        negative_sampling_rate=((-8000, 2, 1),),
+        str_sample_width=((8000, "x", 1),),
+        negative_sample_width=((8000, -2, 1),),
+        str_channels=((8000, 2, "x"),),
+        negative_channels=((8000, 2, -1),),
     )
     def test_get_audio_parameters_invalid(self, values):
-        # TODO 0 or negative use_channel must raise AudioParameterError
-        # change implementation, don't accept negative uc
-        # hifglight everywhere in doc that uc must be positive
         params = dict(
             zip(
-                ("sampling_rate", "sample_width", "channels", "use_channel"),
+                ("sampling_rate", "sample_width", "channels"),
                 values,
             )
         )
@@ -310,133 +286,22 @@
             with self.assertRaises(AudioIOError):
                 from_file("audio", "mp3")
 
-    @genty_dataset(
-        raw_first_channel=("raw", 1, 400),
-        raw_second_channel=("raw", 2, 800),
-        raw_third_channel=("raw", 3, 1600),
-        raw_left_channel=("raw", "left", 400),
-        raw_right_channel=("raw", "right", 800),
-        wav_first_channel=("wav", 1, 400),
-        wav_second_channel=("wav", 2, 800),
-        wav_third_channel=("wav", 3, 1600),
-        wav_left_channel=("wav", "left", 400),
-        wav_right_channel=("wav", "right", 800),
-    )
-    def test_from_file_multichannel_audio(
-        self, audio_format, use_channel, frequency
-    ):
-        expected = PURE_TONE_DICT[frequency]
-        filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.{}".format(
-            audio_format
-        )
-        sample_width = 2
-        audio_source = from_file(
-            filename,
-            sampling_rate=16000,
-            sample_width=sample_width,
-            channels=3,
-            use_channel=use_channel,
-        )
-        fmt = DATA_FORMAT[sample_width]
-        data = array(fmt, audio_source._buffer)
-        self.assertEqual(data, expected)
-
-    @genty_dataset(
-        raw_mono=("raw", "mono_400Hz", (400,)),
-        raw_3channel=("raw", "3channel_400-800-1600Hz", (400, 800, 1600)),
-        wav_mono=("wav", "mono_400Hz", (400,)),
-        wav_3channel=("wav", "3channel_400-800-1600Hz", (400, 800, 1600)),
-    )
-    def test_from_file_multichannel_audio_mix(
-        self, audio_format, filename_suffix, frequencies
-    ):
-        sampling_rate = 16000
-        sample_width = 2
-        channels = len(frequencies)
-        mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
-        channels = len(frequencies)
-        fmt = DATA_FORMAT[sample_width]
-        expected = _array_to_bytes(
-            array(
-                fmt,
-                (sum(samples) // channels for samples in zip(*mono_channels)),
-            )
-        )
-        filename = "tests/data/test_16KHZ_{}.{}".format(
-            filename_suffix, audio_format
-        )
-        audio_source = from_file(
-            filename,
-            use_channel="mix",
-            sampling_rate=sampling_rate,
-            sample_width=2,
-            channels=channels,
-        )
-        mixed = audio_source._buffer
-        self.assertEqual((mixed), expected)
 
     @patch("auditok.io._WITH_PYDUB", True)
     @patch("auditok.io.BufferAudioSource")
     @genty_dataset(
-        ogg_first_channel=("ogg", 1, "from_ogg"),
-        ogg_second_channel=("ogg", 2, "from_ogg"),
-        ogg_mix=("ogg", "mix", "from_ogg"),
-        ogg_default=("ogg", None, "from_ogg"),
-        mp3_left_channel=("mp3", "left", "from_mp3"),
-        mp3_right_channel=("mp3", "right", "from_mp3"),
-        flac_first_channel=("flac", 1, "from_file"),
-        flac_second_channel=("flac", 1, "from_file"),
-        flv_left_channel=("flv", "left", "from_flv"),
-        webm_right_channel=("webm", "right", "from_file"),
+        ogg_first_channel=("ogg", "from_ogg"),
+        ogg_second_channel=("ogg", "from_ogg"),
+        ogg_mix=("ogg", "from_ogg"),
+        ogg_default=("ogg", "from_ogg"),
+        mp3_left_channel=("mp3", "from_mp3"),
+        mp3_right_channel=("mp3", "from_mp3"),
+        flac_first_channel=("flac", "from_file"),
+        flac_second_channel=("flac", "from_file"),
+        flv_left_channel=("flv", "from_flv"),
+        webm_right_channel=("webm", "from_file"),
     )
     def test_from_file_multichannel_audio_compressed(
-        self, audio_format, use_channel, function, *mocks
-    ):
-        filename = "audio.{}".format(audio_format)
-        segment_mock = Mock()
-        segment_mock.sample_width = 2
-        segment_mock.channels = 2
-        segment_mock._data = b"abcd"
-        with patch("auditok.io._extract_selected_channel") as ext_mock:
-            with patch(
-                "auditok.io.AudioSegment.{}".format(function)
-            ) as open_func:
-                open_func.return_value = segment_mock
-                from_file(filename, use_channel=use_channel)
-                self.assertTrue(open_func.called)
-                self.assertTrue(ext_mock.called)
-
-                use_channel = {"left": 1, "right": 2, None: 1}.get(
-                    use_channel, use_channel
-                )
-                if isinstance(use_channel, int):
-                    # _extract_selected_channel will be called with a channel starting from 0
-                    use_channel -= 1
-                ext_mock.assert_called_with(
-                    segment_mock._data,
-                    segment_mock.channels,
-                    segment_mock.sample_width,
-                    use_channel,
-                )
-
-        with patch("auditok.io._extract_selected_channel") as ext_mock:
-            with patch(
-                "auditok.io.AudioSegment.{}".format(function)
-            ) as open_func:
-                segment_mock.channels = 1
-                open_func.return_value = segment_mock
-                from_file(filename, use_channel=use_channel)
-                self.assertTrue(open_func.called)
-                self.assertFalse(ext_mock.called)
-
-    @patch("auditok.io._WITH_PYDUB", True)
-    @patch("auditok.io.BufferAudioSource")
-    @genty_dataset(
-        ogg=("ogg", "from_ogg"),
-        mp3=("mp3", "from_mp3"),
-        flac=("flac", "from_file"),
-    )
-    def test_from_file_multichannel_audio_mix_compressed(
         self, audio_format, function, *mocks
     ):
         filename = "audio.{}".format(audio_format)
@@ -444,86 +309,38 @@
         segment_mock.sample_width = 2
         segment_mock.channels = 2
         segment_mock._data = b"abcd"
-        with patch("auditok.io._mix_audio_channels") as mix_mock:
-            with patch(
-                "auditok.io.AudioSegment.{}".format(function)
-            ) as open_func:
-                open_func.return_value = segment_mock
-                from_file(filename, use_channel="mix")
-                self.assertTrue(open_func.called)
-                mix_mock.assert_called_with(
-                    segment_mock._data,
-                    segment_mock.channels,
-                    segment_mock.sample_width,
-                )
+        with patch(
+            "auditok.io.AudioSegment.{}".format(function)
+        ) as open_func:
+            open_func.return_value = segment_mock
+            from_file(filename)
+            self.assertTrue(open_func.called)
+
 
     @genty_dataset(
-        dafault_first_channel=(None, 400),
-        first_channel=(1, 400),
-        second_channel=(2, 800),
-        third_channel=(3, 1600),
-        negative_first_channel=(-3, 400),
-        negative_second_channel=(-2, 800),
-        negative_third_channel=(-1, 1600),
+        mono=("mono_400", (400,)),
+        three_channel=("3channel_400-800-1600", (400, 800, 1600)),
+
+        mono_large_file=("mono_400", (400,), True),
+        three_channel_large_file=("3channel_400-800-1600", (400, 800, 1600), True),
     )
-    def test_load_raw(self, use_channel, frequency):
-        filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.raw"
-        if use_channel is not None:
-            audio_source = _load_raw(
-                filename,
-                sampling_rate=16000,
-                sample_width=2,
-                channels=3,
-                use_channel=use_channel,
-            )
-        else:
-            audio_source = _load_raw(
-                filename, sampling_rate=16000, sample_width=2, channels=3
-            )
-        self.assertIsInstance(audio_source, BufferAudioSource)
+    def test_load_raw(self, file_id, frequencies, large_file=False):
+        filename = "tests/data/test_16KHZ_{}Hz.raw".format(file_id)
+        audio_source = _load_raw(filename, 16000, 2, len(frequencies), large_file=large_file)
+        audio_source.open()
+        data = audio_source.read(-1)
+        audio_source.close()
+        expected_class = RawAudioSource if large_file else BufferAudioSource
+        self.assertIsInstance(audio_source, expected_class)
         self.assertEqual(audio_source.sampling_rate, 16000)
         self.assertEqual(audio_source.sample_width, 2)
-        self.assertEqual(audio_source.channels, 1)
-        # generate a pure sine wave tone of the given frequency
-        expected = PURE_TONE_DICT[frequency]
-        # compre with data read from file
-        fmt = DATA_FORMAT[2]
-        data = array(fmt, audio_source._buffer)
+        self.assertEqual(audio_source.channels, len(frequencies))
+        mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
+        fmt = DATA_FORMAT[audio_source.sample_width]
+        expected =_array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
         self.assertEqual(data, expected)
 
     @genty_dataset(
-        mono=("mono_400Hz", (400,)),
-        three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
-    )
-    def test_load_raw_mix(self, filename_suffix, frequencies):
-        sampling_rate = 16000
-        sample_width = 2
-        channels = len(frequencies)
-        mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
-
-        fmt = DATA_FORMAT[sample_width]
-        expected = _array_to_bytes(
-            array(
-                fmt,
-                (sum(samples) // channels for samples in zip(*mono_channels)),
-            )
-        )
-        filename = "tests/data/test_16KHZ_{}.raw".format(filename_suffix)
-        audio_source = _load_raw(
-            filename,
-            use_channel="mix",
-            sampling_rate=sampling_rate,
-            sample_width=2,
-            channels=channels,
-        )
-        mixed = audio_source._buffer
-        self.assertEqual(mixed, expected)
-        self.assertIsInstance(audio_source, BufferAudioSource)
-        self.assertEqual(audio_source.sampling_rate, sampling_rate)
-        self.assertEqual(audio_source.sample_width, sample_width)
-        self.assertEqual(audio_source.channels, 1)
-
-    @genty_dataset(
         missing_sampling_rate=("sr",),
         missing_sample_width=("sw",),
         missing_channels=("ch",),
@@ -536,102 +353,60 @@
             _load_raw("audio", srate, swidth, channels)
 
     @genty_dataset(
-        dafault_first_channel=(None, 400),
-        first_channel=(1, 400),
-        second_channel=(2, 800),
-        third_channel=(3, 1600),
-        negative_first_channel=(-3, 400),
-        negative_second_channel=(-2, 800),
-        negative_third_channel=(-1, 1600),
+        mono=("mono_400", (400,)),
+        three_channel=("3channel_400-800-1600", (400, 800, 1600)),
+
+        mono_large_file=("mono_400", (400,), True),
+        three_channel_large_file=("3channel_400-800-1600", (400, 800, 1600), True),
     )
-    def test_load_wave(self, use_channel, frequency):
-        filename = "tests/data/test_16KHZ_3channel_400-800-1600Hz.wav"
-        if use_channel is not None:
-            audio_source = _load_wave(filename, use_channel=use_channel)
-        else:
-            audio_source = _load_wave(filename)
-        self.assertIsInstance(audio_source, BufferAudioSource)
+    def test_load_wave(self, file_id, frequencies, large_file=False):
+        filename = "tests/data/test_16KHZ_{}Hz.wav".format(file_id)
+        audio_source = _load_wave(filename, large_file=large_file)
+        audio_source.open()
+        data = audio_source.read(-1)
+        audio_source.close()
+        expected_class = WaveAudioSource if large_file else BufferAudioSource
+        self.assertIsInstance(audio_source, expected_class)
         self.assertEqual(audio_source.sampling_rate, 16000)
         self.assertEqual(audio_source.sample_width, 2)
-        self.assertEqual(audio_source.channels, 1)
-        # generate a pure sine wave tone of the given frequency
-        expected = PURE_TONE_DICT[frequency]
-        # compre with data read from file
-        fmt = DATA_FORMAT[2]
-        data = array(fmt, audio_source._buffer)
+        self.assertEqual(audio_source.channels, len(frequencies))
+        mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
+        fmt = DATA_FORMAT[audio_source.sample_width]
+        expected =_array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
         self.assertEqual(data, expected)
 
-    @genty_dataset(
-        mono=("mono_400Hz", (400,)),
-        three_channel=("3channel_400-800-1600Hz", (400, 800, 1600)),
-    )
-    def test_load_wave_mix(self, filename_suffix, frequencies):
-        sampling_rate = 16000
-        sample_width = 2
-        channels = len(frequencies)
-        mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
-        fmt = DATA_FORMAT[sample_width]
-        expected = _array_to_bytes(
-            array(
-                fmt,
-                (sum(samples) // channels for samples in zip(*mono_channels)),
-            )
-        )
-        filename = "tests/data/test_16KHZ_{}.wav".format(filename_suffix)
-        audio_source = _load_wave(filename, use_channel="mix")
-        mixed = audio_source._buffer
-        self.assertEqual(mixed, expected)
-        self.assertIsInstance(audio_source, BufferAudioSource)
-        self.assertEqual(audio_source.sampling_rate, sampling_rate)
-        self.assertEqual(audio_source.sample_width, sample_width)
-        self.assertEqual(audio_source.channels, 1)
 
     @patch("auditok.io._WITH_PYDUB", True)
     @patch("auditok.io.BufferAudioSource")
     @genty_dataset(
-        ogg_default_first_channel=("ogg", 2, None, "from_ogg"),
-        ogg_first_channel=("ogg", 1, 0, "from_ogg"),
-        ogg_second_channel=("ogg", 2, 1, "from_ogg"),
-        ogg_mix_channels=("ogg", 3, "mix", "from_ogg"),
-        mp3_left_channel=("mp3", 1, "left", "from_mp3"),
-        mp3_right_channel=("mp3", 2, "right", "from_mp3"),
-        mp3_mix_channels=("mp3", 3, "mix", "from_mp3"),
-        flac_first_channel=("flac", 2, 1, "from_file"),
-        flac_second_channel=("flac", 2, 1, "from_file"),
-        flv_left_channel=("flv", 1, "left", "from_flv"),
-        webm_right_channel=("webm", 2, "right", "from_file"),
-        webm_mix_channels=("webm", 4, "mix", "from_file"),
+        ogg_default_first_channel=("ogg", 2, "from_ogg"),
+        ogg_first_channel=("ogg", 1, "from_ogg"),
+        ogg_second_channel=("ogg", 2, "from_ogg"),
+        ogg_mix_channels=("ogg", 3, "from_ogg"),
+        mp3_left_channel=("mp3", 1, "from_mp3"),
+        mp3_right_channel=("mp3", 2, "from_mp3"),
+        mp3_mix_channels=("mp3", 3, "from_mp3"),
+        flac_first_channel=("flac", 2, "from_file"),
+        flac_second_channel=("flac", 2, "from_file"),
+        flv_left_channel=("flv", 1, "from_flv"),
+        webm_right_channel=("webm", 2, "from_file"),
+        webm_mix_channels=("webm", 4, "from_file"),
     )
     def test_load_with_pydub(
-        self, audio_format, channels, use_channel, function, *mocks
+        self, audio_format, channels, function, *mocks
     ):
         filename = "audio.{}".format(audio_format)
         segment_mock = Mock()
         segment_mock.sample_width = 2
         segment_mock.channels = channels
         segment_mock._data = b"abcdefgh"
-        with patch("auditok.io._extract_selected_channel") as ext_mock:
-            with patch(
-                "auditok.io.AudioSegment.{}".format(function)
-            ) as open_func:
-                open_func.return_value = segment_mock
-                normalized_use_channel = {"left": 1, "right": 2, None: 0}.get(
-                    use_channel, use_channel
-                )
-                if isinstance(normalized_use_channel, int) and normalized_use_channel > 0:
-                     normalized_use_channel -= 1
-                _load_with_pydub(filename, audio_format, use_channel)
-                self.assertTrue(open_func.called)
-                if channels > 1:
-                    self.assertTrue(ext_mock.called)
-                    ext_mock.assert_called_with(
-                        segment_mock._data,
-                        segment_mock.channels,
-                        segment_mock.sample_width,
-                        normalized_use_channel,
-                    )
-                else:
-                    self.assertFalse(ext_mock.called)
+        with patch(
+            "auditok.io.AudioSegment.{}".format(function)
+        ) as open_func:
+            open_func.return_value = segment_mock
+            _load_with_pydub(filename, audio_format)
+            self.assertTrue(open_func.called)
+
 
     @genty_dataset(
         mono=("mono_400Hz.raw", (400,)),