Mercurial > hg > dml-open-cliopatria
view dml-cla/python/similarity.py @ 0:718306e29690 tip
commiting public release
author | Daniel Wolff |
---|---|
date | Tue, 09 Feb 2016 21:05:06 +0100 |
parents | |
children |
line wrap: on
line source
# Part of DML (Digital Music Laboratory) # Copyright 2014-2015 Daniel Wolff, City University # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # -*- coding: utf-8 -*- __author__='wolffd' # this script derives all pairwise similarity measures for the chroma vectors provided. # as a first experiment, only the mean chroma vectors per piece are compared # using euclidean distance # parameters to be forwarded to API # similarity type: # euclidean, compression # simtype = 'euclidean' # parallelisation num_cores = 10 #min_clusters = 40## unused #max_clusters = 256## unused #set_clusters = 40 #max_clips = 50 encoding = 'binary' #compressor = 'zxd' mds_init_tries = 4 mds_max_iter = 100 mfccbins = 12 # resample chroma / timbre values at this fraction for compression distance. 0 to switch off # we want a vector just every second. # The standard sample rate and window size are 44100 / 1024 for chroma / timbre # this is dependent on the sim_downsample parameter resample_factor = 44100/1024; from rdflib import RDF, RDFS from csvutils import * from aggregate import * from n3Parser import get_rdf_graph_from_n3 # numpy, scipy import numpy as np from scipy.spatial import distance from sklearn.metrics.pairwise import pairwise_distances from scipy.signal import resample #scikitlearn from sklearn.datasets import make_blobs from sklearn.cluster import KMeans # from sklearn.metrics import silhouette_samples, silhouette_score from sklearn import manifold # chord processing from chord_seq_key_relative import chords_from_csv, keys_from_csv, chord2function, fun2txt, fun2num,most_frequent_key,chord_roots,type_labels # subprocess, command line and threading import os, tempfile import subprocess, threading # for system / compression calls import zlib def chroma_from_csv(filename): # we assume CSV: time, chroma 1, ... chroma 12] # return (time, [chroma 1-12]) return csv_map_rows(filename,13, lambda row:(float(row[0]),np.array(row[1:12],dtype=float))) def mfcc_from_csv(filename): # we assume CSV: time, mfcc 1, ... mfcc 20] # return (time, [chroma 1-12]) return csv_map_rows(filename,21, lambda row:(float(row[0]),np.array(row[1:20],dtype=float))) chroma_parser_table = { 'csv':chroma_from_csv} mfcc_parser_table = { 'csv':mfcc_from_csv} # in chord_seq_relative key_parser_table = { 'csv':keys_from_csv } chord_parser_table = { 'csv':chords_from_csv } ## generate global dict of chord_keys chord_keys = [] for chordnum in range(1,12+1): for typenum in range(1,11+1): chord_keys.append("%02d%02d" % (chordnum,typenum)) def per_file(inputs,opts={}): chromas = [] chromas_idx = [] mfccs = [] mfccs_idx = [] chords = [] chords_idx = [] uris = [] # get options from API # print_status(str(opts)) simtype = opts['sim_type'] set_clusters = opts['sim_clusters'] # def 40 downsample = opts['sim_downsample'] # def 1 limit = opts['sim_reclimit'] # def 50 compressor = opts['sim_compressor'] # def 'zlib' # parse feature list features = opts['sim_features'].split(',') # features, def: chroma use_chromagram = any(ext in 'chromagram' for ext in features) use_mfcc = any(ext in 'mfcc' for ext in features) use_chords = any(ext in 'chords' for ext in features) # check number of inputs if len(inputs) > limit: #return { 'error': ''} print_status('Similarity: Too many inputs, truncating collection') inputs = inputs[0:limit] # accumulation for euclidean just gets the mean values over the whole clips # todo: add std and other statistics? def accum_euclidean(item): # accumulate chroma vectors for this piece if use_chromagram: chroma = [ res[1] for res in decode_tagged(chroma_parser_table,item['chromagram'])] # print_status('Chroma Raw Data' + str(chroma)) # get mean chroma vector chroma_mean = np.mean(np.array(chroma), axis = 0) #print_status('Chroma Means' + str(chroma_mean)) # add vector to chromas table chromas.append(chroma_mean) if use_mfcc: mfcc = [ res[1] for res in decode_tagged(mfcc_parser_table,item['mfcc'])] mfcc_mean = np.mean(np.array(mfcc), axis = 0) mfccs.append(mfcc_mean) if use_chords: # get duration and normalised frequency for all tuning pitches (A3,A4,A5) keys = decode_tagged(key_parser_table,item['keys']) # get most frequent key key,mode = most_frequent_key(keys) relchords = [] for (time,chord) in decode_tagged(chord_parser_table,item['chords']): # get chord function (root,fun,typ, bfun) = chord2function(chord, key,mode) # translate into text #txt = fun2txt(fun,typ, bfun, mode) #print_status('Chord: ' + chord + ', function: ' + txt) num = fun2num(fun,typ, bfun, mode) if num > 0: # add to chords of this clip #relchords.append((time,key,mode,fun,typ,bfun)) # get the root note of the chord and chord type # ignore mode and base note # format of num [1x mode, 2x function, 2x type, 2x base note] relchords.append(str(num)[1:5]) # append histogram of all chords for this recording hist = chord_histogram(relchords) chords.append(hist) # add uri if everything went well uris.append(item['list']) # accumulation for compression: # save all chroma vectors # possibly build a codebook # otherwise compare for quantisation def accum_compression(item): # get chromas if use_chromagram: # accumulate chroma vectors for this piece chroma = [ res[1] for res in decode_tagged(chroma_parser_table,item['chromagram'])] # print_status('Chroma Raw Data' + str(chroma)) # downsample if necessary if downsample == 1: #chroma = resample(chroma, len(chroma)//resample_factor, axis=0, window=None) #chroma = [chroma[i] for i in np.random.randint(0,len(chroma),len(chroma)//resample_factor)] chroma = [chroma[i*resample_factor] for i in range(0,len(chroma)//resample_factor)] chromas.extend(chroma) chromas_idx.append(len(chromas)) if use_mfcc: mfcc = [ res[1] for res in decode_tagged(mfcc_parser_table,item['mfcc'])] if downsample == 1: # mfcc = np.random.randint(0,len(mfcc),len(mfcc)//resample_factor)] mfcc = [mfcc[i*resample_factor] for i in range(0,len(mfcc)//resample_factor)] mfccs.extend(mfcc) mfccs_idx.append(len(mfccs)) if use_chords: # get duration and normalised frequency for all tuning pitches (A3,A4,A5) keys = decode_tagged(key_parser_table,item['keys']) # get most frequent key key,mode = most_frequent_key(keys) relchords = [] for (time,chord) in decode_tagged(chord_parser_table,item['chords']): # get chord function (root,fun,typ, bfun) = chord2function(chord, key,mode) # translate into text #txt = fun2txt(fun,typ, bfun, mode) #print_status('Chord: ' + chord + ', function: ' + txt) num = fun2num(fun,typ, bfun, mode) if num > 0: # add to chords of this clip #relchords.append((time,key,mode,fun,typ,bfun)) # get the root note of the chord and chord type # ignore mode and base note # format of num [1x mode, 2x function, 2x type, 2x base note] relchords.append(int(str(num)[1:5])) # append histogram of all chords for this recording #hist = chord_histogram(relchords) chords.extend(relchords) chords_idx.append(len(chords)) # add uri if everything went well uris.append(item['list']) # --- # this is the euclidean distance # --- if (simtype == 'euclidean'): # accumulate over all inputs st=for_each(inputs,accum_euclidean) # concatenate feature input for all features arr = np.empty((len(uris),0), float) # concatenate data to nparray for euclidean distance if use_chromagram: arr = np.append(arr, np.array(chromas), axis=1) if use_mfcc: arr = np.append(arr, np.array(mfccs), axis=1) if use_chords: # get chord dictionaries #print(str(np.array(chords).shape)) arr = np.append(arr,np.array(chords) , axis=1) #dist = distance.pdist(chromas, 'euclidean') dist = pairwise_distances(arr, metric = 'euclidean', n_jobs = num_cores) # return to non-condensed matrix for simplicity. # this can be reversed using the very same function for data # efficiency #dist = distance.squareform(dist) # --- # this is the normalised compression distance # --- elif (simtype == 'compression'): # accumulate over all inputs print_status('Similarity Module: Accumulating') st=for_each(inputs,accum_compression) dist = np.zeros((len(uris),len(uris))) count = 0 if use_chromagram: print_status('Similarity Module: Chroma Quantisation') chromas_coded = vector_quantisation(np.array(chromas), set_clusters,num_cores) print_status('Similarity Module: Chroma Compression Results') dist += similarity_by_mask(chromas_coded,chromas_idx,compressor,encoding) count +=1 if use_mfcc: print_status('Similarity Module: MFCC Quantisation') mfccs_coded = vector_quantisation(np.array(mfccs), set_clusters,num_cores) print_status('Similarity Module: MFCC Compression Results') dist += similarity_by_mask(mfccs_coded,mfccs_idx,compressor,encoding) count +=1 if use_chords: print_status('Similarity Module: Chord Compression Results') dist += similarity_by_mask(np.array(chords),chords_idx,compressor,encoding) count +=1 dist = dist / count # get rid of zeros in between #for idx1 in range(0,len(chromas_idx)): # dist[idx1][idx1] = 1 print_status('dist' + str(dist)) # Do MDS scaling with precomputed distance mds = manifold.MDS(n_components = 2, max_iter=mds_max_iter, n_init=mds_init_tries, dissimilarity='precomputed') coordinates = mds.fit_transform(dist) return { 'result': { 'list': uris, 'mds': coordinates.tolist()}, 'stats' : st } # return { 'result': { 'list': uris, 'distance': dist.tolist(), 'mds': coordinates.tolist()}, # 'stats' : st } def vector_quantisation(data, set_clusters,num_cores): # --- # build codebook! # --- # --- 1 quantise chroma data # --- 1a use scikit-learn k-means # http://scikit-learn.org/stable/modules/clustering.html # quick try clusterer = KMeans(n_clusters=set_clusters,n_jobs = num_cores) # --- 2 get compression distance # get all single compressed sizes? data_coded = clusterer.fit_predict(data) #print_status('Chromas Coded' + str(chromas_coded)) # print_status('Coding Histogram' + str(np.histogram(chromas_coded))) return data_coded def similarity_by_mask(data_coded,data_idx,compressor,encoding): # idx is expected to start with the first chroma index of the second piece # TODO: check indexing (starts at 0 or 1?) lengths = [] start_idx = [0] + data_idx[:-1] dist = np.zeros((len(data_idx),len(data_idx))) for idx1 in range(0,len(data_idx)): for idx2 in range(0,len(data_idx)): if (idx2 < idx1): # select encoded chromas for the clips data1_mask = np.zeros(len(data_coded), dtype=bool) data1_mask[start_idx[idx1]:data_idx[idx1]-1] = True data2_mask = np.zeros(len(data_coded), dtype=bool) data2_mask[start_idx[idx2]:data_idx[idx2]-1] = True a_coded = encode(data_coded[data1_mask],format = encoding) b_coded = encode(data_coded[data2_mask],format = encoding) # get compression lengths if compressor == 'zlib': (a,b,ab) = compressed_length(a_coded,b_coded,compressor) else: # get complement chroma set ref_mask = ~data1_mask & ~data2_mask ref_coded = encode(data_coded[ref_mask],format = encoding) (a,b,ab) = delta_compressed_length(a_coded,b_coded,ref_coded,compressor) #NCD(z - min(v, w))/ max(v, w); dist[idx1][idx2] = (ab - min(a,b))/float(max(a,b)) # the above normalised compression distance is symmetric # this is required by the nds routine below dist[idx2][idx1] = dist[idx1][idx2] return dist def encode(data, format = 'string'): # Encoding if format == 'binary': data_coded = data.tostring() elif format == 'string': data_coded = str(data) return data_coded def compressed_length(a_coded,b_coded, type = 'zlib'): # Compression if type == 'zlib': # zlib is quite helpful https://docs.python.org/2/library/zlib.html#module-zlib a = len(zlib.compress(a_coded, 9)) b = len(zlib.compress(a_coded, 9)) ab = len(zlib.compress(a_coded + b_coded, 9)) return (a,b,ab) def delta_compressed_length(a_coded,b_coded,ref_coded, type = 'zxd'): # Compression # zbs - use bsdiff # zxd - uses xdelta3 # zvcd - uses open-vcdiff # zvcz - uses vczip # zdiff - converts binary to text and uses diff to produce an ed script if type == 'zxd' or type == 'zbs' or type == 'zvcz' or type == 'zdiff' or type == 'zvcd': freference = tempfile.NamedTemporaryFile(delete=False) freference.write(ref_coded) freference.close() #print_status('Ref File: ' + freference.name) # to be optimised with bufs later # get length of a regarding reference command = '/home/dml/src/hg/dml-cliopatria/cpack/dml/scripts/compression/%s encode %s | /home/dml/src/hg/dml-cliopatria/cpack/dml/scripts/compression/length' % (type, freference.name) # print_status(command) p1 = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,shell=True) output,err = p1.communicate(input=a_coded) a = int(output) # get length of b regarding reference p1 = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,shell=True) output,err = p1.communicate(input=b_coded) b = int(output) # get length of a,b regarding reference p1 = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,shell=True) output,err = p1.communicate(input=a_coded + b_coded) ab = int(output) #print_status('Compressed Output' + compressed) #print_status('Compressed Size' + str(len(compressed))) os.remove(freference.name) return (a,b,ab) # histogram of the last entry in a list # returns the most frequently used key def chord_histogram(chordstr = []): global chord_keys # build histogram histo = dict.fromkeys(chord_keys,0) for chord in chordstr: histo[chord] = histo.get(chord,0) + 1 #print_status(str(histo.keys())) counts = np.array(histo.values(),float) if max(counts) > 0: counts = counts / max(counts) return (counts)