view auditok/io.py @ 5:252d698ae642

Version 1.3, bug and typos fixes
author Amine Sehili <amine.sehili@gmail.com>
date Wed, 23 Sep 2015 11:26:58 +0200
parents edee860b9f61
children 9be2d0ca4c00
line wrap: on
line source
"""
Module for low-level audio input-output operations. 

September 2015
@author: Amine SEHILI <amine.sehili@gmail.com>
"""

from abc import ABCMeta, abstractmethod
import wave

__all__ = ["AudioSource", "Rewindable", "BufferAudioSource", "WaveAudioSource",
           "PyAudioSource", "PyAudioPlayer", "from_file", "player_for"]

DEFAULT_SAMPLE_RATE = 16000
DEFAULT_SAMPLE_WIDTH = 2
DEFAULT_NB_CHANNELS = 1


class AudioSource():
    __metaclass__ = ABCMeta
    
    """ 
    Base class for audio source.
        
    Subclasses should implement methods to open/close and audio stream 
    and read the desired amount of audio samples.
         
    """

    def __init__(self, sampling_rate = DEFAULT_SAMPLE_RATE,
                 sample_width = DEFAULT_SAMPLE_WIDTH,
                 channels = DEFAULT_NB_CHANNELS):
        
        """
        
        **Parameters:**
        
        
        `sampling_rate` *(int)* :
            Number of samples per second of audio stream. Default = 16000.
        
        `sample_width` *(int)* :
            Size in bytes of one audio sample. Possible values : 1, 2, 4.
            Default = 2.
            
        `channels` *(int)* :
            Number of channels of audio stream. The current version supports
            only mono audio streams (i.e. one channel).
        
        """
        
        if not sample_width in (1, 2, 4):
            raise ValueError("Sample width must be one of: 1, 2 or 4 (bytes)")
        
        if channels != 1:
            raise ValueError("Only mono audio is currently handled")
            
        self.sampling_rate = sampling_rate
        self.sample_width = sample_width
        self.channels = channels
      
    @abstractmethod
    def is_open(self):
        """ Return True if audio source is open, False otherwise """
    
    @abstractmethod
    def open(self):
        """ Open audio source """
    
    @abstractmethod
    def close(self):
        """ Close audio source """
    
    @abstractmethod
    def read(self, size):
        """
        Read and return `size` audio samples at most.
        
        **Parameters:**
        
        `size` : *(int)* :
            the number of samples to read.
        
        **Returns:**
        
        Audio data as a string of length 'N' * 'smaple_width' * 'channels', where 'N' is:
        
        `size` if `size` < 'left_samples'
        
        'left_samples' if `size` > 'left_samples' 
        
        """ 
    
    def get_sampling_rate(self):
        """ Return the number of samples per second of audio stream """
        return self.sampling_rate
    
    def get_sample_width(self):
        """ Return the number of bytes used to represent one audio sample """
        return self.sample_width
    
    def get_channels(self):
        """ Return the number of channels of this audio source """
        return self.channels
    


class Rewindable():
    __metaclass__ = ABCMeta
    
    """
    Base class for rewindable audio streams.
    Subclasses should implement methods to return to the beginning of an
    audio stream as well as method to move to an absolute audio position
    expressed in time or in number of samples. 
    
    """
    
    @abstractmethod
    def rewind(self):
        """ Go back to the beginning of audio stream """
        pass
    
    @abstractmethod
    def get_position(self):
        """ Return the total number of already read samples """
    
    @abstractmethod
    def get_time_position(self):
        """ Return the total duration in seconds of already read data """
    
    @abstractmethod
    def set_position(self, position):
        """ Move to an absolute position 
        
        **Parameters:**
        
        `position` : *(int)*
            number of samples to skip from the start of the stream
        """
    
    @abstractmethod
    def set_time_position(self, time_position):
        """ Move to an absolute position expressed in seconds
        
        **Parameters:**
        
        `time_position` : *(float)*
            seconds to skip from the start of the stream
        """
        pass
    
    

class BufferAudioSource(AudioSource, Rewindable):
    
    """
    A class that represent audio data as a memory buffer. It implements
    methods from `io.Rewindable` and is therefore a navigable `io.AudioSource`.
    """
    
    def __init__(self, data_buffer,
                 sampling_rate = DEFAULT_SAMPLE_RATE,
                 sample_width = DEFAULT_SAMPLE_WIDTH,
                 channels = DEFAULT_NB_CHANNELS):
        
        if len(data_buffer) % (sample_width * channels) !=0:
            raise ValueError("length of data_buffer must be a multiple of (sample_width * channels)")
        
        AudioSource.__init__(self, sampling_rate, sample_width, channels)
        self._buffer = data_buffer
        self._index = 0
        self._left = 0 if self._buffer is None else len(self._buffer)
        self._is_open = False
    
    def is_open(self):
        return self._is_open
        
    def open(self):
        self._is_open = True
    
    def close(self):
        self._is_open = False
        self.rewind()
    
    def read(self, size=None):
        
        if not self._is_open:
            raise IOError("Stream is not open")
        
        if self._left > 0:
            
            to_read = size * self.sample_width * self.channels       
            if to_read > self._left:
                to_read = self._left 
                            
            data = self._buffer[self._index: self._index + to_read]
            self._index += to_read
            self._left -= to_read
            
            return data
        
        return None
    
    def get_data_buffer(self):
        """ Return all audio data as one string buffer. """
        return self._buffer
    
    def set_data(self, data_buffer):
        """ Set new data for this audio stream. 
        
        **Parameters:**
        
        `data_buffer` :
           a string buffer with a length multiple of (sample_width * channels)
        """
        if len(data_buffer) % (self.sample_width * self.channels) !=0:
            raise ValueError("length of data_buffer must be a multiple of (sample_width * channels)")
        self._buffer = data_buffer
        self._index = 0
        self._left = 0 if self._buffer is None else len(self._buffer)
    
    def append_data(self, data_buffer):
        """ Append data to this audio stream
        
        **Parameters:**
        
        `data_buffer` :
           a string buffer with a length multiple of (sample_width * channels)
        
        """
        
        if len(data_buffer) % (self.sample_width * self.channels) !=0:
            raise ValueError("length of data_buffer must be a multiple of (sample_width * channels)")
        
        self._buffer += data_buffer
        self._left += len(data_buffer)

    
    def rewind(self):
        self.set_position(0)
    
    def get_position(self):
        return self._index / self.sample_width
    
    def get_time_position(self):
        return float(self._index) / (self.sample_width * self.sampling_rate) 
    
    def set_position(self, position):
        if position < 0:
            raise ValueError("position must be >= 0")
        
        if self._buffer is None:
            self._index = 0
            self._left = 0
            return
         
        position *= self.sample_width 
        self._index = position if position < len(self._buffer) else len(self._buffer)
        self._left = len(self._buffer) - self._index


    def set_time_position(self, time_position): # time in seconds
        
        position = int(self.sampling_rate * time_position)
        self.set_position(position)
        

        

class WaveAudioSource(AudioSource):
    
    """ A class for an `AudioSource` that reads data from a wave file. """
    
    def __init__(self, filename):
        
        """
        **Parameters:**
        
        `filename` :
            path to a valid wave file
        
        """
                
        self._filename = filename
        self._audio_stream = None
        
        stream = wave.open(self._filename)
        AudioSource.__init__(self, stream.getframerate(),
                                   stream.getsampwidth(),
                                   stream.getnchannels())
        stream.close()
    
    
    def is_open(self):
        return self._audio_stream is not None
 
    def open(self):
        if(self._audio_stream is None):
            self._audio_stream = wave.open(self._filename)
      
        
    def close(self):
        if self._audio_stream is not None:
            self._audio_stream.close()
            self._audio_stream = None
        
    
    def read(self, size):
        
        if self._audio_stream is None:
            raise IOError("Stream is not open")
        else:
            data = self._audio_stream.readframes(size)
            if data is None or len(data) < 1:
                return None
            return data


class PyAudioSource(AudioSource):
    
    """ A class for an `AudioSource` that reads data the built-in microphone. """
    
    def __init__(self, sampling_rate = DEFAULT_SAMPLE_RATE,
                 sample_width = DEFAULT_SAMPLE_WIDTH,
                 channels = DEFAULT_NB_CHANNELS,
                 frames_per_buffer = 1024):
        
        
        AudioSource.__init__(self, sampling_rate, sample_width, channels)
        self._chunk_size = frames_per_buffer
        
        import pyaudio
        self._pyaudio_object = pyaudio.PyAudio()
        self._pyaudio_format = self._pyaudio_object.get_format_from_width(self.sample_width) 
        self._audio_stream = None

    
    def is_open(self):
        return self._audio_stream is not None
    
    def open(self):
        self._audio_stream = self._pyaudio_object.open(format = self._pyaudio_format,
                                                   channels = self.channels,
                                                   rate = self.sampling_rate,
                                                   input = True,
                                                   output = False,
                                                   frames_per_buffer = self._chunk_size)
        
        
    def close(self):
        if self._audio_stream is not None:
            self._audio_stream.stop_stream()
            self._audio_stream.close()
            self._audio_stream = None
            
    
    def read(self, size):
        
        if self._audio_stream is None:
            raise IOError("Stream is not open")
        
        if self._audio_stream.is_active():
            data = self._audio_stream.read(size)
            if data is None or len(data) < 1:
                return None
            return data
        
        return None
    


class PyAudioPlayer():
    """ A class for audio playback """
    
    def __init__(self, sampling_rate = DEFAULT_SAMPLE_RATE,
                 sample_width = DEFAULT_SAMPLE_WIDTH,
                 channels = DEFAULT_NB_CHANNELS):
        
    
        if not sample_width in (1, 2, 4):
            raise ValueError("Sample width must be one of: 1, 2 or 4 (bytes)")
        
        self.sampling_rate = sampling_rate
        self.sample_width = sample_width
        self.channels = channels
        
        import pyaudio
        self._p = pyaudio.PyAudio()
        self.stream = self._p.open(format = self._p.get_format_from_width(self.sample_width),
         channels = self.channels, rate = self.sampling_rate,
         input = False, output = True)
        
    def play(self, data):
        if self.stream.is_stopped():
            self.stream.start_stream()
        self.stream.write(data)
        self.stream.stop_stream()
    
        
    def  stop(self):
        if not self.stream.is_stopped():
            self.stream.stop_stream()
        self.stream.close()
        self._p.terminate()
        
    
        

def from_file(filename):
    
    """
    Create an `AudioSource` object using the audio file specified by `filename`.
    The appropriate `AudioSource` class is guessed from file's extension.
    
    **Parameters:**
    
    `filename` :
        path to an audio file
        
    **Returns:**
    
    an `AudioSource` object that reads data from the given file.
    
    """
    
    if filename.lower().endswith(".wav"):
        return WaveAudioSource(filename)
    
    raise Exception("Can not create an AudioSource object from '%s'" %(filename))


def player_for(audio_source):
    """
    Return a `PyAudioPlayer` that can play data from `audio_source`.
    
    **Parameters:**
    
    `audio_source` : 
        an `AudioSource` object.
    
    **Returns:**
    
    `PyAudioPlayer` that has the same sampling rate, sample width and number of channels
    as `audio_source`.
    """
    
    return PyAudioPlayer(audio_source.get_sampling_rate(),
            audio_source.get_sample_width(),
            audio_source.get_channels())