Mercurial > hg > chourdakisreiss2018smc
view demo/workspace/approach1.py @ 13:16066f0a7127 tip
fixed the problem with brat
author | Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk> |
---|---|
date | Sat, 08 Dec 2018 11:02:40 +0000 |
parents | 90155bdd5dd6 |
children |
line wrap: on
line source
import difflib from sklearn.externals import joblib from collections import defaultdict import nltk import numpy as np import re import librosa import glob import pandas as pd from nltk.stem import porter import sox from scipy.io.wavfile import read as wavread from scipy.io.wavfile import write as wavwrite from numpy.core._internal import _gcd as gcd from rtsfx import * import subprocess import os import pypeg2 as pg import random VOICE_PROPERTIES = ['slow', 'deep', 'fast', 'stuttering'] PAUSE_PROPERTIES = ['short', 'long'] SEPARATORS = [ "[", "]", "(", ")", ":", "-"] SFX_MOD_PROPERTIES = ['quiet', 'loud', 'silent'] FIXED_VOICES = False FMV = 0 FFV = 0 male_voices = r""" cmu_us_ahw_cg cmu_us_awb_cg cmu_us_bdl_cg cmu_us_fem_cg cmu_us_jmk_cg cmu_us_ksp_cg cmu_us_rms_cg """.split() female_voices = r""" cmu_us_aup_cg cmu_us_axb_cg cmu_us_clb_cg cmu_us_gka_cg cmu_us_rxr_cg cmu_us_slt_cg """.split() # male_voices = r""" # cmu_us_ahw_cg # cmu_us_fem_cg # cmu_us_rms_cg # """.split() # # # female_voices = r""" # cmu_us_aup_cg # cmu_us_axb_cg # cmu_us_rxr_cg # cmu_us_slt_cg # """.split() fixed_male_voice = male_voices fixed_female_voice = female_voices # male_voices = r""" # cmu_us_ahw_cg # cmu_us_awb_cg # cmu_us_bdl_cg # cmu_us_fem_cg # cmu_us_jmk_cg # cmu_us_ksp_cg # cmu_us_rms_cg # """.split() # male_voices = r""" # cmu_us_ahw_cg # """.split() # # # female_voices = r""" # cmu_us_ahw_cg # """.split() import matplotlib.pyplot as plt def generate_speech_with_festival(voice, panning, line, sr=None ): """ Used for speech generation Constructs a festival .sable file and runs it through festival. """ header = r""" <?xml version="1.0"?> <!DOCTYPE SABLE PUBLIC "-//SABLE//DTD SABLE speech mark up//EN" "Sable.v0_2.dtd" []> <SABLE> <SPEAKER NAME="{}"> """.format(voice) footer = r""" </SPEAKER> </SABLE> """ # 0. Construct sable file sable = header + line + footer # 1. Save sable file to a temporary .sable file in tmp with open('/tmp/character_line.sable', 'w') as f: f.write(sable) # 2. Call process to festival cmd = 'text2wave /tmp/character_line.sable -o /tmp/character_line.wav' print("Generating speech for line: '{}' with voice '{}' and panning '{}' ".format(line, voice, panning)) value = subprocess.call(cmd, shell=True) if value != 0: raise RuntimeError("Festival failed to execute.") # 3. Load back wave file if sr is None: wav, sr = librosa.load('/tmp/character_line.wav', mono=True) else: wav, sr = librosa.load('/tmp/character_line.wav', sr=sr, mono=True) audio = np.vstack([panning*wav,(1.-panning)*wav]) # # plt.figure() # plt.plot(audio[0,:]) # plt.figure() # plt.plot(audio[1,:]) # plt.show() return audio, sr def substr_features(sent, lower=True, substr=[]): if lower: sent = sent.lower() freqs = defaultdict(int) for ss in substr: if ss in sent: freqs[ss] = 1 return dict(freqs) def features_dict_to_matrix(features, feature_labels): N = len(features) M = len(feature_labels) arr = np.zeros((N, M)) idx_to_feat = list(feature_labels) feat_to_idx = dict((idx_to_feat[k], k) for k in range(len(idx_to_feat))) for n in range(arr.shape[0]): for m in range(arr.shape[1]): if idx_to_feat[m] in features[n]: arr[n, m] = features[n][idx_to_feat[m]] return arr, list(feat_to_idx.keys()) def similar(text1, text2, threshold=0.7 # threshold for similarity ): """ Tests whether two strings are similar """ ratio = difflib.SequenceMatcher(None, text1.lower(), text2.lower()).ratio() return ratio >= threshold class Master(): def __init__(self, downmix): self.downmix = downmix def get_mastered(self): # Creating transformer tfm = sox.Transformer() # Removing everything below 80hz tfm.highpass(80) # Adding a notch filter at 200hz to improve clarity tfm.bandreject(200) # Loudness control for under -9dB tfm.loudness(gain_db=-9) # Store downmix temporarily librosa.output.write_wav('/tmp/downmix_unnormalized.wav', self.downmix, sr=44100, norm=False) tfm.build('/tmp/downmix_unnormalized.wav', '/tmp/downmix_normalized.wav') # Load downmix mastered = librosa.core.load('/tmp/downmix_normalized.wav', sr=44100,mono=False )[0] return mastered class Mixer(): def __init__(self, multitrack): self.multitrack = multitrack def get_downmix(self): # Just a trick to get the length of the first track if 'background' in self.multitrack: D = self.multitrack['background'].shape[1] else: for track in self.multitrack: D = self.multitrack[track].shape[1] break downmix = np.zeros((2, D)) for ttrack in self.multitrack: #1. Normalize track = self.multitrack[ttrack] max_val = np.max(np.abs(track)) if max_val > 0: track /= max_val if ttrack == 'background': track *= 0.05 downmix += track return downmix def zafar(lx, rx, d1, g1, m, fc, G, da=0.007, fs=44100.): """ Rafii & Pardo Reverberator (2009) controlled by High Level parameters Inputs: lx : left channel input rx : right channel input d1 : delay of first comb filter in seconds g1 : gain of first comb filters da : delay of allpass filter in seconds G : dry/wet mix gain fc : lowpass filter cuttoff Hz m : difference between left and right channel phases fs : sampling rate Outputs: ly: left channel output ry: right channel output """ d1 = int(d1 * fs) m = int(m * fs) da = int(da * fs) def calculate_parameters(d1, g1): d2 = int(round((1.5) ** (-1) * d1)) while gcd(d2, d1) != 1: d2 += 1 d3 = int(round((1.5) ** (-2) * d1)) while gcd(d3, d2) != 1 or gcd(d3, d1) != 1: d3 += 1 d4 = int(round((1.5) ** (-3) * d1)) while gcd(d4, d3) != 1 or gcd(d4, d2) != 1 or gcd(d4, d1) != 1: d4 += 1 d5 = int(round((1.5) ** (-4) * d1)) while gcd(d5, d4) != 1 or gcd(d5, d3) != 1 or gcd(d5, d2) != 1 or gcd(d5, d1) != 1: d5 += 1 d6 = int(round((1.5) ** (-5) * d1)) while gcd(d6, d5) != 1 or gcd(d6, d4) != 1 or gcd(d6, d3) != 1 or gcd(d6, d2) != 1 or gcd(d6, d1) != 1: d6 += 1 g2 = g1 ** (1.5) ** (-1) * g1 g3 = g1 ** (1.5) ** (-2) * g1 g4 = g1 ** (1.5) ** (-3) * g1 g5 = g1 ** (1.5) ** (-4) * g1 g6 = g1 ** (1.5) ** (-5) * g1 return (d1, d2, d3, d4, d5, d6, g1, g2, g3, g4, g5, g6) def comb_array(x, g1, d1): (d1, d2, d3, d4, d5, d6, g1, g2, g3, g4, g5, g6) = calculate_parameters(d1, g1) c1out = comb(x, g1, d1) c2out = comb(x, g2, d2) c3out = comb(x, g3, d3) c4out = comb(x, g4, d4) c5out = comb(x, g5, d5) c6out = comb(x, g6, d6) Lc1 = len(c1out) Lc2 = len(c2out) Lc3 = len(c3out) Lc4 = len(c4out) Lc5 = len(c5out) Lc6 = len(c6out) Lc = max(Lc1, Lc2, Lc3, Lc4, Lc5, Lc6) y = np.zeros((Lc,)) y[0:Lc1] = c1out y[0:Lc2] += c2out y[0:Lc3] += c3out y[0:Lc4] += c4out y[0:Lc5] += c5out y[0:Lc6] += c6out return y def comb(x, g, d): LEN = len(x) + d # print d y = np.zeros((LEN,)) for n in range(0, LEN): if n - d < 0: y[n] = 0 else: y[n] = x[n - d] + g * y[n - d] return y def allpass(x, g, d): LENx = len(x) LENy = LENx + d y = np.zeros((LENy,)) for n in range(0, LENy): if n - d < 0: y[n] = -g * x[n] elif n >= LENx: y[n] = x[n - d] + g * y[n - d] else: y[n] = x[n - d] - g * x[n] + g * y[n - d] return y def lowpass(x, g): LEN = len(x) y = np.zeros((LEN,)) for n in range(0, LEN): if n - 1 < 0: y[n] = (1 - g) * x[n] else: y[n] = (1 - g) * x[n] + g * y[n - 1] return y ga = 1. / np.sqrt(2.) cin = 0.5 * lx + 0.5 * rx cout = comb_array(cin, g1, d1) ra = allpass(cout, ga, da + m // 2) la = allpass(cout, ga, da - m // 2) gc = 2 - np.cos(2 * np.pi * fc / fs) - np.sqrt((np.cos(2 * np.pi * fc / fs) - 2) ** 2 - 1) ral = lowpass(ra, gc) lal = lowpass(la, gc) ralg = G * ral lalg = G * lal ry = ralg[0:len(rx)] + (1 - G) * rx ly = lalg[0:len(lx)] + (1 - G) * lx return np.vstack([ry, ly]) def get_reverb_from_tags(xl, xr, tags, fs=44100): reverb_csv = 'contributions.csv' df = pd.read_csv(reverb_csv) df = df.fillna("") params = [] for n in range(len(df)): if all([t in df['agreed'].iloc[n].split(',') for t in tags]): params.append(df['param'].iloc[n]) d1, g1, m, fc, G = [float(f) for f in params[0].split(',')] y = zafar(xl, xr, d1, g1, m, fc, G, fs=fs) return y def fade(x, fade_in, fade_out, sr=44100): """ Creates a fade-in-fade-out envelope for audio array x. """ if len(x) == 0: return x fade_in_samples = int(fade_in * sr) fade_out_samples = int(fade_out * sr) outp = np.ones_like(x) for n in range(fade_in_samples): outp[n] = n * 1. / fade_in_samples for n in range(fade_out_samples): outp[len(outp) - fade_out_samples + n] = 1 - 1. / fade_out_samples * n return outp * x def slope(x, slope_in, slope_out, delay=1.0, v=0.1, sr=44100): """ Creates a slope in slope out envelope """ if len(x) == 0: return x delay_samples = int(delay * sr) slope_in_samples = int(slope_in * sr) slope_out_samples = int(slope_out * sr) outp = np.zeros_like(x) for n in range(len(outp)): if n >= 0 and n < delay_samples: outp[n] = 1.0 - v elif n >= delay_samples and n < delay_samples + slope_in_samples: outp[n] = (1. - v) - (1. - v) / slope_in_samples * (n - delay_samples) elif n >= delay_samples + slope_in_samples and n < len(outp) - delay_samples - slope_out_samples: outp[n] = 0 elif n >= len(outp) - delay_samples - slope_out_samples and n < len(outp) - delay_samples: outp[n] = (1. - v) / slope_out_samples * (n - len(outp) + delay_samples + slope_out_samples) if outp[n] < 0: print(n) break elif n >= len(outp) - delay_samples: outp[n] = 1.0 - v outp += v return outp * x def get_background( fname, duration, ft=0.5, ): print(fname) bg, sr = librosa.load(fname) f_s = int(ft * sr) y = bg z = np.zeros((duration,)) if len(y) < len(z): y = fade(y, ft, ft, sr) for n in range(0, len(z) - len(y), len(y) - f_s): z[n:n + len(y)] += y n += len(y) - f_s if len(y) > len(z[n:]): z[n:] += y[:len(z[n:])] else: z[n:n + len(y)] += y z = fade(z, ft, ft, sr=sr) elif len(y) > len(z): z += fade(y[0:len(z)], ft, ft, sr=sr) return z def compose_bg_scene(bgs, background_changes, D, delay=3*44100): z = np.zeros((2,D)) for n in range(len(background_changes)): bg_choice = background_changes[n][1] start = background_changes[n][0] fname = bgs[bg_choice] if n < len(background_changes) - 1: duration = background_changes[n + 1][0] - background_changes[n][0] else: duration = D - background_changes[n][0] y = get_background(fname, duration) z[0,start:start + len(y)] = y z[1, start:start + len(y)] = y #z = fade(z, 1., 1.) return z class Director(): def __init__(self, script, sound_dir, speech_dir): """ Gets a list of script :param sound_dir: directory of sound files :param speech_dir: directory of speech files :param script: the script """ # Gets character definitions ## TODO: Change this to also have accents self.voice_params = {} self.scene_params = {} self.bg_params = {} # This holds the fxive sound engine if available self.fxive = None global FFV, FMV for d in script['definitions']: if d['type'] == 'scene_definition': number = int(d['number']) tags = d['tags'] filename = d['filename'] # If it starts with fxive: then get the preset from fxive if 'fxive:' == filename[:6]: print("Fetching sample from fxive...") if self.fxive is not None: self.bg_params[number] = self.fxive.get_sfx(filename[6:]) else: self.fxive = FXive(sfx_path=os.path.join(sound_dir, 'sfx.xls')) self.bg_params[number] = self.fxive.get_sfx(filename[6:]) else: self.bg_params[number] = filename if 'none' in tags: self.scene_params[number] = [] else: self.scene_params[number] = tags if d['type'] == 'cast_definition': # print("-----------------") name = d['name'] gender = random.sample(d['gender'], 1)[0] panning = random.sample(d['panning'], 1)[0] if panning == 'left': panning = 0.01 elif panning == 'right': panning = 0.99 elif panning in ['center', 'centre']: panning = 0.5 #print(gender, panning) if gender == 'female': # Choose a random female voice voice = random.sample(female_voices, 1)[0] if FIXED_VOICES: voice = fixed_female_voice[FFV] FFV += 1 else: # Choose a random male voice voice = random.sample(male_voices, 1)[0] if FIXED_VOICES: voice = fixed_male_voice[FMV] FMV += 1 self.voice_params[name] = (voice, panning) # if character_panning == 0.5: # character_panning = 0.1 # elif character_panning == 0.1: # character_panning = 0.9 # elif character_panning == 0.9: # character_panning = 0.1 if self.fxive is not None: self.fxive.close() self.script = script self.sound_dir = sound_dir self.speech_dir = speech_dir self.musicmanager = MusicManager(sound_dir) self.pausemanager = PauseManager() self.speechmanager = SpeechManager(speech_dir, self.voice_params) self.sfxmanager = SoundManager(sound_dir) def get_voice_params(self, name): return self.voice_params[name] def generate_multitrack(self): # Shift by 4 seconds D = 0 P = [] track_names = [] # print(self.script['script']) current_scene = 1 current_reverb_tags = "" scene_changes = [] # Create a program of scripts for s in self.script['script']: if s['type'] == 'music': name = 'music' audio = self.musicmanager.retrieve_music(s) elif s['type'] == 'sfx': name = s['name'].lower() audio = self.sfxmanager.retrieve_sfx(s) elif s['type'] == 'scene_change': current_scene = int(s['number']) #print(current_scene) #print(self.scene_params) current_reverb_tags = self.scene_params[current_scene] print("Changed to scene {} with reverb tags: {}".format(current_scene, current_reverb_tags)) scene_changes.append((D, current_scene)) continue elif s['type'] == 'pause': name = 'pause' audio = self.pausemanager.retrieve_pause(s) elif s['type'] == 'cast_line': print(s) name = s['name'].lower() audio = self.speechmanager.retrieve_speech(s) if len(current_reverb_tags) > 0: print("Applying reverberation with tags: {}".format(current_reverb_tags)) print(audio.shape) if s['name'] != 'Narrator': audio = get_reverb_from_tags(audio[0,:], audio[1,:], current_reverb_tags) if name not in track_names: track_names.append(name) D += audio.shape[1] P.append((name,audio)) multitrack = {t: np.zeros((2, D)) for t in track_names} print("Composing bg scene") multitrack['background'] = compose_bg_scene(self.bg_params, scene_changes, D) idx = 0 for p in P: multitrack[p[0]][:, idx:idx+p[1].shape[1]] = p[1] idx += p[1].shape[1] return multitrack class Generator(): def __init__(self): pass def generate(self): with open('../data/scripts/The Mystery Of Spooky Hill.txt') as f: return f.read() class PauseManager(): def __init__(self): """ Manages pauses """ def retrieve_pause(self, input_): duration_str = input_['duration'] if duration_str == 'long': duration = 3.0 elif duration_str == 'short': duration = 1.0 audio = np.zeros((2, int(duration*44100))) return audio class SpeechManager(): def __init__(self, speech_folder, voice_params): """ :param speech_folder: the folder the speech .mp3s are """ self.voice_params = voice_params self.speech_folder = speech_folder try: self.transcriptions = pd.read_excel(os.path.join(speech_folder ,'transcript.xls')) except: # If the file does not exist self.transcriptions = None print('Transcription file:' + str(os.path.join(speech_folder ,'transcript.xls'))) print('Transcriptions:' + str(self.transcriptions)) def retrieve_speech(self, input_): # print(input_) cast_member = input_['name'] # print(self.voice_params) cast_voice = self.voice_params[cast_member][0] # 0th element is voice cast_panning = self.voice_params[cast_member][1] #1th element is panning cast_line = input_['line'] can_find_entry = False # If the file does not exist cast_lines_df = self.transcriptions[self.transcriptions['cast'].map(lambda x: x.lower()) == cast_member.lower()] similarities = {} for n in cast_lines_df.index: similarities[n] = difflib.SequenceMatcher(None, cast_line, cast_lines_df['line'].loc[n]).ratio() # Most similar entry location chosen_entry = max(similarities, key=lambda x: similarities[x]) chosen_file = cast_lines_df['filename'].loc[chosen_entry] chosen_line = cast_lines_df['line'].loc[chosen_entry] if similar(cast_line, chosen_line): can_find_entry = True chosen_file_path = os.path.join(self.speech_folder, chosen_file) print("Retrieving: " + chosen_file_path) if os.path.exists(chosen_file_path): audio, sr = librosa.core.load(chosen_file_path, sr=44100, mono=False) #print("panning: {}".format(cast_panning)) audio[0,:] *= cast_panning audio[1,:] *= (1-cast_panning) else: can_find_entry = False if not can_find_entry: # 1. Generate line audio, sr = generate_speech_with_festival(cast_voice, cast_panning, cast_line, sr=44100) # print("panning: {}".format(cast_panning)) # audio[0,:] *= cast_panning # audio[1,:] *= (1-cast_panning) # If the line is too disimilar, synthesize it, else use the chosen line return audio class SoundManager(): def __init__(self, sound_folder): """ :param sound_folder: the folder the music .mp3s are """ self.sound_folder = sound_folder self.sound_file_names = [f.split('/')[-1] for f in glob.glob(sound_folder + '*.mp3')] # If the directory is empty, return. if len(self.sound_file_names) == 0: return # Lookup strings strings = [] for f in self.sound_file_names: strings.append(" ".join(re.findall('[A-Za-z]+', f)).lower()) # Sanitize strings, remove the most common substring # Find most common substring string1 = strings[0] for n in range(1, len(strings)): string2 = strings[n] match = difflib.SequenceMatcher(None, string1, string2).find_longest_match(0, len(string1), 0, len(string2)) string1 = string2[match.b:match.b + match.size] # Remove most common substring ## TODO: Check here please, should we remove it? # strings = [s.replace(string1, '') for s in strings] self.lookup = strings def retrieve_sfx(self, input_): """ :param query: dictionary object from parser :return: audio matrix containing audio file """ query = input_['name'].lower() # Lematize words before checking for similarity stemmer = porter.PorterStemmer() qwords = [stemmer.stem(q).lower() for q in query.split()] similarities = [] # If the words in the query are available in the words in the filename, then increase by 1. Finally, # divide by the total number of words (Jaccard similarity?) for s in self.lookup: words = [stemmer.stem(w).lower() for w in s.split()] similarities.append(0.) for qw in qwords: for w in words: similarities[-1] += difflib.SequenceMatcher(None, qw, w).ratio() similarities[-1]/=float(len(words)) # This is argmax chosen = [n for n in range(len(similarities)) if similarities[n] == max(similarities)][0] chosen_fname = self.sound_folder + self.sound_file_names[chosen] audio = librosa.core.load(chosen_fname, sr=44100, mono=False) return audio[0] class MusicManager(): def __init__(self, sound_folder): """ :param sound_folder: the folder the music .mp3s are """ self.sound_folder = sound_folder self.sound_file_names = [f.split('/')[-1] for f in glob.glob(sound_folder + '/*.mp3')] # If the directory is empty, return. if len(self.sound_file_names) == 0: return # Lookup strings strings = [] for f in self.sound_file_names: strings.append(" ".join(re.findall('[A-Za-z]+', f)).lower()) # Sanitize strings, remove the most common substring # Find most common substring string1 = strings[0] for n in range(1, len(strings)): string2 = strings[n] match = difflib.SequenceMatcher(None, string1, string2).find_longest_match(0, len(string1), 0, len(string2)) string1 = string2[match.b:match.b + match.size] # Remove most common substring strings = [s.replace(string1, '') for s in strings] self.lookup = strings def retrieve_music(self, input_): """ :param query: dictionary object from parser :return: audio matrix containing audio file """ query = input_['name'].lower() + ' music' similarities = [] # If the words in the query are available in the words in the filename, then increase by 1. Finally, # divide by the total number of words (Jaccard similarity?) for s in self.lookup: qwords = query.split() words = s.split() similarities.append(0.) for qw in qwords: if qw in words: similarities[-1] += 1. similarities[-1]/=float(len(words)) # This is argmax chosen = [n for n in range(len(similarities)) if similarities[n] == max(similarities)][0] chosen_fname = self.sound_folder + self.sound_file_names[chosen] audio = librosa.core.load(chosen_fname, sr=44100, mono=False) return audio[0] # Classes for aiding parsing class Environment: def __init__(self, varname, name): self.name = name self.varname = varname def to_json(self): return {"type": "environment_definition", "name": self.name} class Sound_Effect: def __init__(self, varname, name, pos): self.name = name self.varname = varname self.keywords = [kw for kw in name.split()] # Set the end to pos-1 so the first # character of the next line won't be ommited self.pos = (pos, pos - 1) def to_json(self): return { 'definition': { 'type': 'sfx_definition', 'name': ' '.join(self.keywords), 'optional': False }, 'playback': { 'type': 'sfx', 'name': ' '.join(self.keywords) } } def add_keywords(self, keywords): for kw in keywords: self.keywords.insert(0, kw) def __str__(self): return "({} FX)".format(' '.join(self.keywords)) def definition(self): return ' '.join(self.keywords) def script(self): return str(self) class Character_Line: def __init__(self, varname, txt, pos_start, pos_end): self.varname = varname self.txt = '. '.join([sent.capitalize() for sent in txt.split('\n')]) if self.txt[-1] != '.': self.txt += '.' self.character = None self.pos = (pos_start, pos_end) def set_character(self, character): self.character = character def __str__(self): return "{}: {}".format(self.character.name, self.txt) def script(self): return "[{}] {}".format(self.character.name, self.txt) def set_pos(self, start, end): self.pos = (start, end) def to_json(self): return {'playback': {"type": "cast_line", "name": self.character.name, "line": self.txt}} class Character: def __init__(self, varname, name): self.name = ' '.join([n.capitalize() for n in name.split()]) self.varname = varname self.gender = '' self.age = '' def set_gender(self, gender): self.gender = gender def set_age(self, age): self.age = age def definition(self): str_ = self.name + ' - ' if self.gender == '': str_ += 'male or female' else: str_ += self.gender return str_ def __str__(self): return __repr__(self) def __repr__(self): return "[{}:{}/{}/{}]".format(self.varname, self.name, self.gender, self.age) def to_json(self): json_dict = {"type": "cast_definition", "name": self.name} if self.gender != '': json_dict['gender'] = self.gender if self.age != '': json_dict['age'] = self.age return json_dict class KDuration(pg.Keyword): grammar = pg.Enum(pg.K('long'), pg.K('short')) class Pause(pg.Plain): grammar = '(', pg.optional(pg.attr('duration', KDuration)), 'pause', ')' class CastHeader(pg.Plain): grammar = 'Cast', pg.optional('List'), ':', pg.endl class KGender(pg.Keyword): grammar = pg.Enum(pg.K('male'), pg.K('female')) class EGender(pg.List): grammar = KGender, pg.optional('or', KGender) class KPan(pg.Keyword): grammar = pg.Enum(pg.K('left'), pg.K('right'), pg.K('center'), pg.K('centre')) class EPan(pg.List): grammar = KPan, pg.optional('or', KPan) class CastDefinition(pg.Plain): grammar = pg.attr('cast_name', re.compile('[A-Za-z0-9 ]+')), \ re.compile('\-+'), \ pg.attr('gender', EGender), \ re.compile('\-+'), \ 'panned', \ pg.attr('panning', EPan), pg.endl class Tag(pg.Plain): grammar = pg.attr('tag', re.compile(r'[A-Za-z0-9_\-]+')) class LTag(pg.List): grammar = pg.csl(Tag) class ScenesHeader(pg.Plain): grammar = re.compile('Scenes?'), pg.optional('List'), ':', pg.endl class ScenesDefinition(pg.Plain): grammar = pg.attr('number', re.compile('[A-Za-z0-9]+')), \ re.compile('\-+'), pg.attr('name', re.compile('[A-Za-z0-9]+')), \ re.compile('\-+'), pg.attr('filename', re.compile('[A-Za-z0-9_\:]+(\.(mp3|wav))?')), \ re.compile('\-+'), pg.attr('tags', LTag), pg.endl class ScriptHeader(pg.Plain): grammar = 'Script', ':', pg.endl class SceneCommence(pg.Plain): grammar = re.compile('\-+'), 'Scene', pg.attr('scene', re.compile('[A-Za-z0-9]+')), pg.optional( re.compile('\-+')), pg.endl; class CastLine(pg.Plain): grammar = '[', pg.attr('cast_name', re.compile('[A-Za-z0-9 ]+')), ']', pg.attr('line', re.compile(r'[A-Za-z0-9\-_.\ \" \'\,\?\:\!]+')), class Headers(pg.Plain): grammar = CastHeader, \ pg.attr('cast_list', pg.maybe_some(CastDefinition)), \ pg.optional(ScenesHeader, pg.attr('scene_list', pg.maybe_some(ScenesDefinition))), pg.optional( ScriptHeader) class Script(pg.List): grammar = pg.some([Pause, SceneCommence, CastLine]) class ScriptDocument(pg.Plain): grammar = pg.attr('headers', Headers), pg.attr('script', Script) class Parser: def __init__(self): pass def parse_str(self, str_): obj = pg.parse(str_, ScriptDocument) definitions = [] script = [] for cast_def in obj.headers.cast_list: name = cast_def.cast_name.strip() gender = [str(t) for t in cast_def.gender] panning = [str(t) for t in cast_def.panning] cast_dict = {'type': 'cast_definition', 'name': name, 'gender': gender, 'voice': [], 'panning': panning} definitions.append(cast_dict) for scene_def in obj.headers.scene_list: name = scene_def.name.strip() number = scene_def.number filename = scene_def.filename tags = [str(t.tag) for t in scene_def.tags] scene_dict = {'type': 'scene_definition', 'scene': name, 'number': number, 'filename': scene_def.filename, 'tags': tags} definitions.append(scene_dict) for action in obj.script: if isinstance(action, Pause): duration = str(action.duration) pause_dict = {'type': 'pause', 'duration': duration} script.append(pause_dict) if isinstance(action, SceneCommence): number = str(action.scene).strip() scene_dict = {'type': 'scene_change', 'number': number} script.append(scene_dict) if isinstance(action, CastLine): name = str(action.cast_name).strip() line = str(action.line) cast_line = {'type': 'cast_line', 'name': name, 'line': line} script.append(cast_line) return {'definitions': definitions, 'script': script} class Preprocessor(): def __init__(self): self.parsed_dict = None self.regexp_entity = re.compile('(?P<variable>[A-Z][0-9]+)\s+(?P<type>[A-Z][A-Za-z_]*)\s+([0-9]+)\s+([0-9]+)(?P<name>(\s+[A-Za-z\'\"]+)+$)') self.regexp_attribute = re.compile('(?P<variable>[A-Z][0-9]+)\s+(?P<type>[A-Z][A-Za-z_]*)\s+(?P<target_variable>[A-Z][0-9]+)\s+(?P<value>[A-Za-z]+)') self.regexp_relation = re.compile('(?P<variable>[A-Z][0-9]+)\s+(([A-Za-z0-9_]+:[A-Z0-9]+\s*)+)') self.regexp_transitive = re.compile('(?P<arity>\*)\s+(?P<type>[A-Z][A-Za-z_]*)\s+(?P<refersTo1>[A-Z][0-9]+)\s+(?P<refersTo2>[A-Z][0-9]+)\s*') def parse_str(self, text, annot): """ takes as input a text and an annotation string """ tups = [tup for tup in annot.split('\n') if tup.strip() != ''] # Add the narrator narrator = Character('_', 'Narrator') characters = [narrator] objects = [] character_lines = [] sound_effects = [] def find_character(var): for c in characters: if c.varname == var: return c def find_character_or_object(var): c = find_character(var) if c is not None: return c def find_character_lines(var): for c in character_lines: if c.varname == var: return c def find_sound_effect(var): for c in sound_effects: if c.varname == var: return c def character_speaks(c): for cl in character_lines: if isinstance(cl, Character_Line) and cl.character == c: return True return False for tup in tups: # print(tup) groups = self.regexp_entity.findall(tup) if len(groups) > 0: if groups[0][1] == 'Character': # If the entity is a character characters.append( Character(groups[0][0].strip(), groups[0][4].strip()) ) elif groups[0][1] == 'Character_Line': character_lines.append( Character_Line(groups[0][0].strip(), groups[0][4].strip(), int(groups[0][2]), int(groups[0][3])) ) elif groups[0][1] == 'Sound_Effect': sound_effects.append( Sound_Effect(groups[0][0].strip(), groups[0][4].strip(), int(groups[0][2])) ) continue for tup in tups: # Attributes and relations groups = self.regexp_attribute.findall(tup) if len(groups) > 0: if groups[0][1] == 'Gender': # if c = find_character(groups[0][2].strip()) c.set_gender(groups[0][3].strip().lower()) elif groups[0][1] == 'Age': c = find_character(groups[0][2].strip()) c.set_age(groups[0][3].strip().lower()) for tup in tups: # Attributes and relations groups = self.regexp_relation.findall(tup) if len(groups) > 0 and groups[0][1][:4] == 'Says': # print(groups) refs = groups[0][1].split()[1:] # Store who and whats whats = [] who = None for ref in refs: type_, var = ref.split(':') if type_[:4] == 'WHAT': whats.append(var) elif type_[:3] == 'WHO': who = find_character(var) # find character lines: clines = [find_character_lines(w) for w in whats] # Assign characters for cl in clines: cl.set_character(who) elif len(groups) > 0 and groups[0][1][:12] == 'Sound_Effect': sfx = find_sound_effect(groups[0][1][13:].split()[0]) #print(groups) # Store extra keywords keywords = [] refs = groups[0][1].split()[1:] for ref in refs: #print(ref) type_, var = ref.split(':') if type_[:8] == 'CausedBy': cause = find_character_or_object(var) if cause != None: keywords.append(cause.name) sfx.add_keywords(keywords) # %% Calculate line segments for character lines and narration # Add sound effects to the timeline clpos = [cl.pos for cl in character_lines] clpos += [sfx.pos for sfx in sound_effects] clpos = sorted(clpos, key=lambda x: x[0]) # Add first narrator line cl = Character_Line('_', text[0:clpos[0][0]], 0, clpos[0][0] - 1) cl.set_character(narrator) character_lines.append(cl) for n in range(len(clpos) - 1): if clpos[n][1] != clpos[n + 1][0] - 1: cl = Character_Line('_', text[clpos[n][1] + 1:clpos[n + 1][0]].rstrip(), clpos[n][1] + 1, clpos[n + 1][0] - 1) cl.set_character(narrator) character_lines.append(cl) character_lines += sound_effects character_lines = sorted(character_lines, key=lambda x: x.pos[1]) # parsed_dict = {'definitions': [c.to_json() for c in characters], # 'script': [cl.to_json() for cl in character_lines]} # parsed_dict = {'definitions': [], 'script':[]} # # print("Cast List:") # for c in characters: # if character_speaks(c): # print(c.definition()) # # print("") # print("Sound Effects:") # # for c in sound_effects: # print(c.definition()) # # for cl in character_lines: # print(cl.script()) # Add definitions for characters # for c in characters: # if character_speaks(c): # parsed_dict['definitions'].append(c.to_json()) # # # Add definitions for sound effects # for c in sound_effects: # parsed_dict['definitions'].append(c.to_json()['definition']) # # # # Add timeline information # for cl in character_lines: # parsed_dict['script'].append(cl.to_json()['playback']) str_ = "Cast List:\n" for c in characters: if character_speaks(c): str_ += "{}\n".format(c.definition()) str_ += "Sound Effects:\n" for c in sound_effects: str_ += "{}\n".format(c.definition()) for cl in character_lines: str_ += "{}\n".format(cl.script()) return str_