view auditok/util.py @ 235:bb9252b56d7c

Add arguments to TooSamllBlockDuration exception
author Amine Sehili <amine.sehili@gmail.com>
date Fri, 19 Jul 2019 23:16:00 +0100
parents 8b3c11cad8d8
children 79b668c48fce
line wrap: on
line source
"""
Class summary
=============

.. autosummary::

        DataSource
        StringDataSource
        ADSFactory
        ADSFactory.AudioDataSource
        ADSFactory.ADSDecorator
        ADSFactory.OverlapADS
        ADSFactory.LimiterADS
        ADSFactory.RecorderADS
        DataValidator
        AudioEnergyValidator

"""
from __future__ import division
from abc import ABCMeta, abstractmethod
import math
from array import array
from .io import (
    AudioIOError,
    AudioSource,
    from_file,
    BufferAudioSource,
    PyAudioSource,
    get_audio_source,
)
from .exceptions import DuplicateArgument, TooSamllBlockDuration
import sys

try:
    import numpy

    _WITH_NUMPY = True
except ImportError as e:
    _WITH_NUMPY = False

try:
    from builtins import str

    basestring = str
except ImportError as e:
    if sys.version_info >= (3, 0):
        basestring = str

__all__ = [
    "DataSource",
    "DataValidator",
    "StringDataSource",
    "ADSFactory",
    "AudioDataSource",
    "AudioEnergyValidator",
]


class DataSource:
    """
    Base class for objects passed to :func:`auditok.core.StreamTokenizer.tokenize`.
    Subclasses should implement a :func:`DataSource.read` method.
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def read(self):
        """
        Read a piece of data read from this source.
        If no more data is available, return None.
        """


class DataValidator:
    """
    Base class for a validator object used by :class:`.core.StreamTokenizer` to check
    if read data is valid.
    Subclasses should implement :func:`is_valid` method.
    """

    __metaclass__ = ABCMeta

    @abstractmethod
    def is_valid(self, data):
        """
        Check whether `data` is valid
        """


class StringDataSource(DataSource):
    """
    A class that represent a :class:`DataSource` as a string buffer.
    Each call to :func:`DataSource.read` returns on character and moves one step forward.
    If the end of the buffer is reached, :func:`read` returns None.

    :Parameters:

        `data` :
            a basestring object.

    """

    def __init__(self, data):

        self._data = None
        self._current = 0
        self.set_data(data)

    def read(self):
        """
        Read one character from buffer.

        :Returns:

            Current character or None if end of buffer is reached
        """

        if self._current >= len(self._data):
            return None
        self._current += 1
        return self._data[self._current - 1]

    def set_data(self, data):
        """
        Set a new data buffer.

        :Parameters:

            `data` : a basestring object
                New data buffer.
        """

        if not isinstance(data, basestring):
            raise ValueError("data must an instance of basestring")
        self._data = data
        self._current = 0


class ADSFactory:
    """
    Factory class that makes it easy to create an :class:`ADSFactory.AudioDataSource` object that implements
    :class:`DataSource` and can therefore be passed to :func:`auditok.core.StreamTokenizer.tokenize`.

    Whether you read audio data from a file, the microphone or a memory buffer, this factory
    instantiates and returns the right :class:`ADSFactory.AudioDataSource` object.

    There are many other features you want your :class:`ADSFactory.AudioDataSource` object to have, such as:
    memorize all read audio data so that you can rewind and reuse it (especially useful when
    reading data from the microphone), read a fixed amount of data (also useful when reading
    from the microphone), read overlapping audio frames (often needed when dosing a spectral
    analysis of data).

    :func:`ADSFactory.ads` automatically creates and return object with the desired behavior according
    to the supplied keyword arguments.
    """

    @staticmethod
    def _check_normalize_args(kwargs):

        for k in kwargs:
            if not k in [
                "block_dur",
                "hop_dur",
                "block_size",
                "hop_size",
                "max_time",
                "record",
                "audio_source",
                "filename",
                "data_buffer",
                "frames_per_buffer",
                "sampling_rate",
                "sample_width",
                "channels",
                "sr",
                "sw",
                "ch",
                "asrc",
                "fn",
                "fpb",
                "db",
                "mt",
                "rec",
                "bd",
                "hd",
                "bs",
                "hs",
            ]:
                raise ValueError("Invalid argument: {0}".format(k))

        if "block_dur" in kwargs and "bd" in kwargs:
            raise DuplicateArgument(
                "Either 'block_dur' or 'bd' must be specified, not both"
            )

        if "hop_dur" in kwargs and "hd" in kwargs:
            raise DuplicateArgument(
                "Either 'hop_dur' or 'hd' must be specified, not both"
            )

        if "block_size" in kwargs and "bs" in kwargs:
            raise DuplicateArgument(
                "Either 'block_size' or 'bs' must be specified, not both"
            )

        if "hop_size" in kwargs and "hs" in kwargs:
            raise DuplicateArgument(
                "Either 'hop_size' or 'hs' must be specified, not both"
            )

        if "max_time" in kwargs and "mt" in kwargs:
            raise DuplicateArgument(
                "Either 'max_time' or 'mt' must be specified, not both"
            )

        if "audio_source" in kwargs and "asrc" in kwargs:
            raise DuplicateArgument(
                "Either 'audio_source' or 'asrc' must be specified, not both"
            )

        if "filename" in kwargs and "fn" in kwargs:
            raise DuplicateArgument(
                "Either 'filename' or 'fn' must be specified, not both"
            )

        if "data_buffer" in kwargs and "db" in kwargs:
            raise DuplicateArgument(
                "Either 'filename' or 'db' must be specified, not both"
            )

        if "frames_per_buffer" in kwargs and "fbb" in kwargs:
            raise DuplicateArgument(
                "Either 'frames_per_buffer' or 'fpb' must be specified, not both"
            )

        if "sampling_rate" in kwargs and "sr" in kwargs:
            raise DuplicateArgument(
                "Either 'sampling_rate' or 'sr' must be specified, not both"
            )

        if "sample_width" in kwargs and "sw" in kwargs:
            raise DuplicateArgument(
                "Either 'sample_width' or 'sw' must be specified, not both"
            )

        if "channels" in kwargs and "ch" in kwargs:
            raise DuplicateArgument(
                "Either 'channels' or 'ch' must be specified, not both"
            )

        if "record" in kwargs and "rec" in kwargs:
            raise DuplicateArgument(
                "Either 'record' or 'rec' must be specified, not both"
            )

        kwargs["bd"] = kwargs.pop("block_dur", None) or kwargs.pop("bd", None)
        kwargs["hd"] = kwargs.pop("hop_dur", None) or kwargs.pop("hd", None)
        kwargs["bs"] = kwargs.pop("block_size", None) or kwargs.pop("bs", None)
        kwargs["hs"] = kwargs.pop("hop_size", None) or kwargs.pop("hs", None)
        kwargs["mt"] = kwargs.pop("max_time", None) or kwargs.pop("mt", None)
        kwargs["asrc"] = kwargs.pop("audio_source", None) or kwargs.pop(
            "asrc", None
        )
        kwargs["fn"] = kwargs.pop("filename", None) or kwargs.pop("fn", None)
        kwargs["db"] = kwargs.pop("data_buffer", None) or kwargs.pop(
            "db", None
        )

        record = kwargs.pop("record", False)
        if not record:
            record = kwargs.pop("rec", False)
            if not isinstance(record, bool):
                raise TypeError("'record' must be a boolean")

        kwargs["rec"] = record

        # keep long names for arguments meant for BufferAudioSource and PyAudioSource
        if "frames_per_buffer" in kwargs or "fpb" in kwargs:
            kwargs["frames_per_buffer"] = kwargs.pop(
                "frames_per_buffer", None
            ) or kwargs.pop("fpb", None)

        if "sampling_rate" in kwargs or "sr" in kwargs:
            kwargs["sampling_rate"] = kwargs.pop(
                "sampling_rate", None
            ) or kwargs.pop("sr", None)

        if "sample_width" in kwargs or "sw" in kwargs:
            kwargs["sample_width"] = kwargs.pop(
                "sample_width", None
            ) or kwargs.pop("sw", None)

        if "channels" in kwargs or "ch" in kwargs:
            kwargs["channels"] = kwargs.pop("channels", None) or kwargs.pop(
                "ch", None
            )

    @staticmethod
    def ads(**kwargs):
        """
        Create an return an :class:`ADSFactory.AudioDataSource`. The type and behavior of the object is the result
        of the supplied parameters.

        :Parameters:

        *No parameters* :
           read audio data from the available built-in microphone with the default parameters.
           The returned :class:`ADSFactory.AudioDataSource` encapsulate an :class:`io.PyAudioSource` object and hence
           it accepts the next four parameters are passed to use instead of their default values.

        `sampling_rate`, `sr` : *(int)*
            number of samples per second. Default = 16000.

        `sample_width`, `sw` : *(int)*
            number of bytes per sample (must be in (1, 2, 4)). Default = 2

        `channels`, `ch` : *(int)*
            number of audio channels. Default = 1 (only this value is currently accepted)

        `frames_per_buffer`, `fpb` : *(int)*
            number of samples of PyAudio buffer. Default = 1024.

        `audio_source`, `asrc` : an `AudioSource` object
            read data from this audio source

        `filename`, `fn` : *(string)*
            build an `io.AudioSource` object using this file (currently only wave format is supported)

        `data_buffer`, `db` : *(string)*
            build an `io.BufferAudioSource` using data in `data_buffer`. If this keyword is used,
            `sampling_rate`, `sample_width` and `channels` are passed to `io.BufferAudioSource`
            constructor and used instead of default values.

        `max_time`, `mt` : *(float)*
            maximum time (in seconds) to read. Default behavior: read until there is no more data
            available.

        `record`, `rec` : *(bool)*
            save all read data in cache. Provide a navigable object which boasts a `rewind` method.
            Default = False.

        `block_dur`, `bd` : *(float)*
            processing block duration in seconds. This represents the quantity of audio data to return
            each time the :func:`read` method is invoked. If `block_dur` is 0.025 (i.e. 25 ms) and the sampling
            rate is 8000 and the sample width is 2 bytes, :func:`read` returns a buffer of 0.025 * 8000 * 2 = 400
            bytes at most. This parameter will be looked for (and used if available) before `block_size`.
            If neither parameter is given, `block_dur` will be set to 0.01 second (i.e. 10 ms)

        `hop_dur`, `hd` : *(float)*
            quantity of data to skip from current processing window. if `hop_dur` is supplied then there
            will be an overlap of `block_dur` - `hop_dur` between two adjacent blocks. This
            parameter will be looked for (and used if available) before `hop_size`. If neither parameter
            is given, `hop_dur` will be set to `block_dur` which means that there will be no overlap
            between two consecutively read blocks.

        `block_size`, `bs` : *(int)*
            number of samples to read each time the `read` method is called. Default: a block size
            that represents a window of 10ms, so for a sampling rate of 16000, the default `block_size`
            is 160 samples, for a rate of 44100, `block_size` = 441 samples, etc.

        `hop_size`, `hs` : *(int)*
            determines the number of overlapping samples between two adjacent read windows. For a
            `hop_size` of value *N*, the overlap is `block_size` - *N*. Default : `hop_size` = `block_size`,
            means that there is no overlap.

        :Returns:

        An AudioDataSource object that has the desired features.

        :Exampels:

        1. **Create an AudioDataSource that reads data from the microphone (requires Pyaudio) with default audio parameters:**

        .. code:: python

            from auditok import ADSFactory
            ads = ADSFactory.ads()
            ads.get_sampling_rate()
            16000
            ads.get_sample_width()
            2
            ads.get_channels()
            1

        2. **Create an AudioDataSource that reads data from the microphone with a sampling rate of 48KHz:**

        .. code:: python

            from auditok import ADSFactory
            ads = ADSFactory.ads(sr=48000)
            ads.get_sampling_rate()
            48000

        3. **Create an AudioDataSource that reads data from a wave file:**

        .. code:: python

            import auditok
            from auditok import ADSFactory
            ads = ADSFactory.ads(fn=auditok.dataset.was_der_mensch_saet_mono_44100_lead_trail_silence)
            ads.get_sampling_rate()
            44100
            ads.get_sample_width()
            2
            ads.get_channels()
            1

        4. **Define size of read blocks as 20 ms**

        .. code:: python

            import auditok
            from auditok import ADSFactory
            '''
            we know samling rate for previous file is 44100 samples/second
            so 10 ms are equivalent to 441 samples and 20 ms to 882
            '''
            block_size = 882
            ads = ADSFactory.ads(bs = 882, fn=auditok.dataset.was_der_mensch_saet_mono_44100_lead_trail_silence)
            ads.open()
            # read one block
            data = ads.read()
            ads.close()
            len(data)
            1764
            assert len(data) ==  ads.get_sample_width() * block_size

        5. **Define block size as a duration (use block_dur or bd):**

        .. code:: python

            import auditok
            from auditok import ADSFactory
            dur = 0.25 # second
            ads = ADSFactory.ads(bd = dur, fn=auditok.dataset.was_der_mensch_saet_mono_44100_lead_trail_silence)
            '''
            we know samling rate for previous file is 44100 samples/second
            for a block duration of 250 ms, block size should be 0.25 * 44100 = 11025
            '''
            ads.get_block_size()
            11025
            assert ads.get_block_size() ==  int(0.25 * 44100)
            ads.open()
            # read one block
            data = ads.read()
            ads.close()
            len(data)
            22050
            assert len(data) ==  ads.get_sample_width() * ads.get_block_size()

        6. **Read overlapping blocks (one of hope_size, hs, hop_dur or hd > 0):**

        For better readability we'd better use :class:`auditok.io.BufferAudioSource` with a string buffer:

        .. code:: python

            import auditok
            from auditok import ADSFactory
            '''
            we supply a data beffer instead of a file (keyword 'bata_buffer' or 'db')
            sr : sampling rate = 16 samples/sec
            sw : sample width = 1 byte
            ch : channels = 1
            '''
            buffer = "abcdefghijklmnop" # 16 bytes = 1 second of data
            bd = 0.250 # block duration = 250 ms = 4 bytes
            hd = 0.125 # hop duration = 125 ms = 2 bytes
            ads = ADSFactory.ads(db = "abcdefghijklmnop", bd = bd, hd = hd, sr = 16, sw = 1, ch = 1)
            ads.open()
            ads.read()
            'abcd'
            ads.read()
            'cdef'
            ads.read()
            'efgh'
            ads.read()
            'ghij'
            data = ads.read()
            assert data == 'ijkl'

        7. **Limit amount of read data (use max_time or mt):**

        .. code:: python

            '''
            We know audio file is larger than 2.25 seconds
            We want to read up to 2.25 seconds of audio data
            '''
            ads = ADSFactory.ads(mt = 2.25, fn=auditok.dataset.was_der_mensch_saet_mono_44100_lead_trail_silence)
            ads.open()
            data = []
            while True:
                d = ads.read()
                if d is None:
                    break
                data.append(d)

            ads.close()
            data = b''.join(data)
            assert len(data) == int(ads.get_sampling_rate() * 2.25 * ads.get_sample_width() * ads.get_channels())
        """

        # copy user's dicionary (shallow copy)
        kwargs = kwargs.copy()

        # check and normalize keyword arguments
        ADSFactory._check_normalize_args(kwargs)

        block_dur = kwargs.pop("bd")
        hop_dur = kwargs.pop("hd")
        block_size = kwargs.pop("bs")
        hop_size = kwargs.pop("hs")
        max_time = kwargs.pop("mt")
        audio_source = kwargs.pop("asrc")
        filename = kwargs.pop("fn")
        data_buffer = kwargs.pop("db")
        record = kwargs.pop("rec")

        # Case 1: an audio source is supplied
        if audio_source is not None:
            if (filename, data_buffer) != (None, None):
                raise Warning(
                    "You should provide one of 'audio_source', 'filename' or 'data_buffer'\
                 keyword parameters. 'audio_source' will be used"
                )

        # Case 2: a file name is supplied
        elif filename is not None:
            if data_buffer is not None:
                raise Warning(
                    "You should provide one of 'filename' or 'data_buffer'\
                 keyword parameters. 'filename' will be used"
                )
            audio_source = from_file(filename)

        # Case 3: a data_buffer is supplied
        elif data_buffer is not None:
            audio_source = BufferAudioSource(data_buffer=data_buffer, **kwargs)

        # Case 4: try to access native audio input
        else:
            audio_source = PyAudioSource(**kwargs)

        if block_dur is not None:
            if block_size is not None:
                raise DuplicateArgument(
                    "Either 'block_dur' or 'block_size' can be specified, not both"
                )
        elif block_size is not None:
            block_dur = block_size / audio_source.sr
        else:
            block_dur = 0.01  # 10 ms

        # Read overlapping blocks of data
        if hop_dur is not None:
            if hop_size is not None:
                raise DuplicateArgument(
                    "Either 'hop_dur' or 'hop_size' can be specified, not both"
                )
        elif hop_size is not None:
            hop_dur = hop_size / audio_source.sr

        ads = AudioDataSource(
            audio_source,
            block_dur=block_dur,
            hop_dur=hop_dur,
            record=record,
            max_read=max_time,
        )
        return ads


class _AudioSourceProxy:
    def __init__(self, audio_source):

        self._audio_source = audio_source

    def rewind(self):
        if self._audio_source.rewindable:
            self._audio_source.rewind()
        else:
            raise AudioIOError("Audio stream is not rewindable")

    def is_open(self):
        return self._audio_source.is_open()

    def open(self):
        self._audio_source.open()

    def close(self):
        self._audio_source.close()

    def read(self, size):
        return self._audio_source.read(size)

    @property
    def data(self):
        raise AttributeError(
            "AudioDataSource is not a recorder, no recorded data can be accessed"
        )

    def __getattr__(self, name):
        return getattr(self._audio_source, name)


class _Recorder(_AudioSourceProxy):
    """
    A class for AudioSource objects that can record all audio data they read,
    with a rewind facility.
    """

    def __init__(self, audio_source):
        super(_Recorder, self).__init__(audio_source)
        self._cache = []
        self._read_block = self._read_and_cache
        self._data = None

    def read(self, size):
        return self._read_block(size)

    @property
    def data(self):
        if self._data is None:
            raise RuntimeError(
                "Unrewinded recorder. Call rewind before accessing recorded data"
            )
        return self._data

    def rewind(self):
        if self._cache:
            self._data = self._concatenate(self._cache)
            self._cache = None
            self._audio_source = BufferAudioSource(
                self._data, self.sr, self.sw, self.ch
            )
            self._read_block = self._audio_source.read
            self.open()

    def _read_and_cache(self, size):
        # Read and save read data
        block = self._audio_source.read(size)
        if block is not None:
            self._cache.append(block)
        return block

    def _concatenate(self, data):
        try:
            # should always work for python 2
            # work for python 3 ONLY if data is a list (or an iterator)
            # whose each element is a 'bytes' objects
            data = b"".join(data)
            return data
        except TypeError:
            # work for 'str' in python 2 and python 3
            return "".join(data)


class _Limiter(_AudioSourceProxy):
    """
    A class for AudioDataSource objects that can read a fixed amount of data.
    This can be useful when reading data from the microphone or from large audio files.
    """

    def __init__(self, audio_source, max_read):
        super(_Limiter, self).__init__(audio_source)
        self._max_read = max_read
        self._max_samples = round(max_read * self.sr)
        self._read_samples = 0

    @property
    def data(self):
        data = self._audio_source.data
        max_read_bytes = self._max_samples * self.sw * self.ch
        return data[:max_read_bytes]

    @property
    def max_read(self):
        return self._max_read

    def read(self, size):
        size = min(self._max_samples - self._read_samples, size)
        if size <= 0:
            return None

        block = self._audio_source.read(size)
        if block is None:
            return None

        self._read_samples += len(block) // self._audio_source.sw
        return block

    def rewind(self):
        super(_Limiter, self).rewind()
        self._read_samples = 0


class _FixedSizeAudioReader(_AudioSourceProxy):
    def __init__(self, audio_source, block_dur):
        super(_FixedSizeAudioReader, self).__init__(audio_source)

        if block_dur <= 0:
            raise ValueError(
                "block_dur must be > 0, given: {}".format(block_dur)
            )

        self._block_size = int(block_dur * self.sr)
        if self._block_size == 0:
            err_msg = "Too small block_dur ({0:f}) for sampling rate ({1}). "
            err_msg += "block_dur should cover at least one sample "
            err_msg += "(i.e. 1/{1})"
            raise TooSamllBlockDuration(
                err_msg.format(block_dur, self.sr), block_dur, self.sr
            )

    def read(self):
        return self._audio_source.read(self._block_size)

    @property
    def block_size(self):
        return self._block_size

    @property
    def block_dur(self):
        return self._block_size / self.sr

    def __getattr__(self, name):
        return getattr(self._audio_source, name)


class _OverlapAudioReader(_FixedSizeAudioReader):
    """
    A class for AudioDataSource objects that can read and return overlapping
    audio frames
    """

    def __init__(self, audio_source, block_dur, hop_dur):

        if hop_dur >= block_dur:
            raise ValueError('"hop_dur" should be < "block_dur"')

        super(_OverlapAudioReader, self).__init__(audio_source, block_dur)

        self._hop_size = int(hop_dur * self.sr)
        self._blocks = self._iter_blocks_with_overlap()

    def _iter_blocks_with_overlap(self):
        while not self.is_open():
            yield AudioIOError
        block = self._audio_source.read(self._block_size)
        if block is None:
            yield None

        _hop_size_bytes = (
            self._hop_size * self._audio_source.sw * self._audio_source.ch
        )
        cache = block[_hop_size_bytes:]
        yield block

        while True:
            block = self._audio_source.read(self._hop_size)
            if block:
                block = cache + block
                cache = block[_hop_size_bytes:]
                yield block
                continue
            yield None

    def read(self):
        try:
            block = next(self._blocks)
            if block == AudioIOError:
                raise AudioIOError("Audio Stream is not open.")
            return block
        except StopIteration:
            return None

    def rewind(self):
        super(_OverlapAudioReader, self).rewind()
        self._blocks = self._iter_blocks_with_overlap()

    @property
    def hop_size(self):
        return self._hop_size

    @property
    def hop_dur(self):
        return self._hop_size / self.sr

    def __getattr__(self, name):
        return getattr(self._audio_source, name)


class AudioDataSource(DataSource):
    """
    Base class for AudioDataSource objects.
    It inherits from DataSource and encapsulates an AudioSource object.
    """

    def __init__(
        self,
        source,
        block_dur=0.01,
        hop_dur=None,
        record=False,
        max_read=None,
        **kwargs
    ):

        if not isinstance(source, AudioSource):
            source = get_audio_source(source, **kwargs)
        self._record = record
        if record:
            source = _Recorder(source)
        if max_read is not None:
            source = _Limiter(source, max_read)
            self._max_read = max_read
        if hop_dur is not None:
            source = _OverlapAudioReader(source, block_dur, hop_dur)
        else:
            source = _FixedSizeAudioReader(source, block_dur)
        self._audio_source = source

    def __repr__(self):
        block_dur, hop_dur, max_read = None, None, None
        if self.block_dur is not None:
            block_dur = "{:.3f}".format(self.block_dur)
        if self.hop_dur is not None:
            hop_dur = "{:.3f}".format(self.hop_dur)
        if self.max_read is not None:
            max_read = "{:.3f}".format(self.max_read)
        return (
            "AudioDataSource(source, block_dur={block_dur}, "
            "hop_dur={hop_dur}, record={rewindable}, "
            "max_read={max_read})"
        ).format(
            block_dur=block_dur,
            hop_dur=hop_dur,
            rewindable=self._record,
            max_read=max_read,
        )

    @property
    def rewindable(self):
        return self._record

    @property
    def block_dur(self):
        return self._audio_source.block_size / self._audio_source.sr

    @property
    def hop_dur(self):
        if hasattr(self._audio_source, "hop_dur"):
            return self._audio_source.hop_size / self._audio_source.sr
        return self.block_dur

    @property
    def hop_size(self):
        if hasattr(self._audio_source, "hop_size"):
            return self._audio_source.hop_size
        return self.block_size

    @property
    def max_read(self):
        try:
            return self._audio_source.max_read
        except AttributeError:
            return None

    def read(self):
        return self._audio_source.read()

    def __getattr__(self, name):
        if name in ("data", "rewind") and not self.rewindable:
            raise AttributeError(
                "'AudioDataSource' has no attribute '{}'".format(name)
            )
        try:
            return getattr(self._audio_source, name)
        except AttributeError:
            raise AttributeError(
                "'AudioDataSource' has no attribute '{}'".format(name)
            )


class AudioEnergyValidator(DataValidator):
    """
    The most basic auditok audio frame validator.
    This validator computes the log energy of an input audio frame
    and return True if the result is >= a given threshold, False
    otherwise.

    :Parameters:

    `sample_width` : *(int)*
        Number of bytes of one audio sample. This is used to convert data from `basestring` or `Bytes` to
        an array of floats.

    `energy_threshold` : *(float)*
        A threshold used to check whether an input data buffer is valid.
    """

    if _WITH_NUMPY:
        _formats = {1: numpy.int8, 2: numpy.int16, 4: numpy.int32}

        @staticmethod
        def _convert(signal, sample_width):
            return numpy.array(
                numpy.frombuffer(
                    signal, dtype=AudioEnergyValidator._formats[sample_width]
                ),
                dtype=numpy.float64,
            )

        @staticmethod
        def _signal_energy(signal):
            return float(numpy.dot(signal, signal)) / len(signal)

        @staticmethod
        def _signal_log_energy(signal):
            energy = AudioEnergyValidator._signal_energy(signal)
            if energy <= 0:
                return -200
            return 10.0 * numpy.log10(energy)

    else:
        _formats = {1: "b", 2: "h", 4: "i"}

        @staticmethod
        def _convert(signal, sample_width):
            return array(
                "d", array(AudioEnergyValidator._formats[sample_width], signal)
            )

        @staticmethod
        def _signal_energy(signal):
            energy = 0.0
            for a in signal:
                energy += a * a
            return energy / len(signal)

        @staticmethod
        def _signal_log_energy(signal):
            energy = AudioEnergyValidator._signal_energy(signal)
            if energy <= 0:
                return -200
            return 10.0 * math.log10(energy)

    def __init__(self, sample_width, energy_threshold=45):
        self.sample_width = sample_width
        self._energy_threshold = energy_threshold

    def is_valid(self, data):
        """
        Check if data is valid. Audio data will be converted into an array (of
        signed values) of which the log energy is computed. Log energy is computed
        as follows:

        .. code:: python

            arr = AudioEnergyValidator._convert(signal, sample_width)
            energy = float(numpy.dot(arr, arr)) / len(arr)
            log_energy = 10. * numpy.log10(energy)


        :Parameters:

        `data` : either a *string* or a *Bytes* buffer
            `data` is converted into a numerical array using the `sample_width`
            given in the constructor.

        :Returns:

        True if `log_energy` >= `energy_threshold`, False otherwise.
        """

        signal = AudioEnergyValidator._convert(data, self.sample_width)
        return (
            AudioEnergyValidator._signal_log_energy(signal)
            >= self._energy_threshold
        )

    def get_energy_threshold(self):
        return self._energy_threshold

    def set_energy_threshold(self, threshold):
        self._energy_threshold = threshold