amine@337: import unittest amine@317: from unittest import TestCase amine@357: from unittest.mock import patch amine@157: import math amine@357: from array import array as array_ amine@317: from genty import genty, genty_dataset amine@357: from auditok.util import ( amine@357: AudioEnergyValidator, amine@357: make_duration_formatter, amine@357: make_channel_selector, amine@357: ) amine@357: from auditok import signal as signal_ amine@357: from auditok import signal_numpy amine@357: amine@345: from auditok.exceptions import TimeFormatError amine@157: amine@157: amine@157: def _sample_generator(*data_buffers): amine@157: """ amine@157: Takes a list of many mono audio data buffers and makes a sample generator amine@157: of interleaved audio samples, one sample from each channel. The resulting amine@157: generator can be used to build a multichannel audio buffer. amine@157: >>> gen = _sample_generator("abcd", "ABCD") amine@157: >>> list(gen) amine@357: ["a", "A", 1, 1, "c", "C", "d", "D"] amine@157: """ amine@157: frame_gen = zip(*data_buffers) amine@157: return (sample for frame in frame_gen for sample in frame) amine@157: amine@157: amine@157: def _generate_pure_tone( amine@157: frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4 amine@157: ): amine@157: """ amine@157: Generates a pure tone with the given frequency. amine@157: """ amine@157: assert frequency <= sampling_rate / 2 amine@157: max_value = (2 ** (sample_width * 8) // 2) - 1 amine@157: if volume > max_value: amine@157: volume = max_value amine@357: fmt = signal_.FORMAT[sample_width] amine@157: total_samples = int(sampling_rate * duration_sec) amine@157: step = frequency / sampling_rate amine@157: two_pi_step = 2 * math.pi * step amine@357: data = array_( amine@157: fmt, amine@157: ( amine@157: int(math.sin(two_pi_step * i) * volume) amine@157: for i in range(total_samples) amine@157: ), amine@157: ) amine@157: return data amine@157: amine@157: amine@157: PURE_TONE_DICT = { amine@157: freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600) amine@157: } amine@157: PURE_TONE_DICT.update( amine@157: { amine@157: freq: _generate_pure_tone(freq, 0.1, 16000, 2) amine@157: for freq in (600, 1150, 2400, 7220) amine@157: } amine@157: ) amine@317: amine@317: amine@317: @genty amine@345: class TestFunctions(TestCase): amine@357: def setUp(self): amine@357: self.data = b"012345679ABC" amine@357: amine@345: @genty_dataset( amine@345: only_seconds=("%S", 5400, "5400.000"), amine@345: only_millis=("%I", 5400, "5400000"), amine@345: full=("%h:%m:%s.%i", 3725.365, "01:02:05.365"), amine@345: full_zero_hours=("%h:%m:%s.%i", 1925.075, "00:32:05.075"), amine@345: full_zero_minutes=("%h:%m:%s.%i", 3659.075, "01:00:59.075"), amine@345: full_zero_seconds=("%h:%m:%s.%i", 3720.075, "01:02:00.075"), amine@345: full_zero_millis=("%h:%m:%s.%i", 3725, "01:02:05.000"), amine@345: duplicate_directive=( amine@345: "%h %h:%m:%s.%i %s", amine@345: 3725.365, amine@345: "01 01:02:05.365 05", amine@345: ), amine@345: no_millis=("%h:%m:%s", 3725, "01:02:05"), amine@345: no_seconds=("%h:%m", 3725, "01:02"), amine@345: no_minutes=("%h", 3725, "01"), amine@345: no_hours=("%m:%s.%i", 3725, "02:05.000"), amine@345: ) amine@345: def test_make_duration_formatter(self, fmt, duration, expected): amine@345: formatter = make_duration_formatter(fmt) amine@345: result = formatter(duration) amine@345: self.assertEqual(result, expected) amine@345: amine@345: @genty_dataset( amine@345: duplicate_only_seconds=("%S %S",), amine@345: duplicate_only_millis=("%I %I",), amine@345: unknown_directive=("%x",), amine@345: ) amine@345: def test_make_duration_formatter_error(self, fmt): amine@345: with self.assertRaises(TimeFormatError): amine@345: make_duration_formatter(fmt) amine@345: amine@357: @genty_dataset( amine@357: int8_1channel_select_0=( amine@357: 1, amine@357: 1, amine@357: 0, amine@357: [48, 49, 50, 51, 52, 53, 54, 55, 57, 65, 66, 67], amine@357: ), amine@357: int8_2channel_select_0=(1, 2, 0, [48, 50, 52, 54, 57, 66]), amine@357: int8_3channel_select_0=(1, 3, 0, [48, 51, 54, 65]), amine@357: int8_3channel_select_1=(1, 3, 1, [49, 52, 55, 66]), amine@357: int8_3channel_select_2=(1, 3, 2, [50, 53, 57, 67]), amine@357: int8_4channel_select_0=(1, 4, 0, [48, 52, 57]), amine@357: int16_1channel_select_0=( amine@357: 2, amine@357: 1, amine@357: 0, amine@357: [12592, 13106, 13620, 14134, 16697, 17218], amine@357: ), amine@357: int16_2channel_select_0=(2, 2, 0, [12592, 13620, 16697]), amine@357: int16_2channel_select_1=(2, 2, 1, [13106, 14134, 17218]), amine@357: int16_3channel_select_0=(2, 3, 0, [12592, 14134]), amine@357: int16_3channel_select_1=(2, 3, 1, [13106, 16697]), amine@357: int16_3channel_select_2=(2, 3, 2, [13620, 17218]), amine@357: int32_1channel_select_0=(4, 1, 0, [858927408, 926299444, 1128415545],), amine@357: int32_3channel_select_0=(4, 3, 0, [858927408]), amine@357: int32_3channel_select_1=(4, 3, 1, [926299444]), amine@357: int32_3channel_select_2=(4, 3, 2, [1128415545]), amine@357: ) amine@357: def test_make_channel_selector_one_channel( amine@357: self, sample_width, channels, selected, expected amine@357: ): amine@357: amine@357: # force using signal functions with standard python implementation amine@357: with patch("auditok.util.signal", signal_): amine@357: selector = make_channel_selector(sample_width, channels, selected) amine@357: result = selector(self.data) amine@357: amine@357: fmt = signal_.FORMAT[sample_width] amine@357: expected = array_(fmt, expected) amine@357: if channels == 1: amine@357: expected = bytes(expected) amine@357: self.assertEqual(result, expected) amine@357: amine@357: # Use signal functions with numpy implementation amine@357: with patch("auditok.util.signal", signal_numpy): amine@357: selector = make_channel_selector(sample_width, channels, selected) amine@360: result_numpy = selector(self.data) amine@357: amine@357: expected = array_(fmt, expected) amine@357: if channels == 1: amine@357: expected = bytes(expected) amine@360: self.assertEqual(result_numpy, expected) amine@357: else: amine@360: self.assertTrue(all(result_numpy == expected)) amine@360: amine@360: @genty_dataset( amine@360: int8_2channel=(1, 2, "avg", [48, 50, 52, 54, 61, 66]), amine@360: int8_4channel=(1, 4, "average", [50, 54, 64]), amine@360: int16_1channel=( amine@360: 2, amine@360: 1, amine@360: "mix", amine@360: [12592, 13106, 13620, 14134, 16697, 17218], amine@360: ), amine@360: int16_2channel=(2, 2, "avg", [12849, 13877, 16957]), amine@360: int32_3channel=(4, 3, "average", [971214132]), amine@360: ) amine@360: def test_make_channel_selector_average( amine@360: self, sample_width, channels, selected, expected amine@360: ): amine@360: # force using signal functions with standard python implementation amine@360: with patch("auditok.util.signal", signal_): amine@360: selector = make_channel_selector(sample_width, channels, selected) amine@360: result = selector(self.data) amine@360: amine@360: fmt = signal_.FORMAT[sample_width] amine@360: expected = array_(fmt, expected) amine@360: if channels == 1: amine@360: expected = bytes(expected) amine@360: self.assertEqual(result, expected) amine@360: amine@360: # Use signal functions with numpy implementation amine@360: with patch("auditok.util.signal", signal_numpy): amine@360: selector = make_channel_selector(sample_width, channels, selected) amine@360: result_numpy = selector(self.data) amine@360: amine@360: if channels in (1, 2): amine@360: self.assertEqual(result_numpy, expected) amine@360: else: amine@360: self.assertTrue(all(result_numpy == expected)) amine@360: amine@360: # def test_make_channel_selector_any( amine@360: # self, sample_width, channels, selected, expected amine@360: # ): amine@360: # pass amine@357: amine@345: amine@345: @genty amine@317: class TestAudioEnergyValidator(TestCase): amine@317: @genty_dataset( amine@317: mono_valid_uc_None=([350, 400], 1, None, True), amine@317: mono_valid_uc_any=([350, 400], 1, "any", True), amine@317: mono_valid_uc_0=([350, 400], 1, 0, True), amine@317: mono_valid_uc_mix=([350, 400], 1, "mix", True), amine@317: # previous cases are all the same since we have mono audio amine@317: mono_invalid_uc_None=([300, 300], 1, None, False), amine@317: stereo_valid_uc_None=([300, 400, 350, 300], 2, None, True), amine@317: stereo_valid_uc_any=([300, 400, 350, 300], 2, "any", True), amine@317: stereo_valid_uc_mix=([300, 400, 350, 300], 2, "mix", True), amine@317: stereo_valid_uc_avg=([300, 400, 350, 300], 2, "avg", True), amine@317: stereo_valid_uc_average=([300, 400, 300, 300], 2, "average", True), amine@317: stereo_valid_uc_mix_with_null_channel=( amine@317: [634, 0, 634, 0], amine@317: 2, amine@317: "mix", amine@317: True, amine@317: ), amine@317: stereo_valid_uc_0=([320, 100, 320, 100], 2, 0, True), amine@317: stereo_valid_uc_1=([100, 320, 100, 320], 2, 1, True), amine@317: stereo_invalid_uc_None=([280, 100, 280, 100], 2, None, False), amine@317: stereo_invalid_uc_any=([280, 100, 280, 100], 2, "any", False), amine@317: stereo_invalid_uc_mix=([400, 200, 400, 200], 2, "mix", False), amine@317: stereo_invalid_uc_0=([300, 400, 300, 400], 2, 0, False), amine@317: stereo_invalid_uc_1=([400, 300, 400, 300], 2, 1, False), amine@317: zeros=([0, 0, 0, 0], 2, None, False), amine@317: ) amine@317: def test_audio_energy_validator( amine@317: self, data, channels, use_channel, expected amine@317: ): amine@317: amine@357: data = array_("h", data) amine@317: sample_width = 2 amine@317: energy_threshold = 50 amine@317: validator = AudioEnergyValidator( amine@317: energy_threshold, sample_width, channels, use_channel amine@317: ) amine@317: amine@317: if expected: amine@317: self.assertTrue(validator.is_valid(data)) amine@317: else: amine@317: self.assertFalse(validator.is_valid(data)) amine@337: amine@337: amine@337: if __name__ == "__main__": amine@337: unittest.main()