Mercurial > hg > mood-conductor
view visualclient/midiclient.py @ 30:4a7fde8ff0fd
added MIDI client made during the Barbican hack day
author | gyorgyf |
---|---|
date | Mon, 29 Apr 2013 15:35:28 +0100 |
parents | |
children |
line wrap: on
line source
#!/usr/bin/env python # encoding: utf-8 """ visclient.py Created by George Fazekas on 2012-06-17. Copyright (c) 2012 . All rights reserved. """ import sys,os,math,time,copy import pygame as pg from pygame.locals import * import httplib as ht from pygame import midi import gradients from gradients import genericFxyGradient from threading import Thread from random import random import numpy as np import colorsys as cs NOTE_OFF_CH0 = [[[0xb0,0x7b,0],0]] # 1 HONKY TONK 0x1 / 0x4 (5) # 2 SITAR 0x9 / 0x0 (1) # 3 Melancholia # 4 Rock GITAR 0x3 / 0x2C (45) IMAP = {1 : (0x1,0x4), 2: (0x9,0x0), 3:(0x8,0xb), 4:(0x3,0x2c) } # from pytagcloud import create_tag_image, make_tags # from pytagcloud.lang.counter import get_tag_counts # YOUR_TEXT = "A tag cloud is a visual representation for text data, typically\ # used to depict keyword metadata on websites, or to visualize free form text." # # tags = make_tags(get_tag_counts(YOUR_TEXT), maxsize=120) # # create_tag_image(tags, 'cloud_large.png', size=(900, 600), fontname='Lobster') scol = (0,255,0,255) ecol = (0,0,0,255) # X,Y=1140,900 # X,Y = 600,400 X,Y = 800,600 # Fullscreen resolution: # XF,YF = 1280,900 # XF,YF = 1440,900 XF,YF = 1344,900 # display calibrated # detect display resolution import subprocess screenres = subprocess.Popen('xrandr | grep "\*" | cut -d" " -f4',shell=True, stdout=subprocess.PIPE).communicate()[0] screenres = map(lambda x: int(x.strip()), screenres.split('x')) XF,YF = screenres print "Screen resolution: ",XF,YF NBLOBS = 18 BLOBSIZE = 25 G=110 FADE = 15 DIST = 0.15 # blob equivalence tolerance FRAMERATE = 60 # Connection: # IP = "127.0.0.1:8030" # IP = "192.168.2.158:8030" IP = "138.37.95.215" HTTP_TIMEOUT = 3 SERVER_UPDATE_INTERVAL = 0.8 class Indicator(object): off_color = pg.Color(110,0,0) on_color = pg.Color(0,120,0) def __init__(self,bg,pos): self.visible = True self.ison = True self.x,self.y = pos self.xs = int(self.x * X) self.ys = int(Y - (self.y * Y)) self.c = self.off_color self.size = 6 self.bg = bg def reinit(self,bg): self.bg = bg self.xs = int(self.x * X) self.ys = int(Y - (self.y * Y)) def draw(self): if self.visible : pg.draw.circle(self.bg, self.c, (self.xs,self.ys),self.size,0) def toggle(self): if self.ison == True : self.off() else : self.on() return self def on(self): self.c = self.on_color self.ison = True return self def off(self): self.c = self.off_color self.ison = False return self class Blob(object): def __init__(self,bg,x,y,color=(255,255,255),mood=None,fade=FADE): # print x,y self.x = x self.y = y self.quadrant = self.get_quadrant_number(x,y) self.xs = x * X self.ys = Y - (y * Y) self.bg = bg self.size = BLOBSIZE self.time = time.time() self.alpha = 255 self.c = color self.count = 1 self.visible = True self.FADE = fade if mood and mood.color : self.c = mood.color def get_quadrant_number(self,x,y): if x > 0.5 and y > 0.5 : return 1 if x > 0.5 and y < 0.5 : return 2 if x < 0.5 and y < 0.5 : return 3 if x < 0.5 and y > 0.5 : return 4 def __cmp__(self,other): d = math.sqrt( math.pow((self.x-other.x),2) + math.pow((self.y-other.y),2) ) if d < DIST : return 0 else : return -1 def draw(self): if not self.visible : return d=int(self.size) self.bg.blit(gradients.radial(self.size, (self.c[0],self.c[1],self.c[2],self.alpha), (0,0,0,self.alpha)), (self.xs-d,self.ys-d)) self.alpha = 255 - int(self.age()*self.FADE) if self.alpha < 5 : self.alpha = 1 self.visible = False def age(self): return time.time() - self.time def increment(self,count): self.time = time.time() self.count = count # self.size = int(BLOBSIZE * int(self.count/1.5)) self.to = int(BLOBSIZE * int(self.count/1.5)) self.start_animate() def get_distance(self,x,y): return math.sqrt( math.pow((self.x-x),2) + math.pow((self.y-y),2) ) def fade(self,fade): if not fade : self.alpha = 255 self.FADE = fade def reset_time(self): self.time = time.time() def start_animate(self): self.thread = Thread(target = self.animate) self.thread.daemon = True self.thread.start() def animate(self): '''Animate the way bubbles are grown.''' while self.size < self.to : self.size += 1 time_inc = 20.0 / (pow(1.2, self.to-self.size) * 200.0) time.sleep(0.02+time_inc) class Mood(): def __init__(self,word,x,y): self.word = word self.x = float(x) self.y = float(y) self.color = [] def get_distance(self,x,y): return math.sqrt( math.pow((self.x-x),2) + math.pow((self.y-y),2) ) class VisualClient(object): '''Main visualisation client.''' def __init__(self): # self.conn = ht.HTTPConnection("192.168.2.184:8030") # self.conn = ht.HTTPConnection("138.37.95.215") self.s_age = 10 self.s_dist = DIST self.s_ninp = 18 pg.init() # fontObj = pg.font.Font("freesansbold.ttf",18) white = ( 255, 255, 255) black = ( 0,0,0) self.fpsClock = pg.time.Clock() self.screen = pg.display.set_mode((X, Y)) pg.display.set_caption('Mood Conductor') self.bg = pg.Surface(self.screen.get_size()) self.bg = self.bg.convert() self.bg.fill((0,0,0)) pg.display.set_gamma(100.0) self.scol = (0,255,0,255) self.ecol = (0,0,0,255) coordstxt = "test" self.blobs = [] self.moods = [] self.read_mood_data() self.FADE = FADE self.indicators = { "conn":Indicator(self.bg,(0.98,0.02)), "update":Indicator(self.bg,(0.96,0.02)), "data":Indicator(self.bg,(0.94,0.02)), "receive":Indicator(self.bg,(0.92,0.02))} self.thread = None self.running = False self.fullscreen = False self.init_reconnect = False pg.midi.init() print pg.midi.get_device_info(4) self.midi_out = pg.midi.Output(4,0) self.midi_out.set_instrument(6) # self.midi_out.note_on(23,128,1) # self.midi_out.write([[[0xc0,0,0],20000],[[0x90,60,100],20500]]) self.active_quadrant = 1 self.prev_quadrant = 1 pass def read_mood_data(self): '''Read the mood position and color information form csv file.''' with open('moods.csv') as mf: data = mf.readlines()[1:] for line in data : l = line.split(',') mood = Mood(l[0],l[1],l[2]) self.moods.append(mood) with open('colors.txt') as ff: data = ff.readlines()[1:] data = map(lambda x: x.split(','),data) for mood in self.moods : d = cd = sys.float_info.max for colors in data : d = mood.get_distance(float(colors[0]),float(colors[1])) if d < cd : cd = d # mood.color = tuple(map(lambda x: int(pow(math.atan((float(x)/7.0)),12.5)),(colors[2],colors[3],colors[4]))) mood.color = self.set_color(tuple(map(lambda x: int(x),(colors[2],colors[3],colors[4])))) return True def set_color(self,color): '''Move to HLS colour space and manipulate saturation there.''' # TODO: ideally, we need a non-linear compressor of the lightness and saturation values r,g,b = map(lambda x: (1.0*x/255.0), color) h,l,s = cs.rgb_to_hls(r,g,b) s = 1.0 #1.0 - (1.0 / pow(50.0,s)) l = 1.0 - (1.0 / pow(20.0,l)) #0.6 r,g,b = map(lambda x: int(x*255), cs.hls_to_rgb(h,l,s)) return r,g,b def start_update_thread(self): '''Start the thread that reads data from the server.''' self.running = True self.thread = Thread(target = self.update_thread) self.thread.daemon = True self.thread.start() def stop_update_thread(self): '''Stop the thread and allow some time fot the connections to close.''' self.running = False try : self.thread.join(2) except : print "No update thread to join." def update_thread(self): '''The server update thread''' while self.running : try : self.update() # self.indicators["update"].visible = True except Exception, e: if str(e).strip() : print "Exception: ", str(e), type(e), len(str(e).strip()) self.indicators["conn"].off() # self.indicators["update"].visible = False time.sleep(SERVER_UPDATE_INTERVAL) def update(self): '''Update the blob list from the server. This should be in a thread.''' # indicate connection health by toggling an indictor self.indicators["update"].toggle() # delete invisibles for blob in self.blobs : if not blob.visible : self.blobs.remove(blob) # get new coordinates from the server self.conn.putrequest("GET","/moodconductor/result", skip_host=True) self.conn.putheader("Host", "www.isophonics.net") self.conn.endheaders() res = self.conn.getresponse() data = res.read() data = eval(data) if not data : self.conn.close() self.indicators["data"].toggle() return False for d in data : # coordstxt = "x:%s y:%s c:%s" %d x,y,count = d self.add_blob(x,y,count) self.indicators["receive"].toggle() self.conn.close() self.blobs = self.blobs[:NBLOBS] self.compute_quadrant_weighting() self.self_change_instrument() return True def compute_quadrant_weighting(self): quadrant_dict = {1:[],2:[],3:[],4:[]} # sort blobs into quadrants for blob in self.blobs : quadrant_dict[blob.quadrant].append(blob) # get weight for each quadrant_weights = [] for q,blob_list in quadrant_dict.iteritems() : quadrant_weights.append(sum(map(lambda x: x.alpha * x.size,blob_list))) self.active_quadrant = np.argmax(quadrant_weights) + 1 print self.active_quadrant return self.active_quadrant def self_change_instrument(self): if self.active_quadrant != self.prev_quadrant : self.prev_quadrant = self.active_quadrant args = IMAP[self.active_quadrant] self.send_midi_patch_change_GR20(*args) print args def add_blob(self,x,y,count=1): '''Insert a blob to the list of blobs''' # find mood correxponding to x,y cmood = None d = cd = sys.float_info.max for mood in self.moods : d = mood.get_distance(x,y) if d < cd : cd = d cmood = mood # create new blob or increase click count on existing one new = Blob(self.bg,x,y,mood=cmood,fade=self.FADE) if not new in self.blobs : self.blobs.insert(0,new) # self.send_midi() elif count > self.blobs[self.blobs.index(new)].count: self.blobs[self.blobs.index(new)].increment(count) pass def send_midi(self): # self.midi_out.write([[[0xc0,0,0],20000],[[0x90,60,100],20500]]) self.midi_out.write([[[0x90,60,100],0],[[0x90,60,100],500]]) def send_midi_patch_change_GR20(self,bank,instrument): '''PIANO = BANK 1, Patch 5.. bank starts from 1, patch starts from 0 so I have to substract one...''' self.midi_out.write([[[0xB0,0x0,bank],0],[[0xC0,instrument],100]]) # Control change (B) followed by patch change (C): # midi_out.write([[[0xB0,0x0,0x9],0],[[0xC0,0x0],100]]) # midi_out.write([[[0xB0,0x0,int(bank)],0],[[0xC0,int(instrument)-1],100]]) # 1 HONKY TONK 0x1 / 0x4 (5) # 2 SITAR 0x9 / 0x0 (1) # 3 Melancholia # 4 Rock GITAR 0x3 / 0x2C (45) def draw(self): self.bg.fill((0,0,0)) # self.bg.blit(gradients.radial(19, self.scol, self.ecol), (rect_x,rect_y)) l = copy.copy(self.blobs) l.reverse() for blob in l : blob.draw() # axis pg.draw.line(self.bg, (G,G,G), (int(X/2.0),0),(int(X/2.0),Y), 1) pg.draw.line(self.bg, (G,G,G), (0,int(Y/2.0)),(X,int(Y/2.0)), 1) # indicators for i in self.indicators.itervalues() : i.draw() def read_keys(self): '''Read keys''' for event in pg.event.get() : # Quit (event) if event.type == QUIT: self.midi_out.write(NOTE_OFF_CH0) self.quit() elif event.type == KEYDOWN : # Post Quit: Esc, q if event.key == K_ESCAPE or event.key == K_q : pg.event.post(pg.event.Event(QUIT)) # Reset: Space elif event.key == K_SPACE : self.blobs = [] # Random : r elif event.key == K_r : self.add_blob(random(),random(),count=5) # Connection : c elif event.key == K_c : self.init_reconnect = True self.indicators["conn"].off() # Fullscreen: f elif event.key == K_f : # pg.display.toggle_fullscreen() self.toggle_screen_mode() # Toggle fade: s elif event.key == K_s : if self.FADE : print "fade off" self.indicators["conn"].off() self.FADE = 0 for blob in self.blobs : blob.fade(0) else: print "fade on" self.indicators["conn"].on() self.FADE = 15 for blob in self.blobs : blob.fade(15) blob.reset_time() # inc age elif event.key == K_1 : self.s_age += 1 self.update_server_config(self.s_age,self.s_dist,self.s_ninp) # dec age elif event.key == K_2 : self.s_age -= 1 self.update_server_config(self.s_age,self.s_dist,self.s_ninp) # inc dist elif event.key == K_3 : self.s_dist += 0.025 self.update_server_config(self.s_age,self.s_dist,self.s_ninp) # dec dist elif event.key == K_4 : self.s_dist -= 0.025 if self.s_dist < 0.025 : self.s_dist = 0.025 self.update_server_config(self.s_age,self.s_dist,self.s_ninp) # inc ninp elif event.key == K_5 : self.s_ninp += 1 self.update_server_config(self.s_age,self.s_dist,self.s_ninp) # dec ninp elif event.key == K_6 : self.s_ninp -= 1 if self.s_ninp < 2 : self.s_ninp = 2 self.update_server_config(self.s_age,self.s_dist,self.s_ninp) pass pass def toggle_screen_mode(self): '''Go back and forth between full screen mode.''' if self.fullscreen == False: globals()['_X'] = globals()['X'] globals()['_Y'] = globals()['Y'] globals()['X'] = XF globals()['Y'] = YF self.screen = pg.display.set_mode((X, Y)) # self.screen = pg.display.set_mode((0,0),pg.FULLSCREEN) self.fullscreen = True self.bg = pg.Surface(self.screen.get_size()) self.bg = self.bg.convert() self.bg.fill((0,0,0)) for i in self.indicators.itervalues() : i.reinit(self.bg) i.draw() else : globals()['X'] = globals()['_X'] globals()['Y'] = globals()['_Y'] self.screen = pg.display.set_mode((X, Y)) self.fullscreen = False self.bg = pg.Surface(self.screen.get_size()) self.bg = self.bg.convert() self.bg.fill((0,0,0)) for i in self.indicators.itervalues() : i.reinit(self.bg) i.draw() pg.display.toggle_fullscreen() def run(self): # setup connection self.connect() # main loop while True : # pg.draw.circle(screen, pg.Color(255,0,0), (300,50),20,0) # screen.blit(gradients.radial(99, scol, ecol), (401, 1)) self.read_keys() # Draw self.draw() # update display self.screen.blit(self.bg, (0, 0)) pg.display.flip() self.fpsClock.tick(FRAMERATE) if self.init_reconnect: self.reconnect() return True def configure_server(self): '''Send the server some configuration data.''' # age = 10.0 # dist = DIST # ninp = 18 self.update_server_config(self.s_age,self.s_dist,self.s_ninp) def update_server_config(self,age,dist,ninp,retry = 3): '''Send the server some configuration data.''' try : self.conn.putrequest("GET","/moodconductor/config?age=%(age)s&dist=%(dist)s&ninp=%(ninp)s" %locals(), skip_host=True) self.conn.putheader("Host", "www.isophonics.net") self.conn.endheaders() res = self.conn.getresponse() if not res.status == 200 : print "Server response:", res.status, res.reason self.indicators["conn"].off() time.sleep(0.5) self.conn.putrequest("GET","/moodconductor/getconf", skip_host=True) self.conn.putheader("Host", "www.isophonics.net") self.conn.endheaders() res = self.conn.getresponse() if not res.status == 200 : print "Server response:", res.status, res.reason self.indicators["conn"].off() print "Server configuration:", res.read() except: time.sleep(2) retry -= 1 self.update_server_config(age,dist,ninp,retry) def connect(self,retry = 5): '''Connect to the server and test connection.''' if not retry : print "Server unreachable" pg.quit() raise SystemExit if retry < 5 : time.sleep(3) try : self.conn = ht.HTTPConnection(IP,timeout=HTTP_TIMEOUT) # self.start_update_thread() self.indicators["conn"].on() except : self.indicators["conn"].off() self.connect(retry = retry-1) try: self.conn.putrequest("GET","/moodconductor/index.html", skip_host=True) self.conn.putheader("Host", "www.isophonics.net") self.conn.endheaders() res = self.conn.getresponse() if res.status == 200 : self.indicators["conn"].on() else : print "Server response:", res.status, res.reason self.indicators["conn"].off() if not hasattr(self,"noretry") and raw_input("Go offline? ") in ['y',''] : return False else : self.noretry = None self.connect(retry = retry-1) except : print "Exception while testing connection." self.indicators["conn"].off() # comment out in offline mode if not hasattr(self,"noretry") and raw_input("Go offline? ") in ['y',''] : return False else : self.noretry = None self.connect(retry = retry-1) self.configure_server() self.start_update_thread() return True def reconnect(self): '''Called when c is pressed.''' self.init_reconnect = False # self.indicators["conn"].off().draw() # self.screen.blit(self.bg, (0, 0)) # pg.display.flip() self.stop_update_thread() time.sleep(1) try : self.conn.close() except : self.indicators["conn"].off() try : self.conn = ht.HTTPConnection(IP,timeout=HTTP_TIMEOUT) self.conn.connect() self.indicators["conn"].on() print "Reconnected." except : self.indicators["conn"].off() print "Error while reconnecting." self.start_update_thread() def quit(self): print "Quitting.." self.indicators["conn"].off() self.stop_update_thread() self.conn.close() pg.quit() sys.exit() def main(): v = VisualClient() v.run() if __name__ == '__main__': pass main()