e@0: import difflib
e@0: from sklearn.externals import joblib
e@0: from collections import defaultdict
e@0: import nltk
e@0: import numpy as np
e@0: import re
e@0: import librosa
e@0: import glob
e@0: import pandas as pd
e@0: from nltk.stem import porter
e@0: import sox
e@0: from scipy.io.wavfile import read as wavread
e@0: from scipy.io.wavfile import write as wavwrite
e@0: from numpy.core._internal import _gcd as gcd
e@0: from rtsfx import *
e@0:
e@0: import subprocess
e@0: import os
e@0: import pypeg2 as pg
e@0: import random
e@0:
e@0: VOICE_PROPERTIES = ['slow', 'deep', 'fast', 'stuttering']
e@0: PAUSE_PROPERTIES = ['short', 'long']
e@0: SEPARATORS = [ "[", "]", "(", ")", ":", "-"]
e@0: SFX_MOD_PROPERTIES = ['quiet', 'loud', 'silent']
e@0:
e@0:
e@0: FIXED_VOICES = False
e@0:
e@0: FMV = 0
e@0: FFV = 0
e@0:
e@0:
e@0: male_voices = r"""
e@0: cmu_us_ahw_cg
e@0: cmu_us_awb_cg
e@0: cmu_us_bdl_cg
e@0: cmu_us_fem_cg
e@0: cmu_us_jmk_cg
e@0: cmu_us_ksp_cg
e@0: cmu_us_rms_cg
e@0: """.split()
e@0:
e@0:
e@0: female_voices = r"""
e@0: cmu_us_aup_cg
e@0: cmu_us_axb_cg
e@0: cmu_us_clb_cg
e@0: cmu_us_gka_cg
e@0: cmu_us_rxr_cg
e@0: cmu_us_slt_cg
e@0: """.split()
e@0:
e@0:
e@0:
e@0: # male_voices = r"""
e@0: # cmu_us_ahw_cg
e@0: # cmu_us_fem_cg
e@0: # cmu_us_rms_cg
e@0: # """.split()
e@0: #
e@0: #
e@0: # female_voices = r"""
e@0: # cmu_us_aup_cg
e@0: # cmu_us_axb_cg
e@0: # cmu_us_rxr_cg
e@0: # cmu_us_slt_cg
e@0: # """.split()
e@0:
e@0: fixed_male_voice = male_voices
e@0: fixed_female_voice = female_voices
e@0:
e@0:
e@0: # male_voices = r"""
e@0: # cmu_us_ahw_cg
e@0: # cmu_us_awb_cg
e@0: # cmu_us_bdl_cg
e@0: # cmu_us_fem_cg
e@0: # cmu_us_jmk_cg
e@0: # cmu_us_ksp_cg
e@0: # cmu_us_rms_cg
e@0: # """.split()
e@0:
e@0:
e@0: # male_voices = r"""
e@0: # cmu_us_ahw_cg
e@0: # """.split()
e@0: #
e@0: #
e@0: # female_voices = r"""
e@0: # cmu_us_ahw_cg
e@0: # """.split()
e@0:
e@0: import matplotlib.pyplot as plt
e@0:
e@0: def generate_speech_with_festival(voice,
e@0: panning,
e@0: line,
e@0: sr=None
e@0: ):
e@0: """
e@0: Used for speech generation
e@0: Constructs a festival .sable file
e@0: and runs it through festival.
e@0:
e@0: """
e@0: header = r"""
e@0:
e@0:
e@0:
e@0:
e@0: """.format(voice)
e@0:
e@0: footer = r"""
e@0:
e@0:
e@0: """
e@0:
e@0: # 0. Construct sable file
e@0: sable = header + line + footer
e@0:
e@0: # 1. Save sable file to a temporary .sable file in tmp
e@0:
e@0: with open('/tmp/character_line.sable', 'w') as f:
e@0: f.write(sable)
e@0:
e@0: # 2. Call process to festival
e@0: cmd = 'text2wave /tmp/character_line.sable -o /tmp/character_line.wav'
e@0:
e@0: print("Generating speech for line: '{}' with voice '{}' and panning '{}' ".format(line, voice, panning))
e@0: value = subprocess.call(cmd, shell=True)
e@0:
e@0: if value != 0:
e@0: raise RuntimeError("Festival failed to execute.")
e@0:
e@0: # 3. Load back wave file
e@0: if sr is None:
e@0: wav, sr = librosa.load('/tmp/character_line.wav', mono=True)
e@0: else:
e@0: wav, sr = librosa.load('/tmp/character_line.wav', sr=sr, mono=True)
e@0:
e@0: audio = np.vstack([panning*wav,(1.-panning)*wav])
e@0: #
e@0: # plt.figure()
e@0: # plt.plot(audio[0,:])
e@0: # plt.figure()
e@0: # plt.plot(audio[1,:])
e@0: # plt.show()
e@0: return audio, sr
e@0:
e@0:
e@0: def substr_features(sent,
e@0: lower=True, substr=[]):
e@0: if lower:
e@0: sent = sent.lower()
e@0: freqs = defaultdict(int)
e@0: for ss in substr:
e@0: if ss in sent:
e@0: freqs[ss] = 1
e@0: return dict(freqs)
e@0:
e@0:
e@0: def features_dict_to_matrix(features, feature_labels):
e@0: N = len(features)
e@0: M = len(feature_labels)
e@0: arr = np.zeros((N, M))
e@0:
e@0: idx_to_feat = list(feature_labels)
e@0: feat_to_idx = dict((idx_to_feat[k], k) for k in range(len(idx_to_feat)))
e@0:
e@0: for n in range(arr.shape[0]):
e@0: for m in range(arr.shape[1]):
e@0: if idx_to_feat[m] in features[n]:
e@0: arr[n, m] = features[n][idx_to_feat[m]]
e@0:
e@0: return arr, list(feat_to_idx.keys())
e@0:
e@0:
e@0: def similar(text1, text2,
e@0: threshold=0.7 # threshold for similarity
e@0: ):
e@0: """ Tests whether two strings are similar """
e@0:
e@0: ratio = difflib.SequenceMatcher(None, text1.lower(), text2.lower()).ratio()
e@0: return ratio >= threshold
e@0:
e@0:
e@0: class Master():
e@0: def __init__(self, downmix):
e@0: self.downmix = downmix
e@0:
e@0:
e@0: def get_mastered(self):
e@0: # Creating transformer
e@0: tfm = sox.Transformer()
e@0:
e@0: # Removing everything below 80hz
e@0: tfm.highpass(80)
e@0:
e@0: # Adding a notch filter at 200hz to improve clarity
e@0: tfm.bandreject(200)
e@0:
e@0: # Loudness control for under -9dB
e@0: tfm.loudness(gain_db=-9)
e@0:
e@0: # Store downmix temporarily
e@0: librosa.output.write_wav('/tmp/downmix_unnormalized.wav', self.downmix, sr=44100, norm=False)
e@0: tfm.build('/tmp/downmix_unnormalized.wav', '/tmp/downmix_normalized.wav')
e@0:
e@0: # Load downmix
e@0: mastered = librosa.core.load('/tmp/downmix_normalized.wav', sr=44100,mono=False )[0]
e@0: return mastered
e@0:
e@0:
e@0: class Mixer():
e@0: def __init__(self, multitrack):
e@0: self.multitrack = multitrack
e@0:
e@0: def get_downmix(self):
e@0:
e@0: # Just a trick to get the length of the first track
e@0: if 'background' in self.multitrack:
e@0: D = self.multitrack['background'].shape[1]
e@0: else:
e@0: for track in self.multitrack:
e@0: D = self.multitrack[track].shape[1]
e@0: break
e@0:
e@0: downmix = np.zeros((2, D))
e@0: for ttrack in self.multitrack:
e@0:
e@0: #1. Normalize
e@0:
e@0: track = self.multitrack[ttrack]
e@0:
e@0: max_val = np.max(np.abs(track))
e@0:
e@0: if max_val > 0:
e@0: track /= max_val
e@0:
e@0: if ttrack == 'background':
e@0: track *= 0.05
e@0:
e@0:
e@0: downmix += track
e@0:
e@0: return downmix
e@0:
e@0:
e@0: def zafar(lx, rx, d1, g1, m, fc, G, da=0.007, fs=44100.):
e@0: """ Rafii & Pardo Reverberator (2009) controlled by High Level parameters
e@0: Inputs:
e@0: lx : left channel input
e@0: rx : right channel input
e@0: d1 : delay of first comb filter in seconds
e@0: g1 : gain of first comb filters
e@0: da : delay of allpass filter in seconds
e@0: G : dry/wet mix gain
e@0: fc : lowpass filter cuttoff Hz
e@0: m : difference between left and right channel phases
e@0: fs : sampling rate
e@0:
e@0: Outputs:
e@0: ly: left channel output
e@0: ry: right channel output
e@0: """
e@0:
e@0: d1 = int(d1 * fs)
e@0: m = int(m * fs)
e@0: da = int(da * fs)
e@0:
e@0: def calculate_parameters(d1, g1):
e@0:
e@0: d2 = int(round((1.5) ** (-1) * d1))
e@0:
e@0: while gcd(d2, d1) != 1:
e@0: d2 += 1
e@0:
e@0: d3 = int(round((1.5) ** (-2) * d1))
e@0:
e@0: while gcd(d3, d2) != 1 or gcd(d3, d1) != 1:
e@0: d3 += 1
e@0:
e@0: d4 = int(round((1.5) ** (-3) * d1))
e@0:
e@0: while gcd(d4, d3) != 1 or gcd(d4, d2) != 1 or gcd(d4, d1) != 1:
e@0: d4 += 1
e@0:
e@0: d5 = int(round((1.5) ** (-4) * d1))
e@0:
e@0: while gcd(d5, d4) != 1 or gcd(d5, d3) != 1 or gcd(d5, d2) != 1 or gcd(d5, d1) != 1:
e@0: d5 += 1
e@0:
e@0: d6 = int(round((1.5) ** (-5) * d1))
e@0: while gcd(d6, d5) != 1 or gcd(d6, d4) != 1 or gcd(d6, d3) != 1 or gcd(d6, d2) != 1 or gcd(d6, d1) != 1:
e@0: d6 += 1
e@0: g2 = g1 ** (1.5) ** (-1) * g1
e@0: g3 = g1 ** (1.5) ** (-2) * g1
e@0: g4 = g1 ** (1.5) ** (-3) * g1
e@0: g5 = g1 ** (1.5) ** (-4) * g1
e@0: g6 = g1 ** (1.5) ** (-5) * g1
e@0:
e@0: return (d1, d2, d3, d4, d5, d6, g1, g2, g3, g4, g5, g6)
e@0:
e@0: def comb_array(x, g1, d1):
e@0:
e@0: (d1, d2, d3, d4, d5, d6, g1, g2, g3, g4, g5, g6) = calculate_parameters(d1, g1)
e@0:
e@0: c1out = comb(x, g1, d1)
e@0: c2out = comb(x, g2, d2)
e@0: c3out = comb(x, g3, d3)
e@0: c4out = comb(x, g4, d4)
e@0: c5out = comb(x, g5, d5)
e@0: c6out = comb(x, g6, d6)
e@0:
e@0: Lc1 = len(c1out)
e@0: Lc2 = len(c2out)
e@0: Lc3 = len(c3out)
e@0: Lc4 = len(c4out)
e@0: Lc5 = len(c5out)
e@0: Lc6 = len(c6out)
e@0:
e@0: Lc = max(Lc1, Lc2, Lc3, Lc4, Lc5, Lc6)
e@0:
e@0: y = np.zeros((Lc,))
e@0:
e@0: y[0:Lc1] = c1out
e@0: y[0:Lc2] += c2out
e@0: y[0:Lc3] += c3out
e@0: y[0:Lc4] += c4out
e@0: y[0:Lc5] += c5out
e@0: y[0:Lc6] += c6out
e@0:
e@0: return y
e@0:
e@0: def comb(x, g, d):
e@0: LEN = len(x) + d
e@0: # print d
e@0: y = np.zeros((LEN,))
e@0: for n in range(0, LEN):
e@0: if n - d < 0:
e@0: y[n] = 0
e@0: else:
e@0: y[n] = x[n - d] + g * y[n - d]
e@0:
e@0: return y
e@0:
e@0: def allpass(x, g, d):
e@0: LENx = len(x)
e@0: LENy = LENx + d
e@0: y = np.zeros((LENy,))
e@0: for n in range(0, LENy):
e@0: if n - d < 0:
e@0: y[n] = -g * x[n]
e@0: elif n >= LENx:
e@0: y[n] = x[n - d] + g * y[n - d]
e@0: else:
e@0: y[n] = x[n - d] - g * x[n] + g * y[n - d]
e@0:
e@0: return y
e@0:
e@0: def lowpass(x, g):
e@0: LEN = len(x)
e@0: y = np.zeros((LEN,))
e@0:
e@0: for n in range(0, LEN):
e@0: if n - 1 < 0:
e@0: y[n] = (1 - g) * x[n]
e@0: else:
e@0: y[n] = (1 - g) * x[n] + g * y[n - 1]
e@0:
e@0: return y
e@0:
e@0: ga = 1. / np.sqrt(2.)
e@0:
e@0: cin = 0.5 * lx + 0.5 * rx
e@0: cout = comb_array(cin, g1, d1)
e@0:
e@0: ra = allpass(cout, ga, da + m // 2)
e@0: la = allpass(cout, ga, da - m // 2)
e@0:
e@0: gc = 2 - np.cos(2 * np.pi * fc / fs) - np.sqrt((np.cos(2 * np.pi * fc / fs) - 2) ** 2 - 1)
e@0:
e@0: ral = lowpass(ra, gc)
e@0: lal = lowpass(la, gc)
e@0:
e@0: ralg = G * ral
e@0: lalg = G * lal
e@0:
e@0: ry = ralg[0:len(rx)] + (1 - G) * rx
e@0: ly = lalg[0:len(lx)] + (1 - G) * lx
e@0:
e@0: return np.vstack([ry, ly])
e@0:
e@0: def get_reverb_from_tags(xl, xr, tags, fs=44100):
e@0: reverb_csv = 'contributions.csv'
e@0: df = pd.read_csv(reverb_csv)
e@0: df = df.fillna("")
e@0: params = []
e@0: for n in range(len(df)):
e@0: if all([t in df['agreed'].iloc[n].split(',') for t in tags]):
e@0: params.append(df['param'].iloc[n])
e@0: d1, g1, m, fc, G = [float(f) for f in params[0].split(',')]
e@0: y = zafar(xl, xr, d1, g1, m, fc, G, fs=fs)
e@0: return y
e@0:
e@0:
e@0: def fade(x, fade_in, fade_out, sr=44100):
e@0: """
e@0: Creates a fade-in-fade-out envelope
e@0: for audio array x.
e@0: """
e@0:
e@0: if len(x) == 0:
e@0: return x
e@0:
e@0: fade_in_samples = int(fade_in * sr)
e@0: fade_out_samples = int(fade_out * sr)
e@0:
e@0: outp = np.ones_like(x)
e@0: for n in range(fade_in_samples):
e@0: outp[n] = n * 1. / fade_in_samples
e@0:
e@0: for n in range(fade_out_samples):
e@0: outp[len(outp) - fade_out_samples + n] = 1 - 1. / fade_out_samples * n
e@0: return outp * x
e@0:
e@0:
e@0: def slope(x, slope_in, slope_out, delay=1.0, v=0.1, sr=44100):
e@0: """
e@0: Creates a slope in slope out envelope
e@0: """
e@0:
e@0: if len(x) == 0:
e@0: return x
e@0:
e@0: delay_samples = int(delay * sr)
e@0: slope_in_samples = int(slope_in * sr)
e@0: slope_out_samples = int(slope_out * sr)
e@0:
e@0: outp = np.zeros_like(x)
e@0:
e@0: for n in range(len(outp)):
e@0: if n >= 0 and n < delay_samples:
e@0: outp[n] = 1.0 - v
e@0: elif n >= delay_samples and n < delay_samples + slope_in_samples:
e@0: outp[n] = (1. - v) - (1. - v) / slope_in_samples * (n - delay_samples)
e@0: elif n >= delay_samples + slope_in_samples and n < len(outp) - delay_samples - slope_out_samples:
e@0: outp[n] = 0
e@0: elif n >= len(outp) - delay_samples - slope_out_samples and n < len(outp) - delay_samples:
e@0: outp[n] = (1. - v) / slope_out_samples * (n - len(outp) + delay_samples + slope_out_samples)
e@0: if outp[n] < 0:
e@0: print(n)
e@0: break
e@0: elif n >= len(outp) - delay_samples:
e@0: outp[n] = 1.0 - v
e@0:
e@0: outp += v
e@0:
e@0: return outp * x
e@0:
e@0:
e@0: def get_background(
e@0: fname,
e@0: duration,
e@0: ft=0.5,
e@0: ):
e@0: print(fname)
e@0: bg, sr = librosa.load(fname)
e@0: f_s = int(ft * sr)
e@0: y = bg
e@0: z = np.zeros((duration,))
e@0: if len(y) < len(z):
e@0: y = fade(y, ft, ft, sr)
e@0: for n in range(0, len(z) - len(y), len(y) - f_s):
e@0: z[n:n + len(y)] += y
e@0: n += len(y) - f_s
e@0: if len(y) > len(z[n:]):
e@0: z[n:] += y[:len(z[n:])]
e@0: else:
e@0: z[n:n + len(y)] += y
e@0:
e@0: z = fade(z, ft, ft, sr=sr)
e@0:
e@0: elif len(y) > len(z):
e@0: z += fade(y[0:len(z)], ft, ft, sr=sr)
e@0: return z
e@0:
e@0:
e@0: def compose_bg_scene(bgs, background_changes, D, delay=3*44100):
e@0: z = np.zeros((2,D))
e@0: for n in range(len(background_changes)):
e@0: bg_choice = background_changes[n][1]
e@0: start = background_changes[n][0]
e@0: fname = bgs[bg_choice]
e@0: if n < len(background_changes) - 1:
e@0: duration = background_changes[n + 1][0] - background_changes[n][0]
e@0: else:
e@0: duration = D - background_changes[n][0]
e@0:
e@0: y = get_background(fname, duration)
e@0: z[0,start:start + len(y)] = y
e@0: z[1, start:start + len(y)] = y
e@0: #z = fade(z, 1., 1.)
e@0: return z
e@0:
e@0:
e@0: class Director():
e@0: def __init__(self, script, sound_dir, speech_dir):
e@0: """
e@0: Gets a list of script
e@0:
e@0: :param sound_dir: directory of sound files
e@0: :param speech_dir: directory of speech files
e@0: :param script: the script
e@0: """
e@0:
e@0: # Gets character definitions
e@0:
e@0: ## TODO: Change this to also have accents
e@0:
e@0: self.voice_params = {}
e@0: self.scene_params = {}
e@0: self.bg_params = {}
e@0:
e@0: # This holds the fxive sound engine if available
e@0: self.fxive = None
e@0:
e@0: global FFV, FMV
e@0: for d in script['definitions']:
e@0: if d['type'] == 'scene_definition':
e@0: number = int(d['number'])
e@0: tags = d['tags']
e@0: filename = d['filename']
e@0:
e@0: # If it starts with fxive: then get the preset from fxive
e@0: if 'fxive:' == filename[:6]:
e@0: print("Fetching sample from fxive...")
e@0: if self.fxive is not None:
e@0: self.bg_params[number] = self.fxive.get_sfx(filename[6:])
e@0: else:
e@0: self.fxive = FXive(sfx_path=os.path.join(sound_dir, 'sfx.xls'))
e@0: self.bg_params[number] = self.fxive.get_sfx(filename[6:])
e@0: else:
e@0: self.bg_params[number] = filename
e@0:
e@0: if 'none' in tags:
e@0: self.scene_params[number] = []
e@0: else:
e@0: self.scene_params[number] = tags
e@0:
e@0: if d['type'] == 'cast_definition':
e@0: # print("-----------------")
e@0: name = d['name']
e@0: gender = random.sample(d['gender'], 1)[0]
e@0: panning = random.sample(d['panning'], 1)[0]
e@0:
e@0: if panning == 'left':
e@0: panning = 0.01
e@0: elif panning == 'right':
e@0: panning = 0.99
e@0: elif panning in ['center', 'centre']:
e@0: panning = 0.5
e@0: #print(gender, panning)
e@0: if gender == 'female':
e@0: # Choose a random female voice
e@0: voice = random.sample(female_voices, 1)[0]
e@0:
e@0: if FIXED_VOICES:
e@0: voice = fixed_female_voice[FFV]
e@0: FFV += 1
e@0: else:
e@0: # Choose a random male voice
e@0: voice = random.sample(male_voices, 1)[0]
e@0:
e@0: if FIXED_VOICES:
e@0: voice = fixed_male_voice[FMV]
e@0: FMV += 1
e@0:
e@0: self.voice_params[name] = (voice, panning)
e@0:
e@0: # if character_panning == 0.5:
e@0: # character_panning = 0.1
e@0: # elif character_panning == 0.1:
e@0: # character_panning = 0.9
e@0: # elif character_panning == 0.9:
e@0: # character_panning = 0.1
e@0:
e@0:
e@0: if self.fxive is not None:
e@0: self.fxive.close()
e@0:
e@0: self.script = script
e@0: self.sound_dir = sound_dir
e@0: self.speech_dir = speech_dir
e@0:
e@0: self.musicmanager = MusicManager(sound_dir)
e@0: self.pausemanager = PauseManager()
e@0: self.speechmanager = SpeechManager(speech_dir, self.voice_params)
e@0: self.sfxmanager = SoundManager(sound_dir)
e@0:
e@0: def get_voice_params(self, name):
e@0: return self.voice_params[name]
e@0:
e@0: def generate_multitrack(self):
e@0: # Shift by 4 seconds
e@0: D = 0
e@0: P = []
e@0: track_names = []
e@0:
e@0: # print(self.script['script'])
e@0:
e@0: current_scene = 1
e@0: current_reverb_tags = ""
e@0:
e@0: scene_changes = []
e@0:
e@0: # Create a program of scripts
e@0: for s in self.script['script']:
e@0: if s['type'] == 'music':
e@0: name = 'music'
e@0: audio = self.musicmanager.retrieve_music(s)
e@0: elif s['type'] == 'sfx':
e@0: name = s['name'].lower()
e@0: audio = self.sfxmanager.retrieve_sfx(s)
e@0: elif s['type'] == 'scene_change':
e@0: current_scene = int(s['number'])
e@0: #print(current_scene)
e@0: #print(self.scene_params)
e@0: current_reverb_tags = self.scene_params[current_scene]
e@0:
e@0: print("Changed to scene {} with reverb tags: {}".format(current_scene, current_reverb_tags))
e@0: scene_changes.append((D, current_scene))
e@0: continue
e@0: elif s['type'] == 'pause':
e@0: name = 'pause'
e@0: audio = self.pausemanager.retrieve_pause(s)
e@0: elif s['type'] == 'cast_line':
e@0: print(s)
e@0: name = s['name'].lower()
e@0: audio = self.speechmanager.retrieve_speech(s)
e@0: if len(current_reverb_tags) > 0:
e@0: print("Applying reverberation with tags: {}".format(current_reverb_tags))
e@0: print(audio.shape)
e@0: if s['name'] != 'Narrator':
e@0: audio = get_reverb_from_tags(audio[0,:], audio[1,:], current_reverb_tags)
e@0:
e@0: if name not in track_names:
e@0: track_names.append(name)
e@0: D += audio.shape[1]
e@0: P.append((name,audio))
e@0:
e@0: multitrack = {t: np.zeros((2, D)) for t in track_names}
e@0:
e@0: print("Composing bg scene")
e@0: multitrack['background'] = compose_bg_scene(self.bg_params, scene_changes, D)
e@0:
e@0: idx = 0
e@0: for p in P:
e@0: multitrack[p[0]][:, idx:idx+p[1].shape[1]] = p[1]
e@0: idx += p[1].shape[1]
e@0:
e@0: return multitrack
e@0:
e@0:
e@0:
e@0: class Generator():
e@0: def __init__(self):
e@0: pass
e@0:
e@0: def generate(self):
e@0: with open('../data/scripts/The Mystery Of Spooky Hill.txt') as f:
e@0: return f.read()
e@0:
e@0:
e@0: class PauseManager():
e@0: def __init__(self):
e@0: """
e@0: Manages pauses
e@0: """
e@0:
e@0: def retrieve_pause(self, input_):
e@0: duration_str = input_['duration']
e@0: if duration_str == 'long':
e@0: duration = 3.0
e@0: elif duration_str == 'short':
e@0: duration = 1.0
e@0:
e@0: audio = np.zeros((2, int(duration*44100)))
e@0: return audio
e@0:
e@0: class SpeechManager():
e@0: def __init__(self, speech_folder, voice_params):
e@0: """
e@0:
e@0: :param speech_folder: the folder the speech .mp3s are
e@0: """
e@0:
e@0: self.voice_params = voice_params
e@0: self.speech_folder = speech_folder
e@0: try:
e@0: self.transcriptions = pd.read_excel(os.path.join(speech_folder ,'transcript.xls'))
e@0: except:
e@0: # If the file does not exist
e@0: self.transcriptions = None
e@0:
e@0: print('Transcription file:' + str(os.path.join(speech_folder ,'transcript.xls')))
e@0: print('Transcriptions:' + str(self.transcriptions))
e@0:
e@0: def retrieve_speech(self, input_):
e@0: # print(input_)
e@0: cast_member = input_['name']
e@0: # print(self.voice_params)
e@0: cast_voice = self.voice_params[cast_member][0] # 0th element is voice
e@0: cast_panning = self.voice_params[cast_member][1] #1th element is panning
e@0:
e@0: cast_line = input_['line']
e@0:
e@0: can_find_entry = False
e@0:
e@0:
e@0: # If the file does not exist
e@0: cast_lines_df = self.transcriptions[self.transcriptions['cast'].map(lambda x: x.lower()) == cast_member.lower()]
e@0: similarities = {}
e@0: for n in cast_lines_df.index:
e@0: similarities[n] = difflib.SequenceMatcher(None, cast_line, cast_lines_df['line'].loc[n]).ratio()
e@0:
e@0: # Most similar entry location
e@0: chosen_entry = max(similarities, key=lambda x: similarities[x])
e@0: chosen_file = cast_lines_df['filename'].loc[chosen_entry]
e@0: chosen_line = cast_lines_df['line'].loc[chosen_entry]
e@0:
e@0: if similar(cast_line, chosen_line):
e@0: can_find_entry = True
e@0:
e@0: chosen_file_path = os.path.join(self.speech_folder, chosen_file)
e@0: print("Retrieving: " + chosen_file_path)
e@0:
e@0: if os.path.exists(chosen_file_path):
e@0: audio, sr = librosa.core.load(chosen_file_path, sr=44100, mono=False)
e@0: #print("panning: {}".format(cast_panning))
e@0: audio[0,:] *= cast_panning
e@0: audio[1,:] *= (1-cast_panning)
e@0: else:
e@0: can_find_entry = False
e@0:
e@0: if not can_find_entry:
e@0: # 1. Generate line
e@0: audio, sr = generate_speech_with_festival(cast_voice, cast_panning, cast_line, sr=44100)
e@0: # print("panning: {}".format(cast_panning))
e@0: # audio[0,:] *= cast_panning
e@0: # audio[1,:] *= (1-cast_panning)
e@0:
e@0:
e@0:
e@0: # If the line is too disimilar, synthesize it, else use the chosen line
e@0: return audio
e@0:
e@0:
e@0: class SoundManager():
e@0: def __init__(self, sound_folder):
e@0: """
e@0:
e@0: :param sound_folder: the folder the music .mp3s are
e@0: """
e@0:
e@0:
e@0: self.sound_folder = sound_folder
e@0: self.sound_file_names = [f.split('/')[-1] for f in glob.glob(sound_folder + '*.mp3')]
e@0:
e@0: # If the directory is empty, return.
e@0: if len(self.sound_file_names) == 0:
e@0: return
e@0: # Lookup strings
e@0: strings = []
e@0: for f in self.sound_file_names:
e@0: strings.append(" ".join(re.findall('[A-Za-z]+', f)).lower())
e@0:
e@0: # Sanitize strings, remove the most common substring
e@0:
e@0: # Find most common substring
e@0: string1 = strings[0]
e@0: for n in range(1, len(strings)):
e@0: string2 = strings[n]
e@0: match = difflib.SequenceMatcher(None, string1, string2).find_longest_match(0, len(string1), 0, len(string2))
e@0: string1 = string2[match.b:match.b + match.size]
e@0:
e@0: # Remove most common substring
e@0: ## TODO: Check here please, should we remove it?
e@0:
e@0: # strings = [s.replace(string1, '') for s in strings]
e@0: self.lookup = strings
e@0:
e@0: def retrieve_sfx(self, input_):
e@0: """
e@0:
e@0: :param query: dictionary object from parser
e@0: :return: audio matrix containing audio file
e@0: """
e@0:
e@0: query = input_['name'].lower()
e@0: # Lematize words before checking for similarity
e@0: stemmer = porter.PorterStemmer()
e@0:
e@0: qwords = [stemmer.stem(q).lower() for q in query.split()]
e@0: similarities = []
e@0:
e@0: # If the words in the query are available in the words in the filename, then increase by 1. Finally,
e@0: # divide by the total number of words (Jaccard similarity?)
e@0:
e@0: for s in self.lookup:
e@0:
e@0: words = [stemmer.stem(w).lower() for w in s.split()]
e@0: similarities.append(0.)
e@0:
e@0: for qw in qwords:
e@0: for w in words:
e@0: similarities[-1] += difflib.SequenceMatcher(None, qw, w).ratio()
e@0:
e@0: similarities[-1]/=float(len(words))
e@0:
e@0: # This is argmax
e@0: chosen = [n for n in range(len(similarities)) if similarities[n] == max(similarities)][0]
e@0: chosen_fname = self.sound_folder + self.sound_file_names[chosen]
e@0: audio = librosa.core.load(chosen_fname, sr=44100, mono=False)
e@0: return audio[0]
e@0:
e@0:
e@0: class MusicManager():
e@0: def __init__(self, sound_folder):
e@0: """
e@0:
e@0: :param sound_folder: the folder the music .mp3s are
e@0: """
e@0:
e@0: self.sound_folder = sound_folder
e@0: self.sound_file_names = [f.split('/')[-1] for f in glob.glob(sound_folder + '/*.mp3')]
e@0:
e@0: # If the directory is empty, return.
e@0: if len(self.sound_file_names) == 0:
e@0: return
e@0:
e@0: # Lookup strings
e@0: strings = []
e@0: for f in self.sound_file_names:
e@0: strings.append(" ".join(re.findall('[A-Za-z]+', f)).lower())
e@0:
e@0: # Sanitize strings, remove the most common substring
e@0:
e@0: # Find most common substring
e@0: string1 = strings[0]
e@0: for n in range(1, len(strings)):
e@0: string2 = strings[n]
e@0: match = difflib.SequenceMatcher(None, string1, string2).find_longest_match(0, len(string1), 0, len(string2))
e@0: string1 = string2[match.b:match.b + match.size]
e@0:
e@0: # Remove most common substring
e@0: strings = [s.replace(string1, '') for s in strings]
e@0: self.lookup = strings
e@0:
e@0: def retrieve_music(self, input_):
e@0: """
e@0:
e@0: :param query: dictionary object from parser
e@0: :return: audio matrix containing audio file
e@0: """
e@0:
e@0: query = input_['name'].lower() + ' music'
e@0:
e@0: similarities = []
e@0:
e@0: # If the words in the query are available in the words in the filename, then increase by 1. Finally,
e@0: # divide by the total number of words (Jaccard similarity?)
e@0:
e@0: for s in self.lookup:
e@0: qwords = query.split()
e@0: words = s.split()
e@0: similarities.append(0.)
e@0:
e@0: for qw in qwords:
e@0: if qw in words:
e@0: similarities[-1] += 1.
e@0:
e@0: similarities[-1]/=float(len(words))
e@0:
e@0: # This is argmax
e@0: chosen = [n for n in range(len(similarities)) if similarities[n] == max(similarities)][0]
e@0: chosen_fname = self.sound_folder + self.sound_file_names[chosen]
e@0: audio = librosa.core.load(chosen_fname, sr=44100, mono=False)
e@0: return audio[0]
e@0:
e@0:
e@0: # Classes for aiding parsing
e@0: class Environment:
e@0: def __init__(self, varname, name):
e@0: self.name = name
e@0: self.varname = varname
e@0:
e@0: def to_json(self):
e@0: return {"type": "environment_definition", "name": self.name}
e@0:
e@0:
e@0: class Sound_Effect:
e@0: def __init__(self, varname, name, pos):
e@0: self.name = name
e@0: self.varname = varname
e@0: self.keywords = [kw for kw in name.split()]
e@0:
e@0: # Set the end to pos-1 so the first
e@0: # character of the next line won't be ommited
e@0:
e@0: self.pos = (pos, pos - 1)
e@0:
e@0: def to_json(self):
e@0: return {
e@0: 'definition': {
e@0: 'type': 'sfx_definition',
e@0: 'name': ' '.join(self.keywords),
e@0: 'optional': False
e@0: },
e@0: 'playback': {
e@0: 'type': 'sfx',
e@0: 'name': ' '.join(self.keywords)
e@0: }
e@0: }
e@0:
e@0: def add_keywords(self, keywords):
e@0: for kw in keywords:
e@0: self.keywords.insert(0, kw)
e@0:
e@0: def __str__(self):
e@0: return "({} FX)".format(' '.join(self.keywords))
e@0:
e@0: def definition(self):
e@0: return ' '.join(self.keywords)
e@0:
e@0: def script(self):
e@0: return str(self)
e@0:
e@0:
e@0: class Character_Line:
e@0: def __init__(self, varname, txt, pos_start, pos_end):
e@0: self.varname = varname
e@0: self.txt = '. '.join([sent.capitalize() for sent in txt.split('\n')])
e@0: if self.txt[-1] != '.':
e@0: self.txt += '.'
e@0:
e@0: self.character = None
e@0: self.pos = (pos_start, pos_end)
e@0:
e@0: def set_character(self, character):
e@0: self.character = character
e@0:
e@0: def __str__(self):
e@0: return "{}: {}".format(self.character.name, self.txt)
e@0:
e@0: def script(self):
e@0: return "[{}] {}".format(self.character.name, self.txt)
e@0:
e@0: def set_pos(self, start, end):
e@0: self.pos = (start, end)
e@0:
e@0: def to_json(self):
e@0: return {'playback': {"type": "cast_line", "name": self.character.name, "line": self.txt}}
e@0:
e@0:
e@0: class Character:
e@0: def __init__(self, varname, name):
e@0: self.name = ' '.join([n.capitalize() for n in name.split()])
e@0: self.varname = varname
e@0: self.gender = ''
e@0: self.age = ''
e@0:
e@0: def set_gender(self, gender):
e@0: self.gender = gender
e@0:
e@0: def set_age(self, age):
e@0: self.age = age
e@0:
e@0: def definition(self):
e@0: str_ = self.name + ' - '
e@0: if self.gender == '':
e@0: str_ += 'male or female'
e@0: else:
e@0: str_ += self.gender
e@0:
e@0: return str_
e@0:
e@0: def __str__(self):
e@0: return __repr__(self)
e@0:
e@0: def __repr__(self):
e@0: return "[{}:{}/{}/{}]".format(self.varname, self.name, self.gender, self.age)
e@0:
e@0: def to_json(self):
e@0: json_dict = {"type": "cast_definition", "name": self.name}
e@0: if self.gender != '':
e@0: json_dict['gender'] = self.gender
e@0: if self.age != '':
e@0: json_dict['age'] = self.age
e@0:
e@0: return json_dict
e@0:
e@0:
e@0: class KDuration(pg.Keyword):
e@0: grammar = pg.Enum(pg.K('long'), pg.K('short'))
e@0:
e@0:
e@0: class Pause(pg.Plain):
e@0: grammar = '(', pg.optional(pg.attr('duration', KDuration)), 'pause', ')'
e@0:
e@0:
e@0: class CastHeader(pg.Plain):
e@0: grammar = 'Cast', pg.optional('List'), ':', pg.endl
e@0:
e@0:
e@0: class KGender(pg.Keyword):
e@0: grammar = pg.Enum(pg.K('male'), pg.K('female'))
e@0:
e@0:
e@0: class EGender(pg.List):
e@0: grammar = KGender, pg.optional('or', KGender)
e@0:
e@0:
e@0: class KPan(pg.Keyword):
e@0: grammar = pg.Enum(pg.K('left'), pg.K('right'), pg.K('center'), pg.K('centre'))
e@0:
e@0:
e@0: class EPan(pg.List):
e@0: grammar = KPan, pg.optional('or', KPan)
e@0:
e@0:
e@0: class CastDefinition(pg.Plain):
e@0: grammar = pg.attr('cast_name', re.compile('[A-Za-z0-9 ]+')), \
e@0: re.compile('\-+'), \
e@0: pg.attr('gender', EGender), \
e@0: re.compile('\-+'), \
e@0: 'panned', \
e@0: pg.attr('panning', EPan), pg.endl
e@0:
e@0:
e@0: class Tag(pg.Plain):
e@0: grammar = pg.attr('tag', re.compile(r'[A-Za-z0-9_\-]+'))
e@0:
e@0:
e@0: class LTag(pg.List):
e@0: grammar = pg.csl(Tag)
e@0:
e@0:
e@0: class ScenesHeader(pg.Plain):
e@0: grammar = re.compile('Scenes?'), pg.optional('List'), ':', pg.endl
e@0:
e@0:
e@0: class ScenesDefinition(pg.Plain):
e@0: grammar = pg.attr('number', re.compile('[A-Za-z0-9]+')), \
e@0: re.compile('\-+'), pg.attr('name', re.compile('[A-Za-z0-9]+')), \
e@0: re.compile('\-+'), pg.attr('filename', re.compile('[A-Za-z0-9_\:]+(\.(mp3|wav))?')), \
e@0: re.compile('\-+'), pg.attr('tags', LTag), pg.endl
e@0:
e@0:
e@0: class ScriptHeader(pg.Plain):
e@0: grammar = 'Script', ':', pg.endl
e@0:
e@0:
e@0: class SceneCommence(pg.Plain):
e@0: grammar = re.compile('\-+'), 'Scene', pg.attr('scene', re.compile('[A-Za-z0-9]+')), pg.optional(
e@0: re.compile('\-+')), pg.endl;
e@0:
e@0:
e@0: class CastLine(pg.Plain):
e@0: grammar = '[', pg.attr('cast_name', re.compile('[A-Za-z0-9 ]+')), ']', pg.attr('line',
e@0: re.compile(r'[A-Za-z0-9\-_.\ \" \'\,\?\:\!]+')),
e@0:
e@0:
e@0: class Headers(pg.Plain):
e@0: grammar = CastHeader, \
e@0: pg.attr('cast_list', pg.maybe_some(CastDefinition)), \
e@0: pg.optional(ScenesHeader, pg.attr('scene_list', pg.maybe_some(ScenesDefinition))), pg.optional(
e@0: ScriptHeader)
e@0:
e@0:
e@0: class Script(pg.List):
e@0: grammar = pg.some([Pause, SceneCommence, CastLine])
e@0:
e@0:
e@0: class ScriptDocument(pg.Plain):
e@0: grammar = pg.attr('headers', Headers), pg.attr('script', Script)
e@0:
e@0:
e@0: class Parser:
e@0: def __init__(self):
e@0: pass
e@0:
e@0: def parse_str(self, str_):
e@0:
e@0: obj = pg.parse(str_, ScriptDocument)
e@0: definitions = []
e@0: script = []
e@0:
e@0: for cast_def in obj.headers.cast_list:
e@0: name = cast_def.cast_name.strip()
e@0: gender = [str(t) for t in cast_def.gender]
e@0:
e@0: panning = [str(t) for t in cast_def.panning]
e@0:
e@0: cast_dict = {'type': 'cast_definition', 'name': name, 'gender': gender, 'voice': [], 'panning': panning}
e@0: definitions.append(cast_dict)
e@0:
e@0: for scene_def in obj.headers.scene_list:
e@0: name = scene_def.name.strip()
e@0: number = scene_def.number
e@0: filename = scene_def.filename
e@0: tags = [str(t.tag) for t in scene_def.tags]
e@0: scene_dict = {'type': 'scene_definition',
e@0: 'scene': name,
e@0: 'number': number,
e@0: 'filename': scene_def.filename,
e@0: 'tags': tags}
e@0: definitions.append(scene_dict)
e@0:
e@0: for action in obj.script:
e@0: if isinstance(action, Pause):
e@0: duration = str(action.duration)
e@0: pause_dict = {'type': 'pause', 'duration': duration}
e@0: script.append(pause_dict)
e@0: if isinstance(action, SceneCommence):
e@0: number = str(action.scene).strip()
e@0: scene_dict = {'type': 'scene_change', 'number': number}
e@0: script.append(scene_dict)
e@0: if isinstance(action, CastLine):
e@0: name = str(action.cast_name).strip()
e@0: line = str(action.line)
e@0: cast_line = {'type': 'cast_line', 'name': name, 'line': line}
e@0: script.append(cast_line)
e@0:
e@0: return {'definitions': definitions, 'script': script}
e@0:
e@0:
e@0: class Preprocessor():
e@0: def __init__(self):
e@0: self.parsed_dict = None
e@0: self.regexp_entity = re.compile('(?P[A-Z][0-9]+)\s+(?P[A-Z][A-Za-z_]*)\s+([0-9]+)\s+([0-9]+)(?P(\s+[A-Za-z\'\"]+)+$)')
e@0: self.regexp_attribute = re.compile('(?P[A-Z][0-9]+)\s+(?P[A-Z][A-Za-z_]*)\s+(?P[A-Z][0-9]+)\s+(?P[A-Za-z]+)')
e@0: self.regexp_relation = re.compile('(?P[A-Z][0-9]+)\s+(([A-Za-z0-9_]+:[A-Z0-9]+\s*)+)')
e@0: self.regexp_transitive = re.compile('(?P\*)\s+(?P[A-Z][A-Za-z_]*)\s+(?P[A-Z][0-9]+)\s+(?P[A-Z][0-9]+)\s*')
e@0:
e@0: def parse_str(self, text, annot):
e@0: """ takes as input a text and an annotation string """
e@0:
e@0: tups = [tup for tup in annot.split('\n') if tup.strip() != '']
e@0:
e@0: # Add the narrator
e@0: narrator = Character('_', 'Narrator')
e@0: characters = [narrator]
e@0:
e@0: objects = []
e@0: character_lines = []
e@0: sound_effects = []
e@0:
e@0: def find_character(var):
e@0: for c in characters:
e@0: if c.varname == var:
e@0: return c
e@0:
e@0: def find_character_or_object(var):
e@0: c = find_character(var)
e@0: if c is not None:
e@0: return c
e@0:
e@0: def find_character_lines(var):
e@0: for c in character_lines:
e@0: if c.varname == var:
e@0: return c
e@0:
e@0: def find_sound_effect(var):
e@0: for c in sound_effects:
e@0: if c.varname == var:
e@0: return c
e@0:
e@0: def character_speaks(c):
e@0: for cl in character_lines:
e@0: if isinstance(cl, Character_Line) and cl.character == c:
e@0: return True
e@0: return False
e@0:
e@0: for tup in tups:
e@0: # print(tup)
e@0:
e@0: groups = self.regexp_entity.findall(tup)
e@0: if len(groups) > 0:
e@0: if groups[0][1] == 'Character':
e@0: # If the entity is a character
e@0: characters.append(
e@0: Character(groups[0][0].strip(), groups[0][4].strip())
e@0: )
e@0: elif groups[0][1] == 'Character_Line':
e@0: character_lines.append(
e@0: Character_Line(groups[0][0].strip(), groups[0][4].strip(), int(groups[0][2]), int(groups[0][3]))
e@0: )
e@0: elif groups[0][1] == 'Sound_Effect':
e@0: sound_effects.append(
e@0: Sound_Effect(groups[0][0].strip(), groups[0][4].strip(), int(groups[0][2]))
e@0: )
e@0: continue
e@0:
e@0: for tup in tups:
e@0: # Attributes and relations
e@0: groups = self.regexp_attribute.findall(tup)
e@0: if len(groups) > 0:
e@0: if groups[0][1] == 'Gender':
e@0: # if
e@0: c = find_character(groups[0][2].strip())
e@0: c.set_gender(groups[0][3].strip().lower())
e@0: elif groups[0][1] == 'Age':
e@0: c = find_character(groups[0][2].strip())
e@0: c.set_age(groups[0][3].strip().lower())
e@0:
e@0: for tup in tups:
e@0: # Attributes and relations
e@0: groups = self.regexp_relation.findall(tup)
e@0: if len(groups) > 0 and groups[0][1][:4] == 'Says':
e@0: # print(groups)
e@0: refs = groups[0][1].split()[1:]
e@0:
e@0: # Store who and whats
e@0: whats = []
e@0: who = None
e@0:
e@0: for ref in refs:
e@0: type_, var = ref.split(':')
e@0: if type_[:4] == 'WHAT':
e@0: whats.append(var)
e@0: elif type_[:3] == 'WHO':
e@0: who = find_character(var)
e@0:
e@0: # find character lines:
e@0: clines = [find_character_lines(w) for w in whats]
e@0:
e@0: # Assign characters
e@0: for cl in clines:
e@0: cl.set_character(who)
e@0: elif len(groups) > 0 and groups[0][1][:12] == 'Sound_Effect':
e@0: sfx = find_sound_effect(groups[0][1][13:].split()[0])
e@0: #print(groups)
e@0:
e@0: # Store extra keywords
e@0: keywords = []
e@0:
e@0: refs = groups[0][1].split()[1:]
e@0: for ref in refs:
e@0: #print(ref)
e@0: type_, var = ref.split(':')
e@0: if type_[:8] == 'CausedBy':
e@0: cause = find_character_or_object(var)
e@0: if cause != None:
e@0: keywords.append(cause.name)
e@0:
e@0: sfx.add_keywords(keywords)
e@0:
e@0: # %% Calculate line segments for character lines and narration
e@0:
e@0: # Add sound effects to the timeline
e@0: clpos = [cl.pos for cl in character_lines]
e@0: clpos += [sfx.pos for sfx in sound_effects]
e@0: clpos = sorted(clpos, key=lambda x: x[0])
e@0:
e@0: # Add first narrator line
e@0: cl = Character_Line('_', text[0:clpos[0][0]], 0, clpos[0][0] - 1)
e@0: cl.set_character(narrator)
e@0: character_lines.append(cl)
e@0:
e@0: for n in range(len(clpos) - 1):
e@0: if clpos[n][1] != clpos[n + 1][0] - 1:
e@0: cl = Character_Line('_', text[clpos[n][1] + 1:clpos[n + 1][0]].rstrip(), clpos[n][1] + 1,
e@0: clpos[n + 1][0] - 1)
e@0: cl.set_character(narrator)
e@0: character_lines.append(cl)
e@0:
e@0: character_lines += sound_effects
e@0: character_lines = sorted(character_lines, key=lambda x: x.pos[1])
e@0:
e@0: # parsed_dict = {'definitions': [c.to_json() for c in characters],
e@0: # 'script': [cl.to_json() for cl in character_lines]}
e@0:
e@0: # parsed_dict = {'definitions': [], 'script':[]}
e@0: #
e@0: # print("Cast List:")
e@0: # for c in characters:
e@0: # if character_speaks(c):
e@0: # print(c.definition())
e@0: #
e@0: # print("")
e@0: # print("Sound Effects:")
e@0: #
e@0: # for c in sound_effects:
e@0: # print(c.definition())
e@0: #
e@0: # for cl in character_lines:
e@0: # print(cl.script())
e@0:
e@0: # Add definitions for characters
e@0: # for c in characters:
e@0: # if character_speaks(c):
e@0: # parsed_dict['definitions'].append(c.to_json())
e@0: #
e@0: # # Add definitions for sound effects
e@0: # for c in sound_effects:
e@0: # parsed_dict['definitions'].append(c.to_json()['definition'])
e@0: #
e@0: #
e@0: # # Add timeline information
e@0: # for cl in character_lines:
e@0: # parsed_dict['script'].append(cl.to_json()['playback'])
e@0:
e@0: str_ = "Cast List:\n"
e@0: for c in characters:
e@0: if character_speaks(c):
e@0: str_ += "{}\n".format(c.definition())
e@0:
e@0: str_ += "Sound Effects:\n"
e@0: for c in sound_effects:
e@0: str_ += "{}\n".format(c.definition())
e@0:
e@0:
e@0: for cl in character_lines:
e@0: str_ += "{}\n".format(cl.script())
e@0:
e@0:
e@0: return str_