Mercurial > hg > auditok
changeset 10:9be2d0ca4c00
Python 3.x support
author | Amine Sehili <amine.sehili@gmail.com> |
---|---|
date | Tue, 24 Nov 2015 01:57:53 +0100 |
parents | c2ddae4d2c36 |
children | dfff16b2a4a8 |
files | .travis.yml auditok/__init__.py auditok/core.py auditok/exceptions.py auditok/io.py auditok/util.py demos/audio_tokenize_demo.py demos/audio_trim_demo.py demos/echo.py tests/test_AudioDataSourceFactory.py |
diffstat | 10 files changed, 609 insertions(+), 171 deletions(-) [+] |
line wrap: on
line diff
--- a/.travis.yml Tue Nov 24 01:41:19 2015 +0100 +++ b/.travis.yml Tue Nov 24 01:57:53 2015 +0100 @@ -1,5 +1,9 @@ language: python python: + - "2.6" - "2.7" + - "3.2" + - "3.3" + - "3.4" script: - python -m unittest discover tests
--- a/auditok/__init__.py Tue Nov 24 01:41:19 2015 +0100 +++ b/auditok/__init__.py Tue Nov 24 01:57:53 2015 +0100 @@ -308,7 +308,7 @@ break original_signal.append(w) - original_signal = ''.join(original_signal) + original_signal = b''.join(original_signal) print("Playing the original file...") player.play(original_signal) @@ -316,7 +316,7 @@ print("playing detected regions...") for t in tokens: print("Token starts at {0} and ends at {1}".format(t[1], t[2])) - data = ''.join(t[0]) + data = b''.join(t[0]) player.play(data) assert len(tokens) == 8 @@ -335,7 +335,7 @@ for t in tokens: print("Token starts at {0} and ends at {1}".format(t[1], t[2])) - data = ''.join(t[0]) + data = b''.join(t[0]) player.play(data) assert len(tokens) == 6 @@ -362,7 +362,7 @@ brief noise that occurs within the leading silence. We certainly do want our tokenizer to stop at this point and considers whatever it comes after as a useful signal. To force the tokenizer to ignore that brief event we use two other parameters `init_min` -ans `init_max_silence`. By `init_min`=3 and `init_max_silence`=1 we tell the tokenizer +and `init_max_silence`. By `init_min`=3 and `init_max_silence`=1 we tell the tokenizer that a valid event must start with at least 3 noisy windows, between which there is at most 1 silent window. @@ -392,7 +392,7 @@ break original_signal.append(w) - original_signal = ''.join(original_signal) + original_signal = b''.join(original_signal) # rewind source asource.rewind() @@ -411,7 +411,7 @@ # Make sure we only have one token assert len(tokens) == 1, "Should have detected one single token" - trimmed_signal = ''.join(tokens[0][0]) + trimmed_signal = b''.join(tokens[0][0]) player = player_for(asource) @@ -449,7 +449,7 @@ def echo(data, start, end): print("Acoustic activity at: {0}--{1}".format(start, end)) - player.play(''.join(data)) + player.play(b''.join(data)) asource.open() @@ -484,11 +484,13 @@ This package is published under GNU GPL Version 3. """ -from core import * -from io import * -from util import * -import dataset +from __future__ import absolute_import +from .core import * +from .io import * +from .util import * +from . import dataset +from .exceptions import * -__version__ = "0.1.3" +__version__ = "0.1.4"
--- a/auditok/core.py Tue Nov 24 01:41:19 2015 +0100 +++ b/auditok/core.py Tue Nov 24 01:57:53 2015 +0100 @@ -20,6 +20,8 @@ NOISE = 3 STRICT_MIN_LENGTH = 2 + DROP_TRAILING_SILENCE = 4 + # alias DROP_TAILING_SILENCE = 4 def __init__(self, validator,
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/auditok/exceptions.py Tue Nov 24 01:57:53 2015 +0100 @@ -0,0 +1,9 @@ +""" +November 2015 +@author: Amine SEHILI <amine.sehili@gmail.com> +""" + +class DuplicateArgument(Exception): + pass + +
--- a/auditok/io.py Tue Nov 24 01:41:19 2015 +0100 +++ b/auditok/io.py Tue Nov 24 01:57:53 2015 +0100 @@ -6,10 +6,12 @@ """ from abc import ABCMeta, abstractmethod +from six import with_metaclass import wave +import sys __all__ = ["AudioSource", "Rewindable", "BufferAudioSource", "WaveAudioSource", - "PyAudioSource", "PyAudioPlayer", "from_file", "player_for"] + "PyAudioSource", "StdinAudioSource", "PyAudioPlayer", "from_file", "player_for"] DEFAULT_SAMPLE_RATE = 16000 DEFAULT_SAMPLE_WIDTH = 2 @@ -183,7 +185,7 @@ self._is_open = False self.rewind() - def read(self, size=None): + def read(self, size): if not self._is_open: raise IOError("Stream is not open") @@ -369,7 +371,40 @@ return None - +class StdinAudioSource(AudioSource): + + """ A class for an `AudioSource` that reads data from standard input. """ + + def __init__(self, sampling_rate = DEFAULT_SAMPLE_RATE, + sample_width = DEFAULT_SAMPLE_WIDTH, + channels = DEFAULT_NB_CHANNELS): + + AudioSource.__init__(self, sampling_rate, sample_width, channels) + self._is_open = False + + + def is_open(self): + return self._is_open + + def open(self): + self._is_open = True + + def close(self): + self._is_open = False + + def read(self, size): + if not self._is_open: + raise IOError("Stream is not open") + + to_read = size * self.sample_width * self.channels + data = sys.stdin.read(to_read) + + if data is None or len(data) < 1: + return None + + return data + + class PyAudioPlayer(): """ A class for audio playback """ @@ -394,7 +429,10 @@ def play(self, data): if self.stream.is_stopped(): self.stream.start_stream() - self.stream.write(data) + + for chunk in self._chunk_data(data): + self.stream.write(chunk) + self.stream.stop_stream() @@ -403,8 +441,14 @@ self.stream.stop_stream() self.stream.close() self._p.terminate() - + def _chunk_data(self, data): + # make audio chunks of 100 ms to allow interruption (like ctrl+c) + chunk_size = int((self.sampling_rate * self.sample_width * self.channels) / 10) + start = 0 + while start < len(data): + yield data[start : start + chunk_size] + start += chunk_size def from_file(filename):
--- a/auditok/util.py Tue Nov 24 01:41:19 2015 +0100 +++ b/auditok/util.py Tue Nov 24 01:57:53 2015 +0100 @@ -6,7 +6,9 @@ from abc import ABCMeta, abstractmethod import math from array import array -from io import Rewindable, from_file, BufferAudioSource, PyAudioSource +from .io import Rewindable, from_file, BufferAudioSource, PyAudioSource +from .exceptions import DuplicateArgument +import sys try: @@ -15,6 +17,14 @@ except ImportError as e: _WITH_NUMPY = False +try: + from builtins import str + basestring = str +except ImportError as e: + if sys.version_info >= (3, 0): + basestring = str + + __all__ = ["DataSource", "DataValidator", "StringDataSource", "ADSFactory", "AudioEnergyValidator"] @@ -108,8 +118,93 @@ `ADSFactory.ads` automatically creates and return object with the desired behavior according to the supplied keyword arguments. + """ - """ + @staticmethod + def _check_normalize_args(kwargs): + + for k in kwargs: + if not k in ["block_dur", "hop_dur", "block_size", "hop_size", "max_time", "record", + "audio_source", "filename", "data_buffer", "frames_per_buffer", "sampling_rate", + "sample_width", "channels", "sr", "sw", "ch", "asrc", "fn", "fpb", "db", "mt", + "rec", "bd", "hd", "bs", "hs"]: + raise ValueError("Invalid argument: {0}".format(k)) + + if "block_dur" in kwargs and "bd" in kwargs: + raise DuplicateArgument("Either 'block_dur' or 'bd' must be specified, not both") + + if "hop_dur" in kwargs and "hd" in kwargs: + raise DuplicateArgument("Either 'hop_dur' or 'hd' must be specified, not both") + + if "block_size" in kwargs and "bs" in kwargs: + raise DuplicateArgument("Either 'block_size' or 'bs' must be specified, not both") + + if "hop_size" in kwargs and "hs" in kwargs: + raise DuplicateArgument("Either 'hop_size' or 'hs' must be specified, not both") + + if "max_time" in kwargs and "mt" in kwargs: + raise DuplicateArgument("Either 'max_time' or 'mt' must be specified, not both") + + if "audio_source" in kwargs and "asrc" in kwargs: + raise DuplicateArgument("Either 'audio_source' or 'asrc' must be specified, not both") + + if "filename" in kwargs and "fn" in kwargs: + raise DuplicateArgument("Either 'filename' or 'fn' must be specified, not both") + + if "data_buffer" in kwargs and "db" in kwargs: + raise DuplicateArgument("Either 'filename' or 'db' must be specified, not both") + + if "frames_per_buffer" in kwargs and "fbb" in kwargs: + raise DuplicateArgument("Either 'frames_per_buffer' or 'fpb' must be specified, not both") + + if "sampling_rate" in kwargs and "sr" in kwargs: + raise DuplicateArgument("Either 'sampling_rate' or 'sr' must be specified, not both") + + if "sample_width" in kwargs and "sw" in kwargs: + raise DuplicateArgument("Either 'sample_width' or 'sw' must be specified, not both") + + if "channels" in kwargs and "ch" in kwargs: + raise DuplicateArgument("Either 'channels' or 'ch' must be specified, not both") + + if "record" in kwargs and "rec" in kwargs: + raise DuplicateArgument("Either 'record' or 'rec' must be specified, not both") + + + kwargs["bd"] = kwargs.pop("block_dur", None) or kwargs.pop("bd", None) + kwargs["hd"] = kwargs.pop("hop_dur", None) or kwargs.pop("hd", None) + kwargs["bs"] = kwargs.pop("block_size", None) or kwargs.pop("bs", None) + kwargs["hs"] = kwargs.pop("hop_size", None) or kwargs.pop("hs", None) + kwargs["mt"] = kwargs.pop("max_time", None) or kwargs.pop("mt", None) + kwargs["asrc"] = kwargs.pop("audio_source", None) or kwargs.pop("asrc", None) + kwargs["fn"] = kwargs.pop("filename", None) or kwargs.pop("fn", None) + kwargs["db"] = kwargs.pop("data_buffer", None) or kwargs.pop("db", None) + + record = kwargs.pop("record", False) + if not record: + record = kwargs.pop("rec", False) + if not isinstance(record, bool): + raise TypeError("'record' must be a boolean") + + kwargs["rec"] = record + + # keep long names for arguments meant for BufferAudioSource and PyAudioSource + if "frames_per_buffer" in kwargs or "fpb" in kwargs: + kwargs["frames_per_buffer"] = kwargs.pop("frames_per_buffer", None) or kwargs.pop("fpb", None) + + if "sampling_rate" in kwargs or "sr" in kwargs: + kwargs["sampling_rate"] = kwargs.pop("sampling_rate", None) or kwargs.pop("sr", None) + + if "sample_width" in kwargs or "sw" in kwargs: + kwargs["sample_width"] = kwargs.pop("sample_width", None) or kwargs.pop("sw", None) + + if "channels" in kwargs or "ch" in kwargs: + kwargs["channels"] = kwargs.pop("channels", None) or kwargs.pop("ch", None) + + + + + + @staticmethod def ads(**kwargs): @@ -126,106 +221,115 @@ The returned `AudioDataSource` encapsulate an `io.PyAudioSource` object and hence it accepts the next four parameters are passed to use instead of their default values. - `sampling_rate` : *(int)* + `sampling_rate`, `sr` : *(int)* number of samples per second. Default = 16000. - `sample_width` : *(int)* + `sample_width`, `sw` : *(int)* number of bytes per sample (must be in (1, 2, 4)). Default = 2 - `channels` : *(int)* + `channels`, `ch` : *(int)* number of audio channels. Default = 1 (only this value is currently accepted) - `frames_per_buffer` *(int)*: + `frames_per_buffer`, `fpb` : *(int)* number of samples of PyAudio buffer. Default = 1024. - `audio_source` : an `io.AudioSource` object + `audio_source`, `asrc` : an `AudioSource` object read data from this audio source - `filename` : *(string)* + `filename`, `fn` : *(string)* build an `io.AudioSource` object using this file (currently only wave format is supported) - `data_buffer` : *(string)* + `data_buffer`, `db` : *(string)* build an `io.BufferAudioSource` using data in `data_buffer`. If this keyword is used, `sampling_rate`, `sample_width` and `channels` are passed to `io.BufferAudioSource` constructor and used instead of default values. - `max_time` : *(float)* - maximum time (in seconds) to read. Default behavior: read until there is no more data - available. + `max_time`, `mt` : *(float)* + maximum time (in seconds) to read. Default behavior: read until there is no more data + available. - `record` : *(bool)* + `record`, `rec` : *(bool)* save all read data in cache. Provide a navigable object which boasts a `rewind` method. Default = False. + + + `block_dur`, `bd` : *(float)* + processing block duration in seconds. This represents the quantity of audio data to return + each time the `read` method is invoked. If `block_dur` is 0.025 (i.e. 25 ms) and the sampling + rate is 8000 and the sample width is 2 bytes, `read` returns a buffer of 0.025 * 8000 * 2 = 400 + bytes at most. This parameter will be looked for (and used if available) before `block_size`. + If neither parameter is given, `block_dur` will be set to 0.01 second (i.e. 10 ms) + + `hop_dur`, `hd` : *(float)* + quantity of data to skip from current processing window. if `hop_dur` is supplied then there + will be an overlap of `block_dur` - `hop_dur` between two adjacent processing windows. This + parameter will be looked for (and used if available) before `hop_size`. If neither parameter + is given, `hop_dur` will be set to `block_dur` which means that there will be no overlap + between adjacent windows. + - `block_size` : *(int)* - number of samples to read each time the `read` method is called. Default : a block size - that represent a window of 10ms, so for a sampling rate of 16000, the default `block_size` - is 160, for a rate of 44100, `block_size` = 441, etc. + `block_size`,`bs` : *(int)* + number of samples to read each time the `read` method is called. Default: a block size + that represents a window of 10ms, so for a sampling rate of 16000, the default `block_size` + is 160 samples, for a rate of 44100, `block_size` = 441 samples, etc. + - `hop_size` : *(int)* - determines the number of overlapping samples between two consecutive read windows. For a + `hop_size`, `hs` : *(int)* + determines the number of overlapping samples between two adjacent read windows. For a `hop_size` of value *N*, the overlap is `block_size` - *N*. Default : `hop_size` = `block_size`, means that there is no overlap. """ - for k in kwargs.iterkeys(): - if not k in ["block_size", "hop_size", "max_time", "record", "audio_source", - "filename", "frames_per_buffer", "data_buffer", "filename", "sampling_rate", - "sample_width", "channels"]: - raise ValueError("Invalid argument: {0}".format(k)) + ADSFactory._check_normalize_args(kwargs) - if kwargs.has_key("block_size"): - block_size = kwargs.pop("block_size") - else: - block_size = None + block_dur = kwargs.pop("bd") + hop_dur = kwargs.pop("hd") + block_size = kwargs.pop("bs") + hop_size = kwargs.pop("hs") + max_time = kwargs.pop("mt") + audio_source = kwargs.pop("asrc") + filename = kwargs.pop("fn") + data_buffer = kwargs.pop("db") - if kwargs.has_key("hop_size"): - hop_size = kwargs.pop("hop_size") - else: - hop_size = None + # normalize db sr, sw and ch - if kwargs.has_key("max_time"): - max_time = float(kwargs.pop("max_time")) - else: - max_time = None - - if kwargs.has_key("record"): - record = kwargs.pop("record") - else: - record = False + record = kwargs.pop("rec") # Case 1: an audio source is supplied - if kwargs.has_key("audio_source"): - if kwargs.has_key("filename") or kwargs.has_key("data_buffer"): + if audio_source is not None: + if (filename, data_buffer) != (None, None): raise Warning("You should provide one of 'audio_source', 'filename' or 'data_buffer'\ keyword parameters. 'audio_source' will be used") - audio_source = kwargs.pop("audio_source") - # Case 2: a file name is supplied - elif kwargs.has_key("filename"): - if kwargs.has_key("data_buffer"): + elif filename is not None: + if data_buffer is not None: raise Warning("You should provide one of 'filename' or 'data_buffer'\ keyword parameters. 'filename' will be used") - audio_source = from_file(kwargs.pop("filename")) - + audio_source = from_file(filename) # Case 3: a data_buffer is supplied - elif kwargs.has_key("data_buffer"): - audio_source = BufferAudioSource(**kwargs) + elif data_buffer is not None: + audio_source = BufferAudioSource(data_buffer = data_buffer, **kwargs) # Case 4: try to access native audio input else: audio_source = PyAudioSource(**kwargs) - # Set default block_size to 10 ms - if block_size is None: - block_size = audio_source.get_sampling_rate() / 100 - + + if block_dur is not None: + if block_size is not None: + raise DuplicateArgument("Either 'block_dur' or 'block_size' can be specified, not both") + else: + block_size = int(audio_source.get_sampling_rate() * block_dur) + elif block_size is None: + # Set default block_size to 10 ms + block_size = int(audio_source.get_sampling_rate() / 100) + # Instantiate base AudioDataSource ads = ADSFactory.AudioDataSource(audio_source=audio_source, block_size=block_size) @@ -238,6 +342,12 @@ ads = ADSFactory.RecorderADS(ads=ads) # Read overlapping blocks of data + if hop_dur is not None: + if hop_size is not None: + raise DuplicateArgument("Either 'hop_dur' or 'hop_size' can be specified, not both") + else: + hop_size = int(audio_source.get_sampling_rate() * hop_dur) + if hop_size is not None: if hop_size <= 0 or hop_size > block_size: raise ValueError("hop_size must be > 0 and <= block_size") @@ -365,7 +475,6 @@ #self.get_block_size = _get_block_size - def _read_first_block(self): # For the first call, we need an entire block of size 'block_size' block = self.ads.read() @@ -477,7 +586,7 @@ if self._record: # If has been recording, create a new BufferAudioSource # from recorded data - dbuffer = ''.join(self._cache) + dbuffer = self._concatenate(self._cache) asource = BufferAudioSource(dbuffer, self.get_sampling_rate(), self.get_sample_width(), self.get_channels()) @@ -503,6 +612,18 @@ self._record = True self._cache = [] self.read = self._read_and_rec + + def _concatenate(self, data): + try: + # should always work for python 2 + # work for python 3 ONLY if data is a list (or an iterator) + # whose each element is a 'bytes' objects + return b''.join(data) + except TypeError: + # work for 'str' in python 2 and python 3 + return ''.join(data) + +
--- a/demos/audio_tokenize_demo.py Tue Nov 24 01:41:19 2015 +0100 +++ b/demos/audio_tokenize_demo.py Tue Nov 24 01:57:53 2015 +0100 @@ -4,47 +4,62 @@ """ from auditok import ADSFactory, AudioEnergyValidator, StreamTokenizer, player_for, dataset +import sys -# We set the `record` argument to True so that we can rewind the source -asource = ADSFactory.ads(filename=dataset.one_to_six_arabic_16000_mono_bc_noise, record=True) +try: -validator = AudioEnergyValidator(sample_width=asource.get_sample_width(), energy_threshold=65) + # We set the `record` argument to True so that we can rewind the source + asource = ADSFactory.ads(filename=dataset.one_to_six_arabic_16000_mono_bc_noise, record=True) -# Defalut analysis window is 10 ms (float(asource.get_block_size()) / asource.get_sampling_rate()) -# min_length=20 : minimum length of a valid audio activity is 20 * 10 == 200 ms -# max_length=400 : maximum length of a valid audio activity is 400 * 10 == 4000 ms == 4 seconds -# max_continuous_silence=30 : maximum length of a tolerated silence within a valid audio activity is 30 * 30 == 300 ms -tokenizer = StreamTokenizer(validator=validator, min_length=20, max_length=400, max_continuous_silence=30) + validator = AudioEnergyValidator(sample_width=asource.get_sample_width(), energy_threshold=65) -asource.open() -tokens = tokenizer.tokenize(asource) + # Defalut analysis window is 10 ms (float(asource.get_block_size()) / asource.get_sampling_rate()) + # min_length=20 : minimum length of a valid audio activity is 20 * 10 == 200 ms + # max_length=400 : maximum length of a valid audio activity is 400 * 10 == 4000 ms == 4 seconds + # max_continuous_silence=30 : maximum length of a tolerated silence within a valid audio activity is 30 * 30 == 300 ms + tokenizer = StreamTokenizer(validator=validator, min_length=20, max_length=400, max_continuous_silence=30) -# Play detected regions back + asource.open() + tokens = tokenizer.tokenize(asource) -player = player_for(asource) + # Play detected regions back -# Rewind and read the whole signal -asource.rewind() -original_signal = [] + player = player_for(asource) -while True: - w = asource.read() - if w is None: - break - original_signal.append(w) + # Rewind and read the whole signal + asource.rewind() + original_signal = [] -original_signal = ''.join(original_signal) + while True: + w = asource.read() + if w is None: + break + original_signal.append(w) + -print("\n ** Playing original file...") -player.play(original_signal) + original_signal = b''.join(original_signal) + player.play(original_signal) -print("\n ** playing detected regions...\n") -for i,t in enumerate(tokens): - print("Token [{0}] starts at {1} and ends at {2}".format(i+1, t[1], t[2])) - data = ''.join(t[0]) - player.play(data) + print("\n ** playing detected regions...\n") + for i,t in enumerate(tokens): + print("Token [{0}] starts at {1} and ends at {2}".format(i+1, t[1], t[2])) + data = b''.join(t[0]) + player.play(data) -assert len(tokens) == 8 + assert len(tokens) == 8 -asource.close() -player.stop() + asource.close() + player.stop() + +except KeyboardInterrupt: + + player.stop() + asource.close() + sys.exit(0) + +""" +except Exception as e: + + sys.stderr.write(str(e) + "\n") + sys.exit(1) +"""
--- a/demos/audio_trim_demo.py Tue Nov 24 01:41:19 2015 +0100 +++ b/demos/audio_trim_demo.py Tue Nov 24 01:57:53 2015 +0100 @@ -3,16 +3,17 @@ September, 2015 """ -# Trim leading and tailing silence from a record +# Trim leading and trailing silence from a record from auditok import ADSFactory, AudioEnergyValidator, StreamTokenizer, player_for, dataset import pyaudio +import sys """ The tokenizer in the following example is set up to remove the silence that precedes the first acoustic activity or follows the last activity in a record. It preserves whatever it founds between the two activities. -In other words, it removes the leading and tailing silence. +In other words, it removes the leading and trailing silence. Sampling rate is 44100 sample per second, we'll use an analysis window of 100 ms (i.e. bloc_ksize == 4410) @@ -21,7 +22,7 @@ The tokenizer will start accumulating windows up from the moment it encounters the first analysis window of an energy >= 50. ALL the following windows will be -kept regardless of their energy. At the end of the analysis, it will drop tailing +kept regardless of their energy. At the end of the analysis, it will drop trailing windows with an energy below 50. This is an interesting example because the audio file we're analyzing contains a very @@ -43,48 +44,59 @@ """ +try: + # record = True so that we'll be able to rewind the source. + asource = ADSFactory.ads(filename=dataset.was_der_mensch_saet_mono_44100_lead_tail_silence, + record=True, block_size=4410) + asource.open() -# record = True so that we'll be able to rewind the source. -asource = ADSFactory.ads(filename=dataset.was_der_mensch_saet_mono_44100_lead_tail_silence, - record=True, block_size=4410) -asource.open() + original_signal = [] + # Read the whole signal + while True: + w = asource.read() + if w is None: + break + original_signal.append(w) -original_signal = [] -# Read the whole signal -while True: - w = asource.read() - if w is None: - break - original_signal.append(w) + original_signal = b''.join(original_signal) -original_signal = ''.join(original_signal) + # rewind source + asource.rewind() -# rewind source -asource.rewind() + # Create a validator with an energy threshold of 50 + validator = AudioEnergyValidator(sample_width=asource.get_sample_width(), energy_threshold=50) -# Create a validator with an energy threshold of 50 -validator = AudioEnergyValidator(sample_width=asource.get_sample_width(), energy_threshold=50) + # Create a tokenizer with an unlimited token length and continuous silence within a token + # Note the DROP_TRAILING_SILENCE mode that will ensure removing trailing silence + trimmer = StreamTokenizer(validator, min_length = 20, max_length=99999999, + max_continuous_silence=9999999, mode=StreamTokenizer.DROP_TRAILING_SILENCE, init_min=3, init_max_silence=1) -# Create a tokenizer with an unlimited token length and continuous silence within a token -# Note the DROP_TAILING_SILENCE mode that will ensure removing tailing silence -trimmer = StreamTokenizer(validator, min_length = 20, max_length=99999999, - max_continuous_silence=9999999, mode=StreamTokenizer.DROP_TAILING_SILENCE, init_min=3, init_max_silence=1) + tokens = trimmer.tokenize(asource) -tokens = trimmer.tokenize(asource) + # Make sure we only have one token + assert len(tokens) == 1, "Should have detected one single token" -# Make sure we only have one token -assert len(tokens) == 1, "Should have detected one single token" + trimmed_signal = b''.join(tokens[0][0]) -trimmed_signal = ''.join(tokens[0][0]) + player = player_for(asource) -player = player_for(asource) + print("\n ** Playing original signal (with leading and trailing silence)...") + player.play(original_signal) + print("\n ** Playing trimmed signal...") + player.play(trimmed_signal) -print("\n ** Playing original signal (with leading and tailing silence)...") -player.play(original_signal) -print("\n ** Playing trimmed signal...") -player.play(trimmed_signal) + player.stop() + asource.close() -player.stop() -asource.close() +except KeyboardInterrupt: + + player.stop() + asource.close() + sys.exit(0) + +except Exception as e: + + sys.stderr.write(str(e) + "\n") + sys.exit(1)
--- a/demos/echo.py Tue Nov 24 01:41:19 2015 +0100 +++ b/demos/echo.py Tue Nov 24 01:57:53 2015 +0100 @@ -3,34 +3,47 @@ import pyaudio import sys -energy_threshold = 45 -duration = 10 # seconds +try: + energy_threshold = 45 + duration = 10 # seconds -if len(sys.argv) > 1: - energy_threshold = float(sys.argv[1]) -if len(sys.argv) > 2: - duration = float(sys.argv[2]) + if len(sys.argv) > 1: + energy_threshold = float(sys.argv[1]) -# record = True so that we'll be able to rewind the source. -# max_time = 10: read 10 seconds from the microphone -asource = ADSFactory.ads(record=True, max_time = duration) + if len(sys.argv) > 2: + duration = float(sys.argv[2]) -validator = AudioEnergyValidator(sample_width=asource.get_sample_width(), energy_threshold = energy_threshold) -tokenizer = StreamTokenizer(validator=validator, min_length=20, max_length=250, max_continuous_silence=30) + # record = True so that we'll be able to rewind the source. + # max_time = 10: read 10 seconds from the microphone + asource = ADSFactory.ads(record=True, max_time = duration) -player = player_for(asource) + validator = AudioEnergyValidator(sample_width=asource.get_sample_width(), energy_threshold = energy_threshold) + tokenizer = StreamTokenizer(validator=validator, min_length=20, max_length=250, max_continuous_silence=30) -def echo(data, start, end): - print("Acoustic activity at: {0}--{1}".format(start, end)) - player.play(''.join(data)) + player = player_for(asource) -asource.open() + def echo(data, start, end): + print("Acoustic activity at: {0}--{1}".format(start, end)) + player.play(b''.join(data)) -print("\n ** Make some noise (dur:{}, energy:{})...".format(duration, energy_threshold)) + asource.open() -tokenizer.tokenize(asource, callback=echo) + print("\n ** Make some noise (dur:{}, energy:{})...".format(duration, energy_threshold)) -asource.close() -player.stop() + tokenizer.tokenize(asource, callback=echo) + + asource.close() + player.stop() + +except KeyboardInterrupt: + + player.stop() + asource.close() + sys.exit(0) + +except Exception as e: + + sys.stderr.write(str(e) + "\n") + sys.exit(1)
--- a/tests/test_AudioDataSourceFactory.py Tue Nov 24 01:41:19 2015 +0100 +++ b/tests/test_AudioDataSourceFactory.py Tue Nov 24 01:57:53 2015 +0100 @@ -5,10 +5,18 @@ ''' import unittest -from auditok import dataset, ADSFactory, BufferAudioSource, WaveAudioSource +from functools import partial +import sys +from auditok import dataset, ADSFactory, BufferAudioSource, WaveAudioSource, DuplicateArgument import wave +try: + from builtins import range +except ImportError: + if sys.version_info < (3, 0): + range = xrange + class TestADSFactoryFileAudioSource(unittest.TestCase): def setUp(self): @@ -25,16 +33,42 @@ def test_default_block_size(self): ads = ADSFactory.ads(audio_source=self.audio_source) - size = ads.get_block_size() self.assertEqual(size, 160, "Wrong default block_size, expected: 160, found: {0}".format(size)) def test_block_size(self): ads = ADSFactory.ads(audio_source=self.audio_source, block_size=512) - size = ads.get_block_size() self.assertEqual(size, 512, "Wrong block_size, expected: 512, found: {0}".format(size)) + + # with alias keyword + ads = ADSFactory.ads(audio_source=self.audio_source, bs=160) + size = ads.get_block_size() + self.assertEqual(size, 160, "Wrong block_size, expected: 160, found: {0}".format(size)) + + def test_block_duration(self): + + ads = ADSFactory.ads(audio_source=self.audio_source, block_dur=0.01) # 10 ms + size = ads.get_block_size() + self.assertEqual(size, 160, "Wrong block_size, expected: 160, found: {0}".format(size)) + + # with alias keyword + ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025) # 25 ms + size = ads.get_block_size() + self.assertEqual(size, 400, "Wrong block_size, expected: 400, found: {0}".format(size)) + + def test_hop_duration(self): + + ads = ADSFactory.ads(audio_source=self.audio_source, block_dur=0.02, hop_dur=0.01) # 10 ms + size = ads.hop_size + self.assertEqual(size, 160, "Wrong hop_size, expected: 160, found: {0}".format(size)) + + # with alias keyword + ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025, hop_dur=0.015) # 15 ms + size = ads.hop_size + self.assertEqual(size, 240, "Wrong block_size, expected: 240, found: {0}".format(size)) + def test_sampling_rate(self): ads = ADSFactory.ads(audio_source=self.audio_source) @@ -87,7 +121,7 @@ break ads_data.append(block) ads.close() - ads_data = ''.join(ads_data) + ads_data = b''.join(ads_data) audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise) audio_source.open() @@ -148,13 +182,13 @@ ads_data = [] ads.open() - for i in xrange(10): + for i in range(10): block = ads.read() if block is None: break ads_data.append(block) ads.close() - ads_data = ''.join(ads_data) + ads_data = b''.join(ads_data) audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise) audio_source.open() @@ -187,7 +221,7 @@ ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320) ads.open() - for i in xrange(10): + for i in range(10): ads.read() ads.rewind() @@ -200,7 +234,7 @@ break ads_data.append(block) ads.close() - ads_data = ''.join(ads_data) + ads_data = b''.join(ads_data) audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise) audio_source.open() @@ -402,7 +436,7 @@ # Compare all blocks read from OverlapADS to those read # from an audio source with a manual set_position - for j in xrange(i): + for j in range(i): tmp = audio_source.read(block_size) @@ -442,7 +476,7 @@ # Compare all blocks read from OverlapADS to those read # from an audio source with a manual set_position - for j in xrange(i): + for j in range(i): tmp = audio_source.read(block_size) @@ -522,7 +556,6 @@ channels = self.ads.get_channels() self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels)) - def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self): # Use arbitrary valid block_size and hop_size @@ -552,7 +585,190 @@ # Compare all blocks read from OverlapADS to those read # from an audio source with a manual set_position - for j in xrange(i): + for j in range(i): + + tmp = audio_source.read(block_size) + + block = ads.read() + + self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i)) + audio_source.set_position((j+1) * hop_size) + + ads.close() + audio_source.close() + + +class TestADSFactoryAlias(unittest.TestCase): + + def setUp(self): + self.signal = "ABCDEFGHIJKLMNOPQRSTUVWXYZ012345" + + def test_sampling_rate_alias(self): + ads = ADSFactory.ads(data_buffer=self.signal, sr=16, + sample_width=2, channels=1) + srate = ads.get_sampling_rate() + self.assertEqual(srate, 16, "Wrong sampling rate, expected: 16000, found: {0}".format(srate)) + + def test_sampling_rate_duplicate(self): + func = partial(ADSFactory.ads, data_buffer=self.signal, sr=16, sampling_rate=16, + sample_width=2, channels=1) + self.assertRaises(DuplicateArgument, func) + + def test_sample_width_alias(self): + ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, + sw=2, channels=1) + swidth = ads.get_sample_width() + self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth)) + + def test_sample_width_duplicate(self): + func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, + sw=2, sample_width=2, channels=1) + self.assertRaises(DuplicateArgument, func) + + def test_channels_alias(self): + ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, + sample_width=2, ch=1) + channels = ads.get_channels() + self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels)) + + def test_channels_duplicate(self): + func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, + sample_width=2, ch=1, channels=1) + self.assertRaises(DuplicateArgument, func) + + + def test_block_size_alias(self): + ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, + sample_width=2, channels=1, bs=8) + size = ads.get_block_size() + self.assertEqual(size, 8, "Wrong block_size using bs alias, expected: 8, found: {0}".format(size)) + + def test_block_size_duplicate(self): + func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, + sample_width=2, channels=1, bs=4, block_size=4) + self.assertRaises(DuplicateArgument, func) + + def test_block_duration_alias(self): + ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, + sample_width=2, channels=1, bd=0.75) + # 0.75 ms = 0.75 * 16 = 12 + size = ads.get_block_size() + self.assertEqual(size, 12, "Wrong block_size set with a block_dur alias 'bd', expected: 8, found: {0}".format(size)) + + def test_block_duration_duplicate(self): + func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, + sample_width=2, channels=1, bd=4, block_dur=4) + self.assertRaises(DuplicateArgument, func) + + def test_block_size_duration_duplicate(self): + func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, + sample_width=2, channels=1, bd=4, bs=12) + self.assertRaises(DuplicateArgument, func) + + def test_hop_duration_alias(self): + + ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, + sample_width=2, channels=1, bd=0.75, hd=0.5 ) + size = ads.hop_size + self.assertEqual(size, 8, "Wrong block_size using bs alias, expected: 8, found: {0}".format(size)) + self.assertIsInstance(ads, ADSFactory.OverlapADS, "ads expected to an ADSFactory.OverlapADS object") + + + def test_hop_duration_duplicate(self): + + func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16, + sample_width=2, channels=1, bd=0.75, hd=0.5, hop_dur=0.5) + self.assertRaises(DuplicateArgument, func) + + + def test_hop_size_duration_duplicate(self): + func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16, + sample_width=2, channels=1, bs=8, hs=4, hd=1) + self.assertRaises(DuplicateArgument, func) + + + def test_hop_size_greater_than_block_size(self): + func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16, + sample_width=2, channels=1, bs=4, hs=8) + self.assertRaises(ValueError, func) + + + def test_filename_alias(self): + ads = ADSFactory.ads(fn=dataset.one_to_six_arabic_16000_mono_bc_noise) + + + def test_filename_duplicate(self): + + func = partial(ADSFactory.ads, fn=dataset.one_to_six_arabic_16000_mono_bc_noise, filename=dataset.one_to_six_arabic_16000_mono_bc_noise) + self.assertRaises(DuplicateArgument, func) + + + def test_data_buffer_alias(self): + ads = ADSFactory.ads(db=self.signal, sampling_rate=16, + sample_width=2, channels=1) + self.assertEqual(ads.get_audio_source().get_data_buffer(), self.signal, "Wrong value for data buffer") + + + def test_data_buffer_duplicate(self): + func = partial(ADSFactory.ads, data_buffer=self.signal, db=self.signal, sampling_rate=16, + sample_width=2, channels=1) + self.assertRaises(DuplicateArgument, func) + + + def test_max_time_alias(self): + ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, + sample_width=2, channels=1, mt=10) + self.assertIsInstance(ads, ADSFactory.LimiterADS, "ads expected to an ADSFactory.LimiterADS object") + + + def test_max_time_duplicate(self): + func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16, + sample_width=2, channels=1, mt=True, max_time=True) + + self.assertRaises(DuplicateArgument, func) + + def test_record_alias(self): + ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16, + sample_width=2, channels=1, rec=True) + self.assertIsInstance(ads, ADSFactory.RecorderADS, "ads expected to an ADSFactory.RecorderADS object") + + + def test_record_duplicate(self): + func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16, + sample_width=2, channels=1, rec=True, record=True) + self.assertRaises(DuplicateArgument, func) + + + def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_alias(self): + + # Use arbitrary valid block_size and hop_size + block_size = 5 + hop_size = 4 + + ads = ADSFactory.ads(db=self.signal, sr=16, + sw=2, ch=1, mt = 0.80, + bs=block_size, hs=hop_size, + rec=True) + + # Read all available data overlapping blocks + ads.open() + i = 0 + while True: + block = ads.read() + if block is None: + break + i += 1 + + ads.rewind() + + # Build a BufferAudioSource + audio_source = BufferAudioSource(self.signal, ads.get_sampling_rate(), + ads.get_sample_width(), ads.get_channels()) + audio_source.open() + + # Compare all blocks read from OverlapADS to those read + # from an audio source with a manual set_position + for j in range(i): tmp = audio_source.read(block_size)