Mercurial > hg > syncopation-dataset
changeset 1:b2da092dc2e0
The consolidated syncopation software. Have finished individual model and basic functions. Need to revise the coding in main.py, and add rhythm-input interface.
author | Chunyang Song <csong@eecs.qmul.ac.uk> |
---|---|
date | Sun, 05 Oct 2014 21:52:41 +0100 |
parents | 76ce27beba95 |
children | 031e2ccb1fb6 |
files | Syncopation models/KTH.py Syncopation models/LHL.py Syncopation models/PRS.py Syncopation models/SG.py Syncopation models/TMC.py Syncopation models/TOB.py Syncopation models/WNBD.py Syncopation models/basic_functions.py Syncopation models/main.py |
diffstat | 9 files changed, 599 insertions(+), 749 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Syncopation models/KTH.py Sun Oct 05 21:52:41 2014 +0100 @@ -0,0 +1,80 @@ +''' +Author: Chunyang Song +Institution: Centre for Digital Music, Queen Mary University of London + +''' +## Problems! Note indices, post bar + +from basic_functions import get_min_timeSpan, get_note_indices, repeat + +# To find the nearest power of 2 equal to or less than the given number +def roundDownPower2(number): + i = 0 + if number > 0: + while pow(2,i) > number or number >= pow(2,i+1): + i = i+1 + power2 = pow(2,i) + else: + print 'Error: numbers that are less than 1 cannot be rounded down to its nearest power of two.' + power2 = None + return power2 + +# To find the nearest power of 2 equal to or more than the given number +def roundUpPower2(number): + i = 0 + while pow(2,i) < number: + i = i + 1 + return pow(2,i) + +# To examine whether start_time is 'off-beat' +def start(start_time, c_n): + s = 0 + if start_time % c_n != 0: + s = 2 + return s + +# To examine whether end_time is 'off-beat' +def end(end_time, c_n): + s = 0 + if end_time % c_n != 0: + s = 1 + return s + +# To calculate syncopation value of the sequence in the given time-signature. +def get_syncopation(seq, timesig, postbar_seq): + syncopation = 0 + + numerator = int(timesig.split("/")[0]) + if numerator == roundDownPower2(numerator): # if is a binary time-signature + # converting to minimum time-span format + seq = get_min_timeSpan(seq) + if postbar_seq != None: + postbar_seq = get_min_timeSpan(postbar_seq) + + # sf is a stretching factor matching rhythm sequence and meter, as Keith defines the note duration as a multiple of 1/(2^d) beats where d is number of metrical level + sf = roundUpPower2(len(seq)) + + # retrieve all the indices of all the notes in this sequence + note_indices = get_note_indices(seq) + + for i in range(len(note_indices)): + # Assuming start_time is the index of this note, end_time is the index of the following note + start_time = note_indices[i]*sf/float(len(seq)) + + if i == len(note_indices)-1: # if this is the last note, end_time is the index of the following note in the next bar + if postbar_seq != None and postbar_seq != repeat([0],len(postbar_seq)): + next_index = get_note_indices(postbar_seq)[0]+len(seq) + end_time = next_index*sf/float(len(seq)) + else: # or if the next bar is none or full rest, end_time is the end of this sequence. + end_time = sf + else: + end_time = note_indices[i+1]*sf/float(len(seq)) + + duration = end_time - start_time + c_n = roundDownPower2(duration) + syncopation = syncopation + start(start_time,c_n) + end(end_time,c_n) + else: + print 'Error: KTH model can only deal with binary time-signature, e.g. 2/4 and 4/4. ' + syncopation = None + + return syncopation
--- a/Syncopation models/LHL.py Fri Mar 21 15:49:46 2014 +0000 +++ b/Syncopation models/LHL.py Sun Oct 05 21:52:41 2014 +0100 @@ -1,218 +1,67 @@ ''' Author: Chunyang Song Institution: Centre for Digital Music, Queen Mary University of London +''' -** Longuet-Higgins and Lee's Model (LHL) ** +from basic_functions import concatenate, repeat, subdivide, ceiling -Algorithm: +terminal_nodes = [] # Global variable, storing all the terminal nodes from recursive tree structure in time order -Only applicable to simple rhtyhms. +# Each terminnal node contains two properties: its node type (note or rest) and its metrical weight. +class Node: + def __init__(self,node_type,metrical_weight): + self.node_type = node_type + self.metrical_weight = metrical_weight -Generate tree structure: -1) Time_signature decides sequence of subdivisions: 4/4 - [2,2,2,...2], 3/4 - [3,2,2,...2], 6/8 - [2,3,2,2...2]; -2) If after subdivision, a node has at least one branch that is full rest or one note, then it cannot be subdivided; -3) Weight of node, initialized as 0, decrease by 1 with increasing depth of the tree. +# This function will recurse the tree for a binary sequence and return a sequence containing the terminal nodes in time order. +def recursive_tree(seq, subdivision_seq, weight_seq, metrical_weight, level): + if seq == concatenate([1],repeat([0],len(seq)-1)): # If matching to a Note type, add to terminal nodes + terminal_nodes.append(Node('N',metrical_weight)) -Assign each node in the tree three attributes - weight, pos, event; -Search for Note-Rest pairs that Note's weight is no more than Rest's weight; -Syncopation for such a pair is the difference in weights; -Total Syncopation value for non-syncopation rhythms is -1; for syncopated rhythms is the sum of syncopation of all notes. + elif seq == repeat([0],len(seq)): # If matching to a Rest type, add to terminal nodes + terminal_nodes.append(Node('R',metrical_weight)) -In order to evaluate LHL models with the others, re-scale LHL's measures to non-negative. -Apart from the immeasuable rhythms, add 1 to each syncopation value. + else: # Keep subdividing by the subdivisor of the next level + sub_seq = subdivide(seq, subdivision_seq[level+1]) + sub_weight_seq = concatenate([metrical_weight],repeat([weight_seq[level+1]],subdivision_seq[level+1]-1)) + for a in range(len(sub_seq)): + recursive_tree(sub_seq[a], subdivision_seq, weight_seq, sub_weight_seq[a], level+1) -''' -from MeterStructure import MeterStructure +# This function calculate syncoaption score for LHL model. +def get_syncopation(seq, subdivision_seq, weight_seq, prebar_seq, rhythm_category): + syncopation = None + if rhythm_category == 'poly': + print 'Error: LHL model cannot deal with polyrhythms.' + else: + # If there is rhythm in previous bar, process its tree structure + if prebar_seq != None: + recursive_tree(ceiling(prebar_seq), subdivision_seq, weight_seq, weight_seq[0],0) + + # Only keep the last note-type node + while terminal_nodes[-1].node_type != 'N': + del terminal_nodes[-1] + del terminal_nodes[0:-1] -class Node: - def __init__(self, pos, event): - self.pos = pos - self.event = event + # For the rhythm in the current bar, process its tree structure and store the terminal nodes + recursive_tree(ceiling(seq), subdivision_seq, weight_seq, weight_seq[0],0) + + # for t in terminal_nodes: + # print '<', t.node_type, t.metrical_weight, '>' - def assignWeight(self, time_sig): - ms = MeterStructure(time_sig) - # Retrieve the LHL meter hierarchy in one bar; - #(e.g. weight = [0, -4, -3, -4, -2, -4, -3, -4, -1, -4, -3, -4, -2, -4, -3, -4] in 4/4 meter) - weights = ms.getLHLWeights(1) - - # find the metrical weight of the note that locates at certain position(self.pos) - # index is the corresponding position of "self.pos" in the "weights" list - index = int(((abs(self.pos)%48)/48.0)*len(weights)) - self.weight = weights[index] + # Search for the NR pairs that contribute to syncopation, add the weight-difference to the NR_pair_syncopation list + NR_pair_syncopation = [] + for i in range(len(terminal_nodes)-1,0,-1): + if terminal_nodes[i].node_type == 'R': + for j in range(i-1, -1, -1): + if (terminal_nodes[j].node_type == 'N') & (terminal_nodes[i].metrical_weight >= terminal_nodes[j].metrical_weight): + NR_pair_syncopation.append(terminal_nodes[i].metrical_weight - terminal_nodes[j].metrical_weight) + break + #print NR_pair_syncopation -class Tree: - def __init__(self): - self.nodes = [] + # If no syncopation, the value is -1; otherwise, sum all the local syncopation values stored in NR_pair_syncopation list + if len(NR_pair_syncopation) != 0: + syncopation = sum(NR_pair_syncopation) + elif len(terminal_nodes) != 0: + syncopation = -1 - def add(self, Node): - self.nodes.append (Node) - - def getWeight(self, time_sig): - for n in self.nodes: - n.assignWeight(time_sig) - - def display(self): - print 'Pos, Event, Weight' - for n in self.nodes: - print '%02d %s %02d' % (n.pos, n.event, n.weight) - - -def subdivide(sequence, segments_num): - subSeq = [] - if len(sequence) % segments_num != 0: - print 'Error: rhythm segment cannot be equally subdivided.' - else: - n = len(sequence) / segments_num - start , end = 0, n - for i in range(segments_num): - subSeq.append(sequence[start : end]) - start = end - end = end + n - - return subSeq - - -def eventType (sequence): - if not(1 in sequence): - event = 'R' # Full rest - elif sequence[0] == 1 and not(1 in sequence[1:]): - event = 'N' # Only one on-beat note - else: - event = 'D' # Divisable - - return event - -def splitInto2 (sequence, tree, pos, isRecursive): - - subs = subdivide(sequence, 2) - - for s in subs: - if eventType(s) == 'R' or eventType(s) == 'N': - #print 'test', s, eventType(s), pos - tree.add( Node(pos, eventType(s)) ) - else: - if isRecursive: - #print 'test', s, eventType(s), pos - splitInto2 (s, tree, pos, True) - else: - splitInto3 (s, tree, pos) - - pos = pos + len(sequence) / 2 - - return tree - -def splitInto3 (sequence, tree, pos): - - subs = subdivide(sequence, 3) - - for s in subs: - if eventType(s) == 'R' or eventType(s) == 'N': - #print 'test', s, eventType(s), pos - tree.add( Node(pos, eventType(s)) ) - - else: - splitInto2 (s, tree, pos, True) - - pos = pos + len(sequence) / 3 - - return tree - - -def createTree(rhythm, time_sig, bar): - t = Tree() - # The root is the rhtyhm in a entire bar, has weight 0, at position 0 - weight, pos, event = 0 , 0, eventType(rhythm) - root = Node (pos, event) - if '2/4' in time_sig: - t.add ( Node(-24, 'N') ) # Treat the last metronome beat in the previous bar as a sounded note - - if event == 'D': - t = splitInto2(rhythm, t, pos, True) - else: - t.add (root) - - elif '4/4' in time_sig: - t.add ( Node(-12, 'N') ) # Treat the last metronome beat in the previous bar as a sounded note - - if event == 'D': - t = splitInto2(rhythm, t, pos, True) - else: - t.add (root) - - elif '3/4' in time_sig: - t.add ( Node(-8, 'N') ) # Treat the last metronome beat in the previous bar as a sounded note - - segments = subdivide (rhythm, bar) - - for s in segments: - if eventType(s) == 'D': - t = splitInto3(s, t, pos) - pos = pos + len(rhythm)/bar - - - elif '6/8' in time_sig: - t.add ( Node(-8, 'N') ) # Treat the last metronome beat in the previous bar as a sounded note - - segments = subdivide (rhythm, bar) - - for s in segments: - if eventType(s) == 'D': - t = splitInto2(s, t, pos, False) - pos = pos + len(rhythm)/bar - - else: - print 'This time signature is not defined. Choose between 4/4, 3/4 or 6/8' - - return t - - -def lhl(rhythm, time_sig, category, bar): - measures = [] - - if 'poly' in category: - return -1 - else: - tree = createTree(rhythm, time_sig, bar) - tree.getWeight(time_sig) - #tree.display() - - # find NR-pair that N's weight <= R's weight, add difference in weight to measures[] - N_weight = tree.nodes[0].weight - size = len(tree.nodes) - - for i in range(1, size): - - if tree.nodes[i].event == 'N': - N_weight = tree.nodes[i].weight - - if tree.nodes[i].event == 'R' and tree.nodes[i].weight >= N_weight: - measures.append(tree.nodes[i].weight - N_weight ) - - # Calculate syncopation - if len(measures) == 0: # syncopation of non-syncopated rhythm is -1 - syncopation = -1 - else: - syncopation = sum(measures) # syncopation of syncopated rhythm is the sum of measures[] - - return syncopation + 1 # For evaluation purpose, scale up by 1 to make scores above 0 - - -# Retrieve the stimuli -#f = file('stimuli.txt') -f = file('stimuli_34only.txt') - -#Calculate syncopation for each rhythm pattern -while True: - line = f.readline().split(';') - if len(line) == 1: - break - else: - sti_name = line[0] - rhythmString = line[1].split() - time_sig = line[2] - category = line[3] - bar = int(line[4]) - - rhythm = map(int,rhythmString[0].split(',')) - - print sti_name, lhl(rhythm, time_sig, category, bar) \ No newline at end of file + return syncopation
--- a/Syncopation models/PRS.py Fri Mar 21 15:49:46 2014 +0000 +++ b/Syncopation models/PRS.py Sun Oct 05 21:52:41 2014 +0100 @@ -1,316 +1,57 @@ ''' Author: Chunyang Song Institution: Centre for Digital Music, Queen Mary University of London - -** Pressing's model ** - -Algorithm: - -Only applicable to simple rhtyhms. - -Generate hierarchical metrical structure for rhythms, they way as same as LHL model; - - ''' -def subdivide(sequence, segments_num): - subSeq = [] - if len(sequence) % segments_num != 0: - print 'Error: rhythm segment cannot be equally subdivided ' +from basic_functions import repeat, subdivide, ceiling, get_min_timeSpan + +def get_cost(seq,next_seq): + seq = get_min_timeSpan(seq) # converting to the minimum time-span format + + if seq[1:] == repeat([0],len(seq)-1): # null prototype + cost = 0 + elif seq == repeat([1],len(seq)): # filled prototype + cost = 1 + elif seq[0] == 1 and seq[-1] == 0: # run1 prototype + cost = 2 + elif seq[0] == 1 and (next_seq == None or next_seq[0] == 0): # run2 prototype + cost = 2 + elif seq[0] == 1 and seq[-1] == 1 and next_seq != None and next_seq[0] == 1: # upbeat prototype + cost = 3 + elif seq[0] == 0: # syncopated prototype + cost = 5 + + return cost + +# This function calculates the syncopation value (cost) for the sequence with the postbar_seq for a certain level. +def syncopation_perlevel(sub_seqs, num_subs): + total = 0 + for l in range(num_subs): + total = total + get_cost(sub_seqs[l], sub_seqs[l+1]) + normalised = float(total)/num_subs + + return normalised + +# This function calculates the overall syncopation value for a bar of sequence.. +def get_syncopation(seq,subdivision_seq, postbar_seq, rhythm_category): + syncopation = None + if rhythm_category == 'poly': + print 'Error: PRS model cannot deal with polyrhythms.' else: - n = len(sequence) / segments_num - start , end = 0, n - for i in range(segments_num): - subSeq.append(sequence[start : end]) - start = end - end = end + n - - return subSeq + syncopation = 0 -# To check whether there is a need to continue subdividing and measuring -def checkContinue(sequence, division): - isContinue = False - if len(sequence) % division == 0: - subs = subdivide (sequence, division) - - for s in subs: - if 1 in s[1:]: # If there are still onsets in-between the divisions - isContinue = True - else: - print 'Error: the sequence cannot be equally subdivided!' - return isContinue + current_num_subs = 1 # the initial number of sub-sequences at a certain metrical level + for subdivisor in subdivision_seq: + # the number of sub-sequence at the current level is product of all the subdivisors up to the current level + current_num_subs = current_num_subs * subdivisor + # recursion stops when the length of sub-sequence is less than 2 + if len(seq)/current_num_subs >= 2: + # generate sub-sequences and append the next bar sequence + sub_seqs = subdivide(ceiling(seq), current_num_subs) + sub_seqs.append(postbar_seq) + # adding syncopation at each metrical level to the total syncopation + syncopation += syncopation_perlevel(sub_seqs, current_num_subs) + else: + break -def timeSpanTranscribe(sequence): - l = len(sequence) - transcribe = [] - - if not (1 in sequence): # Full rest - transcribe = [0] - else: - divisor = 1 - while True: - if l%divisor != 0: - divisor = divisor + 1 - else: - sampleStep = l/divisor # how many digits in each segment, divisor also represents the number of segments - template = (([1] + [0]*(sampleStep-1) ) * divisor ) - - sampled = [] - for i in range(l): - sampled.append(sequence[i] and template[i]) - - if sequence == sampled: - break - else: - divisor = divisor + 1 - - subs = subdivide(sequence, divisor) - for s in subs: - transcribe.append(s[0]) - - return transcribe - -# Identify the type of rhythm sequence : 0- null; 1-filled; 2-run; 3-upbeat; 5-syncopated -def syncType (sequence, followed_event): - - # Null is full rest or only one single on-beat note, therefore no 0 in sequence[1:] - if not(1 in sequence[1:]) : - syncType = 0 - - else: - ts = timeSpanTranscribe(sequence) - - # Filled is equally spaced onsets, therefore all 1s in the time span transcribe of the sequence - if not(0 in ts ): - syncType = 1 - # Run is either starting with 1 and end with 0 in time span, or starting with 1 if next bar starts with 0 - elif ts[0] == 1 and ts[-1] == 0: - syncType = 2 - elif followed_event ==0 and ts[0] == 1: - syncType = 2 - # Upbeat requires next bars starting with 1 and at least the last event in time span is 1 - elif followed_event == 1 and ts[-1] == 1: - syncType = 3 - # Syncopated start and end off-beat - elif sequence[0] == 0: - syncType = 5 - else: - print 'Error: un-recognizable syncopation type ', sequence - syncType = None - - return syncType - -def createHierarchy(rhythm, time_sig, bar): - h = [] # A list of lists to record syncopation type(s) in each hierarchy level across bars - s_bar = subdivide (rhythm, bar) - - if '4/4' in time_sig: - - for i in range(bar): - bar_level = s_bar[i] - - if i == bar -1: - followed_event = s_bar[0][0] - else: - followed_event = s_bar[i+1][0] - - h. append ( [syncType(bar_level, followed_event)] ) # Indentify syncopation type at bar level - - if checkContinue(rhythm, 2*bar): - - for i in range(bar): - s_halfBar = subdivide (s_bar[i], 2) - halfBar_h = [] - - for j in range(2): - halfBar_level = s_halfBar[j] - - if j == 1: - followed_event = s_halfBar[0][0] - else: - followed_event = s_halfBar[j+1][0] - - halfBar_h.append (syncType (halfBar_level , followed_event)) - - h.append(halfBar_h) - - if checkContinue(rhythm, 4*bar): - - for i in range(bar): - s_quarter = subdivide (s_bar[i], 4) - quarter_h = [] - - for j in range(4): - quarter_level = s_quarter [j] - - if j == 3: - followed_event = s_quarter[0][0] - else: - followed_event = s_quarter[j+1][0] - - quarter_h. append ( syncType (quarter_level , followed_event) ) # quarter note level - - h.append(quarter_h) - - if checkContinue( rhythm, 8*bar): - - for i in range(bar): - s_eighth = subdivide (s_bar[i], 8) - eighth_h = [] - - for j in range(8): - eighth_level = s_eighth [j] - - if j == 7: - followed_event = eighth_level[0][0] - else: - followed_event = eighth_level[j+1][0] - - eighth_h.append (syncType (eighth_level, followed_event) ) - - h.append(eighth_h) - - - elif '3/4' in time_sig: - size_bar = len(s_bar) - for i in range(size_bar): - bar_level = s_bar[i] - - quarter_h = [] - eighth_h = [] - - if i == size_bar -1: - followed_event = s_bar[0][0] - else: - followed_event = s_bar[i+1][0] - - h. append ( [syncType(bar_level, followed_event)] ) # Indentify syncopation type at bar level - - if checkContinue(bar_level, 3): - s_quarter = subdivide (bar_level, 3) - size_quarter = len(s_quarter) - - for j in range(size_quarter): - quarter_level = s_quarter [j] - - if j == size_quarter -1: - followed_event = s_quarter[0][0] - else: - followed_event = s_quarter[j+1][0] - - quarter_h. append ( syncType (quarter_level , followed_event) ) # quarter note level - - if checkContinue( quarter_level, 2): - s_eighth = subdivide (quarter_level, 2) # eighth note level - size_eighth = len(s_eighth) - - for k in range(size_eighth): - eighth_level = s_eighth [k] - - if k == size_eighth - 1: - followed_event = eighth_level[0][0] - else: - followed_event = eighth_level[k+1][0] - - eighth_h.append (syncType (eighth_level, followed_event) ) - h.append(eighth_h) - - h.append(quarter_h) - - - elif '6/8' in time_sig: - for i in range(bar): - bar_level = s_bar[i] - - if i == bar -1: - followed_event = s_bar[0][0] - else: - followed_event = s_bar[i+1][0] - - h. append ( [syncType(bar_level, followed_event)] ) # Indentify syncopation type at bar level - - if checkContinue(rhythm, 2*bar): - - for i in range(bar): - s_halfBar = subdivide (s_bar[i], 2) - halfBar_h = [] - - for j in range(2): - halfBar_level = s_halfBar [j] - - if j == 1: - followed_event = s_halfBar[0][0] - else: - followed_event = s_halfBar[j+1][0] - - halfBar_h. append ( syncType (halfBar_level , followed_event) ) - - h.append(halfBar_h) - - if checkContinue( rhythm, 6*bar): - - for i in range(bar): - s_eighth = subdivide (s_bar[i], 6) # eighth note level - eighth_h = [] - - for j in range(6): - eighth_level = s_eighth [j] - - if j == 5: - followed_event = eighth_level[0][0] - else: - followed_event = eighth_level[j+1][0] - - eighth_h.append (syncType (eighth_level, followed_event) ) - - h.append(eighth_h) - - else: - print 'This time signature is not defined. Choose between 4/4, 3/4 or 6/8' - - return h - - -def pressing(rhythm, time_sig, category, bar): - sync_oneLevel = [] - - if 'poly' in category: - return -1 - - else: - hierarchy = createHierarchy(rhythm, time_sig, bar) - # print 'h', hierarchy - - if len(hierarchy) != 0: - for h in hierarchy: - sync_oneLevel.append (sum(h) / float(len(h)) ) - - # Syncopation is the sum of averaged syncopation values of each level - syncopation = sum (sync_oneLevel) - - else: - syncopation = 0 - - return syncopation - - -# Retrieve the stimuli -# f = file('stimuli.txt') -f = file('stimuli_34only.txt') - -#Calculate syncopation for each rhythm pattern -while True: - line = f.readline().split(';') - if len(line) == 1: - break - else: - sti_name = line[0] - rhythmString = line[1].split() - time_sig = line[2] - category = line[3] - bar = int([4]) - - rhythm = map(int,rhythmString[0].split(',')) - - print sti_name, pressing(rhythm, time_sig, category, bar) - + return syncopation
--- a/Syncopation models/SG.py Fri Mar 21 15:49:46 2014 +0000 +++ b/Syncopation models/SG.py Sun Oct 05 21:52:41 2014 +0100 @@ -2,125 +2,75 @@ Author: Chunyang Song Institution: Centre for Digital Music, Queen Mary University of London -** Sioros and Guedes's Model ** - -Algorithm: - -Only applicable to monorhythms. - -This version of implementation follows the description in authors' 2011 ISMIR paper and assumes each bar of rhythm is looped. -Therefore each bar is calculated seperatedly in a loop-mode and summed up in the end as the total syncopation. - ''' -from MeterStructure import MeterStructure -from math import pow +from basic_functions import get_H, get_min_timeSpan -def subdivide(sequence, segments_num): - subSeq = [] - if len(sequence) % segments_num != 0: - print 'Error: rhythm segment cannot be equally subdivided ' +def get_syncopation(seq, subdivision_seq, weight_seq, L_max, rhythm_category): + syncopation = None + if rhythm_category == 'poly': + print 'Error: SG model cannot deal with polyrhythms.' else: - n = len(sequence) / segments_num - start , end = 0, n - for i in range(segments_num): - subSeq.append(sequence[start : end]) - start = end - end = end + n - - return subSeq + + seq = get_min_timeSpan(seq) # converting to the minimum time-span format + + # checking whether the given L_max is enough to analyse the given sequence, if not, request a bigger L_max + new_L_max = True + matching_level = L_max + while matching_level >= 0: + if len(get_H(weight_seq,subdivision_seq, matching_level)) == len(seq): + new_L_max = False + break + else: + matching_level = matching_level - 1 + if new_L_max == True: + print 'Error: needs a bigger L_max (i.e. the lowest metrical level) to match the given rhythm sequence.' -def sgModel(rhythm, time_sig, category, bar): - ms = MeterStructure(time_sig) - mWeights = ms.getLHLWeights(1) - #print "hierarchy", mWeights - num_onsets = 0 - h_min = min(mWeights) + else: + syncopation = 0 + # generate the metrical weights of the lowest level + H = get_H(weight_seq,subdivision_seq, matching_level) - syncAbsolute = 0 - syncNormM = 0 - syncNormE = 0 - - if 'poly' in category: - syncAbsolute = -1 - else: - # segment rhythm into bars - rhythm_byBar = subdivide (rhythm, bar) - - def measurePerBar(rhythm): - syncopation = 0 - - def aveNeighbours(index, h): + # The aveDif_neighbours function calculates the (weighted) average of the difference between the note at a certain index and its neighbours in a certain metrical level + def aveDif_neighbours(index, level): averages = [] - parameter_k = 0.8 + parameter_garma = 0.8 - def findPre(h_hat): - pre_index = index - 1 - while(mWeights[pre_index] < h_hat): - pre_index = (pre_index - 1)%len(mWeights) - #print "h_hat and pre", h_hat, pre_index + # The findPre function is to calculate the index of the previous neighbour at a certain metrical level. + def findPre(index, level): + pre_index = (index - 1)%len(H) + while(H[pre_index] > level): + pre_index = (pre_index - 1)%len(H) return pre_index - def findPost(h_hat): - post_index = (index + 1)%len(mWeights) - while(mWeights[post_index] < h_hat): - post_index = (post_index + 1)%len(mWeights) - #print "h_hat and post", h_hat, post_index + # The findPost function is to calculate the index of the next neighbour at a certain metrical level. + def findPost(index, level): + post_index = (index + 1)%len(H) + while(H[post_index] > level): + post_index = (post_index + 1)%len(H) return post_index + # The dif function is to calculate a difference level factor between two notes (at note position index1 and index 2) in velocity sequence def dif(index1,index2): parameter_beta = 0.5 - pos1 = int(float(index1)/len(mWeights)*48) - pos2 = int(float(index2)/len(mWeights)*48) - dif_v = rhythm[pos1]-rhythm[pos2] - dif_h = abs(mWeights[index1]-mWeights[index2]) + dif_v = seq[index1]-seq[index2] + dif_h = abs(H[index1]-H[index2]) dif = dif_v*(parameter_beta*dif_h/4+1-parameter_beta) - #print 'dif', dif return dif - for h_hat in range(h_min,h+1): - ave = ( parameter_k*dif(index,findPre(h_hat))+dif(index,findPost(h_hat)) )/(1+parameter_k) - #print 'ave', ave + # From the highest to the lowest metrical levels where the current note resides, calculate the difference between the note and its neighbours at that level + for l in range(level, max(H)+1): + ave = ( parameter_garma*dif(index,findPre(index,l))+dif(index,findPost(index,l)) )/(1+parameter_garma) averages.append(ave) - return averages - for pos in range(len(rhythm)): - if rhythm[pos] != 0: # Onset detected - #num_onsets += 1 - i = int((pos/48.0*len(mWeights))) - h = mWeights[i] - potential = 1 - pow(0.5,(-h)) - #print "intermediate", min(aveNeighbours(i,h)) - syncopation += min(aveNeighbours(i, h))*potential - - return syncopation - - for r in rhythm_byBar: - syncAbsolute = syncAbsolute + measurePerBar(r) - #syncAbsolute /= 2.0 - - return syncAbsolute - -# Retrieve the stimuli -#f = file('stimuli.txt') -#f = file('stimuli_34only.txt') -f = file('clave.txt') - - -#Calculate syncopation for each rhythm pattern -#while True: -for i in range(1): - line = f.readline().split(';') - if len(line) == 1: - break - else: - sti_name = line[0] - rhythmString = line[1].split() - time_sig = line[2] - category = line[3] - bar = int(line[4]) - - rhythm = map(int,rhythmString[0].split(',')) - print sti_name, sgModel(rhythm, time_sig, category, bar) \ No newline at end of file + # Calculate the syncopation value for each note + for index in range(len(seq)): + if seq[index] != 0: # Onset detected + h = H[index] + potential = 1 - pow(0.5,h) # Syncopation potential according to its metrical level, which is equal to the metrical weight + level = h # Metrical weight happens to be equal to its metrical level + syncopation += min(aveDif_neighbours(index, h))*potential + + return syncopation
--- a/Syncopation models/TMC.py Fri Mar 21 15:49:46 2014 +0000 +++ b/Syncopation models/TMC.py Sun Oct 05 21:52:41 2014 +0100 @@ -2,71 +2,54 @@ Author: Chunyang Song Institution: Centre for Digital Music, Queen Mary University of London -** Toussaint's Metric Complexity Model ** - -Algorithm: - -Only applicable to monorhythms. - -Define metrical hierarchy by given time signature; -Calculate how many onsets and determine Maximum Metricality Max_Metric; -Calculate the Metrical Simplicity - the weights of all onsets; -Syncopation = Max_Metric - Metric_simplicity. -Output the predicted syncopation score; -1 indicates non-applicable - ''' -from MeterStructure import MeterStructure +from basic_functions import get_H, ceiling, get_min_timeSpan -def metricalModel(rhythm, time_sig, category, bar): - ms = MeterStructure(time_sig) - meter = ms.getLJWeights(bar) - if len(meter) !=0: +# The get_metricity function calculates the metricity for a binary sequence with given sequence of metrical weights in a certain metrical level. +def get_metricity(seq, H): + metricity = 0 + for m in range(len(seq)): + metricity = metricity + seq[m]*H[m] + return metricity + +# The get_max_metricity function calculates the maximum metricity for the same number of notes in a binary sequence. +def get_max_metricity(seq, H): + max_metricity = 0 + H.sort(reverse=True) # Sort the metrical weight sequence from large to small + for i in range(sum(seq)): + max_metricity = max_metricity+H[i] + return max_metricity + +# The get_syncopation function calculates the syncopation value of the given sequence for TMC model. +def get_syncopation(seq, subdivision_seq, weight_seq, L_max, rhythm_category): + syncopation = None + if rhythm_category == 'poly': + print 'Error: TMC model cannot deal with polyrhythms.' + else: + seq = get_min_timeSpan(seq) # converting to the minimum time-span format + + # checking whether the given L_max is enough to analyse the given sequence, if not, request a bigger L_max + new_L_max = True + matching_level = L_max + while matching_level >= 0: + if len(get_H(weight_seq,subdivision_seq, matching_level)) == len(seq): + new_L_max = False + break + else: + matching_level = matching_level - 1 + + if new_L_max == True: + print 'Error: needs a bigger L_max (i.e. the lowest metrical level) to match the given rhythm sequence.' - metricalSimplicity = 0 # sum of weights of onsets per bar - maxMetrical = 0 # maximum metricity per bar - onsetCount = 0 # The number of onsets per bar - - if 'poly' in category: # not applicable to polyrhythms - return -1 - - # Calculate metricalSimplicity else: - l = len(rhythm) - for i in range(l): - if rhythm[i] == 1: # onset detected - pos = int((float(i)/l) *len(meter)) # looking for the metrical position where this note locates - metricalSimplicity = metricalSimplicity+meter[pos] - onsetCount = onsetCount+1 - - # Calculate max metricity - meter.sort(reverse=True) - for i in range(0,onsetCount): - maxMetrical = maxMetrical+meter[i] + # generate the metrical weights of the lowest level, + # using the last matching_level number of elements in the weight_seq list, to make sure the last element is 1 + H = get_H (weight_seq[-(matching_level+1):], subdivision_seq, matching_level) - #print 'test', onsetCount, maxMetrical, metricalSimplicity - syncopation = (maxMetrical - metricalSimplicity) - - return syncopation - else: - return -1 + metricity = get_metricity(ceiling(seq), H) # converting to binary sequence then calculate metricity + max_metricity = get_max_metricity(ceiling(seq), H) -# Retrieve the stimuli -f = file('stimuli.txt') -#f = file('stimuli_34only.txt') + syncopation = max_metricity - metricity + return syncopation -#Calculate syncopation for each rhythm pattern -while True: - line = f.readline().split(';') - if len(line) == 1: - break - else: - sti_name = line[0] - rhythmString = line[1].split() - time_sig = line[2] - category = line[3] - bar = int(line[4]) - - rhythm = map(int,rhythmString[0].split(',')) - - print sti_name, metricalModel(rhythm, time_sig, category, bar)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Syncopation models/TOB.py Sun Oct 05 21:52:41 2014 +0100 @@ -0,0 +1,24 @@ +''' +Author: Chunyang Song +Institution: Centre for Digital Music, Queen Mary University of London + +''' + +from basic_functions import ceiling, find_divisor, get_min_timeSpan + +# This function calculates the syncopation value for TOB model. +def get_syncopation(seq): + syncopation = 0 + bseq_ts = get_min_timeSpan(ceiling(seq)) # converting to binary and mininum time-span sequence + divisors = find_divisor(len(bseq_ts)) # find all the divisors other than 1 and the length of this sequence + del divisors[0] + del divisors[-1] + + offbeatness = [1]*len(bseq_ts) + for index in range(len(bseq_ts)): + for d in divisors: + if index % d == 0: + offbeatness[index] = 0 + break + syncopation += bseq_ts[index]*offbeatness[index] + return syncopation
--- a/Syncopation models/WNBD.py Fri Mar 21 15:49:46 2014 +0000 +++ b/Syncopation models/WNBD.py Sun Oct 05 21:52:41 2014 +0100 @@ -2,89 +2,64 @@ Author: Chunyang Song Institution: Centre for Digital Music, Queen Mary University of London -** Weighted Note-to-Beat Distance Model (WNBD) ** +''' +from basic_functions import repeat, get_note_indices -Algorithm: +def cumu_multiply(numbers): + product = 1 + for n in numbers: + product = product*n + return product -Calculate the distance (d) of onset to the nearest strong beat; -Calculate the WNBD measure (w) of each onset; -Syncopation is the sum of WNBD measures divided by the number of onsets. +def get_syncopation(seq, subdivision_seq, strong_beat_level, postbar_seq): + syncopation = None + + num_beats = cumu_multiply(subdivision_seq[0:strong_beat_level+1]) # num_beats is the number of strong beats + if len(seq)%num_beats != 0: + print 'Error: the length of sequence is not subdivable by the subdivision factor in subdivision_seq.' + else: + # Find the indices of all the strong-beats + beat_indices = [] + beat_interval = len(seq)/num_beats + for i in range(num_beats+1): + beat_indices.append(i*beat_interval) + if postbar_seq != None: # if there is a postbar_seq, add another two beats index for later calculation + beat_indices += [len(seq)+beat_interval, len(seq)+ 2* beat_interval] -''' + note_indices = get_note_indices(seq) # all the notes -from MeterStructure import MeterStructure + # Calculate the WNBD measure for each note + def measure_pernote(note_index, nextnote_index): + # Find the nearest beats where this note locates - in [beat_indices[j], beat_indices[j+1]) + j = 0 + while note_index < beat_indices[j] or note_index >= beat_indices[j+1]: + j = j + 1 + + # The distance of note to nearest beat normalised by the beat interval + distance_nearest_beat = min(abs(note_index - beat_indices[j]), abs(note_index - beat_indices[j+1]))/float(beat_interval) -def WNBD(rhythm, time_sig, category, bar): + # if this note is on-beat + if distance_nearest_beat == 0: + measure = 0 + # or if this note is held on past the following beat, but ends on or before the later beat + elif beat_indices[j+1] < nextnote_index <= beat_indices[j+2]: + measure = float(2)/distance_nearest_beat + else: + measure = float(1)/distance_nearest_beat - ms = MeterStructure(time_sig) - beat = ms.getBeats(bar) + [len(rhythm)] # the "beat" array include all "strong-beat" positions across all bars, and the downbeat position of the following bar - if len(beat)!=0: - unit = beat[1]-beat[0] # how many digits to represent the length of one beat + return measure - onsetPos = [] # The onset positions - d = [] # The distances of each onset to its nearest strong beat - beatToLeft = [] # The beat indexes of the beats that are to the left of or coincide with each onset - wnbdMeasures = [] # the un-normalized measures of wnbd - - # Calculate the distance of each onset to the nearest strong beat - l = len(rhythm) - for i in range(l): - if rhythm[i] == 1: # onset detected - onsetPos.append(i) - - # find its distance to the nearest strong beat - for j in range(len(beat)-1): - if beat[j]<= i < beat[j+1]: - d1 = abs(i-beat[j]) - d2 = abs(i-beat[j+1]) - d.append( min(d1,d2)/float(unit) ) # Normalize the distance to beat-level - beatToLeft.append(j) - else: - continue - - #Calculate the WNBD measure of each onset - n = len(onsetPos) - for i in range(n): - if d[i] ==0: # If on-beat, measure is 0 - wnbdMeasures.append(0) + total = 0 + for i in range(len(note_indices)): + if i == len(note_indices)-1:# if this is the last note, end_time is the index of the following note in the next bar + if postbar_seq != None and postbar_seq != repeat([0],len(postbar_seq)): + nextnote_index = get_note_indices(postbar_seq)[0]+len(seq) + else: # or if the next bar is none or full rest, end_time is the end of this sequence. + nextnote_index = len(seq) else: - if i == n-1: # The last note, then - if beatToLeft[i] == beat[-3]/unit: # if the last note has more than 1- but less than 2-beat distance to the next bar - wnbdMeasures.append(2.0/d[i]) # measure is 2/d, else 1/d - else: - wnbdMeasures.append(1.0/d[i]) + nextnote_index = note_indices[i+1] + total += measure_pernote(note_indices[i],nextnote_index) - else: # if its not the last note - if beatToLeft[i+1] == beatToLeft[i] or (beatToLeft[i+1]-beatToLeft[i]==1 and d[i+1]==0): # if this note is no more than 1-beat distance away from the next note - wnbdMeasures.append(1.0/d[i]) - elif beatToLeft[i+1] - beatToLeft[i] == 1 or (beatToLeft[i+1]-beatToLeft[i]==2 and d[i+1]==0): # if this note is no more than 2-beat distance away from the next - wnbdMeasures.append(2.0/d[i]) - else: # if the note is more than 2-beat distance away from the next note - wnbdMeasures.append(1.0/d[i]) + syncopation = float(total) / len(note_indices) - syncopation = sum(wnbdMeasures)/float(n) - return syncopation - - else: - return -1 - - -# Retrieve the stimuli -f = file('stimuli.txt') -#f = file('stimuli_34only.txt') - -#Calculate syncopation for each rhythm pattern -while True: - line = f.readline().split(';') - if len(line) == 1: - break - else: - sti_name = line[0] - rhythmString = line[1].split() - time_sig = line[2] - category = line[3] - bar = int(line[4]) - - rhythm = map(int,rhythmString[0].split(',')) - - print sti_name, WNBD(rhythm, time_sig, category, bar) + return syncopation
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Syncopation models/basic_functions.py Sun Oct 05 21:52:41 2014 +0100 @@ -0,0 +1,163 @@ +# This python file is a collection of basic functions that are used in the syncopation models. + +import math + +# The concatenation function is used to concatenate two sequences. +def concatenate(seq1,seq2): + return seq1+seq2 + +# The repetition function is to concatenate a sequence to itself for 'times' number of times. +def repeat(seq,times): + new_seq = list(seq) + if times >= 1: + for i in range(times-1): + new_seq = concatenate(new_seq,seq) + else: + #print 'Error: repetition times needs to be no less than 1.' + new_seq = [] + return new_seq + +# The subdivision function is to equally subdivide a sequence into 'divisor' number of segments. +def subdivide(seq,divisor): + subSeq = [] + if len(seq) % divisor != 0: + print 'Error: rhythmic sequence cannot be equally subdivided.' + else: + n = len(seq) / divisor + start , end = 0, n + for i in range(divisor): + subSeq.append(seq[start : end]) + start = end + end = end + n + return subSeq + + +# The ceiling function is to round each number inside a sequence up to its nearest integer. +def ceiling(seq): + seq_ceil = [] + for s in seq: + seq_ceil.append(int(math.ceil(s))) + return seq_ceil + +# The find_divisor function returns a list of all possible divisors for a length of sequence. +def find_divisor(number): + divisors = [1] + for i in range(2,number+1): + if number%i ==0: + divisors.append(i) + return divisors + +# The find_divisor function returns a list of all possible divisors for a length of sequence. +def find_prime_factors(number): + prime_factors = find_divisor(number) + + def is_prime(num): + if num < 2: + return False + if num == 2: + return True + else: + for div in range(2,num): + if num % div == 0: + return False + return True + + for i in range(len(prime_factors)-1,0,-1): + if is_prime(prime_factors[i]) == False: + del prime_factors[i] + + return prime_factors + +# The min_timeSpan function searches for the shortest possible time-span representation for a sequence. +def get_min_timeSpan(seq): + min_ts = [1] + for d in find_divisor(len(seq)): + segments = subdivide(seq,d) + if len(segments)!=0: + del min_ts[:] + for s in segments: + min_ts.append(s[0]) + if sum(min_ts) == sum(seq): + break + return min_ts + +# get_note_indices returns all the indices of all the notes in this sequence +def get_note_indices(seq): + note_indices = [] + + for index in range(len(seq)): + if seq[index] != 0: + note_indices.append(index) + + return note_indices + +# The get_H returns a sequence of metrical weight for a certain metrical level (horizontal), +# given the sequence of metrical weights in a hierarchy (vertical) and a sequence of subdivisions. +def get_H(weight_seq,subdivision_seq, level): + H = [] + #print len(weight_seq), len(subdivision_seq), level + if (level <= len(subdivision_seq)-1) & (level <= len(weight_seq)-1): + if level == 0: + H = repeat([weight_seq[0]],subdivision_seq[0]) + else: + H_pre = get_H(weight_seq,subdivision_seq,level-1) + for h in H_pre: + H = concatenate(H, concatenate([h], repeat([weight_seq[level]],subdivision_seq[level]-1))) + else: + print 'Error: a subdivision factor or metrical weight is not defined for the request metrical level.' + return H + +# The get_subdivision_seq function returns the subdivision sequence of several common time-signatures defined by GTTM, +# or ask for the top three level of subdivision_seq manually set by the user. +def get_subdivision_seq(timesig, L_max): + subdivision_seq = [] + + if timesig == '2/4' or timesig == '4/4': + subdivision_seq = [1,2,2] + elif timesig == '3/4': + subdivision_seq = [1,3,2] + elif timesig == '6/8': + subdivision_seq = [1,2,3] + elif timesig == '9/8': + subdivision_seq = [1,3,3] + elif timesig == '12/8': + subdivision_seq = [1,4,3] + elif timesig == '5/4': + subdivision_seq = [1,5,2] + elif timesig == '7/4': + subdivision_seq = [1,7,2] + elif timesig == '11/4': + subdivision_seq = [1,11,2] + else: + print 'Undefined time-signature. Please indicate subdivision sequence for this requested time-signature, e.g. [1,2,2] for 4/4 meter.' + for i in range(3): + s = int(input('Enter the subdivision factor at metrical level '+str(i)+':')) + subdivision_seq.append(s) + + if L_max > 2: + subdivision_seq = subdivision_seq + [2]*(L_max-2) + else: + subdivision_seq = subdivision_seq[0:L_max+1] + + return subdivision_seq + + # The split_by_bar function seperates the score representation of rhythm by bar lines, + # resulting in a list representingbar-by-bar rhythm sequence, + # e.g. rhythm = ['|',[ts1,td1,v1], [ts2,td2,v2], '|',[ts3,td3,v3],'|'...] + # rhythm_bybar = [ [ [ts1,td1,v1], [ts2,td2,v2] ], [ [ts3,td3,v3] ], [...]] +# def split_by_bar(rhythm): +# rhythm_bybar = [] +# bar_index = [] +# for index in range(len(rhythm)): +# if rhythm[index] == '|': + +# return rhythm_bybar + +# def yseq_to_vseq(yseq): +# vseq = [] + +# return vseq + + +# # testing +# print find_prime_factors(10) \ No newline at end of file
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Syncopation models/main.py Sun Oct 05 21:52:41 2014 +0100 @@ -0,0 +1,85 @@ + +def syncopation_perbar(seq, model, timesig = None, subdivision_seq = None, weight_seq = None, L_max = 5, prebar_seq = None, postbar_seq = None, strong_beat_level = None): + syncopation = None + + if seq == None or model == None: + print 'Error: please indicate rhythm sequence and syncopation model.' + + elif timesig == None and subdivision_seq == None: + print 'Error: please indicate either time signature or subdivision sequence.' + + else: + while subdivision_seq == None: + from basic_functions import get_subdivision_seq + subdivision_seq = get_subdivision_seq(timesig, L_max) + + # polyrhythm detection: + def get_rhythm_category(): + rhythm_category = 'mono' + from basic_functions import get_min_timeSpan, find_prime_factors + for f in find_prime_factors(len(get_min_timeSpan(seq))): + if not (f in subdivision_seq): # if one of the prime factors of this sequence is not in subdivision_seq + rhythm_category = 'poly' + break + return rhythm_category + + rhythm_category = get_rhythm_category() + + if model == 'LHL': + import LHL + if weight_seq == None: + weight_seq = range(0,-L_max,-1) + syncopation = LHL.get_syncopation(seq, subdivision_seq, weight_seq, prebar_seq, rhythm_category) + elif model == 'PRS': + import PRS + syncopation = PRS.get_syncopation(seq, subdivision_seq, postbar_seq, rhythm_category) + elif model == 'TMC': + import TMC + if weight_seq == None: + weight_seq = range(L_max+1,0,-1) + syncopation = TMC.get_syncopation(seq, subdivision_seq, weight_seq, L_max, rhythm_category) + elif model == 'SG': + import SG + if weight_seq == None: + weight_seq = range(L_max+1) + syncopation = SG.get_syncopation(seq, subdivision_seq, weight_seq, L_max, rhythm_category) + elif model == 'KTH': + import KTH + syncopation = KTH.get_syncopation(seq, timesig, postbar_seq) + elif model == 'TOB': + import TOB + syncopation = TOB.get_syncopation(seq) + elif model == 'WNBD': + import WNBD # based on normalising per bar so far, should be normalising whole rhythm... + if strong_beat_level == None: + if timesig == '4/4': + strong_beat_level = 2 + else: + strong_beat_level = 1 + syncopation = WNBD.get_syncopation(seq, subdivision_seq, strong_beat_level, postbar_seq) + else: + print 'Error: undefined syncopation model.' + + return syncopation + + +#def syncopation(rhythm, model, timesig, subdivision_seq = None, weight_seq = None, L_max = 5, strong_beat_level = None): + + + +### TESTING +clave = [1,0,0,1,0,0,1,0,0,0,1,0,1,0,0,0] +bf = [0,0,0,1,0,0,0,0,0,0,1,0] +rhythm = [0,1,0,1,0,1,0,1] +classic1 = [1,0,1,1]*3 + [1,0,0,0] +classic2 = [1,0,0,1]*3 + [1,0,0,0] +shiko = [1,0,1,1,0,1,1,0] +rumba = [1,0,0,1,0,0,0,1,0,0,1,0,1,0,0,0] +soukous = [1,0,0,1,0,0,1,0,0,0,1,1,0,0,0,0] +gahu = [1,0,0,1,0,0,1,0,0,0,1,0,0,0,1,0] +bossanova = [1,0,0,1,0,0,1,0,0,0,1,0,0,1,0,0] + +classic12 = [1,0,0,1,1,1,1,0,0,1,1,1] +soli = [1,0,1,0,1,0,1,0,1,1,0,1] + +print syncopation_perbar(seq = clave, model = 'TMC', timesig = '4/4')