changeset 173:f35c0a18f4cb

Update AudioDataSource tests
author Amine Sehili <amine.sehili@gmail.com>
date Sun, 10 Mar 2019 17:07:30 +0100
parents e526cfd6a056
children ec7b0b3c9818
files tests/test_AudioDataSource.py
diffstat 1 files changed, 692 insertions(+), 474 deletions(-) [+]
line wrap: on
line diff
--- a/tests/test_AudioDataSource.py	Sat Mar 09 18:26:24 2019 +0100
+++ b/tests/test_AudioDataSource.py	Sun Mar 10 17:07:30 2019 +0100
@@ -1,13 +1,20 @@
-'''
+"""
 @author: Amine Sehili <amine.sehili@gmail.com>
 September 2015
 
-'''
+"""
 
 import unittest
 from functools import partial
 import sys
-from auditok import dataset, ADSFactory, BufferAudioSource, WaveAudioSource, DuplicateArgument
+from auditok import (
+    dataset,
+    ADSFactory,
+    AudioDataSource,
+    BufferAudioSource,
+    WaveAudioSource,
+    DuplicateArgument,
+)
 import wave
 
 
@@ -17,102 +24,148 @@
     if sys.version_info < (3, 0):
         range = xrange
 
+
 class TestADSFactoryFileAudioSource(unittest.TestCase):
-    
     def setUp(self):
-        self.audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
-    
-    
+        self.audio_source = WaveAudioSource(
+            filename=dataset.one_to_six_arabic_16000_mono_bc_noise
+        )
+
     def test_ADS_type(self):
-        
+
         ads = ADSFactory.ads(audio_source=self.audio_source)
-        
-        self.assertIsInstance(ads, ADSFactory.AudioDataSource,
-                              msg="wrong type for ads object, expected: 'ADSFactory.AudioDataSource', found: {0}".format(type(ads)))
-        
-        
+
+        self.assertIsInstance(
+            ads,
+            AudioDataSource,
+            msg="wrong type for ads object, expected: 'AudioDataSource', found: {0}".format(
+                type(ads)
+            ),
+        )
+
     def test_default_block_size(self):
         ads = ADSFactory.ads(audio_source=self.audio_source)
-        size = ads.get_block_size()
-        self.assertEqual(size, 160, "Wrong default block_size, expected: 160, found: {0}".format(size))
-        
-        
+        size = ads.block_size
+        self.assertEqual(
+            size,
+            160,
+            "Wrong default block_size, expected: 160, found: {0}".format(size),
+        )
+
     def test_block_size(self):
         ads = ADSFactory.ads(audio_source=self.audio_source, block_size=512)
-        size = ads.get_block_size()
-        self.assertEqual(size, 512, "Wrong block_size, expected: 512, found: {0}".format(size))
-        
+        size = ads.block_size
+        self.assertEqual(
+            size,
+            512,
+            "Wrong block_size, expected: 512, found: {0}".format(size),
+        )
+
         # with alias keyword
         ads = ADSFactory.ads(audio_source=self.audio_source, bs=160)
-        size = ads.get_block_size()
-        self.assertEqual(size, 160, "Wrong block_size, expected: 160, found: {0}".format(size))
-    
+        size = ads.block_size
+        self.assertEqual(
+            size,
+            160,
+            "Wrong block_size, expected: 160, found: {0}".format(size),
+        )
+
     def test_block_duration(self):
-        
-        ads = ADSFactory.ads(audio_source=self.audio_source, block_dur=0.01) # 10 ms
-        size = ads.get_block_size()
-        self.assertEqual(size, 160, "Wrong block_size, expected: 160, found: {0}".format(size))
-        
+
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source, block_dur=0.01
+        )  # 10 ms
+        size = ads.block_size
+        self.assertEqual(
+            size,
+            160,
+            "Wrong block_size, expected: 160, found: {0}".format(size),
+        )
+
         # with alias keyword
-        ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025) # 25 ms
-        size = ads.get_block_size()
-        self.assertEqual(size, 400, "Wrong block_size, expected: 400, found: {0}".format(size))
-        
+        ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025)  # 25 ms
+        size = ads.block_size
+        self.assertEqual(
+            size,
+            400,
+            "Wrong block_size, expected: 400, found: {0}".format(size),
+        )
+
     def test_hop_duration(self):
-        
-        ads = ADSFactory.ads(audio_source=self.audio_source, block_dur=0.02, hop_dur=0.01) # 10 ms
+
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source, block_dur=0.02, hop_dur=0.01
+        )  # 10 ms
         size = ads.hop_size
-        self.assertEqual(size, 160, "Wrong hop_size, expected: 160, found: {0}".format(size))
-        
+        self.assertEqual(
+            size, 160, "Wrong hop_size, expected: 160, found: {0}".format(size)
+        )
+
         # with alias keyword
-        ads = ADSFactory.ads(audio_source=self.audio_source, bd=0.025, hop_dur=0.015) # 15 ms
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source, bd=0.025, hop_dur=0.015
+        )  # 15 ms
         size = ads.hop_size
-        self.assertEqual(size, 240, "Wrong block_size, expected: 240, found: {0}".format(size))    
-        
-    
+        self.assertEqual(
+            size,
+            240,
+            "Wrong block_size, expected: 240, found: {0}".format(size),
+        )
+
     def test_sampling_rate(self):
         ads = ADSFactory.ads(audio_source=self.audio_source)
-        
-        srate = ads.get_sampling_rate()
-        self.assertEqual(srate, 16000, "Wrong sampling rate, expected: 16000, found: {0}".format(srate))
-        
+
+        srate = ads.sampling_rate
+        self.assertEqual(
+            srate,
+            16000,
+            "Wrong sampling rate, expected: 16000, found: {0}".format(srate),
+        )
+
     def test_sample_width(self):
         ads = ADSFactory.ads(audio_source=self.audio_source)
-        
-        swidth = ads.get_sample_width()
-        self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth))
-    
+
+        swidth = ads.sample_width
+        self.assertEqual(
+            swidth,
+            2,
+            "Wrong sample width, expected: 2, found: {0}".format(swidth),
+        )
+
     def test_channels(self):
         ads = ADSFactory.ads(audio_source=self.audio_source)
-        
-        channels = ads.get_channels()
-        self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels))
-        
+
+        channels = ads.channels
+        self.assertEqual(
+            channels,
+            1,
+            "Wrong number of channels, expected: 1, found: {0}".format(
+                channels
+            ),
+        )
+
     def test_read(self):
-        ads = ADSFactory.ads(audio_source=self.audio_source, block_size = 256)
-        
+        ads = ADSFactory.ads(audio_source=self.audio_source, block_size=256)
+
         ads.open()
         ads_data = ads.read()
         ads.close()
-        
-        audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
+
+        audio_source = WaveAudioSource(
+            filename=dataset.one_to_six_arabic_16000_mono_bc_noise
+        )
         audio_source.open()
         audio_source_data = audio_source.read(256)
         audio_source.close()
-        
-        self.assertEqual(ads_data, audio_source_data, "Unexpected data read from ads")
-    
-    def test_Limiter_Deco_type(self):
-        ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1)
-        
-        self.assertIsInstance(ads, ADSFactory.LimiterADS,
-                              msg="wrong type for ads object, expected: 'ADSFactory.LimiterADS', found: {0}".format(type(ads)))
-         
-    
+
+        self.assertEqual(
+            ads_data, audio_source_data, "Unexpected data read from ads"
+        )
+
     def test_Limiter_Deco_read(self):
         # read a maximum of 0.75 seconds from audio source
         ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.75)
-        
+
         ads_data = []
         ads.open()
         while True:
@@ -121,39 +174,31 @@
                 break
             ads_data.append(block)
         ads.close()
-        ads_data = b''.join(ads_data)    
-                    
-        audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
+        ads_data = b"".join(ads_data)
+
+        audio_source = WaveAudioSource(
+            filename=dataset.one_to_six_arabic_16000_mono_bc_noise
+        )
         audio_source.open()
         audio_source_data = audio_source.read(int(16000 * 0.75))
         audio_source.close()
-        
-        self.assertEqual(ads_data, audio_source_data, "Unexpected data read from LimiterADS")
-        
-        
+
+        self.assertEqual(
+            ads_data, audio_source_data, "Unexpected data read from LimiterADS"
+        )
+
     def test_Limiter_Deco_read_limit(self):
-        # read a maximum of 1.25 seconds from audio source
+        # read a maximum of 1.191 seconds from audio source
         ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1.191)
-        
-        # desired duration into bytes is obtained by:
-        # max_time * sampling_rate * sample_width * nb_channels
-        # Limiter deco tries to a total quantity of data as
-        # possible to the desired duration in bytes.   
-        # It reads N block of size block_size where:
-        # (N - 1) * block_size < desired duration, AND
-        # N * block_size >= desired duration
-        
-        # theoretical size to reach          
-        expected_size = int(ads.get_sampling_rate() * 1.191) * \
-                       ads.get_sample_width() * ads.get_channels()
-        
-        
-        # how much data are required to get N blocks of size block_size
-        block_size_bytes = ads.get_block_size() * ads.get_sample_width() * ads.get_channels()
-        r = expected_size % block_size_bytes
-        if r > 0:
-            expected_size += block_size_bytes - r
-        
+        total_samples = round(ads.sampling_rate * 1.191)
+        nb_full_blocks, last_block_size = divmod(total_samples, ads.block_size)
+        total_samples_with_overlap = (
+            nb_full_blocks * ads.block_size + last_block_size
+        )
+        expected_read_bytes = (
+            total_samples_with_overlap * ads.sw * ads.channels
+        )
+
         total_read = 0
         ads.open()
         i = 0
@@ -163,23 +208,22 @@
                 break
             i += 1
             total_read += len(block)
-        
+
         ads.close()
-            
-        self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
-        
-        
-        
-    def test_Recorder_Deco_type(self):
-        ads = ADSFactory.ads(audio_source=self.audio_source, record=True)
-        
-        self.assertIsInstance(ads, ADSFactory.RecorderADS,
-                              msg="wrong type for ads object, expected: 'ADSFactory.RecorderADS', found: {0}".format(type(ads)))
-         
-        
+
+        self.assertEqual(
+            total_read,
+            expected_read_bytes,
+            "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(
+                expected_read_bytes, total_read
+            ),
+        )
+
     def test_Recorder_Deco_read(self):
-        ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size=500)
-        
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source, record=True, block_size=500
+        )
+
         ads_data = []
         ads.open()
         for i in range(10):
@@ -188,44 +232,39 @@
                 break
             ads_data.append(block)
         ads.close()
-        ads_data = b''.join(ads_data)    
-                    
-        audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
+        ads_data = b"".join(ads_data)
+
+        audio_source = WaveAudioSource(
+            filename=dataset.one_to_six_arabic_16000_mono_bc_noise
+        )
         audio_source.open()
         audio_source_data = audio_source.read(500 * 10)
         audio_source.close()
-        
-        self.assertEqual(ads_data, audio_source_data, "Unexpected data read from RecorderADS")
-        
+
+        self.assertEqual(
+            ads_data,
+            audio_source_data,
+            "Unexpected data read from RecorderADS",
+        )
+
     def test_Recorder_Deco_is_rewindable(self):
         ads = ADSFactory.ads(audio_source=self.audio_source, record=True)
-        
-        self.assertTrue(ads.is_rewindable(), "RecorderADS.is_rewindable should return True")
-        
-    
-    def test_Recorder_Deco_rewind(self):
-        ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320)
-        
-        ads.open()
-        ads.read()
-        ads.rewind()
-        
-        
-        self.assertIsInstance(ads.get_audio_source(), 
-                              BufferAudioSource, "After rewind RecorderADS.get_audio_source should \
-                              be an instance of BufferAudioSource")
-        ads.close()
-        
-        
+
+        self.assertTrue(
+            ads.rewindable, "RecorderADS.is_rewindable should return True"
+        )
+
     def test_Recorder_Deco_rewind_and_read(self):
-        ads = ADSFactory.ads(audio_source=self.audio_source, record=True, block_size = 320)
-        
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source, record=True, block_size=320
+        )
+
         ads.open()
         for i in range(10):
             ads.read()
-            
+
         ads.rewind()
-        
+
         # read all available data after rewind
         ads_data = []
         while True:
@@ -234,33 +273,33 @@
                 break
             ads_data.append(block)
         ads.close()
-        ads_data = b''.join(ads_data)    
-                    
-        audio_source = WaveAudioSource(filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
+        ads_data = b"".join(ads_data)
+
+        audio_source = WaveAudioSource(
+            filename=dataset.one_to_six_arabic_16000_mono_bc_noise
+        )
         audio_source.open()
         audio_source_data = audio_source.read(320 * 10)
         audio_source.close()
-        
-        self.assertEqual(ads_data, audio_source_data, "Unexpected data read from RecorderADS")
-    
-    def test_Overlap_Deco_type(self):
-        # an OverlapADS is obtained if a valid hop_size is given
-        ads = ADSFactory.ads(audio_source=self.audio_source, block_size = 256, hop_size = 128)
-        
-        self.assertIsInstance(ads, ADSFactory.OverlapADS,
-                              msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
-         
-        
-        
-    
+
+        self.assertEqual(
+            ads_data,
+            audio_source_data,
+            "Unexpected data read from RecorderADS",
+        )
+
     def test_Overlap_Deco_read(self):
-        
+
         # Use arbitrary valid block_size and hop_size
         block_size = 1714
         hop_size = 313
-        
-        ads = ADSFactory.ads(audio_source=self.audio_source, block_size=block_size, hop_size=hop_size)
-        
+
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source,
+            block_size=block_size,
+            hop_size=hop_size,
+        )
+
         # Read all available data overlapping blocks
         ads.open()
         ads_data = []
@@ -270,49 +309,44 @@
                 break
             ads_data.append(block)
         ads.close()
-        
+
         # Read all data from file and build a BufferAudioSource
         fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
         wave_data = fp.readframes(fp.getnframes())
         fp.close()
-        audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
-                                         ads.get_sample_width(), ads.get_channels())
+        audio_source = BufferAudioSource(
+            wave_data, ads.sampling_rate, ads.sample_width, ads.channels
+        )
         audio_source.open()
-        
+
         # Compare all blocks read from OverlapADS to those read
         # from an audio source with a manual set_position
-        for i,block in enumerate(ads_data):
-            
+        for i, block in enumerate(ads_data):
+
             tmp = audio_source.read(block_size)
-            
-            self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
-            
-            audio_source.set_position((i+1) * hop_size)
-        
+
+            self.assertEqual(
+                block,
+                tmp,
+                "Unexpected block (N={0}) read from OverlapADS".format(i),
+            )
+
+            audio_source.set_position((i + 1) * hop_size)
+
         audio_source.close()
-    
-    
-            
-            
-    def test_Limiter_Overlap_Deco_type(self):
-        ads = ADSFactory.ads(audio_source=self.audio_source, max_time=1, block_size = 256, hop_size = 128)
-        
-        self.assertIsInstance(ads, ADSFactory.OverlapADS,
-                            msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
-         
-        
-        self.assertIsInstance(ads.ads, ADSFactory.LimiterADS,
-                              msg="wrong type for ads object, expected: 'ADSFactory.LimiterADS', found: {0}".format(type(ads)))
-           
-        
-        
-    def test_Limiter_Overlap_Deco_read(self):    
-        
+
+    def test_Limiter_Overlap_Deco_read(self):
+
         block_size = 256
         hop_size = 200
-        
-        ads = ADSFactory.ads(audio_source=self.audio_source, max_time=0.50, block_size=block_size, hop_size=hop_size)
-        
+
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source,
+            max_time=0.50,
+            block_size=block_size,
+            hop_size=hop_size,
+        )
+
         # Read all available data overlapping blocks
         ads.open()
         ads_data = []
@@ -322,60 +356,56 @@
                 break
             ads_data.append(block)
         ads.close()
-        
+
         # Read all data from file and build a BufferAudioSource
         fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
         wave_data = fp.readframes(fp.getnframes())
         fp.close()
-        audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
-                                         ads.get_sample_width(), ads.get_channels())
+        audio_source = BufferAudioSource(
+            wave_data, ads.sampling_rate, ads.sample_width, ads.channels
+        )
         audio_source.open()
-        
+
         # Compare all blocks read from OverlapADS to those read
         # from an audio source with a manual set_position
-        for i,block in enumerate(ads_data):            
-            tmp = audio_source.read(block_size)
-            
-            self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
-            
-            audio_source.set_position((i+1) * hop_size)
-        
+        for i, block in enumerate(ads_data):
+            tmp = audio_source.read(len(block) // (ads.sw * ads.ch))
+            self.assertEqual(
+                len(block),
+                len(tmp),
+                "Unexpected block (N={0}) read from OverlapADS".format(i),
+            )
+            audio_source.set_position((i + 1) * hop_size)
+
         audio_source.close()
-    
-        
-        
+
     def test_Limiter_Overlap_Deco_read_limit(self):
-        
+
         block_size = 313
         hop_size = 207
-        ads = ADSFactory.ads(audio_source=self.audio_source,
-                             max_time=1.932, block_size=block_size,
-                             hop_size=hop_size)
-        
-        # Limiter + Overlap decos => read N block of actual data
-        # one block of size block_size
-        # N - 1 blocks of size hop_size
-        # the total size of read data might be a slightly greater
-        # than the required size calculated from max_time
-        
-        # theoretical size to reach          
-        expected_size = int(ads.get_sampling_rate() * 1.932) * \
-                       ads.get_sample_width() * ads.get_channels()
-        
-        # minus block_size
-        expected_size -= (block_size * ads.get_sample_width() * ads.get_channels())
-        
-        # how much data are required to get N - 1 blocks of size hop_size
-        hop_size_bytes = hop_size * ads.get_sample_width() * ads.get_channels()
-        r = expected_size % hop_size_bytes
-        if r > 0:
-            expected_size += hop_size_bytes - r
-        
-        expected_size += block_size * ads.get_sample_width() * ads.get_channels()
-        
-        cache_size = (block_size - hop_size) * ads.get_sample_width() * ads.get_channels()
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source,
+            max_time=1.932,
+            block_size=block_size,
+            hop_size=hop_size,
+        )
+
+        total_samples = round(ads.sampling_rate * 1.932)
+        first_read_size = block_size
+        next_read_size = block_size - hop_size
+        nb_next_blocks, last_block_size = divmod(
+            (total_samples - first_read_size), next_read_size
+        )
+        total_samples_with_overlap = (
+            first_read_size + next_read_size * nb_next_blocks + last_block_size
+        )
+        expected_read_bytes = (
+            total_samples_with_overlap * ads.sw * ads.channels
+        )
+
+        cache_size = (block_size - hop_size) * ads.sample_width * ads.channels
         total_read = cache_size
-        
+
         ads.open()
         i = 0
         while True:
@@ -384,37 +414,40 @@
                 break
             i += 1
             total_read += len(block) - cache_size
-        
+
         ads.close()
-        self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
-        
-        
-        
-    def test_Recorder_Overlap_Deco_type(self):
-        ads = ADSFactory.ads(audio_source=self.audio_source, block_size=256, hop_size=128, record=True)
-        
-        self.assertIsInstance(ads, ADSFactory.OverlapADS,
-                            msg="wrong type for ads object, expected: 'ADSFactory.OverlapADS', found: {0}".format(type(ads)))
-         
-        
-        self.assertIsInstance(ads.ads, ADSFactory.RecorderADS,
-                              msg="wrong type for ads object, expected: 'ADSFactory.RecorderADS', found: {0}".format(type(ads)))
-               
-    
-        
+        self.assertEqual(
+            total_read,
+            expected_read_bytes,
+            "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(
+                expected_read_bytes, total_read
+            ),
+        )
+
     def test_Recorder_Overlap_Deco_is_rewindable(self):
-        ads = ADSFactory.ads(audio_source=self.audio_source, block_size=320, hop_size=160, record=True)
-        self.assertTrue(ads.is_rewindable(), "RecorderADS.is_rewindable should return True")
-        
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source,
+            block_size=320,
+            hop_size=160,
+            record=True,
+        )
+        self.assertTrue(
+            ads.rewindable, "RecorderADS.is_rewindable should return True"
+        )
 
     def test_Recorder_Overlap_Deco_rewind_and_read(self):
-        
+
         # Use arbitrary valid block_size and hop_size
         block_size = 1600
         hop_size = 400
-        
-        ads = ADSFactory.ads(audio_source=self.audio_source, block_size=block_size, hop_size=hop_size, record=True)
-        
+
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source,
+            block_size=block_size,
+            hop_size=hop_size,
+            record=True,
+        )
+
         # Read all available data overlapping blocks
         ads.open()
         i = 0
@@ -423,38 +456,48 @@
             if block is None:
                 break
             i += 1
-        
+
         ads.rewind()
-        
+
         # Read all data from file and build a BufferAudioSource
         fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
         wave_data = fp.readframes(fp.getnframes())
         fp.close()
-        audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
-                                         ads.get_sample_width(), ads.get_channels())
+        audio_source = BufferAudioSource(
+            wave_data, ads.sampling_rate, ads.sample_width, ads.channels
+        )
         audio_source.open()
-        
+
         # Compare all blocks read from OverlapADS to those read
         # from an audio source with a manual set_position
         for j in range(i):
-            
+
             tmp = audio_source.read(block_size)
-            
-            self.assertEqual(ads.read(), tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
-            audio_source.set_position((j+1) * hop_size)
-        
+
+            self.assertEqual(
+                ads.read(),
+                tmp,
+                "Unexpected block (N={0}) read from OverlapADS".format(i),
+            )
+            audio_source.set_position((j + 1) * hop_size)
+
         ads.close()
         audio_source.close()
-    
-    
+
     def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):
-        
+
         # Use arbitrary valid block_size and hop_size
         block_size = 1600
         hop_size = 400
-        
-        ads = ADSFactory.ads(audio_source=self.audio_source, max_time = 1.50, block_size=block_size, hop_size=hop_size, record=True)
-        
+
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source,
+            max_time=1.50,
+            block_size=block_size,
+            hop_size=hop_size,
+            record=True,
+        )
+
         # Read all available data overlapping blocks
         ads.open()
         i = 0
@@ -463,62 +506,63 @@
             if block is None:
                 break
             i += 1
-        
+
         ads.rewind()
-        
+
         # Read all data from file and build a BufferAudioSource
         fp = wave.open(dataset.one_to_six_arabic_16000_mono_bc_noise, "r")
         wave_data = fp.readframes(fp.getnframes())
         fp.close()
-        audio_source = BufferAudioSource(wave_data, ads.get_sampling_rate(),
-                                         ads.get_sample_width(), ads.get_channels())
+        audio_source = BufferAudioSource(
+            wave_data, ads.sampling_rate, ads.sample_width, ads.channels
+        )
         audio_source.open()
-        
+
         # Compare all blocks read from OverlapADS to those read
         # from an audio source with a manual set_position
         for j in range(i):
-            
+
             tmp = audio_source.read(block_size)
-            
-            self.assertEqual(ads.read(), tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
-            audio_source.set_position((j+1) * hop_size)
-        
+
+            self.assertEqual(
+                ads.read(),
+                tmp,
+                "Unexpected block (N={0}) read from OverlapADS".format(i),
+            )
+            audio_source.set_position((j + 1) * hop_size)
+
         ads.close()
         audio_source.close()
-    
-    
+
     def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_limit(self):
-        
+
         # Use arbitrary valid block_size and hop_size
         block_size = 1000
         hop_size = 200
-        
-        ads = ADSFactory.ads(audio_source=self.audio_source, max_time = 1.317, block_size=block_size, hop_size=hop_size, record=True)
-        
-        # Limiter + Overlap decos => read N block of actual data
-        # one block of size block_size
-        # N - 1 blocks of size hop_size
-        # the total size of read data might be a slightly greater
-        # than the required size calculated from max_time
-        
-        # theoretical size to reach          
-        expected_size = int(ads.get_sampling_rate() * 1.317) * \
-                       ads.get_sample_width() * ads.get_channels()
-        
-        # minus block_size
-        expected_size -= (block_size * ads.get_sample_width() * ads.get_channels())
-        
-        # how much data are required to get N - 1 blocks of size hop_size
-        hop_size_bytes = hop_size * ads.get_sample_width() * ads.get_channels()
-        r = expected_size % hop_size_bytes
-        if r > 0:
-            expected_size += hop_size_bytes - r
-        
-        expected_size += block_size * ads.get_sample_width() * ads.get_channels()
-        
-        cache_size = (block_size - hop_size) * ads.get_sample_width() * ads.get_channels()
+
+        ads = ADSFactory.ads(
+            audio_source=self.audio_source,
+            max_time=1.317,
+            block_size=block_size,
+            hop_size=hop_size,
+            record=True,
+        )
+        total_samples = round(ads.sampling_rate * 1.317)
+        first_read_size = block_size
+        next_read_size = block_size - hop_size
+        nb_next_blocks, last_block_size = divmod(
+            (total_samples - first_read_size), next_read_size
+        )
+        total_samples_with_overlap = (
+            first_read_size + next_read_size * nb_next_blocks + last_block_size
+        )
+        expected_read_bytes = (
+            total_samples_with_overlap * ads.sw * ads.channels
+        )
+
+        cache_size = (block_size - hop_size) * ads.sample_width * ads.channels
         total_read = cache_size
-        
+
         ads.open()
         i = 0
         while True:
@@ -527,46 +571,70 @@
                 break
             i += 1
             total_read += len(block) - cache_size
-        
+
         ads.close()
-        self.assertEqual(total_read, expected_size, "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(expected_size, total_read))
-        
+        self.assertEqual(
+            total_read,
+            expected_read_bytes,
+            "Wrong data length read from LimiterADS, expected: {0}, found: {1}".format(
+                expected_read_bytes, total_read
+            ),
+        )
+
+
 class TestADSFactoryBufferAudioSource(unittest.TestCase):
-    
     def setUp(self):
-        self.signal = "ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
-        self.ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1)
-        
-    def test_ADS_BAS_type(self):
-        self.assertIsInstance(self.ads.get_audio_source(), 
-                              BufferAudioSource, "ads should \
-                              be an instance of BufferAudioSource")
-    
+        self.signal = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
+        self.ads = ADSFactory.ads(
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+        )
+
     def test_ADS_BAS_sampling_rate(self):
-        srate = self.ads.get_sampling_rate()
-        self.assertEqual(srate, 16, "Wrong sampling rate, expected: 16000, found: {0}".format(srate))
-      
-        
+        srate = self.ads.sampling_rate
+        self.assertEqual(
+            srate,
+            16,
+            "Wrong sampling rate, expected: 16000, found: {0}".format(srate),
+        )
+
     def test_ADS_BAS_get_sample_width(self):
-        swidth = self.ads.get_sample_width()
-        self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth))
-    
+        swidth = self.ads.sample_width
+        self.assertEqual(
+            swidth,
+            2,
+            "Wrong sample width, expected: 2, found: {0}".format(swidth),
+        )
+
     def test_ADS_BAS_get_channels(self):
-        channels = self.ads.get_channels()
-        self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels))
-        
+        channels = self.ads.channels
+        self.assertEqual(
+            channels,
+            1,
+            "Wrong number of channels, expected: 1, found: {0}".format(
+                channels
+            ),
+        )
+
     def test_Limiter_Recorder_Overlap_Deco_rewind_and_read(self):
-        
+
         # Use arbitrary valid block_size and hop_size
         block_size = 5
         hop_size = 4
-        
-        ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1, max_time = 0.80,
-                             block_size=block_size, hop_size=hop_size,
-                             record=True)
-        
+
+        ads = ADSFactory.ads(
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            max_time=0.80,
+            block_size=block_size,
+            hop_size=hop_size,
+            record=True,
+        )
+
         # Read all available data overlapping blocks
         ads.open()
         i = 0
@@ -575,181 +643,330 @@
             if block is None:
                 break
             i += 1
-        
+
         ads.rewind()
-        
+
         # Build a BufferAudioSource
-        audio_source = BufferAudioSource(self.signal, ads.get_sampling_rate(),
-                        ads.get_sample_width(), ads.get_channels())
+        audio_source = BufferAudioSource(
+            self.signal, ads.sampling_rate, ads.sample_width, ads.channels
+        )
         audio_source.open()
-        
+
         # Compare all blocks read from OverlapADS to those read
         # from an audio source with a manual set_position
         for j in range(i):
-            
+
             tmp = audio_source.read(block_size)
-            
+
             block = ads.read()
-            
-            self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
-            audio_source.set_position((j+1) * hop_size)
-        
+
+            self.assertEqual(
+                block,
+                tmp,
+                "Unexpected block '{}' (N={}) read from OverlapADS".format(
+                    block, i
+                ),
+            )
+            audio_source.set_position((j + 1) * hop_size)
+
         ads.close()
         audio_source.close()
 
 
 class TestADSFactoryAlias(unittest.TestCase):
-    
     def setUp(self):
         self.signal = "ABCDEFGHIJKLMNOPQRSTUVWXYZ012345"
-    
+
     def test_sampling_rate_alias(self):
-        ads = ADSFactory.ads(data_buffer=self.signal, sr=16,
-                             sample_width=2, channels=1)
-        srate = ads.get_sampling_rate()
-        self.assertEqual(srate, 16, "Wrong sampling rate, expected: 16000, found: {0}".format(srate))
-    
+        ads = ADSFactory.ads(
+            data_buffer=self.signal, sr=16, sample_width=2, channels=1
+        )
+        srate = ads.sampling_rate
+        self.assertEqual(
+            srate,
+            16,
+            "Wrong sampling rate, expected: 16000, found: {0}".format(srate),
+        )
+
     def test_sampling_rate_duplicate(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal, sr=16, sampling_rate=16,
-                             sample_width=2, channels=1)
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sr=16,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+        )
         self.assertRaises(DuplicateArgument, func)
-    
+
     def test_sample_width_alias(self):
-        ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
-                             sw=2, channels=1)
-        swidth = ads.get_sample_width()
-        self.assertEqual(swidth, 2, "Wrong sample width, expected: 2, found: {0}".format(swidth))
-    
+        ads = ADSFactory.ads(
+            data_buffer=self.signal, sampling_rate=16, sw=2, channels=1
+        )
+        swidth = ads.sample_width
+        self.assertEqual(
+            swidth,
+            2,
+            "Wrong sample width, expected: 2, found: {0}".format(swidth),
+        )
+
     def test_sample_width_duplicate(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
-                             sw=2, sample_width=2, channels=1)
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sw=2,
+            sample_width=2,
+            channels=1,
+        )
         self.assertRaises(DuplicateArgument, func)
-        
+
     def test_channels_alias(self):
-        ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, ch=1)
-        channels = ads.get_channels()
-        self.assertEqual(channels, 1, "Wrong number of channels, expected: 1, found: {0}".format(channels))
-    
+        ads = ADSFactory.ads(
+            data_buffer=self.signal, sampling_rate=16, sample_width=2, ch=1
+        )
+        channels = ads.channels
+        self.assertEqual(
+            channels,
+            1,
+            "Wrong number of channels, expected: 1, found: {0}".format(
+                channels
+            ),
+        )
+
     def test_channels_duplicate(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
-                             sample_width=2, ch=1, channels=1)
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            ch=1,
+            channels=1,
+        )
         self.assertRaises(DuplicateArgument, func)
-    
-    
+
     def test_block_size_alias(self):
-        ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1, bs=8)
-        size = ads.get_block_size()
-        self.assertEqual(size, 8, "Wrong block_size using bs alias, expected: 8, found: {0}".format(size))
-        
+        ads = ADSFactory.ads(
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            bs=8,
+        )
+        size = ads.block_size
+        self.assertEqual(
+            size,
+            8,
+            "Wrong block_size using bs alias, expected: 8, found: {0}".format(
+                size
+            ),
+        )
+
     def test_block_size_duplicate(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
-                             sample_width=2, channels=1, bs=4, block_size=4)
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            bs=4,
+            block_size=4,
+        )
         self.assertRaises(DuplicateArgument, func)
-    
+
     def test_block_duration_alias(self):
-        ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1, bd=0.75)
+        ads = ADSFactory.ads(
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            bd=0.75,
+        )
         # 0.75 ms = 0.75 * 16 = 12
-        size = ads.get_block_size()
-        self.assertEqual(size, 12, "Wrong block_size set with a block_dur alias 'bd', expected: 8, found: {0}".format(size))
-        
+        size = ads.block_size
+        self.assertEqual(
+            size,
+            12,
+            "Wrong block_size set with a block_dur alias 'bd', expected: 8, found: {0}".format(
+                size
+            ),
+        )
+
     def test_block_duration_duplicate(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
-                             sample_width=2, channels=1, bd=4, block_dur=4)
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            bd=4,
+            block_dur=4,
+        )
         self.assertRaises(DuplicateArgument, func)
-        
+
     def test_block_size_duration_duplicate(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
-                             sample_width=2, channels=1, bd=4, bs=12)
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            bd=4,
+            bs=12,
+        )
         self.assertRaises(DuplicateArgument, func)
-        
+
     def test_hop_duration_alias(self):
-        
-        ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1, bd=0.75, hd=0.5 )
+
+        ads = ADSFactory.ads(
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            bd=0.75,
+            hd=0.5,
+        )
         size = ads.hop_size
-        self.assertEqual(size, 8, "Wrong block_size using bs alias, expected: 8, found: {0}".format(size))
-        self.assertIsInstance(ads, ADSFactory.OverlapADS, "ads expected to an ADSFactory.OverlapADS object")
-    
-    
+        self.assertEqual(
+            size,
+            8,
+            "Wrong block_size using bs alias, expected: 8, found: {0}".format(
+                size
+            ),
+        )
+
     def test_hop_duration_duplicate(self):
-        
-        func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1, bd=0.75, hd=0.5, hop_dur=0.5)
+
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            bd=0.75,
+            hd=0.5,
+            hop_dur=0.5,
+        )
         self.assertRaises(DuplicateArgument, func)
-    
-    
+
     def test_hop_size_duration_duplicate(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal,sampling_rate=16,
-                             sample_width=2, channels=1, bs=8, hs=4, hd=1)
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            bs=8,
+            hs=4,
+            hd=1,
+        )
         self.assertRaises(DuplicateArgument, func)
-    
-    
+
     def test_hop_size_greater_than_block_size(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1, bs=4, hs=8)
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            bs=4,
+            hs=8,
+        )
         self.assertRaises(ValueError, func)
-        
-    
+
     def test_filename_alias(self):
         ads = ADSFactory.ads(fn=dataset.one_to_six_arabic_16000_mono_bc_noise)
-    
-    
+
     def test_filename_duplicate(self):
-        
-        func = partial(ADSFactory.ads, fn=dataset.one_to_six_arabic_16000_mono_bc_noise, filename=dataset.one_to_six_arabic_16000_mono_bc_noise)
+
+        func = partial(
+            ADSFactory.ads,
+            fn=dataset.one_to_six_arabic_16000_mono_bc_noise,
+            filename=dataset.one_to_six_arabic_16000_mono_bc_noise,
+        )
         self.assertRaises(DuplicateArgument, func)
-    
-    
-    def test_data_buffer_alias(self):
-        ads = ADSFactory.ads(db=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1)
-        self.assertEqual(ads.get_audio_source().get_data_buffer(), self.signal, "Wrong value for data buffer")
-    
-    
+
     def test_data_buffer_duplicate(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal, db=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1)
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            db=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+        )
         self.assertRaises(DuplicateArgument, func)
-        
-    
+
     def test_max_time_alias(self):
-        ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1, mt=10)
-        self.assertIsInstance(ads, ADSFactory.LimiterADS, "ads expected to an ADSFactory.LimiterADS object")
-    
-    
+        ads = ADSFactory.ads(
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            mt=10,
+        )
+        self.assertEqual(
+            ads.max_read,
+            10,
+            "Wrong AudioDataSource.max_read, expected: 10, found: {}".format(
+                ads.max_read
+            ),
+        )
+
     def test_max_time_duplicate(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1, mt=True, max_time=True)
-        
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            mt=True,
+            max_time=True,
+        )
+
         self.assertRaises(DuplicateArgument, func)
-    
+
     def test_record_alias(self):
-        ads = ADSFactory.ads(data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1, rec=True)
-        self.assertIsInstance(ads, ADSFactory.RecorderADS, "ads expected to an ADSFactory.RecorderADS object")
-    
-    
+        ads = ADSFactory.ads(
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            rec=True,
+        )
+        self.assertTrue(
+            ads.rewindable, "AudioDataSource.rewindable expected to be True"
+        )
+
     def test_record_duplicate(self):
-        func = partial(ADSFactory.ads, data_buffer=self.signal, sampling_rate=16,
-                             sample_width=2, channels=1, rec=True, record=True)
+        func = partial(
+            ADSFactory.ads,
+            data_buffer=self.signal,
+            sampling_rate=16,
+            sample_width=2,
+            channels=1,
+            rec=True,
+            record=True,
+        )
         self.assertRaises(DuplicateArgument, func)
-      
-      
+
     def test_Limiter_Recorder_Overlap_Deco_rewind_and_read_alias(self):
-        
+
         # Use arbitrary valid block_size and hop_size
         block_size = 5
         hop_size = 4
-        
-        ads = ADSFactory.ads(db=self.signal, sr=16,
-                             sw=2, ch=1, mt = 0.80,
-                             bs=block_size, hs=hop_size,
-                             rec=True)
-        
+
+        ads = ADSFactory.ads(
+            db=self.signal,
+            sr=16,
+            sw=2,
+            ch=1,
+            mt=0.80,
+            bs=block_size,
+            hs=hop_size,
+            rec=True,
+        )
+
         # Read all available data overlapping blocks
         ads.open()
         i = 0
@@ -758,29 +975,30 @@
             if block is None:
                 break
             i += 1
-        
+
         ads.rewind()
-        
+
         # Build a BufferAudioSource
-        audio_source = BufferAudioSource(self.signal, ads.get_sampling_rate(),
-                        ads.get_sample_width(), ads.get_channels())
+        audio_source = BufferAudioSource(
+            self.signal, ads.sampling_rate, ads.sample_width, ads.channels
+        )
         audio_source.open()
-        
-        # Compare all blocks read from OverlapADS to those read
-        # from an audio source with a manual set_position
+
+        # Compare all blocks read from AudioDataSource to those read
+        # from an audio source with manual position definition
         for j in range(i):
-            
             tmp = audio_source.read(block_size)
-            
             block = ads.read()
-            
-            self.assertEqual(block, tmp, "Unexpected block (N={0}) read from OverlapADS".format(i))
-            audio_source.set_position((j+1) * hop_size)
-        
+            self.assertEqual(
+                block,
+                tmp,
+                "Unexpected block (N={0}) read from OverlapADS".format(i),
+            )
+            audio_source.set_position((j + 1) * hop_size)
         ads.close()
         audio_source.close()
-    
+
 
 if __name__ == "__main__":
-    #import sys;sys.argv = ['', 'Test.testName']
+    # import sys;sys.argv = ['', 'Test.testName']
     unittest.main()