#!/usr/bin/env python # vim: set expandtab ts=4: import sys import array, os, os.path import colorsys import traceback from time import time, sleep from math import sin, cos, pi from ola.ClientWrapper import ClientWrapper wrapper = None loop_count = 0 TICK_INTERVAL = 10 # in ms universenum = 0 enable = '--enable' in sys.argv[1:] phase = 0.0 bpm = 120.0 tl = time() beatsep = 0 def ctlpath(path): return os.path.join('/tmp', path) # seamless restart try: phase = float(open(ctlpath('.dmxeffects'), 'r').read()) except Exception, e: traceback.print_exc() def DmxSent(state): if not state.Succeeded(): wrapper.Stop() # # minimal abstraction... not really used much # class DMXTarget(object): pass class DMXLEDPAR(DMXTarget): # NB: relative color adjustments hardcoded here! def __init__(self, baseaddr, mult = 1.0, max_r = 255, max_g = 144, max_b = 180): self.baseaddr = baseaddr self.r = 0. self.g = 0. self.b = 0. self.max_r = max_r self.max_g = max_g self.max_b = max_b self.mult = mult def getdata(self): return (self.baseaddr, [0, max(min(int(self.r * self.mult * self.max_r), 255), 0), max(min(int(self.g * self.mult * self.max_g), 255), 0), max(min(int(self.b * self.mult * self.max_b), 255), 0), 0]) class DMXUniverse(list): def __init__(self, uniid): list.__init__(self) self.uniid = uniid def mergedata(self): data = array.array('B') for dev in self: devdata = dev.getdata() lastpos = devdata[0] + len(devdata[1]) if len(data) < lastpos: data.extend([0] * (lastpos - len(data))) for i in range(devdata[0], lastpos): data[i] = devdata[1][i - devdata[0]] return data[1:] def send(self, wrapper): md = self.mergedata() if enable: wrapper.Client().SendDmx(self.uniid, md, DmxSent) else: print ' '.join(['%02x' % i for i in md]) # # actual effect code # class EffectRGBFadeJump(object): def __init__(self, target, fadespeed = 1./120., phasejump = 0.0): self.target = target self.phasejump = phasejump self.fadespeed = fadespeed self.control = 1.0 self.camount = 0.0 self.cphase = 0.0 self.jumpv = 0.0 self.jumpdecayset = 0.0 self.jumpdecayfall = 0.9 self.r_lastphase = 0 self.r_phasejump = 0.0 self.r_jumpdecay = 0.0 def update(self, timestamp): curphase = beatsep # int(timestamp) if self.r_lastphase != curphase: self.r_phasejump += self.phasejump self.r_lastphase = curphase self.r_jumpdecay = self.jumpdecayset phase = (self.r_phasejump + timestamp * self.fadespeed) % 1 mixcorr = (cos((phase * 3) * 2*pi) - 1. / 2.) * 0.33 (self.target.r, self.target.g, self.target.b) = \ colorsys.hsv_to_rgb(phase, 1.0 - self.r_jumpdecay, (0.8 + mixcorr) * self.control + self.r_jumpdecay * self.jumpv) self.r_jumpdecay *= self.jumpdecayfall class EffectEffectCross(object): def __init__(self, group, fadespeed = 1./2., phasejump = 0.0): self.group = group self.phasejump = phasejump self.fadespeed = fadespeed self.control = 1.0 self.r_lastphase = 0 self.r_phasejump = 0.0 def update(self, timestamp): curphase = beatsep # int(timestamp) if self.r_lastphase != curphase: self.r_phasejump += self.phasejump self.r_lastphase = curphase phase = (self.r_phasejump + timestamp * self.fadespeed) % 1 for e in self.group: e.control = sin((phase + e.cphase) * 2 * pi) * e.camount + 1.0 # # deployed setup # universe = DMXUniverse(universenum) universe.append(DMXLEDPAR( 1, 0.55)) universe.append(DMXLEDPAR( 6, 0.55)) universe.append(DMXLEDPAR(11, 0.55)) universe.append(DMXLEDPAR(16, 0.55)) universe.append(DMXLEDPAR(21, 0.55)) effects = [ EffectRGBFadeJump(universe[0]), EffectRGBFadeJump(universe[1]), EffectRGBFadeJump(universe[2]), # , fadespeed = 1./12., phasejump = 0.1), EffectRGBFadeJump(universe[3]), EffectRGBFadeJump(universe[4]), ] effects.append( EffectEffectCross([effects[0], effects[1], effects[2], effects[3], effects[4]]) ) # # control loop # lastbeat = 0.0 freerun = False nobeat = True lastphase = 0.0 bpmhist = [] prevrms = 0.0 def SendDMXFrame(): global phase, tl, beatsep, lastbeat, bpm, bpmhist, freerun, nobeat, lastphase, prevrms wrapper.AddEvent(TICK_INTERVAL, SendDMXFrame) t = time() dt = t - tl tl = t phase += dt * bpm / 60. beat = False try: if os.access(ctlpath('.dmxbeat'), os.R_OK): os.unlink(ctlpath('.dmxbeat')) freerun = False nobeat = False beatsep += 1 dbeat = t - lastbeat if 0.20 < dbeat < 0.90: nowbpm = 60. / dbeat bpmhist.insert(0, nowbpm) bpmhist = bpmhist[:10] bpmavg = sum(bpmhist) / len(bpmhist) print 'bpm: %6.1f ' % (bpmavg), ', '.join(['%6.1f' % i for i in bpmhist]) bpm = bpmavg lastbeat = t lastphase = phase beat = True phasephase = phase - int(phase) if phasephase > 0.5: phasephase -= 1.0 print 'phph', phasephase phase -= phasephase * 0.05 except Exception, e: traceback.print_exc() try: if os.access(ctlpath('.dmxfreebeat'), os.R_OK): os.unlink(ctlpath('.dmxfreebeat')) freerun = True except Exception, e: traceback.print_exc() try: if os.access(ctlpath('.dmxnobeat'), os.R_OK): os.unlink(ctlpath('.dmxnobeat')) nobeat = True freerun = False except Exception, e: traceback.print_exc() if t - lastbeat > (60. / bpm) * 1.8 and not nobeat: pass #if not freerun: # print 'freerun' #freerun = True if freerun: if phase - lastphase >= 1.0: lastphase = phase beatsep += 1 beat = True print 'freebeat', bpm try: execfile('dmxcontrol.py') except Exception, e: traceback.print_exc() for e in effects: e.update(phase) universe.send(wrapper) class DummyVerse(object): def __init__(self): pass def Client(self): return self def SendDmx(self, uniid, data, cb): print uniid, data def Run(self): while True: sleep(1./30.) self.func() def AddEvent(self, ignore, func): self.func = func #wrapper = DummyVerse() wrapper = ClientWrapper() wrapper.AddEvent(TICK_INTERVAL, SendDMXFrame) try: wrapper.Run() except KeyboardInterrupt: file(ctlpath('.dmxeffects'), 'w').write(str(phase))