view demo/text2annotation.py~ @ 0:4dad87badb0c

initial commit
author Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk>
date Wed, 16 May 2018 17:56:10 +0100
parents
children
line wrap: on
line source
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Apr 28 14:17:15 2018

@author: Emmanouil Theofanis Chourdakis

Takes a .txt story and annotates it based on:
    
    characters, 
    places,
    saywords,
    character_lines,
    spatial_indicators,
    
@output:
    .ann file with the same name
    
"""

# Change path to current directory
import os
os.chdir(os.path.dirname(os.path.realpath(__file__)))

import argparse
from sklearn.externals import joblib
import ner
import spacy
import re
import logging
import json

logging.basicConfig(level=logging.INFO)


def quotes2dict(text):
    new_text = text
    is_open = False
    
    quote_no = 0
    quote = []
    quote_dict = {}
    
    for n, c in enumerate(text):
        if c == '"' and not is_open:
            is_open = True
            continue
        elif c == '"' and is_open:
            is_open = False
            quote_dict["<cline{}>.".format(quote_no)] = ''.join(quote)
            new_text = new_text.replace('"'+''.join(quote)+'"', "<cline{}>.".format(quote_no))
            quote = []
            quote_no += 1
            continue
        
        if is_open:
            quote.append(c)
            
    return new_text, quote_dict




def annotate_entities(text, 
             model, 
             character_lut, 
             saywords_lut, 
             spind_lut, 
             places_lut):
    """
        Function which annotates entities in text
        using the model in "model",
        
        returns: A ner.Document object with tokens labelled via
                 the LUTS provided and also the NER model in "model"
    """

    # Find and store character lines in a dictionary
    logging.info('Swapping character lines for character line tags')
    processed_text, quotes = quotes2dict(text)

    # Create spacy document object  from resulting text
    # Create the nlp engine
    logging.info("Loading 'en' spacy model")
    nlp = spacy.load('en')
    
    # Parse to spacy document
    logging.info("Parsing document to spacy")
    doc = nlp(processed_text)    
    
    # Parse to our custom Document object
    logging.info("Parsing document to our object format for Named Entity Recognition")
    mDoc = ner.Document(doc)
    
    # Label <CLINE[0-9]+> as character line
    logging.info("Labeling character lines")
    spans = [r.span() for r in re.finditer(r'<cline[0-9]+>\.', mDoc.text)]
    for span in spans:
        mDoc.assign_label_to_tokens(span[0],span[1],'Character_Line')

    # Parse using LUTs
    
    # *- Characters
    
    # Sort by number of words so that tokens with more words override
    # tokens with less words in labelling. For example if you have 
    # `man' and `an old man' as characters, the character labelled is going to
    # be `an old man' and not the included `man'.
    logging.info("Labeling characters from LUT")
    cLUT = [c.lower() for c in sorted(character_lut, key=lambda x: len(x.split()))]
    
    # Find literals in document that match a character in cLUT 
    for c in cLUT:
        spans = [r.span() for r in re.finditer(c, mDoc.text)]
        for span in spans:
            mDoc.assign_label_to_tokens(span[0],span[1],'Character')
             
    # *- Saywords
   
    # Assign labels to saywords. here saywords contain only one token. In addition
    # we check against the saywords' lemma and not the saywords itself. 
    logging.info("Labeling saywords from LUT")
    swLUT = [nlp(sw)[0].lemma_ for sw in saywords_lut]
    for sw in swLUT:
        mDoc.assign_label_to_tokens_by_matching_lemma(sw, 'Says')
        
    # *- Places
    logging.info("Labeling places from LUT")
    plLUT = [pl.lower() for pl in sorted(places_lut, key=lambda x: len(x.split()))]
    
    # Find literals in document that match a character in cLUT 
    for pl in plLUT:
        spans = [r.span() for r in re.finditer(pl, mDoc.text)]
        for span in spans:
            mDoc.assign_label_to_tokens(span[0],span[1],'Place')
            
    # *- Spatial indicators
    logging.info("Labeling spatial indicators from LUT")
    spLUT = [sp.lower() for sp in sorted(spind_lut, key=lambda x: len(x.split()))]         
    for sp in spLUT:
        spans = [r.span() for r in re.finditer(sp, mDoc.text)]
        for span in spans:
            mDoc.assign_label_to_tokens(span[0],span[1],'Spatial_Signal')
        

    logging.info("Extracting token features")
    features, labels = mDoc.get_token_features_labels()
    
    logging.info("Predicting labels")
    new_labels = model.predict(features)
    
    
    logging.info("Assning labels based on the NER model")
    # If a label is not already assigned by a LUT, assign it using the model
    
    #logging.info("{} {}".format(len(mDoc.tokens), len(new_labels)))
    for m, sent in enumerate(mDoc.token_sentences):
        for n, token in enumerate(sent):
            if token.label == 'O':
                token.label = new_labels[m][n]
    
    return mDoc, quotes


def doc2brat(mDoc):
    """ Returns a brat .ann file str based on mDoc """
    
    # Variable generator for entities (T in brat format)
    tvar = ner.var_generator('T')
    
    ann_str = ""
    # Extract characters in the format 
    # T1 Character START END character string
    
    labels = ['Character', 'Says', 'Place', 'Spatial_Signal', 'Character_Line']
    
    for label in labels:
        token_sentences = mDoc.get_tokens_with_label(label)
        for tlist in token_sentences:
            if len(tlist) == 0:
                continue
            
            for tokens in tlist:
                start = tokens[0].start
                end = tokens[-1].end
                txt = mDoc.text[start:end]
                ann_str += "{}\t{} {} {}\t{}\n".format(next(tvar), label, start, end, txt)
            
            
    return ann_str

if __name__=="__main__":
    argparser = argparse.ArgumentParser()
    argparser.add_argument('input_path', help='.txt file to parse')
    argparser.add_argument('ner_model_path', help='.pkl file containing NER model')
    argparser.add_argument('rel_model_path', help='.pkl file containing relational model')
    argparser.add_argument('--say-lut', help='.txt file with list of saywords')
    argparser.add_argument('--char-lut', help='.txt file with known characters')
    argparser.add_argument('--place-lut', help='.txt file with known places')
    argparser.add_argument('--spatial-indicator-lut', help='.txt file with known spatial indicators')
    argparser.add_argument('--force', help='force overwrite when there is a file to be overwritten')
    
    args = argparser.parse_args()
    
    # Load text file
    with open(args.input_path) as f:
        text = f.read()
        
    output_dir = os.path.dirname(args.input_path)
    output_text_path = args.input_path[:-4] + '_processed.txt'
    output_quotes_path = args.input_path[:-4] + '_quotes.json'
    output_annotation_path = args.input_path[:-4] + '_processed.ann'
        
    # Load NER model file
    ner_model = joblib.load(args.ner_model_path)
    
    # Load REL model file
    rel_model = joblib.load(args.rel_model_path)
    
    # Load saywords
    if args.say_lut:
        saylut_path = args.say_lut
    else:
        saylut_path = 'saywords.txt'
        
    with open(saylut_path) as f:
        saylut = [s for s in f.read().split('\n') if s.strip() != '']
        
    # Load places LUT
    if args.place_lut:
        placelut_path = args.place_lut
    else:
        placelut_path = 'places.txt'
        
    with open(placelut_path) as f:
        placelut = [s for s in f.read().split('\n') if s.strip() != '']
        
    # Load spatial indicators LUT
    if args.spatial_indicator_lut:
        spatial_indicator_lut_path = args.spatial_indicator_lut
    else:
        spatial_indicator_lut_path = 'spatial_indicators.txt'
        
    with open(spatial_indicator_lut_path) as f:
        spatial_indicator_lut = [s for s in f.read().split('\n') if s.strip() != '']        
        
    # Load character LUT
    if args.char_lut:
        charlut_path = args.char_lut
    else:
        charlut_path = 'characters.txt'
        
    with open(charlut_path) as f:
        
        charlist = [s for s in f.read().split('\n') if s.strip() != ''] # One character per line
        
        character_lut = {} # Stores character attributes indexed by name
        for l in charlist:
            name, attributes = l.split(':') 
            
            gender = None
            age = None
            
            for a in attributes.split(','):
                if 'male' in a:
                    gender = a
                elif a.lower() in ['young', 'old']:
                    age = a
            
            character_lut[name] = {}
            if gender:
                character_lut[name]['gender'] = gender
            if age:
                character_lut[name]['age'] = age
        
    mDoc, quotes = annotate_entities(text, ner_model, character_lut, saylut, spatial_indicator_lut, placelut)
    
    annotation_text = doc2brat(mDoc)
    
    to_save = {
            output_text_path: mDoc.text,
            output_quotes_path: json.dumps(quotes),
            output_annotation_path: annotation_text
            }
    
    
    for path in to_save:
        if not os.path.exists(path) or args.force:
            with open(path, 'w') as f:
                f.write(to_save[path])               
        else:
            overwrite = input('Path {} exists, overwrite? (y/N) '.format(path))
            if overwrite[0] in ['Y', 'y']:
                with open(path, 'w') as f:
                    f.write(to_save[path])