Mercurial > hg > auditok
changeset 171:a486c424fdff
Refactor AudioDataSource
author | Amine Sehili <amine.sehili@gmail.com> |
---|---|
date | Sat, 09 Mar 2019 17:25:08 +0100 |
parents | 684392cc5019 |
children | e526cfd6a056 |
files | auditok/util.py |
diffstat | 1 files changed, 415 insertions(+), 284 deletions(-) [+] |
line wrap: on
line diff
--- a/auditok/util.py Thu Mar 07 21:28:23 2019 +0100 +++ b/auditok/util.py Sat Mar 09 17:25:08 2019 +0100 @@ -16,35 +16,52 @@ AudioEnergyValidator """ - +from __future__ import division from abc import ABCMeta, abstractmethod import math from array import array -from .io import Rewindable, from_file, BufferAudioSource, PyAudioSource +from .io import ( + AudioIOError, + AudioSource, + from_file, + BufferAudioSource, + PyAudioSource, + get_audio_source, +) from .exceptions import DuplicateArgument import sys try: import numpy + _WITH_NUMPY = True except ImportError as e: _WITH_NUMPY = False try: from builtins import str + basestring = str except ImportError as e: if sys.version_info >= (3, 0): basestring = str -__all__ = ["DataSource", "DataValidator", "StringDataSource", "ADSFactory", "AudioEnergyValidator"] +__all__ = [ + "DataSource", + "DataValidator", + "StringDataSource", + "ADSFactory", + "AudioDataSource", + "AudioEnergyValidator", +] -class DataSource(): +class DataSource: """ Base class for objects passed to :func:`auditok.core.StreamTokenizer.tokenize`. Subclasses should implement a :func:`DataSource.read` method. """ + __metaclass__ = ABCMeta @abstractmethod @@ -55,12 +72,13 @@ """ -class DataValidator(): +class DataValidator: """ Base class for a validator object used by :class:`.core.StreamTokenizer` to check if read data is valid. Subclasses should implement :func:`is_valid` method. """ + __metaclass__ = ABCMeta @abstractmethod @@ -78,7 +96,7 @@ :Parameters: - `data` : + `data` : a basestring object. """ @@ -109,7 +127,7 @@ :Parameters: - `data` : a basestring object + `data` : a basestring object New data buffer. """ @@ -127,73 +145,127 @@ Whether you read audio data from a file, the microphone or a memory buffer, this factory instantiates and returns the right :class:`ADSFactory.AudioDataSource` object. - There are many other features you want your :class:`ADSFactory.AudioDataSource` object to have, such as: - memorize all read audio data so that you can rewind and reuse it (especially useful when - reading data from the microphone), read a fixed amount of data (also useful when reading + There are many other features you want your :class:`ADSFactory.AudioDataSource` object to have, such as: + memorize all read audio data so that you can rewind and reuse it (especially useful when + reading data from the microphone), read a fixed amount of data (also useful when reading from the microphone), read overlapping audio frames (often needed when dosing a spectral analysis of data). :func:`ADSFactory.ads` automatically creates and return object with the desired behavior according - to the supplied keyword arguments. + to the supplied keyword arguments. """ @staticmethod def _check_normalize_args(kwargs): for k in kwargs: - if not k in ["block_dur", "hop_dur", "block_size", "hop_size", "max_time", "record", - "audio_source", "filename", "data_buffer", "frames_per_buffer", "sampling_rate", - "sample_width", "channels", "sr", "sw", "ch", "asrc", "fn", "fpb", "db", "mt", - "rec", "bd", "hd", "bs", "hs"]: + if not k in [ + "block_dur", + "hop_dur", + "block_size", + "hop_size", + "max_time", + "record", + "audio_source", + "filename", + "data_buffer", + "frames_per_buffer", + "sampling_rate", + "sample_width", + "channels", + "sr", + "sw", + "ch", + "asrc", + "fn", + "fpb", + "db", + "mt", + "rec", + "bd", + "hd", + "bs", + "hs", + ]: raise ValueError("Invalid argument: {0}".format(k)) if "block_dur" in kwargs and "bd" in kwargs: - raise DuplicateArgument("Either 'block_dur' or 'bd' must be specified, not both") + raise DuplicateArgument( + "Either 'block_dur' or 'bd' must be specified, not both" + ) if "hop_dur" in kwargs and "hd" in kwargs: - raise DuplicateArgument("Either 'hop_dur' or 'hd' must be specified, not both") + raise DuplicateArgument( + "Either 'hop_dur' or 'hd' must be specified, not both" + ) if "block_size" in kwargs and "bs" in kwargs: - raise DuplicateArgument("Either 'block_size' or 'bs' must be specified, not both") + raise DuplicateArgument( + "Either 'block_size' or 'bs' must be specified, not both" + ) if "hop_size" in kwargs and "hs" in kwargs: - raise DuplicateArgument("Either 'hop_size' or 'hs' must be specified, not both") + raise DuplicateArgument( + "Either 'hop_size' or 'hs' must be specified, not both" + ) if "max_time" in kwargs and "mt" in kwargs: - raise DuplicateArgument("Either 'max_time' or 'mt' must be specified, not both") + raise DuplicateArgument( + "Either 'max_time' or 'mt' must be specified, not both" + ) if "audio_source" in kwargs and "asrc" in kwargs: - raise DuplicateArgument("Either 'audio_source' or 'asrc' must be specified, not both") + raise DuplicateArgument( + "Either 'audio_source' or 'asrc' must be specified, not both" + ) if "filename" in kwargs and "fn" in kwargs: - raise DuplicateArgument("Either 'filename' or 'fn' must be specified, not both") + raise DuplicateArgument( + "Either 'filename' or 'fn' must be specified, not both" + ) if "data_buffer" in kwargs and "db" in kwargs: - raise DuplicateArgument("Either 'filename' or 'db' must be specified, not both") + raise DuplicateArgument( + "Either 'filename' or 'db' must be specified, not both" + ) if "frames_per_buffer" in kwargs and "fbb" in kwargs: - raise DuplicateArgument("Either 'frames_per_buffer' or 'fpb' must be specified, not both") + raise DuplicateArgument( + "Either 'frames_per_buffer' or 'fpb' must be specified, not both" + ) if "sampling_rate" in kwargs and "sr" in kwargs: - raise DuplicateArgument("Either 'sampling_rate' or 'sr' must be specified, not both") + raise DuplicateArgument( + "Either 'sampling_rate' or 'sr' must be specified, not both" + ) if "sample_width" in kwargs and "sw" in kwargs: - raise DuplicateArgument("Either 'sample_width' or 'sw' must be specified, not both") + raise DuplicateArgument( + "Either 'sample_width' or 'sw' must be specified, not both" + ) if "channels" in kwargs and "ch" in kwargs: - raise DuplicateArgument("Either 'channels' or 'ch' must be specified, not both") + raise DuplicateArgument( + "Either 'channels' or 'ch' must be specified, not both" + ) if "record" in kwargs and "rec" in kwargs: - raise DuplicateArgument("Either 'record' or 'rec' must be specified, not both") + raise DuplicateArgument( + "Either 'record' or 'rec' must be specified, not both" + ) kwargs["bd"] = kwargs.pop("block_dur", None) or kwargs.pop("bd", None) kwargs["hd"] = kwargs.pop("hop_dur", None) or kwargs.pop("hd", None) kwargs["bs"] = kwargs.pop("block_size", None) or kwargs.pop("bs", None) kwargs["hs"] = kwargs.pop("hop_size", None) or kwargs.pop("hs", None) kwargs["mt"] = kwargs.pop("max_time", None) or kwargs.pop("mt", None) - kwargs["asrc"] = kwargs.pop("audio_source", None) or kwargs.pop("asrc", None) + kwargs["asrc"] = kwargs.pop("audio_source", None) or kwargs.pop( + "asrc", None + ) kwargs["fn"] = kwargs.pop("filename", None) or kwargs.pop("fn", None) - kwargs["db"] = kwargs.pop("data_buffer", None) or kwargs.pop("db", None) + kwargs["db"] = kwargs.pop("data_buffer", None) or kwargs.pop( + "db", None + ) record = kwargs.pop("record", False) if not record: @@ -205,16 +277,24 @@ # keep long names for arguments meant for BufferAudioSource and PyAudioSource if "frames_per_buffer" in kwargs or "fpb" in kwargs: - kwargs["frames_per_buffer"] = kwargs.pop("frames_per_buffer", None) or kwargs.pop("fpb", None) + kwargs["frames_per_buffer"] = kwargs.pop( + "frames_per_buffer", None + ) or kwargs.pop("fpb", None) if "sampling_rate" in kwargs or "sr" in kwargs: - kwargs["sampling_rate"] = kwargs.pop("sampling_rate", None) or kwargs.pop("sr", None) + kwargs["sampling_rate"] = kwargs.pop( + "sampling_rate", None + ) or kwargs.pop("sr", None) if "sample_width" in kwargs or "sw" in kwargs: - kwargs["sample_width"] = kwargs.pop("sample_width", None) or kwargs.pop("sw", None) + kwargs["sample_width"] = kwargs.pop( + "sample_width", None + ) or kwargs.pop("sw", None) if "channels" in kwargs or "ch" in kwargs: - kwargs["channels"] = kwargs.pop("channels", None) or kwargs.pop("ch", None) + kwargs["channels"] = kwargs.pop("channels", None) or kwargs.pop( + "ch", None + ) @staticmethod def ads(**kwargs): @@ -224,9 +304,9 @@ :Parameters: - *No parameters* : + *No parameters* : read audio data from the available built-in microphone with the default parameters. - The returned :class:`ADSFactory.AudioDataSource` encapsulate an :class:`io.PyAudioSource` object and hence + The returned :class:`ADSFactory.AudioDataSource` encapsulate an :class:`io.PyAudioSource` object and hence it accepts the next four parameters are passed to use instead of their default values. `sampling_rate`, `sr` : *(int)* @@ -236,7 +316,7 @@ number of bytes per sample (must be in (1, 2, 4)). Default = 2 `channels`, `ch` : *(int)* - number of audio channels. Default = 1 (only this value is currently accepted) + number of audio channels. Default = 1 (only this value is currently accepted) `frames_per_buffer`, `fpb` : *(int)* number of samples of PyAudio buffer. Default = 1024. @@ -254,14 +334,14 @@ `max_time`, `mt` : *(float)* maximum time (in seconds) to read. Default behavior: read until there is no more data - available. + available. `record`, `rec` : *(bool)* save all read data in cache. Provide a navigable object which boasts a `rewind` method. Default = False. `block_dur`, `bd` : *(float)* - processing block duration in seconds. This represents the quantity of audio data to return + processing block duration in seconds. This represents the quantity of audio data to return each time the :func:`read` method is invoked. If `block_dur` is 0.025 (i.e. 25 ms) and the sampling rate is 8000 and the sample width is 2 bytes, :func:`read` returns a buffer of 0.025 * 8000 * 2 = 400 bytes at most. This parameter will be looked for (and used if available) before `block_size`. @@ -385,7 +465,7 @@ ''' buffer = "abcdefghijklmnop" # 16 bytes = 1 second of data bd = 0.250 # block duration = 250 ms = 4 bytes - hd = 0.125 # hop duration = 125 ms = 2 bytes + hd = 0.125 # hop duration = 125 ms = 2 bytes ads = ADSFactory.ads(db = "abcdefghijklmnop", bd = bd, hd = hd, sr = 16, sw = 1, ch = 1) ads.open() ads.read() @@ -440,14 +520,18 @@ # Case 1: an audio source is supplied if audio_source is not None: if (filename, data_buffer) != (None, None): - raise Warning("You should provide one of 'audio_source', 'filename' or 'data_buffer'\ - keyword parameters. 'audio_source' will be used") + raise Warning( + "You should provide one of 'audio_source', 'filename' or 'data_buffer'\ + keyword parameters. 'audio_source' will be used" + ) # Case 2: a file name is supplied elif filename is not None: if data_buffer is not None: - raise Warning("You should provide one of 'filename' or 'data_buffer'\ - keyword parameters. 'filename' will be used") + raise Warning( + "You should provide one of 'filename' or 'data_buffer'\ + keyword parameters. 'filename' will be used" + ) audio_source = from_file(filename) # Case 3: a data_buffer is supplied @@ -460,299 +544,337 @@ if block_dur is not None: if block_size is not None: - raise DuplicateArgument("Either 'block_dur' or 'block_size' can be specified, not both") - else: - block_size = int(audio_source.get_sampling_rate() * block_dur) - elif block_size is None: - # Set default block_size to 10 ms - block_size = int(audio_source.get_sampling_rate() / 100) - - # Instantiate base AudioDataSource - ads = ADSFactory.AudioDataSource(audio_source=audio_source, block_size=block_size) - - # Limit data to be read - if max_time is not None: - ads = ADSFactory.LimiterADS(ads=ads, max_time=max_time) - - # Record, rewind and reuse data - if record: - ads = ADSFactory.RecorderADS(ads=ads) + raise DuplicateArgument( + "Either 'block_dur' or 'block_size' can be specified, not both" + ) + elif block_size is not None: + block_dur = block_size / audio_source.sr + else: + block_dur = 0.01 # 10 ms # Read overlapping blocks of data if hop_dur is not None: if hop_size is not None: - raise DuplicateArgument("Either 'hop_dur' or 'hop_size' can be specified, not both") - else: - hop_size = int(audio_source.get_sampling_rate() * hop_dur) + raise DuplicateArgument( + "Either 'hop_dur' or 'hop_size' can be specified, not both" + ) + elif hop_size is not None: + hop_dur = hop_size / audio_source.sr - if hop_size is not None: - if hop_size <= 0 or hop_size > block_size: - raise ValueError("hop_size must be > 0 and <= block_size") - if hop_size < block_size: - ads = ADSFactory.OverlapADS(ads=ads, hop_size=hop_size) - + ads = AudioDataSource( + audio_source, + block_dur=block_dur, + hop_dur=hop_dur, + record=record, + max_read=max_time, + ) return ads - class AudioDataSource(DataSource): - """ - Base class for AudioDataSource objects. - It inherits from DataSource and encapsulates an AudioSource object. - """ - def __init__(self, audio_source, block_size): +class _AudioSourceProxy: + def __init__(self, audio_source): - self.audio_source = audio_source - self.block_size = block_size + self._audio_source = audio_source - def get_block_size(self): - return self.block_size + def rewind(self): + if self._audio_source.rewindable: + self._audio_source.rewind() + else: + raise AudioIOError("Audio stream is not rewindable") - def set_block_size(self, size): - self.block_size = size + def is_open(self): + return self._audio_source.is_open() - def get_audio_source(self): - return self.audio_source + def open(self): + self._audio_source.open() - def set_audio_source(self, audio_source): - self.audio_source = audio_source + def close(self): + self._audio_source.close() - def open(self): - self.audio_source.open() + def read(self, size): + return self._audio_source.read(size) - def close(self): - self.audio_source.close() + @property + def data(self): + raise AttributeError( + "AudioDataSource is not a recorder, no recorded data can be accessed" + ) - def is_open(self): - return self.audio_source.is_open() + def __getattr__(self, name): + return getattr(self._audio_source, name) - def get_sampling_rate(self): - return self.audio_source.get_sampling_rate() - def get_sample_width(self): - return self.audio_source.get_sample_width() +class _Recorder(_AudioSourceProxy): + """ + A class for AudioSource objects that can record all audio data they read, + with a rewind facility. + """ - def get_channels(self): - return self.audio_source.get_channels() + def __init__(self, audio_source): + super(_Recorder, self).__init__(audio_source) + self._cache = [] + self._read_block = self._read_and_cache + self._data = None - def rewind(self): - if isinstance(self.audio_source, Rewindable): - self.audio_source.rewind() - else: - raise Exception("Audio source is not rewindable") + def read(self, size): + return self._read_block(size) - def is_rewindable(self): - return isinstance(self.audio_source, Rewindable) + @property + def data(self): + if self._data is None: + raise RuntimeError( + "Unrewinded recorder. Call rewind before accessing recorded data" + ) + return self._data - def read(self): - return self.audio_source.read(self.block_size) + def rewind(self): + if self._cache: + self._data = self._concatenate(self._cache) + self._cache = None + self._audio_source = BufferAudioSource( + self._data, self.sr, self.sw, self.ch + ) + self._read_block = self._audio_source.read + self.open() - class ADSDecorator(AudioDataSource): - """ - Base decorator class for AudioDataSource objects. - """ - __metaclass__ = ABCMeta + def _read_and_cache(self, size): + # Read and save read data + block = self._audio_source.read(size) + if block is not None: + self._cache.append(block) + return block - def __init__(self, ads): - self.ads = ads + def _concatenate(self, data): + try: + # should always work for python 2 + # work for python 3 ONLY if data is a list (or an iterator) + # whose each element is a 'bytes' objects + data = b"".join(data) + return data + except TypeError: + # work for 'str' in python 2 and python 3 + return "".join(data) - self.get_block_size = self.ads.get_block_size - self.set_block_size = self.ads.set_block_size - self.get_audio_source = self.ads.get_audio_source - self.open = self.ads.open - self.close = self.ads.close - self.is_open = self.ads.is_open - self.get_sampling_rate = self.ads.get_sampling_rate - self.get_sample_width = self.ads.get_sample_width - self.get_channels = self.ads.get_channels - def is_rewindable(self): - return self.ads.is_rewindable +class _Limiter(_AudioSourceProxy): + """ + A class for AudioDataSource objects that can read a fixed amount of data. + This can be useful when reading data from the microphone or from large audio files. + """ - def rewind(self): - self.ads.rewind() - self._reinit() + def __init__(self, audio_source, max_read): + super(_Limiter, self).__init__(audio_source) + self._max_read = max_read + self._max_samples = round(max_read * self.sr) + self._read_samples = 0 - def set_audio_source(self, audio_source): - self.ads.set_audio_source(audio_source) - self._reinit() + @property + def data(self): + data = self._audio_source.data + max_read_bytes = self._max_samples * self.sw * self.ch + return data[:max_read_bytes] - def open(self): - if not self.ads.is_open(): - self.ads.open() - self._reinit() + @property + def max_read(self): + return self._max_read - @abstractmethod - def _reinit(self): - pass + def read(self, size): + size = min(self._max_samples - self._read_samples, size) + if size <= 0: + return None - class OverlapADS(ADSDecorator): - """ - A class for AudioDataSource objects that can read and return overlapping - audio frames - """ + block = self._audio_source.read(size) + if block is None: + return None - def __init__(self, ads, hop_size): - ADSFactory.ADSDecorator.__init__(self, ads) + self._read_samples += len(block) // self._audio_source.sw + return block - if hop_size <= 0 or hop_size > self.get_block_size(): - raise ValueError("hop_size must be either 'None' or \ - between 1 and block_size (both inclusive)") - self.hop_size = hop_size - self._actual_block_size = self.get_block_size() - self._reinit() + def rewind(self): + super(_Limiter, self).rewind() + self._read_samples = 0 - def _get_block_size(): - return self._actual_block_size - def _read_first_block(self): - # For the first call, we need an entire block of size 'block_size' - block = self.ads.read() - if block is None: - return None +class _FixedSizeAudioReader(_AudioSourceProxy): + def __init__(self, audio_source, block_dur): + super(_FixedSizeAudioReader, self).__init__(audio_source) + self._block_size = int(block_dur * self.sr) - # Keep a slice of data in cache and append it in the next call - if len(block) > self._hop_size_bytes: - self._cache = block[self._hop_size_bytes:] + def read(self): + return self._audio_source.read(self._block_size) - # Up from the next call, we will use '_read_next_blocks' - # and we only read 'hop_size' - self.ads.set_block_size(self.hop_size) - self.read = self._read_next_blocks + @property + def block_size(self): + return self._block_size + @property + def block_dur(self): + return self._block_size / self.sr + + def __getattr__(self, name): + return getattr(self._audio_source, name) + + +class _OverlapAudioReader(_FixedSizeAudioReader): + """ + A class for AudioDataSource objects that can read and return overlapping + audio frames + """ + + def __init__(self, audio_source, block_dur, hop_dur): + + if hop_dur >= block_dur: + raise ValueError('"hop_dur" should be < "block_dur"') + + super(_OverlapAudioReader, self).__init__(audio_source, block_dur) + + self._hop_size = int(hop_dur * self.sr) + self._blocks = self._iter_blocks_with_overlap() + + def _iter_blocks_with_overlap(self): + while not self.is_open(): + yield AudioIOError + block = self._audio_source.read(self._block_size) + if block is None: + yield None + + _hop_size_bytes = ( + self._hop_size * self._audio_source.sw * self._audio_source.ch + ) + cache = block[_hop_size_bytes:] + yield block + + while True: + block = self._audio_source.read(self._hop_size) + if block: + block = cache + block + cache = block[_hop_size_bytes:] + yield block + continue + yield None + + def read(self): + try: + block = next(self._blocks) + if block == AudioIOError: + raise AudioIOError("Audio Stream is not open.") return block + except StopIteration: + return None - def _read_next_blocks(self): - block = self.ads.read() - if block is None: - return None + def rewind(self): + super(_OverlapAudioReader, self).rewind() + self._blocks = self._iter_blocks_with_overlap() - # Append block to cache data to ensure overlap - block = self._cache + block - # Keep a slice of data in cache only if we have a full length block - # if we don't that means that this is the last block - if len(block) == self._block_size_bytes: - self._cache = block[self._hop_size_bytes:] - else: - self._cache = None + @property + def hop_size(self): + return self._hop_size - return block + @property + def hop_dur(self): + return self._hop_size / self.sr - def read(self): - pass + def __getattr__(self, name): + return getattr(self._audio_source, name) - def _reinit(self): - self._cache = None - self.ads.set_block_size(self._actual_block_size) - self._hop_size_bytes = self.hop_size * \ - self.get_sample_width() * \ - self.get_channels() - self._block_size_bytes = self.get_block_size() * \ - self.get_sample_width() * \ - self.get_channels() - self.read = self._read_first_block - class LimiterADS(ADSDecorator): - """ - A class for AudioDataSource objects that can read a fixed amount of data. - This can be useful when reading data from the microphone or from large audio files. - """ +class AudioDataSource(DataSource): + """ + Base class for AudioDataSource objects. + It inherits from DataSource and encapsulates an AudioSource object. + """ - def __init__(self, ads, max_time): - ADSFactory.ADSDecorator.__init__(self, ads) + def __init__( + self, + source, + block_dur=0.01, + hop_dur=None, + record=False, + max_read=None, + **kwargs + ): - self.max_time = max_time - self._reinit() + if not isinstance(source, AudioSource): + source = get_audio_source(source, **kwargs) + self._record = record + if record: + source = _Recorder(source) + if max_read is not None: + source = _Limiter(source, max_read) + self._max_read = max_read + if hop_dur is not None: + source = _OverlapAudioReader(source, block_dur, hop_dur) + else: + source = _FixedSizeAudioReader(source, block_dur) + self._audio_source = source - def read(self): - if self._total_read_bytes >= self._max_read_bytes: - return None - block = self.ads.read() - if block is None: - return None - self._total_read_bytes += len(block) + def __repr__(self): + block_dur, hop_dur, max_read = None, None, None + if self.block_dur is not None: + block_dur = "{:.3f}".format(self.block_dur) + if self.hop_dur is not None: + hop_dur = "{:.3f}".format(self.hop_dur) + if self.max_read is not None: + max_read = "{:.3f}".format(self.max_read) + return ( + "AudioDataSource(source, block_dur={block_dur}, " + "hop_dur={hop_dur}, record={rewindable}, " + "max_read={max_read})" + ).format( + block_dur=block_dur, + hop_dur=hop_dur, + rewindable=self._record, + max_read=max_read, + ) - if self._total_read_bytes >= self._max_read_bytes: - self.close() + @property + def rewindable(self): + return self._record - return block + @property + def block_dur(self): + return self._audio_source.block_size / self._audio_source.sr - def _reinit(self): - self._max_read_bytes = int(self.max_time * self.get_sampling_rate()) * \ - self.get_sample_width() * \ - self.get_channels() - self._total_read_bytes = 0 + @property + def hop_dur(self): + if hasattr(self._audio_source, "hop_dur"): + return self._audio_source.hop_size / self._audio_source.sr + return self.block_dur - class RecorderADS(ADSDecorator): - """ - A class for AudioDataSource objects that can record all audio data they read, - with a rewind facility. - """ + @property + def hop_size(self): + if hasattr(self._audio_source, "hop_size"): + return self._audio_source.hop_size + return self.block_size - def __init__(self, ads): - ADSFactory.ADSDecorator.__init__(self, ads) + @property + def max_read(self): + try: + return self._audio_source.max_read + except AttributeError: + return None - self._reinit() + def read(self): + return self._audio_source.read() - def read(self): - pass - - def _read_and_rec(self): - # Read and save read data - block = self.ads.read() - if block is not None: - self._cache.append(block) - - return block - - def _read_simple(self): - # Read without recording - return self.ads.read() - - def rewind(self): - if self._record: - # If has been recording, create a new BufferAudioSource - # from recorded data - dbuffer = self._concatenate(self._cache) - asource = BufferAudioSource(dbuffer, self.get_sampling_rate(), - self.get_sample_width(), - self.get_channels()) - - self.set_audio_source(asource) - self.open() - self._cache = [] - self._record = False - self.read = self._read_simple - - else: - self.ads.rewind() - if not self.is_open(): - self.open() - - def is_rewindable(self): - return True - - def _reinit(self): - # when audio_source is replaced, start recording again - self._record = True - self._cache = [] - self.read = self._read_and_rec - - def _concatenate(self, data): - try: - # should always work for python 2 - # work for python 3 ONLY if data is a list (or an iterator) - # whose each element is a 'bytes' objects - return b''.join(data) - except TypeError: - # work for 'str' in python 2 and python 3 - return ''.join(data) + def __getattr__(self, name): + if name in ("data", "rewind") and not self.rewindable: + raise AttributeError( + "'AudioDataSource' has no attribute '{}'".format(name) + ) + try: + return getattr(self._audio_source, name) + except AttributeError: + raise AttributeError( + "'AudioDataSource' has no attribute '{}'".format(name) + ) class AudioEnergyValidator(DataValidator): """ The most basic auditok audio frame validator. This validator computes the log energy of an input audio frame - and return True if the result is >= a given threshold, False + and return True if the result is >= a given threshold, False otherwise. :Parameters: @@ -770,8 +892,12 @@ @staticmethod def _convert(signal, sample_width): - return numpy.array(numpy.frombuffer(signal, dtype=AudioEnergyValidator._formats[sample_width]), - dtype=numpy.float64) + return numpy.array( + numpy.frombuffer( + signal, dtype=AudioEnergyValidator._formats[sample_width] + ), + dtype=numpy.float64, + ) @staticmethod def _signal_energy(signal): @@ -782,18 +908,20 @@ energy = AudioEnergyValidator._signal_energy(signal) if energy <= 0: return -200 - return 10. * numpy.log10(energy) + return 10.0 * numpy.log10(energy) else: - _formats = {1: 'b', 2: 'h', 4: 'i'} + _formats = {1: "b", 2: "h", 4: "i"} @staticmethod def _convert(signal, sample_width): - return array("d", array(AudioEnergyValidator._formats[sample_width], signal)) + return array( + "d", array(AudioEnergyValidator._formats[sample_width], signal) + ) @staticmethod def _signal_energy(signal): - energy = 0. + energy = 0.0 for a in signal: energy += a * a return energy / len(signal) @@ -803,7 +931,7 @@ energy = AudioEnergyValidator._signal_energy(signal) if energy <= 0: return -200 - return 10. * math.log10(energy) + return 10.0 * math.log10(energy) def __init__(self, sample_width, energy_threshold=45): self.sample_width = sample_width @@ -834,7 +962,10 @@ """ signal = AudioEnergyValidator._convert(data, self.sample_width) - return AudioEnergyValidator._signal_log_energy(signal) >= self._energy_threshold + return ( + AudioEnergyValidator._signal_log_energy(signal) + >= self._energy_threshold + ) def get_energy_threshold(self): return self._energy_threshold