Mercurial > hg > from-my-pen-to-your-ears-supplementary-material
diff demo/workspace/approach1.py @ 0:4dad87badb0c
initial commit
author | Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk> |
---|---|
date | Wed, 16 May 2018 17:56:10 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/demo/workspace/approach1.py Wed May 16 17:56:10 2018 +0100 @@ -0,0 +1,1331 @@ +import difflib +from sklearn.externals import joblib +from collections import defaultdict +import nltk +import numpy as np +import re +import librosa +import glob +import pandas as pd +from nltk.stem import porter +import sox +from scipy.io.wavfile import read as wavread +from scipy.io.wavfile import write as wavwrite +from numpy.core._internal import _gcd as gcd +from rtsfx import * + +import subprocess +import os +import pypeg2 as pg +import random + +VOICE_PROPERTIES = ['slow', 'deep', 'fast', 'stuttering'] +PAUSE_PROPERTIES = ['short', 'long'] +SEPARATORS = [ "[", "]", "(", ")", ":", "-"] +SFX_MOD_PROPERTIES = ['quiet', 'loud', 'silent'] + + +FIXED_VOICES = False + +FMV = 0 +FFV = 0 + + +male_voices = r""" +cmu_us_ahw_cg +cmu_us_awb_cg +cmu_us_bdl_cg +cmu_us_fem_cg +cmu_us_jmk_cg +cmu_us_ksp_cg +cmu_us_rms_cg +""".split() + + +female_voices = r""" +cmu_us_aup_cg +cmu_us_axb_cg +cmu_us_clb_cg +cmu_us_gka_cg +cmu_us_rxr_cg +cmu_us_slt_cg +""".split() + + + +# male_voices = r""" +# cmu_us_ahw_cg +# cmu_us_fem_cg +# cmu_us_rms_cg +# """.split() +# +# +# female_voices = r""" +# cmu_us_aup_cg +# cmu_us_axb_cg +# cmu_us_rxr_cg +# cmu_us_slt_cg +# """.split() + +fixed_male_voice = male_voices +fixed_female_voice = female_voices + + +# male_voices = r""" +# cmu_us_ahw_cg +# cmu_us_awb_cg +# cmu_us_bdl_cg +# cmu_us_fem_cg +# cmu_us_jmk_cg +# cmu_us_ksp_cg +# cmu_us_rms_cg +# """.split() + + +# male_voices = r""" +# cmu_us_ahw_cg +# """.split() +# +# +# female_voices = r""" +# cmu_us_ahw_cg +# """.split() + +import matplotlib.pyplot as plt + +def generate_speech_with_festival(voice, + panning, + line, + sr=None + ): + """ + Used for speech generation + Constructs a festival .sable file + and runs it through festival. + + """ + header = r""" + <?xml version="1.0"?> + <!DOCTYPE SABLE PUBLIC "-//SABLE//DTD SABLE speech mark up//EN" + "Sable.v0_2.dtd" + []> + <SABLE> + <SPEAKER NAME="{}"> + """.format(voice) + + footer = r""" + </SPEAKER> + </SABLE> + """ + + # 0. Construct sable file + sable = header + line + footer + + # 1. Save sable file to a temporary .sable file in tmp + + with open('/tmp/character_line.sable', 'w') as f: + f.write(sable) + + # 2. Call process to festival + cmd = 'text2wave /tmp/character_line.sable -o /tmp/character_line.wav' + + print("Generating speech for line: '{}' with voice '{}' and panning '{}' ".format(line, voice, panning)) + value = subprocess.call(cmd, shell=True) + + if value != 0: + raise RuntimeError("Festival failed to execute.") + + # 3. Load back wave file + if sr is None: + wav, sr = librosa.load('/tmp/character_line.wav', mono=True) + else: + wav, sr = librosa.load('/tmp/character_line.wav', sr=sr, mono=True) + + audio = np.vstack([panning*wav,(1.-panning)*wav]) + # + # plt.figure() + # plt.plot(audio[0,:]) + # plt.figure() + # plt.plot(audio[1,:]) + # plt.show() + return audio, sr + + +def substr_features(sent, + lower=True, substr=[]): + if lower: + sent = sent.lower() + freqs = defaultdict(int) + for ss in substr: + if ss in sent: + freqs[ss] = 1 + return dict(freqs) + + +def features_dict_to_matrix(features, feature_labels): + N = len(features) + M = len(feature_labels) + arr = np.zeros((N, M)) + + idx_to_feat = list(feature_labels) + feat_to_idx = dict((idx_to_feat[k], k) for k in range(len(idx_to_feat))) + + for n in range(arr.shape[0]): + for m in range(arr.shape[1]): + if idx_to_feat[m] in features[n]: + arr[n, m] = features[n][idx_to_feat[m]] + + return arr, list(feat_to_idx.keys()) + + +def similar(text1, text2, + threshold=0.7 # threshold for similarity + ): + """ Tests whether two strings are similar """ + + ratio = difflib.SequenceMatcher(None, text1.lower(), text2.lower()).ratio() + return ratio >= threshold + + +class Master(): + def __init__(self, downmix): + self.downmix = downmix + + + def get_mastered(self): + # Creating transformer + tfm = sox.Transformer() + + # Removing everything below 80hz + tfm.highpass(80) + + # Adding a notch filter at 200hz to improve clarity + tfm.bandreject(200) + + # Loudness control for under -9dB + tfm.loudness(gain_db=-9) + + # Store downmix temporarily + librosa.output.write_wav('/tmp/downmix_unnormalized.wav', self.downmix, sr=44100, norm=False) + tfm.build('/tmp/downmix_unnormalized.wav', '/tmp/downmix_normalized.wav') + + # Load downmix + mastered = librosa.core.load('/tmp/downmix_normalized.wav', sr=44100,mono=False )[0] + return mastered + + +class Mixer(): + def __init__(self, multitrack): + self.multitrack = multitrack + + def get_downmix(self): + + # Just a trick to get the length of the first track + if 'background' in self.multitrack: + D = self.multitrack['background'].shape[1] + else: + for track in self.multitrack: + D = self.multitrack[track].shape[1] + break + + downmix = np.zeros((2, D)) + for ttrack in self.multitrack: + + #1. Normalize + + track = self.multitrack[ttrack] + + max_val = np.max(np.abs(track)) + + if max_val > 0: + track /= max_val + + if ttrack == 'background': + track *= 0.05 + + + downmix += track + + return downmix + + +def zafar(lx, rx, d1, g1, m, fc, G, da=0.007, fs=44100.): + """ Rafii & Pardo Reverberator (2009) controlled by High Level parameters + Inputs: + lx : left channel input + rx : right channel input + d1 : delay of first comb filter in seconds + g1 : gain of first comb filters + da : delay of allpass filter in seconds + G : dry/wet mix gain + fc : lowpass filter cuttoff Hz + m : difference between left and right channel phases + fs : sampling rate + + Outputs: + ly: left channel output + ry: right channel output + """ + + d1 = int(d1 * fs) + m = int(m * fs) + da = int(da * fs) + + def calculate_parameters(d1, g1): + + d2 = int(round((1.5) ** (-1) * d1)) + + while gcd(d2, d1) != 1: + d2 += 1 + + d3 = int(round((1.5) ** (-2) * d1)) + + while gcd(d3, d2) != 1 or gcd(d3, d1) != 1: + d3 += 1 + + d4 = int(round((1.5) ** (-3) * d1)) + + while gcd(d4, d3) != 1 or gcd(d4, d2) != 1 or gcd(d4, d1) != 1: + d4 += 1 + + d5 = int(round((1.5) ** (-4) * d1)) + + while gcd(d5, d4) != 1 or gcd(d5, d3) != 1 or gcd(d5, d2) != 1 or gcd(d5, d1) != 1: + d5 += 1 + + d6 = int(round((1.5) ** (-5) * d1)) + while gcd(d6, d5) != 1 or gcd(d6, d4) != 1 or gcd(d6, d3) != 1 or gcd(d6, d2) != 1 or gcd(d6, d1) != 1: + d6 += 1 + g2 = g1 ** (1.5) ** (-1) * g1 + g3 = g1 ** (1.5) ** (-2) * g1 + g4 = g1 ** (1.5) ** (-3) * g1 + g5 = g1 ** (1.5) ** (-4) * g1 + g6 = g1 ** (1.5) ** (-5) * g1 + + return (d1, d2, d3, d4, d5, d6, g1, g2, g3, g4, g5, g6) + + def comb_array(x, g1, d1): + + (d1, d2, d3, d4, d5, d6, g1, g2, g3, g4, g5, g6) = calculate_parameters(d1, g1) + + c1out = comb(x, g1, d1) + c2out = comb(x, g2, d2) + c3out = comb(x, g3, d3) + c4out = comb(x, g4, d4) + c5out = comb(x, g5, d5) + c6out = comb(x, g6, d6) + + Lc1 = len(c1out) + Lc2 = len(c2out) + Lc3 = len(c3out) + Lc4 = len(c4out) + Lc5 = len(c5out) + Lc6 = len(c6out) + + Lc = max(Lc1, Lc2, Lc3, Lc4, Lc5, Lc6) + + y = np.zeros((Lc,)) + + y[0:Lc1] = c1out + y[0:Lc2] += c2out + y[0:Lc3] += c3out + y[0:Lc4] += c4out + y[0:Lc5] += c5out + y[0:Lc6] += c6out + + return y + + def comb(x, g, d): + LEN = len(x) + d + # print d + y = np.zeros((LEN,)) + for n in range(0, LEN): + if n - d < 0: + y[n] = 0 + else: + y[n] = x[n - d] + g * y[n - d] + + return y + + def allpass(x, g, d): + LENx = len(x) + LENy = LENx + d + y = np.zeros((LENy,)) + for n in range(0, LENy): + if n - d < 0: + y[n] = -g * x[n] + elif n >= LENx: + y[n] = x[n - d] + g * y[n - d] + else: + y[n] = x[n - d] - g * x[n] + g * y[n - d] + + return y + + def lowpass(x, g): + LEN = len(x) + y = np.zeros((LEN,)) + + for n in range(0, LEN): + if n - 1 < 0: + y[n] = (1 - g) * x[n] + else: + y[n] = (1 - g) * x[n] + g * y[n - 1] + + return y + + ga = 1. / np.sqrt(2.) + + cin = 0.5 * lx + 0.5 * rx + cout = comb_array(cin, g1, d1) + + ra = allpass(cout, ga, da + m // 2) + la = allpass(cout, ga, da - m // 2) + + gc = 2 - np.cos(2 * np.pi * fc / fs) - np.sqrt((np.cos(2 * np.pi * fc / fs) - 2) ** 2 - 1) + + ral = lowpass(ra, gc) + lal = lowpass(la, gc) + + ralg = G * ral + lalg = G * lal + + ry = ralg[0:len(rx)] + (1 - G) * rx + ly = lalg[0:len(lx)] + (1 - G) * lx + + return np.vstack([ry, ly]) + +def get_reverb_from_tags(xl, xr, tags, fs=44100): + reverb_csv = 'contributions.csv' + df = pd.read_csv(reverb_csv) + df = df.fillna("") + params = [] + for n in range(len(df)): + if all([t in df['agreed'].iloc[n].split(',') for t in tags]): + params.append(df['param'].iloc[n]) + d1, g1, m, fc, G = [float(f) for f in params[0].split(',')] + y = zafar(xl, xr, d1, g1, m, fc, G, fs=fs) + return y + + +def fade(x, fade_in, fade_out, sr=44100): + """ + Creates a fade-in-fade-out envelope + for audio array x. + """ + + if len(x) == 0: + return x + + fade_in_samples = int(fade_in * sr) + fade_out_samples = int(fade_out * sr) + + outp = np.ones_like(x) + for n in range(fade_in_samples): + outp[n] = n * 1. / fade_in_samples + + for n in range(fade_out_samples): + outp[len(outp) - fade_out_samples + n] = 1 - 1. / fade_out_samples * n + return outp * x + + +def slope(x, slope_in, slope_out, delay=1.0, v=0.1, sr=44100): + """ + Creates a slope in slope out envelope + """ + + if len(x) == 0: + return x + + delay_samples = int(delay * sr) + slope_in_samples = int(slope_in * sr) + slope_out_samples = int(slope_out * sr) + + outp = np.zeros_like(x) + + for n in range(len(outp)): + if n >= 0 and n < delay_samples: + outp[n] = 1.0 - v + elif n >= delay_samples and n < delay_samples + slope_in_samples: + outp[n] = (1. - v) - (1. - v) / slope_in_samples * (n - delay_samples) + elif n >= delay_samples + slope_in_samples and n < len(outp) - delay_samples - slope_out_samples: + outp[n] = 0 + elif n >= len(outp) - delay_samples - slope_out_samples and n < len(outp) - delay_samples: + outp[n] = (1. - v) / slope_out_samples * (n - len(outp) + delay_samples + slope_out_samples) + if outp[n] < 0: + print(n) + break + elif n >= len(outp) - delay_samples: + outp[n] = 1.0 - v + + outp += v + + return outp * x + + +def get_background( + fname, + duration, + ft=0.5, +): + print(fname) + bg, sr = librosa.load(fname) + f_s = int(ft * sr) + y = bg + z = np.zeros((duration,)) + if len(y) < len(z): + y = fade(y, ft, ft, sr) + for n in range(0, len(z) - len(y), len(y) - f_s): + z[n:n + len(y)] += y + n += len(y) - f_s + if len(y) > len(z[n:]): + z[n:] += y[:len(z[n:])] + else: + z[n:n + len(y)] += y + + z = fade(z, ft, ft, sr=sr) + + elif len(y) > len(z): + z += fade(y[0:len(z)], ft, ft, sr=sr) + return z + + +def compose_bg_scene(bgs, background_changes, D, delay=3*44100): + z = np.zeros((2,D)) + for n in range(len(background_changes)): + bg_choice = background_changes[n][1] + start = background_changes[n][0] + fname = bgs[bg_choice] + if n < len(background_changes) - 1: + duration = background_changes[n + 1][0] - background_changes[n][0] + else: + duration = D - background_changes[n][0] + + y = get_background(fname, duration) + z[0,start:start + len(y)] = y + z[1, start:start + len(y)] = y + #z = fade(z, 1., 1.) + return z + + +class Director(): + def __init__(self, script, sound_dir, speech_dir): + """ + Gets a list of script + + :param sound_dir: directory of sound files + :param speech_dir: directory of speech files + :param script: the script + """ + + # Gets character definitions + + ## TODO: Change this to also have accents + + self.voice_params = {} + self.scene_params = {} + self.bg_params = {} + + # This holds the fxive sound engine if available + self.fxive = None + + global FFV, FMV + for d in script['definitions']: + if d['type'] == 'scene_definition': + number = int(d['number']) + tags = d['tags'] + filename = d['filename'] + + # If it starts with fxive: then get the preset from fxive + if 'fxive:' == filename[:6]: + print("Fetching sample from fxive...") + if self.fxive is not None: + self.bg_params[number] = self.fxive.get_sfx(filename[6:]) + else: + self.fxive = FXive(sfx_path=os.path.join(sound_dir, 'sfx.xls')) + self.bg_params[number] = self.fxive.get_sfx(filename[6:]) + else: + self.bg_params[number] = filename + + if 'none' in tags: + self.scene_params[number] = [] + else: + self.scene_params[number] = tags + + if d['type'] == 'cast_definition': + # print("-----------------") + name = d['name'] + gender = random.sample(d['gender'], 1)[0] + panning = random.sample(d['panning'], 1)[0] + + if panning == 'left': + panning = 0.01 + elif panning == 'right': + panning = 0.99 + elif panning in ['center', 'centre']: + panning = 0.5 + #print(gender, panning) + if gender == 'female': + # Choose a random female voice + voice = random.sample(female_voices, 1)[0] + + if FIXED_VOICES: + voice = fixed_female_voice[FFV] + FFV += 1 + else: + # Choose a random male voice + voice = random.sample(male_voices, 1)[0] + + if FIXED_VOICES: + voice = fixed_male_voice[FMV] + FMV += 1 + + self.voice_params[name] = (voice, panning) + + # if character_panning == 0.5: + # character_panning = 0.1 + # elif character_panning == 0.1: + # character_panning = 0.9 + # elif character_panning == 0.9: + # character_panning = 0.1 + + + if self.fxive is not None: + self.fxive.close() + + self.script = script + self.sound_dir = sound_dir + self.speech_dir = speech_dir + + self.musicmanager = MusicManager(sound_dir) + self.pausemanager = PauseManager() + self.speechmanager = SpeechManager(speech_dir, self.voice_params) + self.sfxmanager = SoundManager(sound_dir) + + def get_voice_params(self, name): + return self.voice_params[name] + + def generate_multitrack(self): + # Shift by 4 seconds + D = 0 + P = [] + track_names = [] + + # print(self.script['script']) + + current_scene = 1 + current_reverb_tags = "" + + scene_changes = [] + + # Create a program of scripts + for s in self.script['script']: + if s['type'] == 'music': + name = 'music' + audio = self.musicmanager.retrieve_music(s) + elif s['type'] == 'sfx': + name = s['name'].lower() + audio = self.sfxmanager.retrieve_sfx(s) + elif s['type'] == 'scene_change': + current_scene = int(s['number']) + #print(current_scene) + #print(self.scene_params) + current_reverb_tags = self.scene_params[current_scene] + + print("Changed to scene {} with reverb tags: {}".format(current_scene, current_reverb_tags)) + scene_changes.append((D, current_scene)) + continue + elif s['type'] == 'pause': + name = 'pause' + audio = self.pausemanager.retrieve_pause(s) + elif s['type'] == 'cast_line': + print(s) + name = s['name'].lower() + audio = self.speechmanager.retrieve_speech(s) + if len(current_reverb_tags) > 0: + print("Applying reverberation with tags: {}".format(current_reverb_tags)) + print(audio.shape) + if s['name'] != 'Narrator': + audio = get_reverb_from_tags(audio[0,:], audio[1,:], current_reverb_tags) + + if name not in track_names: + track_names.append(name) + D += audio.shape[1] + P.append((name,audio)) + + multitrack = {t: np.zeros((2, D)) for t in track_names} + + print("Composing bg scene") + multitrack['background'] = compose_bg_scene(self.bg_params, scene_changes, D) + + idx = 0 + for p in P: + multitrack[p[0]][:, idx:idx+p[1].shape[1]] = p[1] + idx += p[1].shape[1] + + return multitrack + + + +class Generator(): + def __init__(self): + pass + + def generate(self): + with open('../data/scripts/The Mystery Of Spooky Hill.txt') as f: + return f.read() + + +class PauseManager(): + def __init__(self): + """ + Manages pauses + """ + + def retrieve_pause(self, input_): + duration_str = input_['duration'] + if duration_str == 'long': + duration = 3.0 + elif duration_str == 'short': + duration = 1.0 + + audio = np.zeros((2, int(duration*44100))) + return audio + +class SpeechManager(): + def __init__(self, speech_folder, voice_params): + """ + + :param speech_folder: the folder the speech .mp3s are + """ + + self.voice_params = voice_params + self.speech_folder = speech_folder + try: + self.transcriptions = pd.read_excel(os.path.join(speech_folder ,'transcript.xls')) + except: + # If the file does not exist + self.transcriptions = None + + print('Transcription file:' + str(os.path.join(speech_folder ,'transcript.xls'))) + print('Transcriptions:' + str(self.transcriptions)) + + def retrieve_speech(self, input_): + # print(input_) + cast_member = input_['name'] + # print(self.voice_params) + cast_voice = self.voice_params[cast_member][0] # 0th element is voice + cast_panning = self.voice_params[cast_member][1] #1th element is panning + + cast_line = input_['line'] + + can_find_entry = False + + + # If the file does not exist + cast_lines_df = self.transcriptions[self.transcriptions['cast'].map(lambda x: x.lower()) == cast_member.lower()] + similarities = {} + for n in cast_lines_df.index: + similarities[n] = difflib.SequenceMatcher(None, cast_line, cast_lines_df['line'].loc[n]).ratio() + + # Most similar entry location + chosen_entry = max(similarities, key=lambda x: similarities[x]) + chosen_file = cast_lines_df['filename'].loc[chosen_entry] + chosen_line = cast_lines_df['line'].loc[chosen_entry] + + if similar(cast_line, chosen_line): + can_find_entry = True + + chosen_file_path = os.path.join(self.speech_folder, chosen_file) + print("Retrieving: " + chosen_file_path) + + if os.path.exists(chosen_file_path): + audio, sr = librosa.core.load(chosen_file_path, sr=44100, mono=False) + #print("panning: {}".format(cast_panning)) + audio[0,:] *= cast_panning + audio[1,:] *= (1-cast_panning) + else: + can_find_entry = False + + if not can_find_entry: + # 1. Generate line + audio, sr = generate_speech_with_festival(cast_voice, cast_panning, cast_line, sr=44100) + # print("panning: {}".format(cast_panning)) + # audio[0,:] *= cast_panning + # audio[1,:] *= (1-cast_panning) + + + + # If the line is too disimilar, synthesize it, else use the chosen line + return audio + + +class SoundManager(): + def __init__(self, sound_folder): + """ + + :param sound_folder: the folder the music .mp3s are + """ + + + self.sound_folder = sound_folder + self.sound_file_names = [f.split('/')[-1] for f in glob.glob(sound_folder + '*.mp3')] + + # If the directory is empty, return. + if len(self.sound_file_names) == 0: + return + # Lookup strings + strings = [] + for f in self.sound_file_names: + strings.append(" ".join(re.findall('[A-Za-z]+', f)).lower()) + + # Sanitize strings, remove the most common substring + + # Find most common substring + string1 = strings[0] + for n in range(1, len(strings)): + string2 = strings[n] + match = difflib.SequenceMatcher(None, string1, string2).find_longest_match(0, len(string1), 0, len(string2)) + string1 = string2[match.b:match.b + match.size] + + # Remove most common substring + ## TODO: Check here please, should we remove it? + + # strings = [s.replace(string1, '') for s in strings] + self.lookup = strings + + def retrieve_sfx(self, input_): + """ + + :param query: dictionary object from parser + :return: audio matrix containing audio file + """ + + query = input_['name'].lower() + # Lematize words before checking for similarity + stemmer = porter.PorterStemmer() + + qwords = [stemmer.stem(q).lower() for q in query.split()] + similarities = [] + + # If the words in the query are available in the words in the filename, then increase by 1. Finally, + # divide by the total number of words (Jaccard similarity?) + + for s in self.lookup: + + words = [stemmer.stem(w).lower() for w in s.split()] + similarities.append(0.) + + for qw in qwords: + for w in words: + similarities[-1] += difflib.SequenceMatcher(None, qw, w).ratio() + + similarities[-1]/=float(len(words)) + + # This is argmax + chosen = [n for n in range(len(similarities)) if similarities[n] == max(similarities)][0] + chosen_fname = self.sound_folder + self.sound_file_names[chosen] + audio = librosa.core.load(chosen_fname, sr=44100, mono=False) + return audio[0] + + +class MusicManager(): + def __init__(self, sound_folder): + """ + + :param sound_folder: the folder the music .mp3s are + """ + + self.sound_folder = sound_folder + self.sound_file_names = [f.split('/')[-1] for f in glob.glob(sound_folder + '/*.mp3')] + + # If the directory is empty, return. + if len(self.sound_file_names) == 0: + return + + # Lookup strings + strings = [] + for f in self.sound_file_names: + strings.append(" ".join(re.findall('[A-Za-z]+', f)).lower()) + + # Sanitize strings, remove the most common substring + + # Find most common substring + string1 = strings[0] + for n in range(1, len(strings)): + string2 = strings[n] + match = difflib.SequenceMatcher(None, string1, string2).find_longest_match(0, len(string1), 0, len(string2)) + string1 = string2[match.b:match.b + match.size] + + # Remove most common substring + strings = [s.replace(string1, '') for s in strings] + self.lookup = strings + + def retrieve_music(self, input_): + """ + + :param query: dictionary object from parser + :return: audio matrix containing audio file + """ + + query = input_['name'].lower() + ' music' + + similarities = [] + + # If the words in the query are available in the words in the filename, then increase by 1. Finally, + # divide by the total number of words (Jaccard similarity?) + + for s in self.lookup: + qwords = query.split() + words = s.split() + similarities.append(0.) + + for qw in qwords: + if qw in words: + similarities[-1] += 1. + + similarities[-1]/=float(len(words)) + + # This is argmax + chosen = [n for n in range(len(similarities)) if similarities[n] == max(similarities)][0] + chosen_fname = self.sound_folder + self.sound_file_names[chosen] + audio = librosa.core.load(chosen_fname, sr=44100, mono=False) + return audio[0] + + +# Classes for aiding parsing +class Environment: + def __init__(self, varname, name): + self.name = name + self.varname = varname + + def to_json(self): + return {"type": "environment_definition", "name": self.name} + + +class Sound_Effect: + def __init__(self, varname, name, pos): + self.name = name + self.varname = varname + self.keywords = [kw for kw in name.split()] + + # Set the end to pos-1 so the first + # character of the next line won't be ommited + + self.pos = (pos, pos - 1) + + def to_json(self): + return { + 'definition': { + 'type': 'sfx_definition', + 'name': ' '.join(self.keywords), + 'optional': False + }, + 'playback': { + 'type': 'sfx', + 'name': ' '.join(self.keywords) + } + } + + def add_keywords(self, keywords): + for kw in keywords: + self.keywords.insert(0, kw) + + def __str__(self): + return "({} FX)".format(' '.join(self.keywords)) + + def definition(self): + return ' '.join(self.keywords) + + def script(self): + return str(self) + + +class Character_Line: + def __init__(self, varname, txt, pos_start, pos_end): + self.varname = varname + self.txt = '. '.join([sent.capitalize() for sent in txt.split('\n')]) + if self.txt[-1] != '.': + self.txt += '.' + + self.character = None + self.pos = (pos_start, pos_end) + + def set_character(self, character): + self.character = character + + def __str__(self): + return "{}: {}".format(self.character.name, self.txt) + + def script(self): + return "[{}] {}".format(self.character.name, self.txt) + + def set_pos(self, start, end): + self.pos = (start, end) + + def to_json(self): + return {'playback': {"type": "cast_line", "name": self.character.name, "line": self.txt}} + + +class Character: + def __init__(self, varname, name): + self.name = ' '.join([n.capitalize() for n in name.split()]) + self.varname = varname + self.gender = '' + self.age = '' + + def set_gender(self, gender): + self.gender = gender + + def set_age(self, age): + self.age = age + + def definition(self): + str_ = self.name + ' - ' + if self.gender == '': + str_ += 'male or female' + else: + str_ += self.gender + + return str_ + + def __str__(self): + return __repr__(self) + + def __repr__(self): + return "[{}:{}/{}/{}]".format(self.varname, self.name, self.gender, self.age) + + def to_json(self): + json_dict = {"type": "cast_definition", "name": self.name} + if self.gender != '': + json_dict['gender'] = self.gender + if self.age != '': + json_dict['age'] = self.age + + return json_dict + + +class KDuration(pg.Keyword): + grammar = pg.Enum(pg.K('long'), pg.K('short')) + + +class Pause(pg.Plain): + grammar = '(', pg.optional(pg.attr('duration', KDuration)), 'pause', ')' + + +class CastHeader(pg.Plain): + grammar = 'Cast', pg.optional('List'), ':', pg.endl + + +class KGender(pg.Keyword): + grammar = pg.Enum(pg.K('male'), pg.K('female')) + + +class EGender(pg.List): + grammar = KGender, pg.optional('or', KGender) + + +class KPan(pg.Keyword): + grammar = pg.Enum(pg.K('left'), pg.K('right'), pg.K('center'), pg.K('centre')) + + +class EPan(pg.List): + grammar = KPan, pg.optional('or', KPan) + + +class CastDefinition(pg.Plain): + grammar = pg.attr('cast_name', re.compile('[A-Za-z0-9 ]+')), \ + re.compile('\-+'), \ + pg.attr('gender', EGender), \ + re.compile('\-+'), \ + 'panned', \ + pg.attr('panning', EPan), pg.endl + + +class Tag(pg.Plain): + grammar = pg.attr('tag', re.compile(r'[A-Za-z0-9_\-]+')) + + +class LTag(pg.List): + grammar = pg.csl(Tag) + + +class ScenesHeader(pg.Plain): + grammar = re.compile('Scenes?'), pg.optional('List'), ':', pg.endl + + +class ScenesDefinition(pg.Plain): + grammar = pg.attr('number', re.compile('[A-Za-z0-9]+')), \ + re.compile('\-+'), pg.attr('name', re.compile('[A-Za-z0-9]+')), \ + re.compile('\-+'), pg.attr('filename', re.compile('[A-Za-z0-9_\:]+(\.(mp3|wav))?')), \ + re.compile('\-+'), pg.attr('tags', LTag), pg.endl + + +class ScriptHeader(pg.Plain): + grammar = 'Script', ':', pg.endl + + +class SceneCommence(pg.Plain): + grammar = re.compile('\-+'), 'Scene', pg.attr('scene', re.compile('[A-Za-z0-9]+')), pg.optional( + re.compile('\-+')), pg.endl; + + +class CastLine(pg.Plain): + grammar = '[', pg.attr('cast_name', re.compile('[A-Za-z0-9 ]+')), ']', pg.attr('line', + re.compile(r'[A-Za-z0-9\-_.\ \" \'\,\?\:\!]+')), + + +class Headers(pg.Plain): + grammar = CastHeader, \ + pg.attr('cast_list', pg.maybe_some(CastDefinition)), \ + pg.optional(ScenesHeader, pg.attr('scene_list', pg.maybe_some(ScenesDefinition))), pg.optional( + ScriptHeader) + + +class Script(pg.List): + grammar = pg.some([Pause, SceneCommence, CastLine]) + + +class ScriptDocument(pg.Plain): + grammar = pg.attr('headers', Headers), pg.attr('script', Script) + + +class Parser: + def __init__(self): + pass + + def parse_str(self, str_): + + obj = pg.parse(str_, ScriptDocument) + definitions = [] + script = [] + + for cast_def in obj.headers.cast_list: + name = cast_def.cast_name.strip() + gender = [str(t) for t in cast_def.gender] + + panning = [str(t) for t in cast_def.panning] + + cast_dict = {'type': 'cast_definition', 'name': name, 'gender': gender, 'voice': [], 'panning': panning} + definitions.append(cast_dict) + + for scene_def in obj.headers.scene_list: + name = scene_def.name.strip() + number = scene_def.number + filename = scene_def.filename + tags = [str(t.tag) for t in scene_def.tags] + scene_dict = {'type': 'scene_definition', + 'scene': name, + 'number': number, + 'filename': scene_def.filename, + 'tags': tags} + definitions.append(scene_dict) + + for action in obj.script: + if isinstance(action, Pause): + duration = str(action.duration) + pause_dict = {'type': 'pause', 'duration': duration} + script.append(pause_dict) + if isinstance(action, SceneCommence): + number = str(action.scene).strip() + scene_dict = {'type': 'scene_change', 'number': number} + script.append(scene_dict) + if isinstance(action, CastLine): + name = str(action.cast_name).strip() + line = str(action.line) + cast_line = {'type': 'cast_line', 'name': name, 'line': line} + script.append(cast_line) + + return {'definitions': definitions, 'script': script} + + +class Preprocessor(): + def __init__(self): + self.parsed_dict = None + self.regexp_entity = re.compile('(?P<variable>[A-Z][0-9]+)\s+(?P<type>[A-Z][A-Za-z_]*)\s+([0-9]+)\s+([0-9]+)(?P<name>(\s+[A-Za-z\'\"]+)+$)') + self.regexp_attribute = re.compile('(?P<variable>[A-Z][0-9]+)\s+(?P<type>[A-Z][A-Za-z_]*)\s+(?P<target_variable>[A-Z][0-9]+)\s+(?P<value>[A-Za-z]+)') + self.regexp_relation = re.compile('(?P<variable>[A-Z][0-9]+)\s+(([A-Za-z0-9_]+:[A-Z0-9]+\s*)+)') + self.regexp_transitive = re.compile('(?P<arity>\*)\s+(?P<type>[A-Z][A-Za-z_]*)\s+(?P<refersTo1>[A-Z][0-9]+)\s+(?P<refersTo2>[A-Z][0-9]+)\s*') + + def parse_str(self, text, annot): + """ takes as input a text and an annotation string """ + + tups = [tup for tup in annot.split('\n') if tup.strip() != ''] + + # Add the narrator + narrator = Character('_', 'Narrator') + characters = [narrator] + + objects = [] + character_lines = [] + sound_effects = [] + + def find_character(var): + for c in characters: + if c.varname == var: + return c + + def find_character_or_object(var): + c = find_character(var) + if c is not None: + return c + + def find_character_lines(var): + for c in character_lines: + if c.varname == var: + return c + + def find_sound_effect(var): + for c in sound_effects: + if c.varname == var: + return c + + def character_speaks(c): + for cl in character_lines: + if isinstance(cl, Character_Line) and cl.character == c: + return True + return False + + for tup in tups: + # print(tup) + + groups = self.regexp_entity.findall(tup) + if len(groups) > 0: + if groups[0][1] == 'Character': + # If the entity is a character + characters.append( + Character(groups[0][0].strip(), groups[0][4].strip()) + ) + elif groups[0][1] == 'Character_Line': + character_lines.append( + Character_Line(groups[0][0].strip(), groups[0][4].strip(), int(groups[0][2]), int(groups[0][3])) + ) + elif groups[0][1] == 'Sound_Effect': + sound_effects.append( + Sound_Effect(groups[0][0].strip(), groups[0][4].strip(), int(groups[0][2])) + ) + continue + + for tup in tups: + # Attributes and relations + groups = self.regexp_attribute.findall(tup) + if len(groups) > 0: + if groups[0][1] == 'Gender': + # if + c = find_character(groups[0][2].strip()) + c.set_gender(groups[0][3].strip().lower()) + elif groups[0][1] == 'Age': + c = find_character(groups[0][2].strip()) + c.set_age(groups[0][3].strip().lower()) + + for tup in tups: + # Attributes and relations + groups = self.regexp_relation.findall(tup) + if len(groups) > 0 and groups[0][1][:4] == 'Says': + # print(groups) + refs = groups[0][1].split()[1:] + + # Store who and whats + whats = [] + who = None + + for ref in refs: + type_, var = ref.split(':') + if type_[:4] == 'WHAT': + whats.append(var) + elif type_[:3] == 'WHO': + who = find_character(var) + + # find character lines: + clines = [find_character_lines(w) for w in whats] + + # Assign characters + for cl in clines: + cl.set_character(who) + elif len(groups) > 0 and groups[0][1][:12] == 'Sound_Effect': + sfx = find_sound_effect(groups[0][1][13:].split()[0]) + #print(groups) + + # Store extra keywords + keywords = [] + + refs = groups[0][1].split()[1:] + for ref in refs: + #print(ref) + type_, var = ref.split(':') + if type_[:8] == 'CausedBy': + cause = find_character_or_object(var) + if cause != None: + keywords.append(cause.name) + + sfx.add_keywords(keywords) + + # %% Calculate line segments for character lines and narration + + # Add sound effects to the timeline + clpos = [cl.pos for cl in character_lines] + clpos += [sfx.pos for sfx in sound_effects] + clpos = sorted(clpos, key=lambda x: x[0]) + + # Add first narrator line + cl = Character_Line('_', text[0:clpos[0][0]], 0, clpos[0][0] - 1) + cl.set_character(narrator) + character_lines.append(cl) + + for n in range(len(clpos) - 1): + if clpos[n][1] != clpos[n + 1][0] - 1: + cl = Character_Line('_', text[clpos[n][1] + 1:clpos[n + 1][0]].rstrip(), clpos[n][1] + 1, + clpos[n + 1][0] - 1) + cl.set_character(narrator) + character_lines.append(cl) + + character_lines += sound_effects + character_lines = sorted(character_lines, key=lambda x: x.pos[1]) + + # parsed_dict = {'definitions': [c.to_json() for c in characters], + # 'script': [cl.to_json() for cl in character_lines]} + + # parsed_dict = {'definitions': [], 'script':[]} + # + # print("Cast List:") + # for c in characters: + # if character_speaks(c): + # print(c.definition()) + # + # print("") + # print("Sound Effects:") + # + # for c in sound_effects: + # print(c.definition()) + # + # for cl in character_lines: + # print(cl.script()) + + # Add definitions for characters + # for c in characters: + # if character_speaks(c): + # parsed_dict['definitions'].append(c.to_json()) + # + # # Add definitions for sound effects + # for c in sound_effects: + # parsed_dict['definitions'].append(c.to_json()['definition']) + # + # + # # Add timeline information + # for cl in character_lines: + # parsed_dict['script'].append(cl.to_json()['playback']) + + str_ = "Cast List:\n" + for c in characters: + if character_speaks(c): + str_ += "{}\n".format(c.definition()) + + str_ += "Sound Effects:\n" + for c in sound_effects: + str_ += "{}\n".format(c.definition()) + + + for cl in character_lines: + str_ += "{}\n".format(cl.script()) + + + return str_