Mercurial > hg > movesynth
view tim_grid_mapper/grid_mapper.py @ 8:6df5e29a65f9
Updated OSC spec for communication between Ableton and Joe's synth.
author | Tim MB <tim.murraybrowne@eecs.qmul.ac.uk> |
---|---|
date | Thu, 17 Feb 2011 13:57:51 +0000 |
parents | 5f9ad838d417 |
children | d68f98883e63 |
line wrap: on
line source
''' grid_mapper.py - maintained by Tim. This module implements a mapping from person positions (id,x,y,z) to generate pitch, velocities and channels for Joe's synthesiser and controller data for Ableton. ''' from OSC import ThreadingOSCServer, OSCClient, OSCMessage, OSCClientError from threading import Thread from time import sleep #### PUBLIC OPTIONS #### num_channels = 3 # number of instruments (and max number of people who can # make noise). #### OSC OPTIONS - THESE NEED TO BE SET MANUALLY #### my_port = 12344 # to receive OSC messages joe = ('localhost', "THIS_MUST_BE_SET") ableton = ('localhost', "THIS_MUST_BE_SET") ### Constants for grid mapping: # The range of values that the input coordinates and output values may take: # (ranges are inclusive) MIN = { 'x' : 0., 'y' : 0., 'z' : 0., 'pitch' : 0, 'cc1' : 0, 'cc2' : 0, } MAX = { 'x' : 1., 'y' : 1., 'z' : 1., 'pitch' : 15, 'cc1' : 127, 'cc2' : 127, } #### PRIVATE VARIABLES #### # mapping from channel to the currently playing note (pitch values) or None # initialize each channel to None: currently_playing = [None] * num_channels # mapping from personId to time of last update last_update_times = {} # OSC OBJECTS server = None # Initialised when start() is run client = OSCClient() ### MAPPING def send_to_joe(data, address='/test'): '''Sends `data` to Joe directly as an OSC message. ''' message = OSCMesssage(address) message.extend(data) client.sendto(message, joe) print_d('==OSC Output to Joe %s:==\n %s' % (joe, data)) def flush(channel): '''Sends note off messages for whatever note is currently playing on `channel`. ''' pitch = currently_playing[channel] if pitch: #print_d('Sending note-off for note %i on channel %i.' % (pitch, channel)) send_to_joe([ 'Turn off note %i on channel %i' % (pitch, channel), # first string is ignored pitch, # pitch to turn off 0, # 0 to turn note off 127, # doesn't matter for note-off (but never send 0) channel, ]) def person_handler(address, tags, data, client_address): ''' Handles OSC input matching the 'person' tag. `data` should be in form [person_id, x, y, z] ''' pitch, velocity, channel, cc1, cc2 = grid_map(data) ## Format data for Joe - done using Specification.txt on 2011-02-15 # constrain and round off pitch and velocity pitch = max(min(round(pitch), MAX['pitch']), MIN['pitch']) velocity = max(min(round(velocity), MAX['velocity']), MIN['velocity']) if velocity and pitch == currently_playing[channel]: return # keep playing current note # otherwise turn note off: flush(channel) if velocity: # if there is a new note to play send_to_joe([ 'Turn on note %i on channel %i' % (pitch, channel), # first value is string which is ignored pitch, 1, # 1 to turn note on velocity, channel ]) def grid_map(person_id, x, y, z): '''This function maps from a person's location to MIDI data returning a tuple (pitch, velocity, channel, cc1, cc2). The current mapping creates higher pitch values as the person moves closer to the Kinect (i.e. z decreases). x and y values are mapped to cc1 and cc2 (to be sent straight to Ableton and determined by a particular synth) NB. channel == person_id and velocity==0 when note is off. Midi-Velocity is currently unimplemented but will use Person-velocity data when that becomes available. This function does not guarantee that the output will be in range if the input goes out of range. ''' pitch = round(interpolate(z, MIN['z'], MAX['z'], MIN['pitch'], MAX['pitch']) ) cc1 = round(interpolate(x, MIN['x'], MAX['x'], MIN['cc1'], MAX['cc1']) ) cc2 = round(interpolate(x, MIN['y'], MAX['y'], MIN['cc2'], MAX['cc2']) ) velocity = 127 return (pitch, velocity, person_id, cc1, cc2) def interpolate(x, a, b, A, B): ''' Interpolates x from the range [a, b] to the range [A, B]. Interpolation is linear. From [a,b] to [A,B] this would be: (B-A)*(x-a)/(b-a) + A ''' return (B-A)*(x-a)/(b-a) + A def print_d(string): '''Function to print out debug method. Disable if processing power is in demand. ''' print(string) #### CONTROL def start(): '''Set up OSC servers and start the program running. ''' global joe, ableton, server if joe[1] == "THIS_MUST_BE_SET": joe_port = input("Enter port number on %s for Joe's synth software: " % joe[0]) joe = (joe[0], joe_port) if ableton[1] == "THIS_MUST_BE_SET": ableton_port = input("Enter port number on %s for Ableton: " % ableton[0]) ableton = (ableton[0], ableton_port) server = ThreadingOSCServer(('localhost', my_port)) # Register OSC callbacks: server.addMsgHandler('/person', person_handler) t = Thread(target=server.serve_forever) t.start() if server.running and t.is_alive(): print('OSC Server running on port %i.' % my_port) print("Use 'stop()' to close it.") else: print('Error: Either server thread died or server is not reported as running.') def stop(): '''Close the OSC server. ''' if server: server.close() sleep(0.3) if not server.running: print("\n\nSuccessfully closed the OSC server. Ignore a 'Bad file descriptor' Exception - there's nothing I can do about that.") print("Type 'quit()' to exit.") else: print('Error: server has been told to close but is still running.') if __name__=='__main__': start() while True: try: print(repr(input())) except Exception as e: print('Caught %s:' % repr(e)) exception = e trace = traceback.format_exc() print('Exception saved as `exception`. Stack trace saved as `trace`.')