e@0: #!/usr/bin/env python3 e@0: # -*- coding: utf-8 -*- e@0: """ e@0: Created on Sat Apr 28 14:17:15 2018 e@0: e@0: @author: Emmanouil Theofanis Chourdakis e@0: e@0: Takes a .txt story and annotates it based on: e@0: e@0: characters, e@0: places, e@0: saywords, e@0: character_lines, e@0: spatial_indicators, e@0: e@0: @output: e@0: .ann file with the same name e@0: .json file with the extracted character lines e@0: e@0: """ e@0: e@0: import os e@0: import argparse e@0: from sklearn.externals import joblib e@0: import ner e@0: import spacy e@0: import re e@0: import logging e@0: import json e@0: from difflib import SequenceMatcher e@0: from neuralcoref import Coref e@0: from rel import * e@0: e@0: def pronoun2gender(word): e@0: pronoun2gender = { e@0: 'he' : 'Male', e@0: 'him': 'Male', e@0: 'she': 'Female', e@0: 'her': 'Female', e@0: 'his': 'Male', e@0: 'hers': 'Female', e@0: 'himself': 'Male', e@0: 'herself': 'Female', e@0: } e@0: e@0: if word in pronoun2gender: e@0: return pronoun2gender[word] e@0: else: e@0: return 'neutral' e@0: e@0: e@0: logging.basicConfig(level=logging.INFO) e@0: e@0: # given an iterable of pairs return the key corresponding to the greatest value e@0: def argmax(pairs): e@0: #https://stackoverflow.com/questions/5098580/implementing-argmax-in-python e@0: return max(pairs, key=lambda x: x[1])[0] e@0: e@0: # given an iterable of values return the index of the greatest value e@0: def argmax_index(values): e@0: return argmax(enumerate(values)) e@0: e@0: # given an iterable of keys and a function f, return the key with largest f(key) e@0: def argmax_f(keys, f): e@0: return max(keys, key=f) e@0: e@0: def similar(a, b): e@0: """ Returns string similarity between a and b """ e@0: # https://stackoverflow.com/questions/17388213/find-the-similarity-metric-between-two-strings e@0: return SequenceMatcher(None, a, b).ratio() e@0: e@0: e@0: def get_resolved_clusters(coref): e@0: """ Gets a coref object (from neural coref) and e@0: returns the clusters as words """ e@0: e@0: mentions = coref.get_mentions() e@0: clusters = coref.get_clusters()[0] e@0: result = [] e@0: for c in clusters: e@0: result.append([mentions[r] for r in clusters[c]]) e@0: return result e@0: e@0: def cluster_word(word, clusters): e@0: """ Gets a word and a list of clusters of mentions e@0: and figures out where the word matches most based on e@0: string similarity """ e@0: e@0: similarities = [] e@0: for rc in clusters: e@0: similarity = [similar(word.lower(), c.text.lower()) for c in rc] e@0: similarities.append(similarity) e@0: max_similarities = [max(s) for s in similarities] e@0: if max(max_similarities) > 0.75: e@0: return argmax_index(max_similarities) e@0: else: e@0: return -1 e@0: e@0: def quotes2dict(text): e@0: new_text = text e@0: is_open = False e@0: e@0: quote_no = 0 e@0: quote = [] e@0: narrator = [] e@0: quote_dict = {} e@0: e@0: for n, c in enumerate(text): e@0: if c == '"' and not is_open: e@0: is_open = True e@0: quote_dict[".".format(quote_no)] = ''.join(narrator) e@0: narrator = [] e@0: quote_no += 1 e@0: continue e@0: e@0: elif c == '"' and is_open: e@0: is_open = False e@0: quote_dict[".".format(quote_no)] = ''.join(quote) e@0: new_text = new_text.replace('"'+''.join(quote)+'"', ".".format(quote_no)) e@0: quote = [] e@0: quote_no += 1 e@0: continue e@0: e@0: if is_open: e@0: quote.append(c) e@0: elif not is_open: e@0: narrator.append(c) e@0: e@0: return new_text, quote_dict e@0: e@0: def figure_gender(word, clusters, character_lut): e@0: for c in character_lut: e@0: if c.lower() in [w.lower() for w in word] and character_lut[c]['gender'] in ['Male', 'Female']: e@0: return character_lut[c]['gender'] e@0: e@0: cluster_idx = cluster_word(word, clusters) e@0: if cluster_idx == -1: e@0: return 'neutral' e@0: genders = [pronoun2gender(c.text) for c in clusters[cluster_idx]] e@0: if 'Male' in genders and 'Female' not in 'genders': e@0: return 'Male' e@0: if 'Female' in genders and 'Male' not in 'genders': e@0: return 'Female' e@0: return 'neutral' e@0: e@0: def annotate(text, e@0: ner_model, e@0: rel_model, e@0: character_lut, e@0: saywords_lut, e@0: spind_lut, e@0: places_lut, e@0: do_coreference_resolution=True): e@0: """ e@0: Function which annotates entities in text e@0: using the model in "model", e@0: e@0: returns: A ner.Document object with tokens labelled via e@0: the LUTS provided and also the NER model in "model" e@0: """ e@0: e@0: # Find and store character lines in a dictionary e@0: logging.info('Swapping character lines for character line tags') e@0: processed_text, quotes = quotes2dict(text) e@0: e@0: # Create spacy document object from resulting text e@0: # Create the nlp engine e@0: logging.info("Loading 'en' spacy model") e@0: nlp = spacy.load('en') e@0: e@0: # Loading coreference model e@0: coref = Coref() e@0: e@0: e@0: # Doing coreference resolution e@0: if do_coreference_resolution: e@0: logging.info("Doing one-shot coreference resolution (this might take some time)") e@0: coref.one_shot_coref(processed_text) e@0: resolved_clusters = get_resolved_clusters(coref) e@0: processed_text = coref.get_resolved_utterances()[0] e@0: e@0: # Parse to spacy document e@0: logging.info("Parsing document to spacy") e@0: doc = nlp(processed_text) e@0: e@0: # Parse to our custom Document object e@0: logging.info("Parsing document to our object format for Named Entity Recognition") e@0: mDoc = ner.Document(doc) e@0: e@0: # Label as character line e@0: logging.info("Labeling character lines") e@0: spans = [r.span() for r in re.finditer(r'\.', mDoc.text)] e@0: for span in spans: e@0: mDoc.assign_label_to_tokens(span[0],span[1],'Character_Line') e@0: e@0: # Parse using LUTs e@0: e@0: # *- Characters e@0: e@0: # Sort by number of words so that tokens with more words override e@0: # tokens with less words in labelling. For example if you have e@0: # `man' and `an old man' as characters, the character labelled is going to e@0: # be `an old man' and not the included `man'. e@0: logging.info("Labeling characters from LUT") e@0: cLUT = [c.lower() for c in sorted(character_lut, key=lambda x: len(x.split()))] e@0: e@0: # Find literals in document that match a character in cLUT e@0: for c in cLUT: e@0: spans = [r.span() for r in re.finditer(c, mDoc.text)] e@0: for span in spans: e@0: mDoc.assign_label_to_tokens(span[0],span[1],'Character') e@0: e@0: # *- Saywords e@0: e@0: # Assign labels to saywords. here saywords contain only one token. In addition e@0: # we check against the saywords' lemma and not the saywords itself. e@0: logging.info("Labeling saywords from LUT") e@0: swLUT = [nlp(sw)[0].lemma_ for sw in saywords_lut] e@0: for sw in swLUT: e@0: mDoc.assign_label_to_tokens_by_matching_lemma(sw, 'Says') e@0: e@0: # *- Places e@0: logging.info("Labeling places from LUT") e@0: plLUT = [pl.lower() for pl in sorted(places_lut, key=lambda x: len(x.split()))] e@0: e@0: # Find literals in document that match a character in cLUT e@0: for pl in plLUT: e@0: spans = [r.span() for r in re.finditer(pl, mDoc.text)] e@0: for span in spans: e@0: mDoc.assign_label_to_tokens(span[0],span[1],'Place') e@0: e@0: # *- Spatial indicators e@0: logging.info("Labeling spatial indicators from LUT") e@0: spLUT = [sp.lower() for sp in sorted(spind_lut, key=lambda x: len(x.split()))] e@0: for sp in spLUT: e@0: spans = [r.span() for r in re.finditer(sp, mDoc.text)] e@0: for span in spans: e@0: mDoc.assign_label_to_tokens(span[0],span[1],'Spatial_Signal') e@0: e@0: logging.info("Extracting token features") e@0: features, labels = mDoc.get_token_features_labels() e@0: e@0: logging.info("Predicting labels") e@0: new_labels = ner_model.predict(features) e@0: e@0: e@0: logging.info("Assigning labels based on the NER model") e@0: # If a label is not already assigned by a LUT, assign it using the model e@0: e@0: #logging.info("{} {}".format(len(mDoc.tokens), len(new_labels))) e@0: for m, sent in enumerate(mDoc.token_sentences): e@0: for n, token in enumerate(sent): e@0: if token.label == 'O': e@0: token.label = new_labels[m][n] e@0: e@0: # Assign character labels e@0: if do_coreference_resolution: e@0: logging.info('Figuring out character genders') e@0: character_tok_sent = mDoc.get_tokens_with_label('Character') e@0: for sent in character_tok_sent: e@0: for character in sent: e@0: raw_string = " ".join([c.text for c in character]) e@0: gender = figure_gender(raw_string, resolved_clusters, character_lut) e@0: for tok in character: e@0: if gender in ['Male', 'Female']: e@0: tok.set_attribute('gender', gender) e@0: e@0: logging.info('Predicting the correct label for all possible relations in Document') e@0: mDoc.predict_relations(rel_model) e@0: e@0: e@0: return mDoc, quotes e@0: e@0: e@0: def doc2brat(mDoc): e@0: """ Returns a brat .ann file str based on mDoc """ e@0: e@0: # Dictionary that maps text span -> variable (to be used when e@0: # adding relations ) e@0: span2var = {} e@0: e@0: # Variable generator for entities (T in brat format) e@0: tvar = ner.var_generator('T') e@0: e@0: # Variable generator for relations (E in brat format) e@0: rvar = ner.var_generator('E') e@0: e@0: # Variable generator for attributions (E in brat format) e@0: avar = ner.var_generator('A') e@0: e@0: ann_str = "" e@0: # Extract characters in the format e@0: # T1 Character START END character string e@0: e@0: labels = ['Character', 'Says', 'Place', 'Spatial_Signal', 'Character_Line'] e@0: e@0: for label in labels: e@0: token_sentences = mDoc.get_tokens_with_label(label) e@0: for tlist in token_sentences: e@0: if len(tlist) == 0: e@0: continue e@0: e@0: for tokens in tlist: e@0: start = tokens[0].start e@0: end = tokens[-1].end e@0: txt = mDoc.text[start:end] e@0: var = next(tvar) e@0: ann_str += "{}\t{} {} {}\t{}\n".format(var, label, start, end, txt) e@0: if 'gender' in tokens[0].attributes: e@0: ann_str += "{}\t{} {} {}\n".format(next(avar), 'Gender', var, tokens[0].attributes['gender']) e@0: e@0: span2var[(start, end)] = var e@0: e@0: # Map relations e@0: for r in mDoc.relations: e@0: var = next(rvar) e@0: trigger = r.trigger e@0: trigger_label = trigger[0].label[2:] e@0: trigger_start = trigger[0].start e@0: trigger_end = trigger[-1].end e@0: trigger_var = span2var[(trigger_start, trigger_end)] e@0: e@0: # If a trigger is Spatial_Signal then the e@0: # arguments are of form Trajector and Landmark e@0: e@0: if trigger_label == 'Spatial_Signal': e@0: arg1_label = 'Trajector' e@0: arg2_label = 'Landmark' e@0: e@0: e@0: # If a trigger is Says then the e@0: # arguments are WHO and WHAT e@0: e@0: elif trigger_label == 'Says': e@0: arg1_label = 'WHO' e@0: arg2_label = 'WHAT' e@0: e@0: # Span for the first argument e@0: arg1_start = r.arg1[0].start e@0: arg1_end = r.arg1[-1].end e@0: e@0: # Variable for the first argument e@0: arg1_var = span2var[(arg1_start, arg1_end)] e@0: e@0: # Span for the second argument e@0: arg2_start = r.arg2[0].start e@0: arg2_end = r.arg2[-1].end e@0: e@0: # Variable for the second argument e@0: arg2_var = span2var[(arg2_start, arg2_end)] e@0: e@0: annot_line = "{}\t{}:{} {}:{} {}:{}\n".format(var, e@0: trigger_label, e@0: trigger_var, e@0: arg1_label, e@0: arg1_var, e@0: arg2_label, e@0: arg2_var) e@0: e@0: ann_str += annot_line e@0: e@0: e@0: e@0: e@0: return ann_str e@0: e@0: if __name__=="__main__": e@0: argparser = argparse.ArgumentParser() e@0: argparser.add_argument('input_path', help='.txt file to parse') e@0: argparser.add_argument('ner_model_path', help='.pkl file containing NER model') e@0: argparser.add_argument('rel_model_path', help='.pkl file containing relational model') e@0: argparser.add_argument('--say-lut', help='.txt file with list of saywords') e@0: argparser.add_argument('--char-lut', help='.txt file with known characters') e@0: argparser.add_argument('--place-lut', help='.txt file with known places') e@0: argparser.add_argument('--spatial-indicator-lut', help='.txt file with known spatial indicators') e@0: argparser.add_argument('--force', help='force overwrite when there is a file to be overwritten') e@0: argparser.add_argument('--no-coreference-resolution', action='store_true', help='omit coreference resolution step') e@0: e@0: args = argparser.parse_args() e@0: e@0: # Load text file e@0: with open(args.input_path) as f: e@0: text = " ".join(f.read().split()) e@0: e@0: output_dir = os.path.dirname(args.input_path) e@0: output_text_path = args.input_path[:-4] + '_processed.txt' e@0: output_quotes_path = args.input_path[:-4] + '_quotes.json' e@0: output_annotation_path = args.input_path[:-4] + '_processed.ann' e@0: e@0: # Load NER model file e@0: ner_model = joblib.load(args.ner_model_path) e@0: e@0: # Load REL model file e@0: rel_model = joblib.load(args.rel_model_path) e@0: e@0: # Load saywords e@0: if args.say_lut: e@0: saylut_path = args.say_lut e@0: else: e@0: saylut_path = 'saywords.txt' e@0: e@0: with open(saylut_path) as f: e@0: saylut = [s for s in f.read().split('\n') if s.strip() != ''] e@0: e@0: # Load places LUT e@0: if args.place_lut: e@0: placelut_path = args.place_lut e@0: else: e@0: placelut_path = 'places.txt' e@0: e@0: with open(placelut_path) as f: e@0: placelut = [s for s in f.read().split('\n') if s.strip() != ''] e@0: e@0: # Load spatial indicators LUT e@0: if args.spatial_indicator_lut: e@0: spatial_indicator_lut_path = args.spatial_indicator_lut e@0: else: e@0: spatial_indicator_lut_path = 'spatial_indicators.txt' e@0: e@0: with open(spatial_indicator_lut_path) as f: e@0: spatial_indicator_lut = [s for s in f.read().split('\n') if s.strip() != ''] e@0: e@0: # Load character LUT e@0: if args.char_lut: e@0: charlut_path = args.char_lut e@0: else: e@0: charlut_path = 'characters.txt' e@0: e@0: with open(charlut_path) as f: e@0: e@0: charlist = [s for s in f.read().split('\n') if s.strip() != ''] # One character per line e@0: e@0: character_lut = {} # Stores character attributes indexed by name e@0: for l in charlist: e@0: name, attributes = l.split(':') e@0: e@0: gender = None e@0: age = None e@0: e@0: for a in attributes.split(','): e@0: if 'male' in a: e@0: gender = a e@0: elif a.lower() in ['young', 'old']: e@0: age = a e@0: e@0: character_lut[name] = {} e@0: if gender: e@0: character_lut[name]['gender'] = gender e@0: if age: e@0: character_lut[name]['age'] = age e@0: e@0: if args.no_coreference_resolution: e@0: corefres = False e@0: else: e@0: corefres = True e@0: mDoc, quotes = annotate(text, ner_model, rel_model, character_lut, saylut, spatial_indicator_lut, placelut, corefres) e@0: e@0: annotation_text = doc2brat(mDoc) e@0: e@0: to_save = { e@0: output_text_path: mDoc.text, e@0: output_quotes_path: json.dumps(quotes), e@0: output_annotation_path: annotation_text e@0: } e@0: e@0: e@0: for path in to_save: e@0: if not os.path.exists(path) or args.force: e@0: with open(path, 'w') as f: e@0: f.write(to_save[path]) e@0: else: e@0: overwrite = input('Path {} exists, overwrite? (y/N) '.format(path)) e@0: if overwrite[0] in ['Y', 'y']: e@0: with open(path, 'w') as f: e@0: f.write(to_save[path]) e@0: e@0: e@0: e@0: