Mercurial > hg > dml-open-backendtools
view collection_analysis/chord_sequence_mining/spmf.py @ 0:e34cf1b6fe09 tip
commit
author | Daniel Wolff |
---|---|
date | Sat, 20 Feb 2016 18:14:24 +0100 |
parents | |
children |
line wrap: on
line source
# Part of DML (Digital Music Laboratory) # Copyright 2014-2015 Daniel Wolff, City University # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA #!/usr/bin/python # -*- coding: utf-8 -*- # # This is a data conversion wrapper for the spmf toolkit. # The toolkit has been released under GPL3 at www.philippe-fournier-viger.com/spmf __author__="Daniel Wolff" import chord2function as c2f import csv import re # takes a dictionary of chords for one or multiple files # in the form of dict[clipid] = [ (time,key,mode,fun,typ,bfun) ] # and converts it into spmf def folder2spmf(folderin = 'D:/mirg/Chord_Analysis20141216/', fileout = 'D:/mirg/Chord_Analysis20141216/Beethoven.spmf'): # get chords for all files output = c2f.folder2functions(folderin) # open log logfile = fileout + '.dic' csvfile = open(logfile, "w+b") #opens the file for updating w = csv.writer(csvfile) w.writerow(["track","key","mode","sequence length"]) # open spmf file fspmf = open(fileout,'w') # --- # this is writing the spmf format for track,trackdata in output.iteritems(): # write chord sequence as one line in spmf file for (time,key,mode,fun,typ,bfun) in trackdata: chord = c2f.fun2num(fun,typ,bfun,mode) # -1 is the spearator of items or itemsets fspmf.write(str(chord) + ' -1 ') # the sequence is closed with -2 fspmf.write('-2\n') w.writerow([track, str(key), str(mode),str(len(trackdata))]) fspmf.close() csvfile.close() # read an spmf file # def parsespmf(filein = 'D:/mirg/Chord_Analysis20141216/Beethoven.txt'): # string sourcefile path to the source spmf file with chords from records # string patternfile path to the pattern spmf file # matches each of the patterns in patternfile # to the chord sequences in sourcefile def match(sourcefile = 'D:/mirg/Chord_Analysis20141216/Beethoven.spmf',sourcedict = 'D:/mirg/Chord_Analysis20141216/Beethoven.spmf.dic', patternfile = 'D:/mirg/Chord_Analysis20141216/Beethoven_70.txt'): # define regular expressions for matching # closed sequence # --- # we here assume that there are more files than patterns, # as display of patterns is somehow limited # therefore parallelisation will be 1 pattern/multiple files # per instance # --- patterns = [] patterns_raw = [] # read all patterns f = open(patternfile, 'r') for line in f: # a line looks like this: # 1120401 -1 1120101 -1 #SUP: 916 # save pattern #patterns.append(pattern) #numeric? or just regex? # we'll use string, so any representation works pattern,support = readPattern(line) patterns.append(pattern) # here's the regex # first the spacer #spacer = '((\s-1\s)|((\s-1\s)*[0-9]+\s-1\s)+)' #repattern = r'(' + spacer + '*' + spacer.join(pattern) + spacer + '*' + '.*)' #print repattern #patterns.append(re.compile(repattern)) # --- # now for the input sequences # --- # first: read track dictionary and get the input sequence names tracks = getClipDict(sourcedict) # read the input sequences source = open(sourcefile, 'r') patterns_tracks = dict() tracks_patterns = dict() # iterate over all tracks - to be parallelised for track,count in tracks.iteritems(): sequence = readSequence(next(source)) print track for p in range(0,len(patterns)): # match open or closed pattern if openPatternInSequence(sequence,patterns[p]): if patterns_tracks.has_key(p): patterns_tracks[p].append(track) else: patterns_tracks[p] = [track] if tracks_patterns.has_key(track): tracks_patterns[track].append(p) else: tracks_patterns[track] = [p] # write clip index to files writeAllPatternsForClips('D:/mirg/Chord_Analysis20141216/',tracks_patterns) #print patterns_tracks[p] # writes results to disk per key def writeAllPatternsForClips(path = 'D:/mirg/Chord_Analysis20141216/',tracks_patterns = dict()): for name, contents in tracks_patterns.iteritems(): # create new file csvfile = open(path + '/' + name + '_patterns.csv', "w+b") #opens the file for updating w = csv.writer(csvfile) # compress pattern data ? # e.g. 2 columns from-to for the long series of atomic increments w.writerow(contents) csvfile.close() # @param line: reads a line in the spmf output file with frequent patterns # returns list of strings "pattern" and int "support" def readPattern(line): # locate support suploc = line.find('#SUP:') support = int(line[suploc+5:-1]) # extract pattern pattern = line[:suploc].split(' -1 ')[:-1] return (pattern,support) # @param line: reads a line in the spmf input file with chord sequence # returns list of strings "pattern" and int "support" def readSequence(line): # locate support suploc = line.find('-2') # extract pattern sequence = line[:suploc].split(' -1 ')[:-1] return sequence # finds open pattern in sequences # @param [string] sequence input sequence # @param [string] pattern pattern to be found def openPatternInSequence(sequence,pattern): patidx = 0 for item in sequence: if item == pattern[patidx]: patidx +=1 # did we complet the pattern? if patidx >= (len(pattern)-1): # could also return the start index return 1 # finished the sequence before finishing pattern return 0 # finds closed pattern in sequences # @param [string] sequence input sequence # @param [string] pattern pattern to be found def closedPatternInSequence(sequence,pattern): # alternatively use KnuthMorrisPratt with unsplit string return ''.join(map(str, pattern)) in ''.join(map(str, sequence)) # reads all track names from the dictionary created by folder2spmf # @param sourcedict path to dictionary def getClipDict(sourcedict): f = open(sourcedict, 'rt') reader = csv.reader(f) # skip first roow that contains legend next(reader) # get following rows tracks = dict() for (track,key,mode,seqlen) in reader: tracks[track]= (key,mode,seqlen) #tracks.append((track,count)) f.close() return tracks # run spmf afterwards with java -jar spmf.jar run CM-SPADE Beethoven.spmf output.txt 50% 3 if __name__ == "__main__": #folder2spmf() match()