view auditok/util.py @ 333:6fc2d27bd2ef

Shorten long lines
author Amine Sehili <amine.sehili@gmail.com>
date Fri, 25 Oct 2019 20:23:51 +0100
parents e0c7ae720cc6
children f424ac9193b7
line wrap: on
line source
"""
Class summary
=============

.. autosummary::

        make_channel_selector
        AudioEnergyValidator
        AudioReader
        Recorder
"""
from __future__ import division
import sys
from abc import ABC, abstractmethod
import warnings
from functools import partial
from audioop import tomono
from .io import (
    AudioIOError,
    AudioSource,
    from_file,
    BufferAudioSource,
    PyAudioSource,
    get_audio_source,
)
from .exceptions import DuplicateArgument, TooSamllBlockDuration

try:
    from . import signal_numpy as signal
except ImportError:
    from . import signal


__all__ = [
    "DataSource",
    "DataValidator",
    "StringDataSource",
    "ADSFactory",
    "AudioDataSource",
    "AudioReader",
    "Recorder",
    "AudioEnergyValidator",
]


def make_channel_selector(sample_width, channels, selected=None):
    fmt = signal.FORMAT.get(sample_width)
    if fmt is None:
        err_msg = "'sample_width' must be 1, 2 or 4, given: {}"
        raise ValueError(err_msg.format(sample_width))
    if channels == 1:
        return lambda x: x

    if isinstance(selected, int):
        if selected < 0:
            selected += channels
        if selected < 0 or selected >= channels:
            err_msg = "Selected channel must be >= -channels and < 'channels'"
            err_msg += ", given: {}"
            raise ValueError(err_msg.format(selected))
        return partial(
            signal.extract_single_channel,
            fmt=fmt,
            channels=channels,
            selected=selected,
        )

    if selected in ("mix", "avg", "average"):
        if channels == 2:
            # when data is stereo, using audioop when possible is much faster
            return partial(
                signal.average_channels_stereo, sample_width=sample_width
            )

        return partial(signal.average_channels, fmt=fmt, channels=channels)

    if selected in (None, "any"):
        return partial(signal.separate_channels, fmt=fmt, channels=channels)

    raise ValueError(
        "Selected channel must be an integer, None (alias 'any') or 'average' "
        "(alias 'avg' or 'mix')"
    )


class DataSource(ABC):
    """
    Base class for objects passed to
    :func:`auditok.core.StreamTokenizer.tokenize`.
    Subclasses should implement a :func:`DataSource.read` method.
    """

    @abstractmethod
    def read(self):
        """
        Read a piece of data read from this source.
        If no more data is available, return None.
        """


class DataValidator(ABC):
    """
    Base class for a validator object used by :class:`.core.StreamTokenizer`
    to check if read data is valid.
    Subclasses should implement :func:`is_valid` method.
   """

    @abstractmethod
    def is_valid(self, data):
        """
        Check whether `data` is valid
        """


class AudioEnergyValidator(DataValidator):
    def __init__(
        self, energy_threshold, sample_width, channels, use_channel=None
    ):
        self._sample_width = sample_width
        self._selector = make_channel_selector(
            sample_width, channels, use_channel
        )
        if channels == 1 or use_channel not in (None, "any"):
            self._energy_fn = signal.calculate_energy_single_channel
        else:
            self._energy_fn = signal.calculate_energy_multichannel
        self._energy_threshold = energy_threshold

    def is_valid(self, data):
        log_energy = self._energy_fn(self._selector(data), self._sample_width)
        return log_energy >= self._energy_threshold


class StringDataSource(DataSource):
    """
    A class that represent a :class:`DataSource` as a string buffer.
    Each call to :func:`DataSource.read` returns on character and moves one
    step forward. If the end of the buffer is reached, :func:`read` returns
    None.

    :Parameters:

        `data` :
            a str object.

    """

    def __init__(self, data):

        self._data = None
        self._current = 0
        self.set_data(data)

    def read(self):
        """
        Read one character from buffer.

        :Returns:

            Current character or None if end of buffer is reached
        """

        if self._current >= len(self._data):
            return None
        self._current += 1
        return self._data[self._current - 1]

    def set_data(self, data):
        """
        Set a new data buffer.

        :Parameters:

            `data` : a str object
                New data buffer.
        """

        if not isinstance(data, str):
            raise ValueError("data must an instance of str")
        self._data = data
        self._current = 0


class ADSFactory:
    """
    Factory class that makes it easy to create an
    :class:`ADSFactory.AudioDataSource` object that implements
    :class:`DataSource` and can therefore be passed to
    :func:`auditok.core.StreamTokenizer.tokenize`.

    Whether you read audio data from a file, the microphone or a memory buffer,
    this factory instantiates and returns the right
    :class:`ADSFactory.AudioDataSource` object.

    There are many other features you want your
    :class:`ADSFactory.AudioDataSource` object to have, such as: memorize all
    read audio data so that you can rewind and reuse it (especially useful when
    reading data from the microphone), read a fixed amount of data (also useful
    when reading from the microphone), read overlapping audio frames
    (often needed when dosing a spectral analysis of data).

    :func:`ADSFactory.ads` automatically creates and return object with the
    desired behavior according to the supplied keyword arguments.
    """

    @staticmethod  # noqa: C901
    def _check_normalize_args(kwargs):

        for k in kwargs:
            if k not in [
                "block_dur",
                "hop_dur",
                "block_size",
                "hop_size",
                "max_time",
                "record",
                "audio_source",
                "filename",
                "data_buffer",
                "frames_per_buffer",
                "sampling_rate",
                "sample_width",
                "channels",
                "sr",
                "sw",
                "ch",
                "asrc",
                "fn",
                "fpb",
                "db",
                "mt",
                "rec",
                "bd",
                "hd",
                "bs",
                "hs",
            ]:
                raise ValueError("Invalid argument: {0}".format(k))

        if "block_dur" in kwargs and "bd" in kwargs:
            raise DuplicateArgument(
                "Either 'block_dur' or 'bd' must be specified, not both"
            )

        if "hop_dur" in kwargs and "hd" in kwargs:
            raise DuplicateArgument(
                "Either 'hop_dur' or 'hd' must be specified, not both"
            )

        if "block_size" in kwargs and "bs" in kwargs:
            raise DuplicateArgument(
                "Either 'block_size' or 'bs' must be specified, not both"
            )

        if "hop_size" in kwargs and "hs" in kwargs:
            raise DuplicateArgument(
                "Either 'hop_size' or 'hs' must be specified, not both"
            )

        if "max_time" in kwargs and "mt" in kwargs:
            raise DuplicateArgument(
                "Either 'max_time' or 'mt' must be specified, not both"
            )

        if "audio_source" in kwargs and "asrc" in kwargs:
            raise DuplicateArgument(
                "Either 'audio_source' or 'asrc' must be specified, not both"
            )

        if "filename" in kwargs and "fn" in kwargs:
            raise DuplicateArgument(
                "Either 'filename' or 'fn' must be specified, not both"
            )

        if "data_buffer" in kwargs and "db" in kwargs:
            raise DuplicateArgument(
                "Either 'filename' or 'db' must be specified, not both"
            )

        if "frames_per_buffer" in kwargs and "fbb" in kwargs:
            raise DuplicateArgument(
                "Either 'frames_per_buffer' or 'fpb' must be specified, not "
                "both"
            )

        if "sampling_rate" in kwargs and "sr" in kwargs:
            raise DuplicateArgument(
                "Either 'sampling_rate' or 'sr' must be specified, not both"
            )

        if "sample_width" in kwargs and "sw" in kwargs:
            raise DuplicateArgument(
                "Either 'sample_width' or 'sw' must be specified, not both"
            )

        if "channels" in kwargs and "ch" in kwargs:
            raise DuplicateArgument(
                "Either 'channels' or 'ch' must be specified, not both"
            )

        if "record" in kwargs and "rec" in kwargs:
            raise DuplicateArgument(
                "Either 'record' or 'rec' must be specified, not both"
            )

        kwargs["bd"] = kwargs.pop("block_dur", None) or kwargs.pop("bd", None)
        kwargs["hd"] = kwargs.pop("hop_dur", None) or kwargs.pop("hd", None)
        kwargs["bs"] = kwargs.pop("block_size", None) or kwargs.pop("bs", None)
        kwargs["hs"] = kwargs.pop("hop_size", None) or kwargs.pop("hs", None)
        kwargs["mt"] = kwargs.pop("max_time", None) or kwargs.pop("mt", None)
        kwargs["asrc"] = kwargs.pop("audio_source", None) or kwargs.pop(
            "asrc", None
        )
        kwargs["fn"] = kwargs.pop("filename", None) or kwargs.pop("fn", None)
        kwargs["db"] = kwargs.pop("data_buffer", None) or kwargs.pop(
            "db", None
        )

        record = kwargs.pop("record", False)
        if not record:
            record = kwargs.pop("rec", False)
            if not isinstance(record, bool):
                raise TypeError("'record' must be a boolean")

        kwargs["rec"] = record

        # keep long names for arguments meant for BufferAudioSource
        # and PyAudioSource
        if "frames_per_buffer" in kwargs or "fpb" in kwargs:
            kwargs["frames_per_buffer"] = kwargs.pop(
                "frames_per_buffer", None
            ) or kwargs.pop("fpb", None)

        if "sampling_rate" in kwargs or "sr" in kwargs:
            kwargs["sampling_rate"] = kwargs.pop(
                "sampling_rate", None
            ) or kwargs.pop("sr", None)

        if "sample_width" in kwargs or "sw" in kwargs:
            kwargs["sample_width"] = kwargs.pop(
                "sample_width", None
            ) or kwargs.pop("sw", None)

        if "channels" in kwargs or "ch" in kwargs:
            kwargs["channels"] = kwargs.pop("channels", None) or kwargs.pop(
                "ch", None
            )

    @staticmethod
    def ads(**kwargs):
        """
        Create an return an :class:`ADSFactory.AudioDataSource`. The type and
        behavior of the object is the result
        of the supplied parameters.

        :Parameters:

        *No parameters* :
           read audio data from the available built-in microphone with the
           default parameters. The returned :class:`ADSFactory.AudioDataSource`
           encapsulate an :class:`io.PyAudioSource` object and hence it accepts
           the next four parameters are passed to use instead of their default
           values.

        `sampling_rate`, `sr` : *(int)*
            number of samples per second. Default = 16000.

        `sample_width`, `sw` : *(int)*
            number of bytes per sample (must be in (1, 2, 4)). Default = 2

        `channels`, `ch` : *(int)*
            number of audio channels. Default = 1 (only this value is currently
            accepted)

        `frames_per_buffer`, `fpb` : *(int)*
            number of samples of PyAudio buffer. Default = 1024.

        `audio_source`, `asrc` : an `AudioSource` object
            read data from this audio source

        `filename`, `fn` : *(string)*
            build an `io.AudioSource` object using this file (currently only
            wave format is supported)

        `data_buffer`, `db` : *(string)*
            build an `io.BufferAudioSource` using data in `data_buffer`.
            If this keyword is used,
            `sampling_rate`, `sample_width` and `channels` are passed to
            `io.BufferAudioSource` constructor and used instead of default
            values.

        `max_time`, `mt` : *(float)*
            maximum time (in seconds) to read. Default behavior: read until
            there is no more data
            available.

        `record`, `rec` : *(bool)*
            save all read data in cache. Provide a navigable object which has a
            `rewind` method.
            Default = False.

        `block_dur`, `bd` : *(float)*
            processing block duration in seconds. This represents the quantity
            of audio data to return each time the :func:`read` method is
            invoked. If `block_dur` is 0.025 (i.e. 25 ms) and the sampling rate
            is 8000 and the sample width is 2 bytes, :func:`read` returns a
            buffer of 0.025 * 8000 * 2 = 400 bytes at most. This parameter will
            be looked for (and used if available) before `block_size`. If
            neither parameter is given, `block_dur` will be set to 0.01 second
            (i.e. 10 ms)

        `hop_dur`, `hd` : *(float)*
            quantity of data to skip from current processing window. if
            `hop_dur` is supplied then there will be an overlap of `block_dur`
            - `hop_dur` between two adjacent blocks. This parameter will be
            looked for (and used if available) before `hop_size`.
            If neither parameter is given, `hop_dur` will be set to `block_dur`
            which means that there will be no overlap between two consecutively
            read blocks.

        `block_size`, `bs` : *(int)*
            number of samples to read each time the `read` method is called.
            Default: a block size that represents a window of 10ms, so for a
            sampling rate of 16000, the default `block_size` is 160 samples,
            for a rate of 44100, `block_size` = 441 samples, etc.

        `hop_size`, `hs` : *(int)*
            determines the number of overlapping samples between two adjacent
            read windows. For a `hop_size` of value *N*, the overlap is
            `block_size` - *N*. Default : `hop_size` = `block_size`, means that
            there is no overlap.

        :Returns:

        An AudioDataSource object that has the desired features.

        :Exampels:

        1. **Create an AudioDataSource that reads data from the microphone
        (requires Pyaudio) with default audio parameters:**

        .. code:: python

            from auditok import ADSFactory
            ads = ADSFactory.ads()
            ads.get_sampling_rate()
            16000
            ads.get_sample_width()
            2
            ads.get_channels()
            1

        2. **Create an AudioDataSource that reads data from the microphone with
        a sampling rate of 48KHz:**

        .. code:: python

            from auditok import ADSFactory
            ads = ADSFactory.ads(sr=48000)
            ads.get_sampling_rate()
            48000

        3. **Create an AudioDataSource that reads data from a wave file:**

        .. code:: python

            from auditok import ADSFactory
            from auditok import dataset
            file = dataset.was_der_mensch_saet_mono_44100_lead_trail_silence
            ads = ADSFactory.ads(fn=file)
            ads.get_sampling_rate()
            44100
            ads.get_sample_width()
            2
            ads.get_channels()
            1

        4. **Define size of read blocks as 20 ms**

        .. code:: python

            from auditok import ADSFactory
            from auditok import dataset
            file = dataset.was_der_mensch_saet_mono_44100_lead_trail_silence
            #we know samling rate for previous file is 44100 samples/second
            #so 10 ms are equivalent to 441 samples and 20 ms to 882
            block_size = 882
            ads = ADSFactory.ads(bs=882, fn=file)
            ads.open()
            # read one block
            data = ads.read()
            ads.close()
            len(data)
            1764
            assert len(data) ==  ads.get_sample_width() * block_size

        5. **Define block size as a duration (use block_dur or bd):**

        .. code:: python

            from auditok import ADSFactory
            from auditok import dataset
            file = dataset.was_der_mensch_saet_mono_44100_lead_trail_silence
            dur = 0.25 # second
            ads = ADSFactory.ads(bd=dur, fn=file)

            # we know samling rate for previous file is 44100 samples/second
            # for a block duration of 250 ms, block size should be
            # 0.25 * 44100 = 11025
            ads.get_block_size()
            11025
            assert ads.get_block_size() ==  int(0.25 * 44100)
            ads.open()
            # read one block
            data = ads.read()
            ads.close()
            len(data)
            22050
            assert len(data) ==  ads.get_sample_width() * ads.get_block_size()

        6. **Read overlapping blocks (when one of hope_size, hs, hop_dur or hd
            is > 0):**

        For a better readability we'd use :class:`auditok.io.BufferAudioSource`
        with a string buffer:

        .. code:: python

            from auditok import ADSFactory
            '''
            we supply a data beffer instead of a file (keyword 'bata_buffer' or
            'db')
            sr : sampling rate = 16 samples/sec
            sw : sample width = 1 byte
            ch : channels = 1
            '''
            buffer = "abcdefghijklmnop" # 16 bytes = 1 second of data
            bd = 0.250 # block duration = 250 ms = 4 bytes
            hd = 0.125 # hop duration = 125 ms = 2 bytes
            ads = ADSFactory.ads(db="abcdefghijklmnop",
                                 bd=bd,
                                 hd=hd,
                                 sr=16,
                                 sw=1,
                                 ch=1)
            ads.open()
            ads.read()
            'abcd'
            ads.read()
            'cdef'
            ads.read()
            'efgh'
            ads.read()
            'ghij'
            data = ads.read()
            assert data == 'ijkl'

        7. **Limit amount of read data (use max_time or mt):**

        .. code:: python

            '''
            We know audio file is larger than 2.25 seconds
            We want to read up to 2.25 seconds of audio data
            '''
            from auditok import dataset
            from auditok import ADSFactory
            file = dataset.was_der_mensch_saet_mono_44100_lead_trail_silence
            ads = ADSFactory.ads(mt=2.25, fn=file)
            ads.open()
            data = []
            while True:
                d = ads.read()
                if d is None:
                    break
                data.append(d)

            ads.close()
            data = b''.join(data)
            assert len(data) == int(ads.get_sampling_rate() *
                                 2.25 * ads.get_sample_width() *
                                 ads.get_channels())
        """
        warnings.warn(
            "'ADSFactory' is deprecated and will be removed in a future "
            "release. Please use AudioReader(...) instead.",
            DeprecationWarning,
        )

        # check and normalize keyword arguments
        ADSFactory._check_normalize_args(kwargs)

        block_dur = kwargs.pop("bd")
        hop_dur = kwargs.pop("hd")
        block_size = kwargs.pop("bs")
        hop_size = kwargs.pop("hs")
        max_time = kwargs.pop("mt")
        audio_source = kwargs.pop("asrc")
        filename = kwargs.pop("fn")
        data_buffer = kwargs.pop("db")
        record = kwargs.pop("rec")

        # Case 1: an audio source is supplied
        if audio_source is not None:
            if (filename, data_buffer) != (None, None):
                raise Warning(
                    "You should provide one of 'audio_source', 'filename' or \
                    'data_buffer' keyword parameters. 'audio_source' will be \
                    used"
                )

        # Case 2: a file name is supplied
        elif filename is not None:
            if data_buffer is not None:
                raise Warning(
                    "You should provide one of 'filename' or 'data_buffer'\
                 keyword parameters. 'filename' will be used"
                )
            audio_source = from_file(filename)

        # Case 3: a data_buffer is supplied
        elif data_buffer is not None:
            audio_source = BufferAudioSource(data=data_buffer, **kwargs)

        # Case 4: try to access native audio input
        else:
            audio_source = PyAudioSource(**kwargs)

        if block_dur is not None:
            if block_size is not None:
                raise DuplicateArgument(
                    "Either 'block_dur' or 'block_size' can be specified, not \
                    both"
                )
        elif block_size is not None:
            block_dur = block_size / audio_source.sr
        else:
            block_dur = 0.01  # 10 ms

        # Read overlapping blocks of data
        if hop_dur is not None:
            if hop_size is not None:
                raise DuplicateArgument(
                    "Either 'hop_dur' or 'hop_size' can be specified, not both"
                )
        elif hop_size is not None:
            hop_dur = hop_size / audio_source.sr

        ads = AudioDataSource(
            audio_source,
            block_dur=block_dur,
            hop_dur=hop_dur,
            record=record,
            max_read=max_time,
        )
        return ads


class _AudioSourceProxy:
    def __init__(self, audio_source):

        self._audio_source = audio_source

    def rewind(self):
        if self.rewindable:
            self._audio_source.rewind()
        else:
            raise AudioIOError("Audio stream is not rewindable")

    def rewindable(self):
        try:
            return self._audio_source.rewindable
        except AttributeError:
            return False

    def is_open(self):
        return self._audio_source.is_open()

    def open(self):
        self._audio_source.open()

    def close(self):
        self._audio_source.close()

    def read(self, size):
        return self._audio_source.read(size)

    @property
    def data(self):
        err_msg = "AudioDataSource is not a recorder, no recorded data can "
        err_msg += "be retrieved"
        raise AttributeError(err_msg)

    def __getattr__(self, name):
        return getattr(self._audio_source, name)


class _Recorder(_AudioSourceProxy):
    """
    A class for AudioSource objects that can record all audio data they read,
    with a rewind facility.
    """

    def __init__(self, audio_source):
        super(_Recorder, self).__init__(audio_source)
        self._cache = []
        self._read_block = self._read_and_cache
        self._read_from_cache = False
        self._data = None

    def read(self, size):
        return self._read_block(size)

    @property
    def data(self):
        if self._data is None:
            err_msg = "Unrewinded recorder. Call rewind before accessing "
            err_msg += "recorded data"
            raise RuntimeError(err_msg)
        return self._data

    def rewindable(self):
        return True

    def rewind(self):
        if self._read_from_cache:
            self._audio_source.rewind()
        else:
            self._data = b"".join(self._cache)
            self._cache = None
            self._audio_source = BufferAudioSource(
                self._data, self.sr, self.sw, self.ch
            )
            self._read_block = self._audio_source.read
            self.open()
            self._read_from_cache = True

    def _read_and_cache(self, size):
        # Read and save read data
        block = self._audio_source.read(size)
        if block is not None:
            self._cache.append(block)
        return block


class _Limiter(_AudioSourceProxy):
    """
    A class for AudioDataSource objects that can read a fixed amount of data.
    This can be useful when reading data from the microphone or from large
    audio files.
    """

    def __init__(self, audio_source, max_read):
        super(_Limiter, self).__init__(audio_source)
        self._max_read = max_read
        self._max_samples = round(max_read * self.sr)
        self._bytes_per_sample = self.sw * self.ch
        self._read_samples = 0

    @property
    def data(self):
        data = self._audio_source.data
        max_read_bytes = self._max_samples * self._bytes_per_sample
        return data[:max_read_bytes]

    @property
    def max_read(self):
        return self._max_read

    def read(self, size):
        size = min(self._max_samples - self._read_samples, size)
        if size <= 0:
            return None
        block = self._audio_source.read(size)
        if block is None:
            return None
        self._read_samples += len(block) // self._bytes_per_sample
        return block

    def rewind(self):
        super(_Limiter, self).rewind()
        self._read_samples = 0


class _FixedSizeAudioReader(_AudioSourceProxy):
    def __init__(self, audio_source, block_dur):
        super(_FixedSizeAudioReader, self).__init__(audio_source)

        if block_dur <= 0:
            raise ValueError(
                "block_dur must be > 0, given: {}".format(block_dur)
            )

        self._block_size = int(block_dur * self.sr)
        if self._block_size == 0:
            err_msg = "Too small block_dur ({0:f}) for sampling rate ({1}). "
            err_msg += "block_dur should cover at least one sample "
            err_msg += "(i.e. 1/{1})"
            raise TooSamllBlockDuration(
                err_msg.format(block_dur, self.sr), block_dur, self.sr
            )

    def read(self):
        return self._audio_source.read(self._block_size)

    @property
    def block_size(self):
        return self._block_size

    @property
    def block_dur(self):
        return self._block_size / self.sr

    def __getattr__(self, name):
        return getattr(self._audio_source, name)


class _OverlapAudioReader(_FixedSizeAudioReader):
    """
    A class for AudioDataSource objects that can read and return overlapping
    audio frames
    """

    def __init__(self, audio_source, block_dur, hop_dur):

        if hop_dur >= block_dur:
            raise ValueError('"hop_dur" should be < "block_dur"')

        super(_OverlapAudioReader, self).__init__(audio_source, block_dur)

        self._hop_size = int(hop_dur * self.sr)
        self._blocks = self._iter_blocks_with_overlap()

    def _iter_blocks_with_overlap(self):
        while not self.is_open():
            yield AudioIOError
        block = self._audio_source.read(self._block_size)
        if block is None:
            yield None

        _hop_size_bytes = (
            self._hop_size * self._audio_source.sw * self._audio_source.ch
        )
        cache = block[_hop_size_bytes:]
        yield block

        while True:
            block = self._audio_source.read(self._hop_size)
            if block:
                block = cache + block
                cache = block[_hop_size_bytes:]
                yield block
                continue
            yield None

    def read(self):
        try:
            block = next(self._blocks)
            if block == AudioIOError:
                raise AudioIOError("Audio Stream is not open.")
            return block
        except StopIteration:
            return None

    def rewind(self):
        super(_OverlapAudioReader, self).rewind()
        self._blocks = self._iter_blocks_with_overlap()

    @property
    def hop_size(self):
        return self._hop_size

    @property
    def hop_dur(self):
        return self._hop_size / self.sr

    def __getattr__(self, name):
        return getattr(self._audio_source, name)


class AudioReader(DataSource):
    """
    Base class for AudioReader objects.
    It inherits from DataSource and encapsulates an AudioSource object.
    """

    def __init__(
        self,
        input,
        block_dur=0.01,
        hop_dur=None,
        record=False,
        max_read=None,
        **kwargs
    ):
        if not isinstance(input, AudioSource):
            input = get_audio_source(input, **kwargs)
        self._record = record
        if record:
            input = _Recorder(input)
        if max_read is not None:
            input = _Limiter(input, max_read)
            self._max_read = max_read
        if hop_dur is not None:
            input = _OverlapAudioReader(input, block_dur, hop_dur)
        else:
            input = _FixedSizeAudioReader(input, block_dur)
        self._audio_source = input

    def __repr__(self):
        block_dur, hop_dur, max_read = None, None, None
        if self.block_dur is not None:
            block_dur = "{:.3f}".format(self.block_dur)
        if self.hop_dur is not None:
            hop_dur = "{:.3f}".format(self.hop_dur)
        if self.max_read is not None:
            max_read = "{:.3f}".format(self.max_read)
        return (
            "{cls}(block_dur={block_dur}, "
            "hop_dur={hop_dur}, record={rewindable}, "
            "max_read={max_read})"
        ).format(
            cls=self.__class__.__name__,
            block_dur=block_dur,
            hop_dur=hop_dur,
            rewindable=self._record,
            max_read=max_read,
        )

    @property
    def rewindable(self):
        return self._record

    @property
    def block_dur(self):
        return self._audio_source.block_size / self._audio_source.sr

    @property
    def hop_dur(self):
        if hasattr(self._audio_source, "hop_dur"):
            return self._audio_source.hop_size / self._audio_source.sr
        return self.block_dur

    @property
    def hop_size(self):
        if hasattr(self._audio_source, "hop_size"):
            return self._audio_source.hop_size
        return self.block_size

    @property
    def max_read(self):
        try:
            return self._audio_source.max_read
        except AttributeError:
            return None

    def read(self):
        return self._audio_source.read()

    def __getattr__(self, name):
        if name in ("data", "rewind") and not self.rewindable:
            raise AttributeError(
                "'AudioReader' has no attribute '{}'".format(name)
            )
        try:
            return getattr(self._audio_source, name)
        except AttributeError:
            raise AttributeError(
                "'AudioReader' has no attribute '{}'".format(name)
            )


# Keep AudioDataSource for compatibility
# Remove in a future version when ADSFactory is dropped
AudioDataSource = AudioReader


class Recorder(AudioReader):
    def __init__(
        self, input, block_dur=0.01, hop_dur=None, max_read=None, **kwargs
    ):
        super().__init__(
            input,
            block_dur=block_dur,
            hop_dur=hop_dur,
            record=True,
            max_read=max_read,
            **kwargs
        )