amine@106
|
1 import os
|
amine@106
|
2 import sys
|
amine@106
|
3 import math
|
amine@107
|
4 from array import array
|
amine@110
|
5 from tempfile import NamedTemporaryFile
|
amine@110
|
6 import filecmp
|
amine@108
|
7 from unittest import TestCase
|
amine@108
|
8 from genty import genty, genty_dataset
|
amine@110
|
9 from auditok.io import (
|
amine@110
|
10 DATA_FORMAT,
|
amine@110
|
11 AudioParameterError,
|
amine@110
|
12 check_audio_data,
|
amine@116
|
13 _array_to_bytes,
|
amine@118
|
14 _mix_audio_channels,
|
amine@111
|
15 _save_raw,
|
amine@110
|
16 _save_wave,
|
amine@110
|
17 )
|
amine@106
|
18
|
amine@106
|
19
|
amine@106
|
20 if sys.version_info >= (3, 0):
|
amine@106
|
21 PYTHON_3 = True
|
amine@106
|
22 else:
|
amine@106
|
23 PYTHON_3 = False
|
amine@106
|
24
|
amine@106
|
25
|
amine@106
|
26 def _sample_generator(*data_buffers):
|
amine@106
|
27 """
|
amine@106
|
28 Takes a list of many mono audio data buffers and makes a sample generator
|
amine@106
|
29 of interleaved audio samples, one sample from each channel. The resulting
|
amine@106
|
30 generator can be used to build a multichannel audio buffer.
|
amine@106
|
31 >>> gen = _sample_generator("abcd", "ABCD")
|
amine@106
|
32 >>> list(gen)
|
amine@106
|
33 ["a", "A", "b", "B", "c", "C", "d", "D"]
|
amine@106
|
34 """
|
amine@106
|
35 frame_gen = zip(*data_buffers)
|
amine@106
|
36 return (sample for frame in frame_gen for sample in frame)
|
amine@106
|
37
|
amine@106
|
38
|
amine@107
|
39 def _generate_pure_tone(
|
amine@107
|
40 frequency, duration_sec=1, sampling_rate=16000, sample_width=2, volume=1e4
|
amine@107
|
41 ):
|
amine@107
|
42 """
|
amine@107
|
43 Generates a pure tone with the given frequency.
|
amine@107
|
44 """
|
amine@107
|
45 assert frequency <= sampling_rate / 2
|
amine@107
|
46 max_value = (2 ** (sample_width * 8) // 2) - 1
|
amine@107
|
47 if volume > max_value:
|
amine@107
|
48 volume = max_value
|
amine@107
|
49 fmt = DATA_FORMAT[sample_width]
|
amine@107
|
50 total_samples = int(sampling_rate * duration_sec)
|
amine@107
|
51 step = frequency / sampling_rate
|
amine@107
|
52 two_pi_step = 2 * math.pi * step
|
amine@107
|
53 data = array(
|
amine@107
|
54 fmt,
|
amine@107
|
55 (
|
amine@107
|
56 int(math.sin(two_pi_step * i) * volume)
|
amine@107
|
57 for i in range(total_samples)
|
amine@107
|
58 ),
|
amine@107
|
59 )
|
amine@107
|
60 return data
|
amine@107
|
61
|
amine@107
|
62
|
amine@107
|
63 PURE_TONE_DICT = {
|
amine@107
|
64 freq: _generate_pure_tone(freq, 1, 16000, 2) for freq in (400, 800, 1600)
|
amine@107
|
65 }
|
amine@107
|
66 PURE_TONE_DICT.update(
|
amine@107
|
67 {
|
amine@107
|
68 freq: _generate_pure_tone(freq, 0.1, 16000, 2)
|
amine@107
|
69 for freq in (600, 1150, 2400, 7220)
|
amine@107
|
70 }
|
amine@107
|
71 )
|
amine@108
|
72
|
amine@108
|
73
|
amine@108
|
74 @genty
|
amine@108
|
75 class TestIO(TestCase):
|
amine@108
|
76 @genty_dataset(
|
amine@108
|
77 valid_mono=(b"\0" * 113, 1, 1),
|
amine@108
|
78 valid_stereo=(b"\0" * 160, 1, 2),
|
amine@108
|
79 invalid_mono_sw_2=(b"\0" * 113, 2, 1, False),
|
amine@108
|
80 invalid_stereo_sw_1=(b"\0" * 113, 1, 2, False),
|
amine@108
|
81 invalid_stereo_sw_2=(b"\0" * 158, 2, 2, False),
|
amine@108
|
82 )
|
amine@108
|
83 def test_check_audio_data(self, data, sample_width, channels, valid=True):
|
amine@108
|
84
|
amine@108
|
85 if not valid:
|
amine@108
|
86 with self.assertRaises(AudioParameterError):
|
amine@108
|
87 check_audio_data(data, sample_width, channels)
|
amine@108
|
88 else:
|
amine@108
|
89 self.assertIsNone(check_audio_data(data, sample_width, channels))
|
amine@110
|
90
|
amine@110
|
91 @genty_dataset(
|
amine@118
|
92 mono_1byte=([400], 1),
|
amine@118
|
93 stereo_1byte=([400, 600], 1),
|
amine@118
|
94 three_channel_1byte=([400, 600, 2400], 1),
|
amine@118
|
95 mono_2byte=([400], 2),
|
amine@118
|
96 stereo_2byte=([400, 600], 2),
|
amine@118
|
97 three_channel_2byte=([400, 600, 1150], 2),
|
amine@118
|
98 mono_4byte=([400], 4),
|
amine@118
|
99 stereo_4byte=([400, 600], 4),
|
amine@118
|
100 four_channel_2byte=([400, 600, 1150, 7220], 4),
|
amine@118
|
101 )
|
amine@118
|
102 def test_mix_audio_channels(self, frequencies, sample_width):
|
amine@118
|
103 sampling_rate = 16000
|
amine@118
|
104 sample_width = 2
|
amine@118
|
105 channels = len(frequencies)
|
amine@118
|
106 mono_channels = [
|
amine@118
|
107 _generate_pure_tone(
|
amine@118
|
108 freq,
|
amine@118
|
109 duration_sec=0.1,
|
amine@118
|
110 sampling_rate=sampling_rate,
|
amine@118
|
111 sample_width=sample_width,
|
amine@118
|
112 )
|
amine@118
|
113 for freq in frequencies
|
amine@118
|
114 ]
|
amine@118
|
115 fmt = DATA_FORMAT[sample_width]
|
amine@118
|
116 expected = _array_to_bytes(
|
amine@118
|
117 array(
|
amine@118
|
118 fmt,
|
amine@118
|
119 (sum(samples) // channels for samples in zip(*mono_channels)),
|
amine@118
|
120 )
|
amine@118
|
121 )
|
amine@118
|
122 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@118
|
123 mixed = _mix_audio_channels(data, channels, sample_width)
|
amine@118
|
124 self.assertEqual(mixed, expected)
|
amine@118
|
125
|
amine@118
|
126 @genty_dataset(
|
amine@111
|
127 mono=("mono_400Hz.raw", (400,)),
|
amine@111
|
128 three_channel=("3channel_400-800-1600Hz.raw", (400, 800, 1600)),
|
amine@111
|
129 )
|
amine@111
|
130 def test_save_raw(self, filename, frequencies):
|
amine@111
|
131 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@111
|
132 sample_width = 2
|
amine@111
|
133 fmt = DATA_FORMAT[sample_width]
|
amine@111
|
134 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@111
|
135 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@111
|
136 tmpfile = NamedTemporaryFile()
|
amine@111
|
137 _save_raw(tmpfile.name, data)
|
amine@111
|
138 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|
amine@111
|
139
|
amine@111
|
140 @genty_dataset(
|
amine@110
|
141 mono=("mono_400Hz.wav", (400,)),
|
amine@110
|
142 three_channel=("3channel_400-800-1600Hz.wav", (400, 800, 1600)),
|
amine@110
|
143 )
|
amine@110
|
144 def test_save_wave(self, filename, frequencies):
|
amine@110
|
145 filename = "tests/data/test_16KHZ_{}".format(filename)
|
amine@110
|
146 sampling_rate = 16000
|
amine@110
|
147 sample_width = 2
|
amine@110
|
148 channels = len(frequencies)
|
amine@110
|
149 fmt = DATA_FORMAT[sample_width]
|
amine@110
|
150 mono_channels = [PURE_TONE_DICT[freq] for freq in frequencies]
|
amine@110
|
151 data = _array_to_bytes(array(fmt, _sample_generator(*mono_channels)))
|
amine@110
|
152 tmpfile = NamedTemporaryFile()
|
amine@110
|
153 _save_wave(tmpfile.name, data, sampling_rate, sample_width, channels)
|
amine@110
|
154 self.assertTrue(filecmp.cmp(tmpfile.name, filename, shallow=False))
|