From 76c0532b5638eda76e71a748832290adb985f21a Mon Sep 17 00:00:00 2001 From: Christian Franke Date: Sun, 17 Mar 2013 12:16:07 +0100 Subject: Add some queue management to osc2light gateway --- clock.py | 23 +++++++++++++ osc2light.py | 110 +++++++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 111 insertions(+), 22 deletions(-) create mode 100644 clock.py diff --git a/clock.py b/clock.py new file mode 100644 index 0000000..3e97dbf --- /dev/null +++ b/clock.py @@ -0,0 +1,23 @@ +# Courtesy of Armin Ronacher, found at StackOverflow + +import ctypes +import os + +CLOCK_MONOTONIC = 1 # see + +class timespec(ctypes.Structure): + _fields_ = [ + ('tv_sec', ctypes.c_long), + ('tv_nsec', ctypes.c_long) + ] + +librt = ctypes.CDLL('librt.so.1', use_errno=True) +clock_gettime = librt.clock_gettime +clock_gettime.argtypes = [ctypes.c_int, ctypes.POINTER(timespec)] + +def now(): + t = timespec() + if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0: + errno_ = ctypes.get_errno() + raise OSError(errno_, os.strerror(errno_)) + return t.tv_sec + t.tv_nsec * 1e-9 diff --git a/osc2light.py b/osc2light.py index 4a6d20e..e046600 100644 --- a/osc2light.py +++ b/osc2light.py @@ -1,13 +1,64 @@ #!/usr/bin/env python -# This is a very crude script which receives osc commands and passes those -# on to the ethernet/can gateway so it controls the light - import hashlib import liblo import socket import struct import sys +import clock +import time +import threading + +class TXOp(object): + def __init__(self, cb, *args, **kwargs): + self.cb = cb + self.args = args + self.kwargs = kwargs + + def perform(self): + self.cb(*self.args, **self.kwargs) + +class TXQueue(threading.Thread): + def __init__(self, *args, **kwargs): + # Queue params + self.wait_iv = 0.10 # 0.07 seems to work, but just play it save + self.sleep_iv = 0.025 + self.max_wait = 2.0 # This caps the queue len + + self.last_sent = 0 + self.queue = [] + self.queue_sync = threading.Condition() + threading.Thread.__init__(self, *args, **kwargs) + + def run(self): + while True: + try: + self.fetch_and_send() + except Exception: + print >>sys.stderr, "TXQueue Exception:" + sys.excepthook(*sys.exc_info()) + + def fetch_and_send(self): + with self.queue_sync: + while not len(self.queue): + self.queue_sync.wait() + item = self.queue.pop(0) + + # We can't send too often, this will cause messages to be lost + while self.last_sent + self.wait_iv > clock.now(): + time.sleep(self.sleep_iv) + item.perform() + self.last_sent = clock.now() + + def append(self, txop): + assert isinstance(txop, TXOp) + with self.queue_sync: + if len(self.queue) > (self.max_wait / self.wait_iv): + print >>sys.stderr, "Queue len == %d. Drop." % len(self.queue) + return # This limits queue size + self.queue.append(txop) + self.queue_sync.notify() + class LightServer(object): class Unknown(Exception): @@ -19,8 +70,10 @@ class LightServer(object): self.raw_socket = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(0x88b7)) self.osc_server = liblo.Server(port) self.osc_server.add_method(None, None, self.handle_message) + self.txqueue = TXQueue() def run(self): + self.txqueue.start() while True: self.osc_server.recv() @@ -29,11 +82,23 @@ class LightServer(object): prefix = '/dali' if path.startswith(prefix): self.handle_dali(path[len(prefix):], args, types, src) - else: - raise self.Unknown() + return + prefix = '/1' + if path.startswith(prefix): + self.handle_touch(path[len(prefix):], args, types, src) + return + raise self.Unknown() except Exception, e: + sys.excepthook(*sys.exc_info()) print >>sys.stderr, "Error processing message '%s' from '%s': %s" % (path, src.get_url(), e) + def handle_touch(self, path, args, types, src): + prefix = '/fader' + if not path.startswith(prefix): + raise self.Unknown() + lamp_number = int(path[len(prefix):]) + self.handle_lamp_bright(lamp_number, args, types, src) + def handle_dali(self, path, args, types, src): prefix = '/lamps' if path.startswith(prefix): @@ -74,10 +139,10 @@ class LightServer(object): if brightness >= 255: brightness = 254 - buf = chr(brightness) + buf = chr(brightness) addr = 0xcc080440 + lamp - buf = struct.pack('>IB', addr, len(buf)) + buf + '\x00' * (3 + 8 - len(buf)) + buf = struct.pack('>IB', addr, len(buf)) + buf + '\x00' * (3 + 8 - len(buf)) src = '\x00\x04\x23\xb6\xde\xe4' dst = '\xff\x3a\xf6CAN' @@ -86,21 +151,22 @@ class LightServer(object): subp = 0xaaaa typ = 3 - buf = struct.pack('6s6sH3sH', dst, src, proto, oui, subp) + buf - self.raw_socket.sendto(buf, (self.interface, 0)) + buf = struct.pack('6s6sH3sH', dst, src, proto, oui, subp) + buf + + self.txqueue.append(TXOp(self.raw_socket.sendto, buf, (self.interface, 0))) if __name__ == '__main__': server = LightServer(4243, 'bond0.4') -- cgit v1.2.1