e@0: import difflib e@0: from sklearn.externals import joblib e@0: from collections import defaultdict e@0: import nltk e@0: import numpy as np e@0: import re e@0: import librosa e@0: import glob e@0: import pandas as pd e@0: from nltk.stem import porter e@0: import sox e@0: from scipy.io.wavfile import read as wavread e@0: from scipy.io.wavfile import write as wavwrite e@0: from numpy.core._internal import _gcd as gcd e@0: from rtsfx import * e@0: e@0: import subprocess e@0: import os e@0: import pypeg2 as pg e@0: import random e@0: e@0: VOICE_PROPERTIES = ['slow', 'deep', 'fast', 'stuttering'] e@0: PAUSE_PROPERTIES = ['short', 'long'] e@0: SEPARATORS = [ "[", "]", "(", ")", ":", "-"] e@0: SFX_MOD_PROPERTIES = ['quiet', 'loud', 'silent'] e@0: e@0: e@0: FIXED_VOICES = False e@0: e@0: FMV = 0 e@0: FFV = 0 e@0: e@0: e@0: male_voices = r""" e@0: cmu_us_ahw_cg e@0: cmu_us_awb_cg e@0: cmu_us_bdl_cg e@0: cmu_us_fem_cg e@0: cmu_us_jmk_cg e@0: cmu_us_ksp_cg e@0: cmu_us_rms_cg e@0: """.split() e@0: e@0: e@0: female_voices = r""" e@0: cmu_us_aup_cg e@0: cmu_us_axb_cg e@0: cmu_us_clb_cg e@0: cmu_us_gka_cg e@0: cmu_us_rxr_cg e@0: cmu_us_slt_cg e@0: """.split() e@0: e@0: e@0: e@0: # male_voices = r""" e@0: # cmu_us_ahw_cg e@0: # cmu_us_fem_cg e@0: # cmu_us_rms_cg e@0: # """.split() e@0: # e@0: # e@0: # female_voices = r""" e@0: # cmu_us_aup_cg e@0: # cmu_us_axb_cg e@0: # cmu_us_rxr_cg e@0: # cmu_us_slt_cg e@0: # """.split() e@0: e@0: fixed_male_voice = male_voices e@0: fixed_female_voice = female_voices e@0: e@0: e@0: # male_voices = r""" e@0: # cmu_us_ahw_cg e@0: # cmu_us_awb_cg e@0: # cmu_us_bdl_cg e@0: # cmu_us_fem_cg e@0: # cmu_us_jmk_cg e@0: # cmu_us_ksp_cg e@0: # cmu_us_rms_cg e@0: # """.split() e@0: e@0: e@0: # male_voices = r""" e@0: # cmu_us_ahw_cg e@0: # """.split() e@0: # e@0: # e@0: # female_voices = r""" e@0: # cmu_us_ahw_cg e@0: # """.split() e@0: e@0: import matplotlib.pyplot as plt e@0: e@0: def generate_speech_with_festival(voice, e@0: panning, e@0: line, e@0: sr=None e@0: ): e@0: """ e@0: Used for speech generation e@0: Constructs a festival .sable file e@0: and runs it through festival. e@0: e@0: """ e@0: header = r""" e@0: e@0: e@0: e@0: e@0: """.format(voice) e@0: e@0: footer = r""" e@0: e@0: e@0: """ e@0: e@0: # 0. Construct sable file e@0: sable = header + line + footer e@0: e@0: # 1. Save sable file to a temporary .sable file in tmp e@0: e@0: with open('/tmp/character_line.sable', 'w') as f: e@0: f.write(sable) e@0: e@0: # 2. Call process to festival e@0: cmd = 'text2wave /tmp/character_line.sable -o /tmp/character_line.wav' e@0: e@0: print("Generating speech for line: '{}' with voice '{}' and panning '{}' ".format(line, voice, panning)) e@0: value = subprocess.call(cmd, shell=True) e@0: e@0: if value != 0: e@0: raise RuntimeError("Festival failed to execute.") e@0: e@0: # 3. Load back wave file e@0: if sr is None: e@0: wav, sr = librosa.load('/tmp/character_line.wav', mono=True) e@0: else: e@0: wav, sr = librosa.load('/tmp/character_line.wav', sr=sr, mono=True) e@0: e@0: audio = np.vstack([panning*wav,(1.-panning)*wav]) e@0: # e@0: # plt.figure() e@0: # plt.plot(audio[0,:]) e@0: # plt.figure() e@0: # plt.plot(audio[1,:]) e@0: # plt.show() e@0: return audio, sr e@0: e@0: e@0: def substr_features(sent, e@0: lower=True, substr=[]): e@0: if lower: e@0: sent = sent.lower() e@0: freqs = defaultdict(int) e@0: for ss in substr: e@0: if ss in sent: e@0: freqs[ss] = 1 e@0: return dict(freqs) e@0: e@0: e@0: def features_dict_to_matrix(features, feature_labels): e@0: N = len(features) e@0: M = len(feature_labels) e@0: arr = np.zeros((N, M)) e@0: e@0: idx_to_feat = list(feature_labels) e@0: feat_to_idx = dict((idx_to_feat[k], k) for k in range(len(idx_to_feat))) e@0: e@0: for n in range(arr.shape[0]): e@0: for m in range(arr.shape[1]): e@0: if idx_to_feat[m] in features[n]: e@0: arr[n, m] = features[n][idx_to_feat[m]] e@0: e@0: return arr, list(feat_to_idx.keys()) e@0: e@0: e@0: def similar(text1, text2, e@0: threshold=0.7 # threshold for similarity e@0: ): e@0: """ Tests whether two strings are similar """ e@0: e@0: ratio = difflib.SequenceMatcher(None, text1.lower(), text2.lower()).ratio() e@0: return ratio >= threshold e@0: e@0: e@0: class Master(): e@0: def __init__(self, downmix): e@0: self.downmix = downmix e@0: e@0: e@0: def get_mastered(self): e@0: # Creating transformer e@0: tfm = sox.Transformer() e@0: e@0: # Removing everything below 80hz e@0: tfm.highpass(80) e@0: e@0: # Adding a notch filter at 200hz to improve clarity e@0: tfm.bandreject(200) e@0: e@0: # Loudness control for under -9dB e@0: tfm.loudness(gain_db=-9) e@0: e@0: # Store downmix temporarily e@0: librosa.output.write_wav('/tmp/downmix_unnormalized.wav', self.downmix, sr=44100, norm=False) e@0: tfm.build('/tmp/downmix_unnormalized.wav', '/tmp/downmix_normalized.wav') e@0: e@0: # Load downmix e@0: mastered = librosa.core.load('/tmp/downmix_normalized.wav', sr=44100,mono=False )[0] e@0: return mastered e@0: e@0: e@0: class Mixer(): e@0: def __init__(self, multitrack): e@0: self.multitrack = multitrack e@0: e@0: def get_downmix(self): e@0: e@0: # Just a trick to get the length of the first track e@0: if 'background' in self.multitrack: e@0: D = self.multitrack['background'].shape[1] e@0: else: e@0: for track in self.multitrack: e@0: D = self.multitrack[track].shape[1] e@0: break e@0: e@0: downmix = np.zeros((2, D)) e@0: for ttrack in self.multitrack: e@0: e@0: #1. Normalize e@0: e@0: track = self.multitrack[ttrack] e@0: e@0: max_val = np.max(np.abs(track)) e@0: e@0: if max_val > 0: e@0: track /= max_val e@0: e@0: if ttrack == 'background': e@0: track *= 0.05 e@0: e@0: e@0: downmix += track e@0: e@0: return downmix e@0: e@0: e@0: def zafar(lx, rx, d1, g1, m, fc, G, da=0.007, fs=44100.): e@0: """ Rafii & Pardo Reverberator (2009) controlled by High Level parameters e@0: Inputs: e@0: lx : left channel input e@0: rx : right channel input e@0: d1 : delay of first comb filter in seconds e@0: g1 : gain of first comb filters e@0: da : delay of allpass filter in seconds e@0: G : dry/wet mix gain e@0: fc : lowpass filter cuttoff Hz e@0: m : difference between left and right channel phases e@0: fs : sampling rate e@0: e@0: Outputs: e@0: ly: left channel output e@0: ry: right channel output e@0: """ e@0: e@0: d1 = int(d1 * fs) e@0: m = int(m * fs) e@0: da = int(da * fs) e@0: e@0: def calculate_parameters(d1, g1): e@0: e@0: d2 = int(round((1.5) ** (-1) * d1)) e@0: e@0: while gcd(d2, d1) != 1: e@0: d2 += 1 e@0: e@0: d3 = int(round((1.5) ** (-2) * d1)) e@0: e@0: while gcd(d3, d2) != 1 or gcd(d3, d1) != 1: e@0: d3 += 1 e@0: e@0: d4 = int(round((1.5) ** (-3) * d1)) e@0: e@0: while gcd(d4, d3) != 1 or gcd(d4, d2) != 1 or gcd(d4, d1) != 1: e@0: d4 += 1 e@0: e@0: d5 = int(round((1.5) ** (-4) * d1)) e@0: e@0: while gcd(d5, d4) != 1 or gcd(d5, d3) != 1 or gcd(d5, d2) != 1 or gcd(d5, d1) != 1: e@0: d5 += 1 e@0: e@0: d6 = int(round((1.5) ** (-5) * d1)) e@0: while gcd(d6, d5) != 1 or gcd(d6, d4) != 1 or gcd(d6, d3) != 1 or gcd(d6, d2) != 1 or gcd(d6, d1) != 1: e@0: d6 += 1 e@0: g2 = g1 ** (1.5) ** (-1) * g1 e@0: g3 = g1 ** (1.5) ** (-2) * g1 e@0: g4 = g1 ** (1.5) ** (-3) * g1 e@0: g5 = g1 ** (1.5) ** (-4) * g1 e@0: g6 = g1 ** (1.5) ** (-5) * g1 e@0: e@0: return (d1, d2, d3, d4, d5, d6, g1, g2, g3, g4, g5, g6) e@0: e@0: def comb_array(x, g1, d1): e@0: e@0: (d1, d2, d3, d4, d5, d6, g1, g2, g3, g4, g5, g6) = calculate_parameters(d1, g1) e@0: e@0: c1out = comb(x, g1, d1) e@0: c2out = comb(x, g2, d2) e@0: c3out = comb(x, g3, d3) e@0: c4out = comb(x, g4, d4) e@0: c5out = comb(x, g5, d5) e@0: c6out = comb(x, g6, d6) e@0: e@0: Lc1 = len(c1out) e@0: Lc2 = len(c2out) e@0: Lc3 = len(c3out) e@0: Lc4 = len(c4out) e@0: Lc5 = len(c5out) e@0: Lc6 = len(c6out) e@0: e@0: Lc = max(Lc1, Lc2, Lc3, Lc4, Lc5, Lc6) e@0: e@0: y = np.zeros((Lc,)) e@0: e@0: y[0:Lc1] = c1out e@0: y[0:Lc2] += c2out e@0: y[0:Lc3] += c3out e@0: y[0:Lc4] += c4out e@0: y[0:Lc5] += c5out e@0: y[0:Lc6] += c6out e@0: e@0: return y e@0: e@0: def comb(x, g, d): e@0: LEN = len(x) + d e@0: # print d e@0: y = np.zeros((LEN,)) e@0: for n in range(0, LEN): e@0: if n - d < 0: e@0: y[n] = 0 e@0: else: e@0: y[n] = x[n - d] + g * y[n - d] e@0: e@0: return y e@0: e@0: def allpass(x, g, d): e@0: LENx = len(x) e@0: LENy = LENx + d e@0: y = np.zeros((LENy,)) e@0: for n in range(0, LENy): e@0: if n - d < 0: e@0: y[n] = -g * x[n] e@0: elif n >= LENx: e@0: y[n] = x[n - d] + g * y[n - d] e@0: else: e@0: y[n] = x[n - d] - g * x[n] + g * y[n - d] e@0: e@0: return y e@0: e@0: def lowpass(x, g): e@0: LEN = len(x) e@0: y = np.zeros((LEN,)) e@0: e@0: for n in range(0, LEN): e@0: if n - 1 < 0: e@0: y[n] = (1 - g) * x[n] e@0: else: e@0: y[n] = (1 - g) * x[n] + g * y[n - 1] e@0: e@0: return y e@0: e@0: ga = 1. / np.sqrt(2.) e@0: e@0: cin = 0.5 * lx + 0.5 * rx e@0: cout = comb_array(cin, g1, d1) e@0: e@0: ra = allpass(cout, ga, da + m // 2) e@0: la = allpass(cout, ga, da - m // 2) e@0: e@0: gc = 2 - np.cos(2 * np.pi * fc / fs) - np.sqrt((np.cos(2 * np.pi * fc / fs) - 2) ** 2 - 1) e@0: e@0: ral = lowpass(ra, gc) e@0: lal = lowpass(la, gc) e@0: e@0: ralg = G * ral e@0: lalg = G * lal e@0: e@0: ry = ralg[0:len(rx)] + (1 - G) * rx e@0: ly = lalg[0:len(lx)] + (1 - G) * lx e@0: e@0: return np.vstack([ry, ly]) e@0: e@0: def get_reverb_from_tags(xl, xr, tags, fs=44100): e@0: reverb_csv = 'contributions.csv' e@0: df = pd.read_csv(reverb_csv) e@0: df = df.fillna("") e@0: params = [] e@0: for n in range(len(df)): e@0: if all([t in df['agreed'].iloc[n].split(',') for t in tags]): e@0: params.append(df['param'].iloc[n]) e@0: d1, g1, m, fc, G = [float(f) for f in params[0].split(',')] e@0: y = zafar(xl, xr, d1, g1, m, fc, G, fs=fs) e@0: return y e@0: e@0: e@0: def fade(x, fade_in, fade_out, sr=44100): e@0: """ e@0: Creates a fade-in-fade-out envelope e@0: for audio array x. e@0: """ e@0: e@0: if len(x) == 0: e@0: return x e@0: e@0: fade_in_samples = int(fade_in * sr) e@0: fade_out_samples = int(fade_out * sr) e@0: e@0: outp = np.ones_like(x) e@0: for n in range(fade_in_samples): e@0: outp[n] = n * 1. / fade_in_samples e@0: e@0: for n in range(fade_out_samples): e@0: outp[len(outp) - fade_out_samples + n] = 1 - 1. / fade_out_samples * n e@0: return outp * x e@0: e@0: e@0: def slope(x, slope_in, slope_out, delay=1.0, v=0.1, sr=44100): e@0: """ e@0: Creates a slope in slope out envelope e@0: """ e@0: e@0: if len(x) == 0: e@0: return x e@0: e@0: delay_samples = int(delay * sr) e@0: slope_in_samples = int(slope_in * sr) e@0: slope_out_samples = int(slope_out * sr) e@0: e@0: outp = np.zeros_like(x) e@0: e@0: for n in range(len(outp)): e@0: if n >= 0 and n < delay_samples: e@0: outp[n] = 1.0 - v e@0: elif n >= delay_samples and n < delay_samples + slope_in_samples: e@0: outp[n] = (1. - v) - (1. - v) / slope_in_samples * (n - delay_samples) e@0: elif n >= delay_samples + slope_in_samples and n < len(outp) - delay_samples - slope_out_samples: e@0: outp[n] = 0 e@0: elif n >= len(outp) - delay_samples - slope_out_samples and n < len(outp) - delay_samples: e@0: outp[n] = (1. - v) / slope_out_samples * (n - len(outp) + delay_samples + slope_out_samples) e@0: if outp[n] < 0: e@0: print(n) e@0: break e@0: elif n >= len(outp) - delay_samples: e@0: outp[n] = 1.0 - v e@0: e@0: outp += v e@0: e@0: return outp * x e@0: e@0: e@0: def get_background( e@0: fname, e@0: duration, e@0: ft=0.5, e@0: ): e@0: print(fname) e@0: bg, sr = librosa.load(fname) e@0: f_s = int(ft * sr) e@0: y = bg e@0: z = np.zeros((duration,)) e@0: if len(y) < len(z): e@0: y = fade(y, ft, ft, sr) e@0: for n in range(0, len(z) - len(y), len(y) - f_s): e@0: z[n:n + len(y)] += y e@0: n += len(y) - f_s e@0: if len(y) > len(z[n:]): e@0: z[n:] += y[:len(z[n:])] e@0: else: e@0: z[n:n + len(y)] += y e@0: e@0: z = fade(z, ft, ft, sr=sr) e@0: e@0: elif len(y) > len(z): e@0: z += fade(y[0:len(z)], ft, ft, sr=sr) e@0: return z e@0: e@0: e@0: def compose_bg_scene(bgs, background_changes, D, delay=3*44100): e@0: z = np.zeros((2,D)) e@0: for n in range(len(background_changes)): e@0: bg_choice = background_changes[n][1] e@0: start = background_changes[n][0] e@0: fname = bgs[bg_choice] e@0: if n < len(background_changes) - 1: e@0: duration = background_changes[n + 1][0] - background_changes[n][0] e@0: else: e@0: duration = D - background_changes[n][0] e@0: e@0: y = get_background(fname, duration) e@0: z[0,start:start + len(y)] = y e@0: z[1, start:start + len(y)] = y e@0: #z = fade(z, 1., 1.) e@0: return z e@0: e@0: e@0: class Director(): e@0: def __init__(self, script, sound_dir, speech_dir): e@0: """ e@0: Gets a list of script e@0: e@0: :param sound_dir: directory of sound files e@0: :param speech_dir: directory of speech files e@0: :param script: the script e@0: """ e@0: e@0: # Gets character definitions e@0: e@0: ## TODO: Change this to also have accents e@0: e@0: self.voice_params = {} e@0: self.scene_params = {} e@0: self.bg_params = {} e@0: e@0: # This holds the fxive sound engine if available e@0: self.fxive = None e@0: e@0: global FFV, FMV e@0: for d in script['definitions']: e@0: if d['type'] == 'scene_definition': e@0: number = int(d['number']) e@0: tags = d['tags'] e@0: filename = d['filename'] e@0: e@0: # If it starts with fxive: then get the preset from fxive e@0: if 'fxive:' == filename[:6]: e@0: print("Fetching sample from fxive...") e@0: if self.fxive is not None: e@0: self.bg_params[number] = self.fxive.get_sfx(filename[6:]) e@0: else: e@0: self.fxive = FXive(sfx_path=os.path.join(sound_dir, 'sfx.xls')) e@0: self.bg_params[number] = self.fxive.get_sfx(filename[6:]) e@0: else: e@0: self.bg_params[number] = filename e@0: e@0: if 'none' in tags: e@0: self.scene_params[number] = [] e@0: else: e@0: self.scene_params[number] = tags e@0: e@0: if d['type'] == 'cast_definition': e@0: # print("-----------------") e@0: name = d['name'] e@0: gender = random.sample(d['gender'], 1)[0] e@0: panning = random.sample(d['panning'], 1)[0] e@0: e@0: if panning == 'left': e@0: panning = 0.01 e@0: elif panning == 'right': e@0: panning = 0.99 e@0: elif panning in ['center', 'centre']: e@0: panning = 0.5 e@0: #print(gender, panning) e@0: if gender == 'female': e@0: # Choose a random female voice e@0: voice = random.sample(female_voices, 1)[0] e@0: e@0: if FIXED_VOICES: e@0: voice = fixed_female_voice[FFV] e@0: FFV += 1 e@0: else: e@0: # Choose a random male voice e@0: voice = random.sample(male_voices, 1)[0] e@0: e@0: if FIXED_VOICES: e@0: voice = fixed_male_voice[FMV] e@0: FMV += 1 e@0: e@0: self.voice_params[name] = (voice, panning) e@0: e@0: # if character_panning == 0.5: e@0: # character_panning = 0.1 e@0: # elif character_panning == 0.1: e@0: # character_panning = 0.9 e@0: # elif character_panning == 0.9: e@0: # character_panning = 0.1 e@0: e@0: e@0: if self.fxive is not None: e@0: self.fxive.close() e@0: e@0: self.script = script e@0: self.sound_dir = sound_dir e@0: self.speech_dir = speech_dir e@0: e@0: self.musicmanager = MusicManager(sound_dir) e@0: self.pausemanager = PauseManager() e@0: self.speechmanager = SpeechManager(speech_dir, self.voice_params) e@0: self.sfxmanager = SoundManager(sound_dir) e@0: e@0: def get_voice_params(self, name): e@0: return self.voice_params[name] e@0: e@0: def generate_multitrack(self): e@0: # Shift by 4 seconds e@0: D = 0 e@0: P = [] e@0: track_names = [] e@0: e@0: # print(self.script['script']) e@0: e@0: current_scene = 1 e@0: current_reverb_tags = "" e@0: e@0: scene_changes = [] e@0: e@0: # Create a program of scripts e@0: for s in self.script['script']: e@0: if s['type'] == 'music': e@0: name = 'music' e@0: audio = self.musicmanager.retrieve_music(s) e@0: elif s['type'] == 'sfx': e@0: name = s['name'].lower() e@0: audio = self.sfxmanager.retrieve_sfx(s) e@0: elif s['type'] == 'scene_change': e@0: current_scene = int(s['number']) e@0: #print(current_scene) e@0: #print(self.scene_params) e@0: current_reverb_tags = self.scene_params[current_scene] e@0: e@0: print("Changed to scene {} with reverb tags: {}".format(current_scene, current_reverb_tags)) e@0: scene_changes.append((D, current_scene)) e@0: continue e@0: elif s['type'] == 'pause': e@0: name = 'pause' e@0: audio = self.pausemanager.retrieve_pause(s) e@0: elif s['type'] == 'cast_line': e@0: print(s) e@0: name = s['name'].lower() e@0: audio = self.speechmanager.retrieve_speech(s) e@0: if len(current_reverb_tags) > 0: e@0: print("Applying reverberation with tags: {}".format(current_reverb_tags)) e@0: print(audio.shape) e@0: if s['name'] != 'Narrator': e@0: audio = get_reverb_from_tags(audio[0,:], audio[1,:], current_reverb_tags) e@0: e@0: if name not in track_names: e@0: track_names.append(name) e@0: D += audio.shape[1] e@0: P.append((name,audio)) e@0: e@0: multitrack = {t: np.zeros((2, D)) for t in track_names} e@0: e@0: print("Composing bg scene") e@0: multitrack['background'] = compose_bg_scene(self.bg_params, scene_changes, D) e@0: e@0: idx = 0 e@0: for p in P: e@0: multitrack[p[0]][:, idx:idx+p[1].shape[1]] = p[1] e@0: idx += p[1].shape[1] e@0: e@0: return multitrack e@0: e@0: e@0: e@0: class Generator(): e@0: def __init__(self): e@0: pass e@0: e@0: def generate(self): e@0: with open('../data/scripts/The Mystery Of Spooky Hill.txt') as f: e@0: return f.read() e@0: e@0: e@0: class PauseManager(): e@0: def __init__(self): e@0: """ e@0: Manages pauses e@0: """ e@0: e@0: def retrieve_pause(self, input_): e@0: duration_str = input_['duration'] e@0: if duration_str == 'long': e@0: duration = 3.0 e@0: elif duration_str == 'short': e@0: duration = 1.0 e@0: e@0: audio = np.zeros((2, int(duration*44100))) e@0: return audio e@0: e@0: class SpeechManager(): e@0: def __init__(self, speech_folder, voice_params): e@0: """ e@0: e@0: :param speech_folder: the folder the speech .mp3s are e@0: """ e@0: e@0: self.voice_params = voice_params e@0: self.speech_folder = speech_folder e@0: try: e@0: self.transcriptions = pd.read_excel(os.path.join(speech_folder ,'transcript.xls')) e@0: except: e@0: # If the file does not exist e@0: self.transcriptions = None e@0: e@0: print('Transcription file:' + str(os.path.join(speech_folder ,'transcript.xls'))) e@0: print('Transcriptions:' + str(self.transcriptions)) e@0: e@0: def retrieve_speech(self, input_): e@0: # print(input_) e@0: cast_member = input_['name'] e@0: # print(self.voice_params) e@0: cast_voice = self.voice_params[cast_member][0] # 0th element is voice e@0: cast_panning = self.voice_params[cast_member][1] #1th element is panning e@0: e@0: cast_line = input_['line'] e@0: e@0: can_find_entry = False e@0: e@0: e@0: # If the file does not exist e@0: cast_lines_df = self.transcriptions[self.transcriptions['cast'].map(lambda x: x.lower()) == cast_member.lower()] e@0: similarities = {} e@0: for n in cast_lines_df.index: e@0: similarities[n] = difflib.SequenceMatcher(None, cast_line, cast_lines_df['line'].loc[n]).ratio() e@0: e@0: # Most similar entry location e@0: chosen_entry = max(similarities, key=lambda x: similarities[x]) e@0: chosen_file = cast_lines_df['filename'].loc[chosen_entry] e@0: chosen_line = cast_lines_df['line'].loc[chosen_entry] e@0: e@0: if similar(cast_line, chosen_line): e@0: can_find_entry = True e@0: e@0: chosen_file_path = os.path.join(self.speech_folder, chosen_file) e@0: print("Retrieving: " + chosen_file_path) e@0: e@0: if os.path.exists(chosen_file_path): e@0: audio, sr = librosa.core.load(chosen_file_path, sr=44100, mono=False) e@0: #print("panning: {}".format(cast_panning)) e@0: audio[0,:] *= cast_panning e@0: audio[1,:] *= (1-cast_panning) e@0: else: e@0: can_find_entry = False e@0: e@0: if not can_find_entry: e@0: # 1. Generate line e@0: audio, sr = generate_speech_with_festival(cast_voice, cast_panning, cast_line, sr=44100) e@0: # print("panning: {}".format(cast_panning)) e@0: # audio[0,:] *= cast_panning e@0: # audio[1,:] *= (1-cast_panning) e@0: e@0: e@0: e@0: # If the line is too disimilar, synthesize it, else use the chosen line e@0: return audio e@0: e@0: e@0: class SoundManager(): e@0: def __init__(self, sound_folder): e@0: """ e@0: e@0: :param sound_folder: the folder the music .mp3s are e@0: """ e@0: e@0: e@0: self.sound_folder = sound_folder e@0: self.sound_file_names = [f.split('/')[-1] for f in glob.glob(sound_folder + '*.mp3')] e@0: e@0: # If the directory is empty, return. e@0: if len(self.sound_file_names) == 0: e@0: return e@0: # Lookup strings e@0: strings = [] e@0: for f in self.sound_file_names: e@0: strings.append(" ".join(re.findall('[A-Za-z]+', f)).lower()) e@0: e@0: # Sanitize strings, remove the most common substring e@0: e@0: # Find most common substring e@0: string1 = strings[0] e@0: for n in range(1, len(strings)): e@0: string2 = strings[n] e@0: match = difflib.SequenceMatcher(None, string1, string2).find_longest_match(0, len(string1), 0, len(string2)) e@0: string1 = string2[match.b:match.b + match.size] e@0: e@0: # Remove most common substring e@0: ## TODO: Check here please, should we remove it? e@0: e@0: # strings = [s.replace(string1, '') for s in strings] e@0: self.lookup = strings e@0: e@0: def retrieve_sfx(self, input_): e@0: """ e@0: e@0: :param query: dictionary object from parser e@0: :return: audio matrix containing audio file e@0: """ e@0: e@0: query = input_['name'].lower() e@0: # Lematize words before checking for similarity e@0: stemmer = porter.PorterStemmer() e@0: e@0: qwords = [stemmer.stem(q).lower() for q in query.split()] e@0: similarities = [] e@0: e@0: # If the words in the query are available in the words in the filename, then increase by 1. Finally, e@0: # divide by the total number of words (Jaccard similarity?) e@0: e@0: for s in self.lookup: e@0: e@0: words = [stemmer.stem(w).lower() for w in s.split()] e@0: similarities.append(0.) e@0: e@0: for qw in qwords: e@0: for w in words: e@0: similarities[-1] += difflib.SequenceMatcher(None, qw, w).ratio() e@0: e@0: similarities[-1]/=float(len(words)) e@0: e@0: # This is argmax e@0: chosen = [n for n in range(len(similarities)) if similarities[n] == max(similarities)][0] e@0: chosen_fname = self.sound_folder + self.sound_file_names[chosen] e@0: audio = librosa.core.load(chosen_fname, sr=44100, mono=False) e@0: return audio[0] e@0: e@0: e@0: class MusicManager(): e@0: def __init__(self, sound_folder): e@0: """ e@0: e@0: :param sound_folder: the folder the music .mp3s are e@0: """ e@0: e@0: self.sound_folder = sound_folder e@0: self.sound_file_names = [f.split('/')[-1] for f in glob.glob(sound_folder + '/*.mp3')] e@0: e@0: # If the directory is empty, return. e@0: if len(self.sound_file_names) == 0: e@0: return e@0: e@0: # Lookup strings e@0: strings = [] e@0: for f in self.sound_file_names: e@0: strings.append(" ".join(re.findall('[A-Za-z]+', f)).lower()) e@0: e@0: # Sanitize strings, remove the most common substring e@0: e@0: # Find most common substring e@0: string1 = strings[0] e@0: for n in range(1, len(strings)): e@0: string2 = strings[n] e@0: match = difflib.SequenceMatcher(None, string1, string2).find_longest_match(0, len(string1), 0, len(string2)) e@0: string1 = string2[match.b:match.b + match.size] e@0: e@0: # Remove most common substring e@0: strings = [s.replace(string1, '') for s in strings] e@0: self.lookup = strings e@0: e@0: def retrieve_music(self, input_): e@0: """ e@0: e@0: :param query: dictionary object from parser e@0: :return: audio matrix containing audio file e@0: """ e@0: e@0: query = input_['name'].lower() + ' music' e@0: e@0: similarities = [] e@0: e@0: # If the words in the query are available in the words in the filename, then increase by 1. Finally, e@0: # divide by the total number of words (Jaccard similarity?) e@0: e@0: for s in self.lookup: e@0: qwords = query.split() e@0: words = s.split() e@0: similarities.append(0.) e@0: e@0: for qw in qwords: e@0: if qw in words: e@0: similarities[-1] += 1. e@0: e@0: similarities[-1]/=float(len(words)) e@0: e@0: # This is argmax e@0: chosen = [n for n in range(len(similarities)) if similarities[n] == max(similarities)][0] e@0: chosen_fname = self.sound_folder + self.sound_file_names[chosen] e@0: audio = librosa.core.load(chosen_fname, sr=44100, mono=False) e@0: return audio[0] e@0: e@0: e@0: # Classes for aiding parsing e@0: class Environment: e@0: def __init__(self, varname, name): e@0: self.name = name e@0: self.varname = varname e@0: e@0: def to_json(self): e@0: return {"type": "environment_definition", "name": self.name} e@0: e@0: e@0: class Sound_Effect: e@0: def __init__(self, varname, name, pos): e@0: self.name = name e@0: self.varname = varname e@0: self.keywords = [kw for kw in name.split()] e@0: e@0: # Set the end to pos-1 so the first e@0: # character of the next line won't be ommited e@0: e@0: self.pos = (pos, pos - 1) e@0: e@0: def to_json(self): e@0: return { e@0: 'definition': { e@0: 'type': 'sfx_definition', e@0: 'name': ' '.join(self.keywords), e@0: 'optional': False e@0: }, e@0: 'playback': { e@0: 'type': 'sfx', e@0: 'name': ' '.join(self.keywords) e@0: } e@0: } e@0: e@0: def add_keywords(self, keywords): e@0: for kw in keywords: e@0: self.keywords.insert(0, kw) e@0: e@0: def __str__(self): e@0: return "({} FX)".format(' '.join(self.keywords)) e@0: e@0: def definition(self): e@0: return ' '.join(self.keywords) e@0: e@0: def script(self): e@0: return str(self) e@0: e@0: e@0: class Character_Line: e@0: def __init__(self, varname, txt, pos_start, pos_end): e@0: self.varname = varname e@0: self.txt = '. '.join([sent.capitalize() for sent in txt.split('\n')]) e@0: if self.txt[-1] != '.': e@0: self.txt += '.' e@0: e@0: self.character = None e@0: self.pos = (pos_start, pos_end) e@0: e@0: def set_character(self, character): e@0: self.character = character e@0: e@0: def __str__(self): e@0: return "{}: {}".format(self.character.name, self.txt) e@0: e@0: def script(self): e@0: return "[{}] {}".format(self.character.name, self.txt) e@0: e@0: def set_pos(self, start, end): e@0: self.pos = (start, end) e@0: e@0: def to_json(self): e@0: return {'playback': {"type": "cast_line", "name": self.character.name, "line": self.txt}} e@0: e@0: e@0: class Character: e@0: def __init__(self, varname, name): e@0: self.name = ' '.join([n.capitalize() for n in name.split()]) e@0: self.varname = varname e@0: self.gender = '' e@0: self.age = '' e@0: e@0: def set_gender(self, gender): e@0: self.gender = gender e@0: e@0: def set_age(self, age): e@0: self.age = age e@0: e@0: def definition(self): e@0: str_ = self.name + ' - ' e@0: if self.gender == '': e@0: str_ += 'male or female' e@0: else: e@0: str_ += self.gender e@0: e@0: return str_ e@0: e@0: def __str__(self): e@0: return __repr__(self) e@0: e@0: def __repr__(self): e@0: return "[{}:{}/{}/{}]".format(self.varname, self.name, self.gender, self.age) e@0: e@0: def to_json(self): e@0: json_dict = {"type": "cast_definition", "name": self.name} e@0: if self.gender != '': e@0: json_dict['gender'] = self.gender e@0: if self.age != '': e@0: json_dict['age'] = self.age e@0: e@0: return json_dict e@0: e@0: e@0: class KDuration(pg.Keyword): e@0: grammar = pg.Enum(pg.K('long'), pg.K('short')) e@0: e@0: e@0: class Pause(pg.Plain): e@0: grammar = '(', pg.optional(pg.attr('duration', KDuration)), 'pause', ')' e@0: e@0: e@0: class CastHeader(pg.Plain): e@0: grammar = 'Cast', pg.optional('List'), ':', pg.endl e@0: e@0: e@0: class KGender(pg.Keyword): e@0: grammar = pg.Enum(pg.K('male'), pg.K('female')) e@0: e@0: e@0: class EGender(pg.List): e@0: grammar = KGender, pg.optional('or', KGender) e@0: e@0: e@0: class KPan(pg.Keyword): e@0: grammar = pg.Enum(pg.K('left'), pg.K('right'), pg.K('center'), pg.K('centre')) e@0: e@0: e@0: class EPan(pg.List): e@0: grammar = KPan, pg.optional('or', KPan) e@0: e@0: e@0: class CastDefinition(pg.Plain): e@0: grammar = pg.attr('cast_name', re.compile('[A-Za-z0-9 ]+')), \ e@0: re.compile('\-+'), \ e@0: pg.attr('gender', EGender), \ e@0: re.compile('\-+'), \ e@0: 'panned', \ e@0: pg.attr('panning', EPan), pg.endl e@0: e@0: e@0: class Tag(pg.Plain): e@0: grammar = pg.attr('tag', re.compile(r'[A-Za-z0-9_\-]+')) e@0: e@0: e@0: class LTag(pg.List): e@0: grammar = pg.csl(Tag) e@0: e@0: e@0: class ScenesHeader(pg.Plain): e@0: grammar = re.compile('Scenes?'), pg.optional('List'), ':', pg.endl e@0: e@0: e@0: class ScenesDefinition(pg.Plain): e@0: grammar = pg.attr('number', re.compile('[A-Za-z0-9]+')), \ e@0: re.compile('\-+'), pg.attr('name', re.compile('[A-Za-z0-9]+')), \ e@0: re.compile('\-+'), pg.attr('filename', re.compile('[A-Za-z0-9_\:]+(\.(mp3|wav))?')), \ e@0: re.compile('\-+'), pg.attr('tags', LTag), pg.endl e@0: e@0: e@0: class ScriptHeader(pg.Plain): e@0: grammar = 'Script', ':', pg.endl e@0: e@0: e@0: class SceneCommence(pg.Plain): e@0: grammar = re.compile('\-+'), 'Scene', pg.attr('scene', re.compile('[A-Za-z0-9]+')), pg.optional( e@0: re.compile('\-+')), pg.endl; e@0: e@0: e@0: class CastLine(pg.Plain): e@0: grammar = '[', pg.attr('cast_name', re.compile('[A-Za-z0-9 ]+')), ']', pg.attr('line', e@0: re.compile(r'[A-Za-z0-9\-_.\ \" \'\,\?\:\!]+')), e@0: e@0: e@0: class Headers(pg.Plain): e@0: grammar = CastHeader, \ e@0: pg.attr('cast_list', pg.maybe_some(CastDefinition)), \ e@0: pg.optional(ScenesHeader, pg.attr('scene_list', pg.maybe_some(ScenesDefinition))), pg.optional( e@0: ScriptHeader) e@0: e@0: e@0: class Script(pg.List): e@0: grammar = pg.some([Pause, SceneCommence, CastLine]) e@0: e@0: e@0: class ScriptDocument(pg.Plain): e@0: grammar = pg.attr('headers', Headers), pg.attr('script', Script) e@0: e@0: e@0: class Parser: e@0: def __init__(self): e@0: pass e@0: e@0: def parse_str(self, str_): e@0: e@0: obj = pg.parse(str_, ScriptDocument) e@0: definitions = [] e@0: script = [] e@0: e@0: for cast_def in obj.headers.cast_list: e@0: name = cast_def.cast_name.strip() e@0: gender = [str(t) for t in cast_def.gender] e@0: e@0: panning = [str(t) for t in cast_def.panning] e@0: e@0: cast_dict = {'type': 'cast_definition', 'name': name, 'gender': gender, 'voice': [], 'panning': panning} e@0: definitions.append(cast_dict) e@0: e@0: for scene_def in obj.headers.scene_list: e@0: name = scene_def.name.strip() e@0: number = scene_def.number e@0: filename = scene_def.filename e@0: tags = [str(t.tag) for t in scene_def.tags] e@0: scene_dict = {'type': 'scene_definition', e@0: 'scene': name, e@0: 'number': number, e@0: 'filename': scene_def.filename, e@0: 'tags': tags} e@0: definitions.append(scene_dict) e@0: e@0: for action in obj.script: e@0: if isinstance(action, Pause): e@0: duration = str(action.duration) e@0: pause_dict = {'type': 'pause', 'duration': duration} e@0: script.append(pause_dict) e@0: if isinstance(action, SceneCommence): e@0: number = str(action.scene).strip() e@0: scene_dict = {'type': 'scene_change', 'number': number} e@0: script.append(scene_dict) e@0: if isinstance(action, CastLine): e@0: name = str(action.cast_name).strip() e@0: line = str(action.line) e@0: cast_line = {'type': 'cast_line', 'name': name, 'line': line} e@0: script.append(cast_line) e@0: e@0: return {'definitions': definitions, 'script': script} e@0: e@0: e@0: class Preprocessor(): e@0: def __init__(self): e@0: self.parsed_dict = None e@0: self.regexp_entity = re.compile('(?P[A-Z][0-9]+)\s+(?P[A-Z][A-Za-z_]*)\s+([0-9]+)\s+([0-9]+)(?P(\s+[A-Za-z\'\"]+)+$)') e@0: self.regexp_attribute = re.compile('(?P[A-Z][0-9]+)\s+(?P[A-Z][A-Za-z_]*)\s+(?P[A-Z][0-9]+)\s+(?P[A-Za-z]+)') e@0: self.regexp_relation = re.compile('(?P[A-Z][0-9]+)\s+(([A-Za-z0-9_]+:[A-Z0-9]+\s*)+)') e@0: self.regexp_transitive = re.compile('(?P\*)\s+(?P[A-Z][A-Za-z_]*)\s+(?P[A-Z][0-9]+)\s+(?P[A-Z][0-9]+)\s*') e@0: e@0: def parse_str(self, text, annot): e@0: """ takes as input a text and an annotation string """ e@0: e@0: tups = [tup for tup in annot.split('\n') if tup.strip() != ''] e@0: e@0: # Add the narrator e@0: narrator = Character('_', 'Narrator') e@0: characters = [narrator] e@0: e@0: objects = [] e@0: character_lines = [] e@0: sound_effects = [] e@0: e@0: def find_character(var): e@0: for c in characters: e@0: if c.varname == var: e@0: return c e@0: e@0: def find_character_or_object(var): e@0: c = find_character(var) e@0: if c is not None: e@0: return c e@0: e@0: def find_character_lines(var): e@0: for c in character_lines: e@0: if c.varname == var: e@0: return c e@0: e@0: def find_sound_effect(var): e@0: for c in sound_effects: e@0: if c.varname == var: e@0: return c e@0: e@0: def character_speaks(c): e@0: for cl in character_lines: e@0: if isinstance(cl, Character_Line) and cl.character == c: e@0: return True e@0: return False e@0: e@0: for tup in tups: e@0: # print(tup) e@0: e@0: groups = self.regexp_entity.findall(tup) e@0: if len(groups) > 0: e@0: if groups[0][1] == 'Character': e@0: # If the entity is a character e@0: characters.append( e@0: Character(groups[0][0].strip(), groups[0][4].strip()) e@0: ) e@0: elif groups[0][1] == 'Character_Line': e@0: character_lines.append( e@0: Character_Line(groups[0][0].strip(), groups[0][4].strip(), int(groups[0][2]), int(groups[0][3])) e@0: ) e@0: elif groups[0][1] == 'Sound_Effect': e@0: sound_effects.append( e@0: Sound_Effect(groups[0][0].strip(), groups[0][4].strip(), int(groups[0][2])) e@0: ) e@0: continue e@0: e@0: for tup in tups: e@0: # Attributes and relations e@0: groups = self.regexp_attribute.findall(tup) e@0: if len(groups) > 0: e@0: if groups[0][1] == 'Gender': e@0: # if e@0: c = find_character(groups[0][2].strip()) e@0: c.set_gender(groups[0][3].strip().lower()) e@0: elif groups[0][1] == 'Age': e@0: c = find_character(groups[0][2].strip()) e@0: c.set_age(groups[0][3].strip().lower()) e@0: e@0: for tup in tups: e@0: # Attributes and relations e@0: groups = self.regexp_relation.findall(tup) e@0: if len(groups) > 0 and groups[0][1][:4] == 'Says': e@0: # print(groups) e@0: refs = groups[0][1].split()[1:] e@0: e@0: # Store who and whats e@0: whats = [] e@0: who = None e@0: e@0: for ref in refs: e@0: type_, var = ref.split(':') e@0: if type_[:4] == 'WHAT': e@0: whats.append(var) e@0: elif type_[:3] == 'WHO': e@0: who = find_character(var) e@0: e@0: # find character lines: e@0: clines = [find_character_lines(w) for w in whats] e@0: e@0: # Assign characters e@0: for cl in clines: e@0: cl.set_character(who) e@0: elif len(groups) > 0 and groups[0][1][:12] == 'Sound_Effect': e@0: sfx = find_sound_effect(groups[0][1][13:].split()[0]) e@0: #print(groups) e@0: e@0: # Store extra keywords e@0: keywords = [] e@0: e@0: refs = groups[0][1].split()[1:] e@0: for ref in refs: e@0: #print(ref) e@0: type_, var = ref.split(':') e@0: if type_[:8] == 'CausedBy': e@0: cause = find_character_or_object(var) e@0: if cause != None: e@0: keywords.append(cause.name) e@0: e@0: sfx.add_keywords(keywords) e@0: e@0: # %% Calculate line segments for character lines and narration e@0: e@0: # Add sound effects to the timeline e@0: clpos = [cl.pos for cl in character_lines] e@0: clpos += [sfx.pos for sfx in sound_effects] e@0: clpos = sorted(clpos, key=lambda x: x[0]) e@0: e@0: # Add first narrator line e@0: cl = Character_Line('_', text[0:clpos[0][0]], 0, clpos[0][0] - 1) e@0: cl.set_character(narrator) e@0: character_lines.append(cl) e@0: e@0: for n in range(len(clpos) - 1): e@0: if clpos[n][1] != clpos[n + 1][0] - 1: e@0: cl = Character_Line('_', text[clpos[n][1] + 1:clpos[n + 1][0]].rstrip(), clpos[n][1] + 1, e@0: clpos[n + 1][0] - 1) e@0: cl.set_character(narrator) e@0: character_lines.append(cl) e@0: e@0: character_lines += sound_effects e@0: character_lines = sorted(character_lines, key=lambda x: x.pos[1]) e@0: e@0: # parsed_dict = {'definitions': [c.to_json() for c in characters], e@0: # 'script': [cl.to_json() for cl in character_lines]} e@0: e@0: # parsed_dict = {'definitions': [], 'script':[]} e@0: # e@0: # print("Cast List:") e@0: # for c in characters: e@0: # if character_speaks(c): e@0: # print(c.definition()) e@0: # e@0: # print("") e@0: # print("Sound Effects:") e@0: # e@0: # for c in sound_effects: e@0: # print(c.definition()) e@0: # e@0: # for cl in character_lines: e@0: # print(cl.script()) e@0: e@0: # Add definitions for characters e@0: # for c in characters: e@0: # if character_speaks(c): e@0: # parsed_dict['definitions'].append(c.to_json()) e@0: # e@0: # # Add definitions for sound effects e@0: # for c in sound_effects: e@0: # parsed_dict['definitions'].append(c.to_json()['definition']) e@0: # e@0: # e@0: # # Add timeline information e@0: # for cl in character_lines: e@0: # parsed_dict['script'].append(cl.to_json()['playback']) e@0: e@0: str_ = "Cast List:\n" e@0: for c in characters: e@0: if character_speaks(c): e@0: str_ += "{}\n".format(c.definition()) e@0: e@0: str_ += "Sound Effects:\n" e@0: for c in sound_effects: e@0: str_ += "{}\n".format(c.definition()) e@0: e@0: e@0: for cl in character_lines: e@0: str_ += "{}\n".format(cl.script()) e@0: e@0: e@0: return str_