view cnmf.py @ 11:915c849b17ea

added arg in cnmf script to choose to run standard nmf
author mitian
date Mon, 18 May 2015 17:43:48 +0100
parents 294f66d285af
children c01fcb752221
line wrap: on
line source
"""
C-NMF method for segmentation, modified from here:

Nieto, O., Jehan, T., Convex Non-negative Matrix Factorization For Automatic
Music Structure Identification. Proc. of the 38th IEEE International Conference
on Acoustics, Speech, and Signal Processing (ICASSP). Vancouver, Canada, 2013.
"""

__author__ = "Oriol Nieto"
__copyright__ = "Copyright 2014, Music and Audio Research Lab (MARL)"
__license__ = "GPL"
__version__ = "1.0"
__email__ = "oriol@nyu.edu"

import numpy as np
import pymf

# Local stuff
from utils import SegUtil

# Algorithm params
h = 8               # Size of median filter for features in C-NMF
R = 15              # Size of the median filter for the activation matrix C-NMF
rank = 4            # Rank of decomposition for the boundaries
rank_labels = 6     # Rank of decomposition for the labels
R_labels = 6        # Size of the median filter for the labels

def cnmf(S, rank, niter=500):
    """(Convex) Non-Negative Matrix Factorization.

    Parameters
    ----------
    S: np.array(p, N)
        Features matrix. p row features and N column observations.
    rank: int
        Rank of decomposition
    niter: int
        Number of iterations to be used

    Returns
    -------
    F: np.array
        Cluster matrix (decomposed matrix)
    G: np.array
        Activation matrix (decomposed matrix)
        (s.t. S ~= F * G)
    """
    nmf_mdl = pymf.CNMF(S, num_bases=rank)
    nmf_mdl.factorize(niter=niter)
    F = np.asarray(nmf_mdl.W)
    G = np.asarray(nmf_mdl.H)
    return F, G

def nmf(S, rank, nither=500):
	nmf_mdl = pymf.NMF(S, num_bases=rank, niter=nither)
	nmf_mdl.factorize()
    F = np.asarray(nmf_mdl.W)
    G = np.asarray(nmf_mdl.H)
    return F, G
	
	
def most_frequent(x):
    """Returns the most frequent value in x."""
    return np.argmax(np.bincount(x))


def compute_labels(X, rank, R, bound_idxs, niter=300):
    """Computes the labels using the bounds."""

    X = X.T
    try:
        F, G = cnmf(X, rank, niter=niter)
    except:
        return [1]

    label_frames = filter_activation_matrix(G.T, R)
    label_frames = np.asarray(label_frames, dtype=int)

    # Get labels from the label frames
    labels = []
    bound_inters = zip(bound_idxs[:-1], bound_idxs[1:])
    for bound_inter in bound_inters:
        if bound_inter[1] - bound_inter[0] <= 0:
            labels.append(np.max(label_frames) + 1)
        else:
            labels.append(most_frequent(
                label_frames[bound_inter[0]:bound_inter[1]]))

    return labels


def filter_activation_matrix(G, R):
    """Filters the activation matrix G, and returns a flattened copy."""
    idx = np.argmax(G, axis=1)
    max_idx = np.arange(G.shape[0])
    max_idx = (max_idx, idx.flatten())
    G[:, :] = 0
    G[max_idx] = idx + 1
    G = np.sum(G, axis=1)
    G = SegUtil.median_filter(G[:, np.newaxis], R)
    return G.flatten()


def segmentation(X, rank=4, R=15, h=8, niter=300, CNMF=True):
    """
    Gets the segmentation (boundaries and labels) from the factorization
    matrices.

    Parameters
    ----------
    X: np.array()
        Features matrix (e.g. chromagram)
    rank: int
        Rank of decomposition
    R: int
        Size of the median filter for activation matrix
    niter: int
        Number of iterations for k-means
    bound_idxs : list
        Use previously found boundaries (None to detect them)
	CNMF : bool
		If True, use CNMF; otherwise use NMF
		
    Returns
    -------
    bounds_idx: np.array
        Bound indeces found
    labels: np.array
        Indeces of the labels representing the similarity between segments.
    """

    # Filter
    X = SegUtil.median_filter(X, M=h)
    X = X.T

    # Find non filtered boundaries
    bound_idxs = None
    while True:
        if bound_idxs is None:
            try:
                if CNMF: F, G = cnmf(X, rank, niter=niter)
				else: F, G = nmf(X, rank, niter=niter)
            except:
                return np.empty(0), [1]

            # Filter G
            G = filter_activation_matrix(G.T, R)
            if bound_idxs is None:
                bound_idxs = np.where(np.diff(G) != 0)[0] + 1

        if len(np.unique(bound_idxs)) <= 2:
            rank += 1
            bound_idxs = None
        else:
            break

    return G, bound_idxs