Mercurial > hg > simscene-py
view simscene.py @ 8:0f8d8ff0ece7
fixed a bug in calculating end times
author | Emmanouil Thoefanis Chourdakis <e.t.chourdakis@qmul.ac.uk> |
---|---|
date | Tue, 03 Oct 2017 13:56:15 +0100 |
parents | f90eba90a78f |
children | 53ee437b5ba3 |
line wrap: on
line source
#!/bin/python # -*- coding: utf-8 -*- # For licensing please see: LICENSE # Copyright (c) Emmanouil Theofanis Chourdakis <e.t.chourdakis@qmul.ac.uk> # Argparse import argparse # Logging import logging # Pandas import pandas as pd # Numpy import numpy as np import sys # Glob import glob import random # Librosa import librosa import librosa.display import librosa.output # Matplotlib import matplotlib.pyplot as plt import matplotlib.patches as patches from cycler import cycler # Tabulate from tabulate import tabulate def _N(t, sr=44100): """ Helper function: Converts time to samples """ return int(t*sr) def read_events_file(fname): if fname[-3:].lower() == 'xls': df = pd.read_excel(fname) elif fname[-4:].lower() == 'json': df = pd.read_json(fname) elif fname[-3:].lower() in ['txt', 'csv']: with open(fname) as f: header = f.readline() s = f.readline() f.seek(0,0) if ',' in s: sep = ',' elif '\t' in s: sep = '\t' else: sep = ' ' if sep in header: logging.warning('Probably no header or malformed .csv. Will try to parse it raw.') df = pd.read_csv(f, header=None, sep=sep) else: df = pd.read_csv(f, sep=sep) df = None df.columns = ['label','sampleid','ebr','ebr_stddev','mean_time_between_instances','time_between_instances_stddev','start_time','end_time','fade_in_time','fade_out_time'] logging.info('Using input:\n'+tabulate(df, headers='keys', tablefmt='psql')) return df def read_backgrounds_file(fname): if fname[-3:].lower() == 'xls': df = pd.read_excel(fname) elif fname[-4:].lower() == 'json': df = pd.read_json(fname) elif fname[-3:].lower() in ['txt', 'csv']: with open(fname) as f: header = f.readline() s = f.readline() f.seek(0,0) if ',' in s: sep = ',' elif '\t' in s: sep = '\t' else: sep = ' ' if sep in header: logging.warning('Probably no header or malformed .csv. Will try to parse it raw.') df = pd.read_csv(f, header=None, sep=sep) else: df = pd.read_csv(f, sep=sep) df = None df.columns = ['label','sampleid','snr'] logging.info('Using input:\n'+tabulate(df, headers='keys', tablefmt='psql')) return df def read_annotations_file(fname): if fname[-3:].lower() == 'xls': df = pd.read_excel(fname) elif fname[-4:].lower() == 'json': df = pd.read_json(fname) elif fname[-3:].lower() in ['txt', 'csv']: with open(fname) as f: header = f.readline() s = f.readline() f.seek(0,0) if ',' in s: sep = ',' elif '\t' in s: sep = '\t' else: sep = ' ' if sep in header: logging.warning('Probably no header or malformed .csv. Will try to parse it raw.') df = pd.read_csv(f, header=None, sep=sep) df.columns = ['start', 'stop', 'class'] else: df.columns = ['start', 'stop', 'class'] df = pd.read_csv(f, sep=sep) df = None logging.info('Using input:\n'+tabulate(df, headers='keys', tablefmt='psql')) return df def run_demo(): print("TODO: Implement run_demo()") def fade(x, fade_in, fade_out, sr=44100): """ Creates a fade-in-fade-out envelope for audio array x. """ fade_in_samples = int(fade_in*sr) fade_out_samples = int(fade_out*sr) outp = np.ones_like(x) for n in range(fade_in_samples): outp[n] = n*1./fade_in_samples for n in range(fade_out_samples): outp[len(outp)-fade_out_samples+n] = 1-1./fade_out_samples*n return outp*x def simscene(input_path, output_path, scene_duration, score_events, score_backgrounds, **kwargs): logging.info('simscene() is not yet implemented') SR = 44100 # Samplerate. Should probably not be hardcoded events_df = score_events # Create empty numpy array scene_arr = np.zeros(int(scene_duration*SR)) if 'end_cut' in kwargs: end_cut = kwargs['end_cut'] else: end_cut = False if 'figure_verbosity' in kwargs: figure_verbosity = kwargs['figure_verbosity'] else: figure_verbosity = 0 if 'image_format' in kwargs: image_format = kwargs['image_format'] else: image_format = 'png' # Stores the starting and ending times of every track for visualization # purposes scene_starting_times = [] scene_ending_times = [] for n in range(len(events_df)): # Get label of track label = str(events_df['label'].loc[n]) candidates = glob.glob('{}/event/{}*.wav'.format(input_path, events_df['sampleid'].loc[n])) chosen_fname = random.sample(candidates,1)[0] wav, sr = librosa.load(chosen_fname, sr=SR) assert sr == SR, "Sample rate of individual tracks must be 44100Hz (Failed: `{}' with sample rate: {} )".format(chosen_fname, sr) # Apply a fader envelope fade_in_time = float(events_df['fade_in_time'].loc[n]) fade_out_time = float(events_df['fade_out_time'].loc[n]) wav = fade(wav, fade_in_time, fade_out_time) # Mean time between instances \mu. mean_time_between_instances = events_df['mean_time_between_instances'].loc[n] track_end_time = events_df['end_time'].loc[n] # Track array track_arr = np.zeros(int(scene_duration*SR)) #If \mu is -1, then play the event only once. if mean_time_between_instances == -1: track_arr[_N(events_df['start_time'].loc[n]):_N(events_df['start_time'].loc[n])+len(wav)] += wav else: # If 0, then start next sample after this one (set it to the duration of the sample) if mean_time_between_instances == 0: mean_time_between_instances = len(wav)/float(SR) # Store the successive starting and ending times of the events (given e.g. the model) # in the following lists. start_times = [events_df['start_time'].loc[n]] end_times = [start_times[-1]+len(wav)/float(SR)] # Start with the first time in the list new_start_time = start_times[-1] new_end_time = end_times[-1] # Until the scene is full while new_start_time < track_end_time: offset = float(mean_time_between_instances) +\ float(events_df['time_between_instances_stddev'].loc[n]*np.random.randn()) logging.debug(offset) new_start_time += offset new_end_time += offset # Only exception is if we have set the 'end_cut' flag # and the end time of the event surpasses the end time # of the track if end_cut and new_end_time > track_end_time: break else: start_times.append(new_start_time) end_times.append(new_end_time) for t in start_times: # We need to be careful with the limits here # since numpy will just ignore indexing that # exceeds the size of the array begin = min(_N(t), len(track_arr)) end = min(len(track_arr), _N(t)+len(wav)) # Part of the wav to store part = wav[:end-begin] # If wav file was concatenated, fade out # quickly to avoid clicks if len(part) < len(wav) and len(part) > fade_out_time*SR: part = fade(part, 0, fade_out_time) track_arr[begin:end] += part scene_arr[:len(track_arr)] += track_arr if figure_verbosity > 0: plt.figure() plt.subplot(2,1,1) plt.title('`{}\' waveform and spectrogram'.format(label)) visible_track = track_arr[int(start_times[0]*SR):int(end_times[-1]*SR)] librosa.display.waveplot(visible_track,sr=SR) F = librosa.stft(visible_track) Fdb = librosa.amplitude_to_db(F) plt.subplot(2,1,2) librosa.display.specshow(Fdb, sr=SR, x_axis='time', y_axis='hz') plt.savefig('{}/{}.{}'.format(output_path, label, image_format)) scene_starting_times.append((label, start_times)) scene_ending_times.append((label, end_times)) if figure_verbosity > 0: plt.figure() plt.subplot(3,1,1) plt.title('Waveform and spectrogram for the full track') librosa.display.waveplot(scene_arr) F = librosa.stft(scene_arr) Fdb = librosa.amplitude_to_db(F) plt.subplot(3,1,2) librosa.display.specshow(Fdb, sr=SR, x_axis='time', y_axis='hz') ax = plt.subplot(3,1,3) ax.set_xlim([0,scene_duration]) ax.set_ylim([-0.5, len(scene_starting_times)-0.5]) # Get labels labels = [s[0] for s in scene_starting_times] plt.yticks(range(len(scene_starting_times)), labels) plt.rc('lines', linewidth=4) plt.rc('axes', prop_cycle=(cycler('color', ['r', 'g', 'b', 'y']) + cycler('linestyle', ['-', '--', ':', '-.']))) for n in range(len(scene_starting_times)): label = scene_starting_times[n][0] start_times = scene_starting_times[n][1] end_times = scene_ending_times[n][1] for m in range(len(start_times)): plt.axhline(float(n), start_times[m], start_times[m]+1.0) # for m in range(len(start_times)): # plt.text( # start_times[m], # n, # label, # size=9,ha='center',va='center', # bbox=dict(boxstyle='square', ec=(1., 0.5, 0.5), fc=(1., 1-n/float(len(scene_starting_times)), n/float(len(scene_starting_times)))), # ) plt.savefig('{}/full-scene.{}'.format(output_path, image_format)) if figure_verbosity > 1: plt.show() if channel_mode == 'mono': librosa.output.write_wav('{}/full-scene.wav'.format(output_path), scene_arr, SR) return scene_arr def not_implemented(): print("TODO: not implemented") if __name__=="__main__": """ Main function, parses options and calls the simscene generation function or a demo. The options given are almost identical to Lagrange et al's simscene. """ argparser = argparse.ArgumentParser( description="SimScene.py acoustic scene generator", ) argparser.add_argument( 'input_path', type=str, help="Path of a directory containing wave files for sound backgrounds (in the `background' sub-directory) or events (in `event')" ) argparser.add_argument( 'output_path', type=str, help="The directory the generated scenes and annotations will reside." ) argparser.add_argument( 'scene_duration', type=float, help="Duration of scene in seconds", ) scene_duration = None argparser.add_argument( '-e', '--score-events', type=str, help="Score events file as a comma-separated text file (.csv, .txt), JSON (.json), or Excel (.xls) file" ) score_events = None argparser.add_argument( '-b', '--score-backgrounds', type=str, help="Score backgrounds file as a comma-separated text file (.csv, .txt), JSON (.json), or Excel (.xls) file" ) score_backgrounds = None argparser.add_argument( '-t', '--time-mode', type=str, help="Mode of spacing between events. `generate': values must be set for each track in the score files. `abstract': values are computed from an abstract representation of an existing acoustic scene. `replicate': values are replicated from an existing acousting scene.", choices=['generate', 'abstract', 'replicate'] ) time_mode = 'generate' argparser.add_argument( '-R', '--ebr-mode', type=str, help="Mode for Event to Background power level ratio. `generate': values must be set for each track in the score files. `abstract': values are computed from an abstract representation of an existing acoustic scene. `replicate': values are replicated from an existing acousting scene.", choices=['generate', 'abstract', 'replicate'] ) ebr_mode = 'generate' argparser.add_argument( '-A', '--annotation-file', type=float, help="If -R or -m are selected, this provides the source for sourcing the times or EBRs from ANNOTATION_FILE. ANNOTATION_FILE must be comma-separated text file (.csv, .txt), JSON (.json), or Excel (.xls)." ) annotation_file = None argparser.add_argument( '-a', '--audio-file', type=float, help="If -R or -m are selected, this provides the source for sourcing the times or EBRs from AUDIO_FILE. AUDIO_FILE must be a 44100Hz .wav file." ) audio_file = None argparser.add_argument( '-v', '--figure-verbosity', action='count', help="Increase figure verbosity. (Default) 0 - Don't save or display figures, 1 - Save pictures but do not display them, 2 - Save and display figures" ) figure_verbosity = None argparser.add_argument( '-x', '--image-format', help="Image format for the figures", choices=['png', 'jpg', 'pdf'] ) image_format = 'png' argparser.add_argument( '-C', '--channel-mode', type=str, help="number of audio channels contained in file. (Default) 'mono' - 1 channel (mono), 'classes' - As many channels as sound classes (events+textures), 'separate' - Same as 'classes', each channel is saved in a separate .wav file.", choices=['mono', 'classes', 'separate'] ) channel_mode = 'mono' argparser.add_argument( '-m', '--min-space', type=float, help="Minimum space allowed between successive events (seconds). If -1, then allow overlapping between events." ) min_space = None argparser.add_argument( '-c', '--end-cut', action='store_true', help="If the last sample ends after the scene ends then: if enabled, cut the sample to duration, else remove the sample." ) end_cut = None logging.basicConfig(level=logging.DEBUG) args = argparser.parse_args() if args.input_path: input_path = args.input_path logging.debug("Using `{}' as input path".format(input_path)) if args.output_path: output_path = args.output_path logging.debug("Saving to `{}'".format(output_path)) if args.scene_duration: if not (args.score_backgrounds or args.score_events): print("You must provide one of -e or -b") else: if args.image_format: image_format = args.image_format if args.channel_mode: channel_mode = args.channel_mode if args.ebr_mode: ebr_mode = args.ebr_mode if ebr_mode not in ['generate']: logging.warning("`{}' not yet implemented for EBR_MODE, using default.".format(ebr_mode)) ebr_mode = 'generate' if args.time_mode: time_mode = args.time_mode if time_mode not in ['generate']: logging.warning("`{}' not yet implemented for TIME_MODE, using default.".format(time_mode)) time_mode = 'generate' if args.annotation_file: annotations = read_annotations_file(args.annotation_file) scene_duration = float(args.scene_duration) if args.score_backgrounds: score_backgrounds = read_backgrounds_file(args.score_backgrounds) if args.score_events: score_events = read_events_file(args.score_events) if args.figure_verbosity: figure_verbosity = args.figure_verbosity simscene(input_path, output_path, scene_duration, score_events, score_backgrounds, time_mode=time_mode, ebr_mode=ebr_mode, channel_mode=channel_mode, annotation_file=annotation_file, audio_file=audio_file, figure_verbosity=figure_verbosity, min_space=min_space, end_cut=end_cut, image_format=image_format)