diff Syncopation models/PRS.py @ 1:b2da092dc2e0

The consolidated syncopation software. Have finished individual model and basic functions. Need to revise the coding in main.py, and add rhythm-input interface.
author Chunyang Song <csong@eecs.qmul.ac.uk>
date Sun, 05 Oct 2014 21:52:41 +0100
parents 76ce27beba95
children 031e2ccb1fb6
line wrap: on
line diff
--- a/Syncopation models/PRS.py	Fri Mar 21 15:49:46 2014 +0000
+++ b/Syncopation models/PRS.py	Sun Oct 05 21:52:41 2014 +0100
@@ -1,316 +1,57 @@
 '''
 Author: Chunyang Song
 Institution: Centre for Digital Music, Queen Mary University of London
-
-** Pressing's model **
-
-Algorithm:
-
-Only applicable to simple rhtyhms.
-
-Generate hierarchical metrical structure for rhythms, they way as same as LHL model;
-
-
 '''
 
-def subdivide(sequence, segments_num):
-	subSeq = []
-	if len(sequence) % segments_num != 0:
-		print 'Error: rhythm segment cannot be equally subdivided '
+from basic_functions import repeat, subdivide, ceiling, get_min_timeSpan
+
+def get_cost(seq,next_seq):
+	seq = get_min_timeSpan(seq)					# converting to the minimum time-span format
+	
+	if seq[1:] == repeat([0],len(seq)-1):		# null prototype
+		cost = 0
+	elif seq == repeat([1],len(seq)):			# filled prototype
+		cost = 1
+	elif seq[0] == 1 and seq[-1] == 0:			# run1 prototype
+		cost = 2
+	elif seq[0] == 1 and (next_seq == None or next_seq[0] == 0):	# run2 prototype
+		cost = 2
+	elif seq[0] == 1 and seq[-1] == 1 and next_seq != None and next_seq[0] == 1:		# upbeat prototype
+		cost = 3
+	elif seq[0] == 0:							# syncopated prototype
+		cost = 5
+
+	return cost
+
+# This function calculates the syncopation value (cost) for the sequence with the postbar_seq for a certain level. 
+def syncopation_perlevel(sub_seqs, num_subs):
+	total = 0
+	for l in range(num_subs):
+		total = total + get_cost(sub_seqs[l], sub_seqs[l+1])
+	normalised = float(total)/num_subs
+	
+	return normalised
+
+# This function calculates the overall syncopation value for a bar of sequence..
+def get_syncopation(seq,subdivision_seq, postbar_seq, rhythm_category):
+	syncopation = None
+	if rhythm_category == 'poly':
+		print 'Error: PRS model cannot deal with polyrhythms.'
 	else:
-		n = len(sequence) / segments_num
-		start , end = 0, n
-		for i in range(segments_num):
-			subSeq.append(sequence[start : end])
-			start = end
-			end = end + n
-	
-	return subSeq
+		syncopation = 0
 
-# To check whether there is a need to continue subdividing and measuring
-def checkContinue(sequence, division):
-	isContinue = False
-	if len(sequence) % division == 0:
-		subs = subdivide (sequence, division)
-		
-		for s in subs:
-			if 1 in s[1:]:	# If there are still onsets in-between the divisions
-				isContinue = True
-	else:
-		print 'Error: the sequence cannot be equally subdivided!'
-	return isContinue
+		current_num_subs = 1	# the initial number of sub-sequences at a certain metrical level
+		for subdivisor in subdivision_seq:
+			# the number of sub-sequence at the current level is product of all the subdivisors up to the current level
+			current_num_subs = current_num_subs * subdivisor
+			# recursion stops when the length of sub-sequence is less than 2
+			if len(seq)/current_num_subs >= 2:		
+				# generate sub-sequences and append the next bar sequence
+				sub_seqs = subdivide(ceiling(seq), current_num_subs)	
+				sub_seqs.append(postbar_seq)
+				# adding syncopation at each metrical level to the total syncopation
+				syncopation += syncopation_perlevel(sub_seqs, current_num_subs)	
+			else:
+				break
 
-def timeSpanTranscribe(sequence):
-	l = len(sequence)
-	transcribe = []
-
-	if not (1 in sequence):  # Full rest
-		transcribe = [0]
-	else:
-		divisor = 1
-		while True:
-			if l%divisor != 0:
-				divisor = divisor + 1
-			else:
-				sampleStep = l/divisor # how many digits in each segment, divisor also represents the number of segments
-				template = (([1] + [0]*(sampleStep-1) ) * divisor )
-
-				sampled = []
-				for i in range(l):
-					sampled.append(sequence[i] and template[i])
-
-				if sequence == sampled:
-					break
-				else:
-					divisor = divisor + 1
-
-		subs = subdivide(sequence, divisor)
-		for s in subs:
-			transcribe.append(s[0])
-
-	return transcribe
-
-# Identify the type of rhythm sequence : 0- null; 1-filled; 2-run; 3-upbeat; 5-syncopated
-def syncType (sequence, followed_event):
-	
-	# Null is full rest or only one single on-beat note, therefore no 0 in sequence[1:]
-	if not(1 in sequence[1:]) : 
-		syncType = 0 
-
-	else:
-		ts = timeSpanTranscribe(sequence)
-
-		# Filled is equally spaced onsets, therefore all 1s in the time span transcribe of the sequence
-		if not(0 in ts ):
-			syncType = 1  
-		# Run is either starting with 1 and end with 0 in time span, or starting with 1 if next bar starts with 0
-		elif ts[0] == 1 and ts[-1] == 0:
-			syncType = 2		
-		elif followed_event ==0 and ts[0] == 1:
-			syncType = 2
-		# Upbeat requires next bars starting with 1 and at least the last event in time span is 1
-		elif followed_event == 1 and ts[-1] == 1: 
-			syncType = 3
-		# Syncopated start and end off-beat		
-		elif sequence[0] == 0:
-			syncType = 5
-		else:
-			print 'Error: un-recognizable syncopation type ', sequence
-			syncType = None
-
-	return syncType
-
-def createHierarchy(rhythm, time_sig, bar):
-	h = [] # A list of lists to record syncopation type(s) in each hierarchy level across bars
-	s_bar = subdivide (rhythm, bar)
-	
-	if '4/4' in time_sig:
-		 
-		for i in range(bar):
-			bar_level = s_bar[i]
-
-			if i == bar -1:
-				followed_event = s_bar[0][0]
-			else:
-				followed_event = s_bar[i+1][0]
-			
-			h. append ( [syncType(bar_level, followed_event)] )  # Indentify syncopation type at bar level 
-
-		if checkContinue(rhythm, 2*bar):
-
-			for i in range(bar):
-				s_halfBar = subdivide (s_bar[i], 2)
-				halfBar_h = []
-
-				for j in range(2):
-					halfBar_level = s_halfBar[j]
-
-					if j == 1:
-						followed_event = s_halfBar[0][0]
-					else:
-						followed_event = s_halfBar[j+1][0]
-
-					halfBar_h.append (syncType (halfBar_level , followed_event))
-
-				h.append(halfBar_h)
-
-			if checkContinue(rhythm, 4*bar):
-			
-				for i in range(bar):
-					s_quarter = subdivide (s_bar[i], 4)
-					quarter_h = []
-
-					for j in range(4):			
-						quarter_level = s_quarter [j]
-
-						if j == 3:
-							followed_event = s_quarter[0][0]
-						else:
-							followed_event = s_quarter[j+1][0]
-
-						quarter_h. append ( syncType (quarter_level , followed_event) )	# quarter note level
-
-					h.append(quarter_h)
-
-				if checkContinue( rhythm, 8*bar):
-					
-					for i in range(bar):
-						s_eighth = subdivide (s_bar[i], 8)
-						eighth_h = []
-
-						for j in range(8):
-							eighth_level = s_eighth [j]
-
-							if j == 7:
-								followed_event = eighth_level[0][0]
-							else:
-								followed_event = eighth_level[j+1][0]
-
-							eighth_h.append (syncType (eighth_level, followed_event) )
-					
-						h.append(eighth_h)
-
-
-	elif '3/4' in time_sig:	
-		size_bar = len(s_bar)
-		for i in range(size_bar):
-			bar_level = s_bar[i]
-
-			quarter_h = []
-			eighth_h = []
-			
-			if i == size_bar -1:
-				followed_event = s_bar[0][0]
-			else:
-				followed_event = s_bar[i+1][0]
-			
-			h. append ( [syncType(bar_level, followed_event)] )  # Indentify syncopation type at bar level 
-
-			if checkContinue(bar_level, 3):
-				s_quarter = subdivide (bar_level, 3)
-				size_quarter = len(s_quarter)
-				
-				for j in range(size_quarter):
-					quarter_level = s_quarter [j]
-
-					if j == size_quarter -1:
-						followed_event = s_quarter[0][0]
-					else:
-						followed_event = s_quarter[j+1][0]
-
-					quarter_h. append ( syncType (quarter_level , followed_event) )	# quarter note level
-
-					if checkContinue( quarter_level, 2):
-						s_eighth = subdivide (quarter_level, 2)	# eighth note level
-						size_eighth = len(s_eighth)
-
-						for k in range(size_eighth):
-							eighth_level = s_eighth [k]
-
-							if k == size_eighth - 1:
-								followed_event = eighth_level[0][0]
-							else:
-								followed_event = eighth_level[k+1][0]
-
-							eighth_h.append (syncType (eighth_level, followed_event) )
-						h.append(eighth_h)
-
-				h.append(quarter_h)
-		
-
-	elif '6/8' in time_sig:		
-		for i in range(bar):
-			bar_level = s_bar[i]
-			
-			if i == bar -1:
-				followed_event = s_bar[0][0]
-			else:
-				followed_event = s_bar[i+1][0]
-			
-			h. append ( [syncType(bar_level, followed_event)] )  # Indentify syncopation type at bar level 
-			
-		if checkContinue(rhythm, 2*bar):
-			
-			for i in range(bar):
-				s_halfBar = subdivide (s_bar[i], 2)
-				halfBar_h = []
-
-				for j in range(2):
-					halfBar_level = s_halfBar [j]
-					
-					if j == 1:
-						followed_event = s_halfBar[0][0]
-					else:
-						followed_event = s_halfBar[j+1][0]
-
-					halfBar_h. append ( syncType (halfBar_level , followed_event) )	
-
-				h.append(halfBar_h)
-
-			if checkContinue( rhythm, 6*bar):
-				
-				for i in range(bar):	
-					s_eighth = subdivide (s_bar[i], 6)	# eighth note level
-					eighth_h = []
-					
-					for j in range(6):
-						eighth_level = s_eighth [j]
-
-						if j == 5:
-							followed_event = eighth_level[0][0]
-						else:
-							followed_event = eighth_level[j+1][0]
-
-						eighth_h.append (syncType (eighth_level, followed_event) )
-					
-					h.append(eighth_h)
-
-	else:
-		print 'This time signature is not defined. Choose between 4/4, 3/4 or 6/8'
-
-	return h
-
-
-def pressing(rhythm, time_sig, category, bar):
-	sync_oneLevel = []
-
-	if 'poly' in category:
-		return -1
-
-	else:
-		hierarchy = createHierarchy(rhythm, time_sig, bar)
-		# print 'h', hierarchy
-
-		if len(hierarchy) != 0:
-			for h in hierarchy:
-				sync_oneLevel.append (sum(h) / float(len(h)) )
-			
-			# Syncopation is the sum of averaged syncopation values of each level
-			syncopation = sum (sync_oneLevel)
-			
-		else:
-			syncopation = 0
-
-		return syncopation 
-
-
-# Retrieve the stimuli
-# f = file('stimuli.txt')
-f = file('stimuli_34only.txt')
-
-#Calculate syncopation for each rhythm pattern
-while True:
-	line = f.readline().split(';')
-	if len(line) == 1:
-		 break
-	else:
-		sti_name = line[0]
-		rhythmString = line[1].split()
-		time_sig = line[2]
-		category = line[3]
-		bar = int([4])
-		
-		rhythm = map(int,rhythmString[0].split(','))
-
-		print sti_name, pressing(rhythm, time_sig, category, bar)
-
+	return syncopation