view dml-cla/python/tuning_stats.py @ 0:718306e29690 tip

commiting public release
author Daniel Wolff
date Tue, 09 Feb 2016 21:05:06 +0100
parents
children
line wrap: on
line source
# Part of DML (Digital Music Laboratory)
# Copyright 2014-2015 Daniel Wolff, City University; Steven Hargreaves; Samer Abdallah, University of London
 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA

# -*- coding: utf-8 -*-
__author__='wolffd, hargreavess, abdallahs'

# this script derives standard statistics for tuning frequency, 
# in particular:
# average
# standard deviation
# histogram

from rdflib import RDF, RDFS
from csvutils import *
from aggregate import *
from n3Parser import get_rdf_graph_from_n3 
import numpy

def transcription_from_csv(filename):
    # we assume CSV: time, duration, pitch, velocity, note_name 
    # return (time, duration, pitch, note_name)
    return csv_map_rows(filename,5, lambda row:(float(row[0]),float(row[1]),float(row[2]),row[4]))

def transcription_from_n3(filename):
    graph=get_rdf_graph_from_n3(filename)
    notes = [ ( event_time_prop(graph, ev, tl_ns.beginsAt), 
                event_time_prop(graph, ev, tl_ns.duration),
                graph.value(ev,af_ns.feature).split(' ')[0],
                graph.value(ev,RDFS.label) )
              for ev in subject((RDF.type, af_ns.Note)) ]

parser_table = { 'n3':transcription_from_n3, 
                 'csv':transcription_from_csv }

offset = { 'D':7, 'E':5, 'A':0 }

def fold_pitch(freq,name): 
    # semis =(4+offset[name[0]]-int(name[1]))
    # print_status("folding by %d" % semis)
    return freq * 2**(4+offset[name[0]]/12.0-int(name[1]))
def tuning_note(n):
    return n[1] in ['3','4','5'] and n[0] in ['A','E','D']


def per_file(inputs):
    means = []
    hists = []
    hist_edges =[]

    def accum(item):
        # get duration and normalised frequency for all tuning pitches (A3,A4,A5)
        a_notes = [ (note[1],fold_pitch(note[2],note[3])) 
                    for note in decode_tagged(parser_table,item) 
                    if tuning_note(note[3]) ]

        if len(a_notes)==0:
            print_status("No notes for "+str(item))
        else:
            # get frequency and duration columns
            freq = numpy_column(a_notes,1)
            dur = numpy_column(a_notes,0)
            # get mean values per clip now,
            # then statistics over clips later
            avg, std = weighted_stats(freq, weights = dur)
            (counts, edges) = histogram(freq, 100, 390, 490, weights=dur)
            
            means.append(avg) 
            hists.append(counts)
            if len(hist_edges) == 0: 
                hist_edges.extend(edges)
            
    st=for_each(inputs,accum)

    avg, std = stats(numpy.array(means,dtype=float))

    # !!! does this make any sense?
    hist_mean, hist_std = stats(numpy.array(hists,dtype=float))

    return { 'result': { 'mean': avg, 'std-dev': std, 
                         'hist': continuous_hist(hist_edges,hist_mean) }, 
             'stats' : st }


def aggregate(inputs):
    notes = [] # will contain all notes in all inputs
    def accum(item):
        # get duration and normalised frequency for all tuning pitches (A3,A4,A5)
        # and collect them all in notes
        notes.extend( [ (note[1],fold_pitch(note[2],note[3])) 
                        for note in decode_tagged(parser_table,item) 
                        if tuning_note(note[3]) ] )
    
    # execute accumulation for each accum                    
    stats=for_each(inputs,accum)
            
    # get frequency and duration columns
    dur = numpy_column(notes,0)
    freq = numpy_column(notes,1)

    # get basic statistics
    avg, std = weighted_stats(freq, weights=dur)

    # get histogram weighted by duration
    counts, edges = histogram(freq, 100, 390, 490, weights=dur)
        
    return { 'result': { 'mean': avg, 'std_dev': std, 
                         'hist': continuous_hist(edges,counts) },
             'stats' : stats }

# convert one column, specified by datapos, to numpy array
def numpy_column(data,datapos):
    return numpy.array([ row[datapos] for row in data ], dtype=float)

#calculates the histogram
# nbins: number of bins
# lb: lower bound
# ub: upper bound
def histogram(colu, nbins, lb, ub, weights = []):
    counts,edges = numpy.histogram(colu, bins=nbins, range=[lb, ub], weights=weights)
    counts = counts / numpy.sum(counts)
    
    return (counts.tolist(), edges.tolist())

# calculates unweighted statistics for the histograms
def stats(counts):
    avg = numpy.average(counts, axis = 0).tolist()
    std = numpy.std(counts, axis =0)
    return (avg,std)
    
#calculates weighted statistics for  numerical input
def weighted_stats(colu, weights = []):
    avg = numpy.average(colu, axis = 0 ,weights = weights)
    #weighted standard deviation
    std = numpy.sqrt(numpy.average((colu-avg)**2, axis = 0, weights=weights))
    #std = numpy.std(colu, weights = weights).tolist()
    #med = numpy.median(colu, weights = weights).tolist()
    # could use https://pypi.python.org/pypi/wquantiles for weighted median
    return (avg,std)