Daniel@0: #!/usr/bin/python Daniel@0: # Part of DML (Digital Music Laboratory) Daniel@0: # Copyright 2014-2015 Daniel Wolff, City University Daniel@0: Daniel@0: # This program is free software; you can redistribute it and/or Daniel@0: # modify it under the terms of the GNU General Public License Daniel@0: # as published by the Free Software Foundation; either version 2 Daniel@0: # of the License, or (at your option) any later version. Daniel@0: # Daniel@0: # This program is distributed in the hope that it will be useful, Daniel@0: # but WITHOUT ANY WARRANTY; without even the implied warranty of Daniel@0: # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Daniel@0: # GNU General Public License for more details. Daniel@0: # Daniel@0: # You should have received a copy of the GNU General Public Daniel@0: # License along with this library; if not, write to the Free Software Daniel@0: # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA Daniel@0: Daniel@0: # -*- coding: utf-8 -*- Daniel@0: __author__="wolffd" Daniel@0: Daniel@0: # json testfile Daniel@0: # Daniel@0: #{ "module":"chord_seq_key_relative", Daniel@0: # "function":"aggregate", Daniel@0: # "arguments": [[ Daniel@0: # {"keys": { "tag": "csv", "value":"D:\\mirg\\Chord_Analysis20141216\\Beethoven\\qm_vamp_key_standard.n3_50ac9\\1CD0000653_BD01_vamp_qm-vamp-plugins_qm-keydetector_key.csv"}, Daniel@0: # "chords": { "tag": "csv", "value":"D:\\mirg\\Chord_Analysis20141216\\Beethoven\\chordino_simple.n3_1a812\\1CD0000653_BD01_vamp_nnls-chroma_chordino_simplechord.csv"}, Daniel@0: # "trackuri": "Eins"}, Daniel@0: # {"keys": { "tag": "csv", "value":"D:\\mirg\\Chord_Analysis20141216\\Beethoven\\qm_vamp_key_standard.n3_50ac9\\1CD0000653_BD01_vamp_qm-vamp-plugins_qm-keydetector_key.csv"}, Daniel@0: # "chords": { "tag": "csv", "value":"D:\\mirg\\Chord_Analysis20141216\\Beethoven\\chordino_simple.n3_1a812\\1CD0000653_BD01_vamp_nnls-chroma_chordino_simplechord.csv"}} Daniel@0: # ]] Daniel@0: #} Daniel@0: Daniel@0: # these for file reading etc Daniel@0: import re Daniel@0: import os Daniel@0: import csv Daniel@0: import numpy Daniel@0: Daniel@0: # spmf functions Daniel@0: import chord_seq_spmf_helper as spmf Daniel@0: Daniel@0: from aggregate import * Daniel@0: from csvutils import * Daniel@0: Daniel@0: # --- Daniel@0: # roots Daniel@0: # --- Daniel@0: chord_roots = ["C","D","E","F","G","A","B"] Daniel@0: Daniel@0: # create a dictionary for efficiency Daniel@0: roots_dic = dict(zip(chord_roots, [0,2,4,5,7,9,11])) Daniel@0: Daniel@0: mode_lbls = ['major','minor'] Daniel@0: mode_dic = dict(zip(mode_lbls, range(0,2))) Daniel@0: # --- Daniel@0: # types Daniel@0: # --- Daniel@0: type_labels = ["", "6", "7", "m","m6", "m7", "maj7", "m7b5", "dim", "dim7", "aug"] Daniel@0: type_dic = dict(zip(type_labels, range(0,len(type_labels)))) Daniel@0: Daniel@0: base_labels = ["1","","2","b3","3","4","","5","","6","b7","7"] Daniel@0: #base_dic = dict(zip(base_labels, range(0,len(base_labels)))) Daniel@0: Daniel@0: # functions Daniel@0: root_funs_maj = ['I','#I','II','#II','III','IV','#IV','V','#V','VI','#VI','VII'] Daniel@0: root_funs_min = ['I','#I','II','III','#III','IV','#IV','V','VI','#VI','VII','#VII'] Daniel@0: # dan's suggestion Daniel@0: #root_funs_maj = ['I','#I','II','#II','(M)III','IV','#IV','V','#V','VI','#VI','(M)VII'] Daniel@0: #root_funs_min = ['I','#I','II','(m)III','#III','IV','#IV','V','VI','#VI','(m)VII','#VII'] Daniel@0: Daniel@0: fun_dic_maj = dict(zip(range(0,len(root_funs_maj)),root_funs_maj)) Daniel@0: fun_dic_min = dict(zip(range(0,len(root_funs_min)),root_funs_min)) Daniel@0: # regex that separates roots and types, and gets chord base Daniel@0: # this only accepts chords with a sharp (#) and no flats Daniel@0: p = re.compile(r'(?P[A-G,N](#|b)*)(?P[a-z,0-9]*)(/(?P[A-G](#|b)*))*') Daniel@0: p2 = re.compile(r'(?P[A-G](#|b)*)(\s/\s[A-G](#|b)*)*\s(?P[major|minor]+)') Daniel@0: pclip = re.compile(r'(?P[A-Z,0-9]+(\-|_)[A-Z,0-9]+((\-|_)[A-Z,0-9]+)*((\-|_)[A-Z,0-9]+)*)_(?Pvamp.*).(?P(csv|xml|txt|n3)+)') Daniel@0: Daniel@0: Daniel@0: Daniel@0: def chords_from_csv(filename): Daniel@0: # we assume CSV: time, chord_string Daniel@0: # return (time, chord_string) Daniel@0: return csv_map_rows(filename,2, lambda row:(float(row[0]),row[1])) Daniel@0: Daniel@0: def keys_from_csv(filename): Daniel@0: # we assume CSV: time, key_code, key_string Daniel@0: # return ( time, key_code, key_string) Daniel@0: return csv_map_rows(filename,3, lambda row:(float(row[0]),row[1],row[2])) Daniel@0: Daniel@0: # parsers for n3 / csv Daniel@0: key_parser_table = { 'csv':keys_from_csv } Daniel@0: chord_parser_table = { 'csv':chords_from_csv } Daniel@0: Daniel@0: # extracts relative chord sequences from inputs of chord / key data Daniel@0: # input list of pairs with instances of features: Daniel@0: # (['chords'] chordino_simple.n3_1a812 , ['keys'] qm_vamp_key_standard.n3_50ac9, Daniel@0: # optional: ['trackuri'] trackidentifier ) Daniel@0: # @note: in future we could add support for qm_key_tonic input Daniel@0: # Daniel@0: # opts : dictionary with opts["spm_algorithm"] = SPADE, TKS or ClaSP algorithm? Daniel@0: # and opts["spm_options"] = "70%" Daniel@0: # output: Daniel@0: # 'sequences': seq, 'support': sup Daniel@0: Daniel@0: trackctr = 0 Daniel@0: Daniel@0: def aggregate(inputs,opts={}): Daniel@0: print_status('In chord_seq_key_relative') Daniel@0: Daniel@0: Daniel@0: # SPADE, TKS or ClaSP algorithm? Daniel@0: algo = opts.get("spm_algorithm","CM-SPADE") Daniel@0: Daniel@0: # number of sequences Daniel@0: maxseqs = int(opts.get("spm_maxseqs",500)/2) Daniel@0: Daniel@0: # min. length of sequences Daniel@0: minlen = int(opts.get("spm_minlen",2)) Daniel@0: Daniel@0: # min. length of sequences in seconds Daniel@0: maxtime = int(opts.get("spm_maxtime",1*60)/2) Daniel@0: Daniel@0: ignoreN = int(opts.get("spm_ignore_n",1)) Daniel@0: Daniel@0: # min. length of sequences Daniel@0: minsup = int(opts.get("spm_minsupport",50)) Daniel@0: Daniel@0: # we now safe the mode of each piece Daniel@0: # to treat them separately Daniel@0: out_chords = [dict(), dict()]; Daniel@0: # generate dict[trackuri] = [ (time,key,mode,fun,typ,bfun) ] Daniel@0: def accum(item): Daniel@0: global trackctr Daniel@0: # increase virtual identifier Daniel@0: trackctr += 1 Daniel@0: Daniel@0: # get duration and normalised frequency for all tuning pitches (A3,A4,A5) Daniel@0: keys = decode_tagged(key_parser_table,item['keys']) Daniel@0: Daniel@0: # get most frequent key Daniel@0: key,mode = most_frequent_key(keys) Daniel@0: Daniel@0: relchords = [] Daniel@0: for (time,chord) in decode_tagged(chord_parser_table,item['chords']): Daniel@0: Daniel@0: # ignore chords that are 'N': Daniel@0: # a. the open pattern matching allows for arbitrary chords Daniel@0: # to appear inbetween those in a sequence Daniel@0: # b. the N chord potentially maps to any contents, so the Daniel@0: # inclusion of N chord has limited (or no) use Daniel@0: Daniel@0: # get chord function Daniel@0: (root,fun,typ, bfun) = chord2function(chord, key,mode) Daniel@0: Daniel@0: if not (ignoreN & (root == -1)): Daniel@0: # translate into text Daniel@0: txt = fun2txt(fun,typ, bfun, mode) Daniel@0: # print 'Chord: ' + chord + ', function: ' + txt Daniel@0: Daniel@0: # add to chords of this clip Daniel@0: relchords.append((time,key,mode,fun,typ,bfun)) Daniel@0: Daniel@0: # save results into dict for this track Daniel@0: trackuri = item.get('trackuri',trackctr) Daniel@0: out_chords[mode][trackuri] = relchords Daniel@0: Daniel@0: # collate relative chord information per file Daniel@0: st=for_each(inputs,accum) Daniel@0: # print_status('Finished accumulating') Daniel@0: Daniel@0: if trackctr < 2: Daniel@0: raise Exception("Need more than 1 track") Daniel@0: Daniel@0: seq = [[],[]] Daniel@0: sup = [[],[]] Daniel@0: Daniel@0: for mode in [0,1]: Daniel@0: # write to spmf file Daniel@0: spmffile = spmf.relchords2spmf(out_chords[mode]) Daniel@0: #print_status('Wrote SPMF data ' + spmffile.name) Daniel@0: Daniel@0: Daniel@0: # run sequential pattern matching Daniel@0: if algo == "TKS": Daniel@0: algoopts = opts.get("spm_options","") Daniel@0: seqfile = spmf.spmf(spmffile.name,'TKS',[str(maxseqs), algoopts]) Daniel@0: elif algo == "ClaSP": Daniel@0: algoopts = opts.get("spm_options",str(minsup) + "%") Daniel@0: seqfile = spmf.spmf(spmffile.name,'ClaSP',[algoopts, str(minlen)], timeout = maxtime) Daniel@0: elif algo == "SPADE": Daniel@0: algoopts = opts.get("spm_options",str(minsup) + "%") Daniel@0: seqfile = spmf.spmf(spmffile.name,'SPADE',[algoopts, str(minlen)], timeout = maxtime) Daniel@0: else: Daniel@0: print_status('Running CM-SPADE algo') Daniel@0: algoopts = opts.get("spm_options",str(minsup) + "%") Daniel@0: seqfile = spmf.spmf(spmffile.name,'CM-SPADE',[algoopts, str(minlen)], timeout = maxtime) Daniel@0: Daniel@0: #seqfile = spmf.spmf(spmffile.name,'BIDE+',['70%']) Daniel@0: #seqfile = "D:\mirg\Chord_Analysis20141216\Beethoven_60.txt" Daniel@0: Daniel@0: #print_status('SPADE finished in ' + seqfile) Daniel@0: # parse spmf output Daniel@0: seq[mode],sup[mode] = spmf.spmf2table(seqfile) Daniel@0: Daniel@0: #clean up Daniel@0: os.remove(spmffile.name) Daniel@0: os.remove(seqfile) Daniel@0: Daniel@0: # fold back sequences and support Daniel@0: # note that this results in the sequences being truncated together below Daniel@0: seq = [item for sublist in seq for item in sublist] Daniel@0: sup = [item for sublist in sup for item in sublist] Daniel@0: Daniel@0: # filter according to min. sequencelength and number of sequences Daniel@0: seq_out = [] Daniel@0: sup_out = [] Daniel@0: seq_count = 0 Daniel@0: Daniel@0: # sort in descending support and pick up sequences of sufficient length Daniel@0: for i in numpy.argsort(sup)[::-1]: Daniel@0: if len(seq[i]) >= minlen: Daniel@0: seq_out.append(seq[i]) Daniel@0: sup_out.append(sup[i]) Daniel@0: seq_count += 1 Daniel@0: Daniel@0: if seq_count >= maxseqs: Daniel@0: break Daniel@0: Daniel@0: return { 'result': { 'sequences': seq_out, 'support': sup_out}, Daniel@0: 'stats' : st } Daniel@0: Daniel@0: Daniel@0: # most simple note2num Daniel@0: def note2num(notein = 'Cb'): Daniel@0: base = roots_dic[notein[0]] Daniel@0: if len(notein) > 1: Daniel@0: if notein[1] == 'b': Daniel@0: return (base - 1) % 12 Daniel@0: elif notein[1] == '#': Daniel@0: return (base + 1) % 12 Daniel@0: else: Daniel@0: print "Error parsing chord " + notein Daniel@0: raise Daniel@0: else: Daniel@0: return base % 12 Daniel@0: Daniel@0: Daniel@0: # convert key to number Daniel@0: def key2num(keyin = 'C major'): Daniel@0: # --- Daniel@0: # parse key string: separate root from rest Daniel@0: # --- Daniel@0: sepstring = p2.match(keyin) Daniel@0: if not sepstring: Daniel@0: print "Error parsing key " + keyin Daniel@0: raise Daniel@0: Daniel@0: # get relative position of chord and adapt for flats Daniel@0: key = sepstring.group('key') Daniel@0: key = note2num(key) Daniel@0: Daniel@0: # --- Daniel@0: # parse mode. care for (unknown) string Daniel@0: # --- Daniel@0: mode = sepstring.group('mode') Daniel@0: Daniel@0: if mode: Daniel@0: mode = mode_dic[mode] Daniel@0: else: Daniel@0: mode = -1 Daniel@0: Daniel@0: return (key, mode) Daniel@0: Daniel@0: Daniel@0: Daniel@0: # convert chord to relative function Daniel@0: def chord2function(cin = 'B',key=3, mode=0): Daniel@0: # --- Daniel@0: # parse chord string: separate root from rest Daniel@0: # --- Daniel@0: sepstring = p.match(cin) Daniel@0: Daniel@0: # test for N code -> no chord detected Daniel@0: if sepstring.group('root') == 'N': Daniel@0: return (-1,-1,-1,-1) Daniel@0: Daniel@0: # get root and type otherwise Daniel@0: root = note2num(sepstring.group('root')) Daniel@0: type = sepstring.group('type') Daniel@0: Daniel@0: typ = type_dic[type] Daniel@0: Daniel@0: # get relative position Daniel@0: fun = (root - key) % 12 Daniel@0: Daniel@0: #--- do we have a base key? Daniel@0: # if yes return it relative to chord root Daniel@0: # --- Daniel@0: if sepstring.group('base'): Daniel@0: broot = note2num(sepstring.group('base')) Daniel@0: bfun = (broot - root) % 12 Daniel@0: else: Daniel@0: # this standard gives 1 as a base key if not specified otherwise Daniel@0: bfun = 0 Daniel@0: Daniel@0: Daniel@0: # --- Daniel@0: # todo: integrate bfun in final type list Daniel@0: # --- Daniel@0: Daniel@0: return (root,fun,typ,bfun) Daniel@0: Daniel@0: # reads in any csv and returns a list of structure Daniel@0: # time(float), data1, data2 ....data2 Daniel@0: def read_vamp_csv(filein = ''): Daniel@0: output = [] Daniel@0: with open(filein, 'rb') as csvfile: Daniel@0: contents = csv.reader(csvfile, delimiter=',', quotechar='"') Daniel@0: for row in contents: Daniel@0: output.append([float(row[0])] + row[1:]) Daniel@0: return output Daniel@0: Daniel@0: Daniel@0: Daniel@0: # histogram of the last entry in a list Daniel@0: # returns the most frequently used key Daniel@0: def histogram(keysin = []): Daniel@0: # build histogram Daniel@0: histo = dict() Daniel@0: for row in keysin: Daniel@0: histo[row[-1]] = histo.get(row[-1], 0) + 1 Daniel@0: Daniel@0: # return most frequent key Daniel@0: return (histo, max(histo.iterkeys(), key=(lambda key: histo[key]))) Daniel@0: Daniel@0: def most_frequent_key(keys): Daniel@0: # delete 'unknown' keys Daniel@0: keys = [(time,knum,key) for (time,knum,key) in keys if not key == '(unknown)'] Daniel@0: Daniel@0: # aggregate to one key Daniel@0: (histo, skey) = histogram(keys) Daniel@0: Daniel@0: # bet key number Daniel@0: (key,mode) = key2num(skey) Daniel@0: return key,mode Daniel@0: Daniel@0: Daniel@0: Daniel@0: def fun2txt(fun,typ, bfun,mode): Daniel@0: # now we can interpret this function Daniel@0: # when given the mode of major or minor. Daniel@0: if (fun >= 0): Daniel@0: if (mode == 1): Daniel@0: pfun = fun_dic_min[fun] Daniel@0: md = '(m)' Daniel@0: elif (mode == 0): Daniel@0: pfun = fun_dic_maj[fun] Daniel@0: md = '(M)' Daniel@0: else: Daniel@0: return 'N' Daniel@0: Daniel@0: #if typ == 'm': Daniel@0: # print 'Key: ' + skey + ', chord: ' + chord + ' function ' + str(fun) + ' type ' + typ + ' bfun ' + str(bfun) Daniel@0: type = type_labels[typ] if typ > 0 else '' Daniel@0: Daniel@0: blb = '/' + base_labels[bfun] if (bfun >= 0 and base_labels[bfun]) else '' Daniel@0: return md + pfun + type + blb Daniel@0: Daniel@0: def fun2num(fun,typ, bfun,mode): Daniel@0: # now we can interpret this function Daniel@0: if not fun == -1: Daniel@0: return (mode+1)* 1000000 + (fun+1) * 10000 + (typ+1) * 100 + (bfun+1) Daniel@0: else: Daniel@0: return 0 Daniel@0: Daniel@0: Daniel@0: if __name__ == "__main__": Daniel@0: #chords2functions() Daniel@0: print "Creates a key-independent chord histogram. Usage: chord2function path_vamp_chords path_vamp_keys" Daniel@0: # sys.argv[1] Daniel@0: result = folder2histogram() Daniel@0: print "Please input a description for the chord function histogram" Daniel@0: c2j.data2json(result)