#!/usr/bin/python ''' Serialwatch.py: receive events from an X10 "MP3 Remote Control" serial device Written by Andrew Plotkin This Python script is released to the public domain. The X10 company sells an MP3 player package which includes a Windows-based MP3-playing program, a universal remote control, and a serial port dongle that receives commands from the remote. Conveniently, the remote and the dongle communicate via RF, not infrared. And since the dongle just pumps binary data into the port (9600 baud, 8N1), it's easy to make use of the thing under Linux. (I'm told the X10 motion-detector sends RF commands in the same way. So this script should work for that as well.) This script squats on the serial port, listening for remote commands. It translates them into commands on a TCP/IP connection. serialwatch.py [ --device /dev/ttyXXX ] [ --hostname host ] [ --port portnum ] The default device is /dev/ttyS0; the default host is localhost; the default port, for no very good reason, is 31863. (You can also specify an absolute pathname for the port, in which case the script writes to a Unix domain socket instead of a network socket.) Each remote button-press is translated into one line of text sent across the TCP connection: "remote BUTTON", followed by a Unix newline. (See the mapping dictionary for a list of possible values for BUTTON.) If you hold down a button, events will repeat. (The four switchable on/off buttons at the bottom of the remote behave a little differently. If you hold one of them down, the second and following events will be "devonrep" or "devoffrep", instead of repeating the initial event. Furthermore, if you hold one of these buttons in the "on" position, and then change to "off", some of the repeated events will be of the wrong type. I'm not sure whether this is deliberate or a hardware bug.) A couple of the remote buttons seem to send the same codes, so this script cannot distinguish them. "Enter" and "ok" are conflated; as are "skip back" and "channel up", "skip ahead" and "channel down", "subtitle" and "pc". Also note that if you push one of the mode buttons ("stereo", etc), the remote will switch to IR mode, and you'll have to push "pc" to switch back to the serial-port dongle. (Since this sends a "subtitle" event, it is unwise to use this event for anything important.) This script attempts to be clever about dropped connections. If the connection breaks, or the initial connection could not be made, the script will continue to run. It will try to re-open the connection every time a new button-press is received. If it fails, the button will be printed to stdout. (So you can use this script in a debugging mode by giving an unused port number.) If you want to do something else with button presses, you can either write a server to receive TCP events, or else hack this script. Either is easy. ''' import sys import getopt import traceback import os import socket import string import termios if (sys.version[0] == '1' or sys.version[0:3] == '2.0'): import TERMIOS term = TERMIOS else: term = termios mapping = { '\356\002' : '0', '\356\202' : '1', '\356\102' : '2', '\356\302' : '3', '\356\042' : '4', '\356\242' : '5', '\356\142' : '6', '\356\342' : '7', '\356\022' : '8', '\356\222' : '9', '\356\272' : 'ab', '\356\122' : 'enter', # also ok '\356\100' : 'chanup', # also skipback '\356\300' : 'chandown', # also skipahead '\356\140' : 'volup', '\356\340' : 'voldown', '\356\240' : 'mute', '\356\260' : 'play', '\356\160' : 'stop', '\356\270' : 'ff', '\356\070' : 'rewind', '\356\162' : 'pause', '\356\377' : 'record', '\356\321' : 'left', '\356\323' : 'down', '\356\322' : 'right', '\356\325' : 'up', '\356\330' : 'return', '\356\072' : 'display', '\356\326' : 'title', '\356\324' : 'subtitle', # also pc '\356\266' : 'menu', '\356\311' : 'exit', '\356\360' : 'power', '\356\362' : 'recall', '\140\000' : 'dev1on', '\140\040' : 'dev1off', '\140\020' : 'dev2on', '\140\060' : 'dev2off', '\140\010' : 'dev3on', '\140\050' : 'dev3off', '\140\030' : 'dev4on', '\140\070' : 'dev4off', '\140\100' : 'dev5on', '\140\140' : 'dev5off', '\140\120' : 'dev6on', '\140\160' : 'dev6off', '\140\110' : 'dev7on', '\140\150' : 'dev7off', '\140\130' : 'dev8on', '\140\170' : 'dev8off', '\144\000' : 'dev9on', '\144\040' : 'dev9off', '\144\020' : 'dev10on', '\144\060' : 'dev10off', '\144\010' : 'dev11on', '\144\050' : 'dev11off', '\144\030' : 'dev12on', '\144\070' : 'dev12off', '\144\100' : 'dev13on', '\144\140' : 'dev13off', '\144\120' : 'dev14on', '\144\160' : 'dev14off', '\144\110' : 'dev15on', '\144\150' : 'dev15off', '\144\130' : 'dev16on', '\144\170' : 'dev16off', '\140\210' : 'devonrep', '\140\230' : 'devoffrep', '\140\220' : 'lightson', '\140\200' : 'alloff', } deviceevents = { 'dev1on' : 1, 'dev1off' : 1, 'dev2on' : 1, 'dev2off' : 1, 'dev3on' : 1, 'dev3off' : 1, 'dev4on' : 1, 'dev4off' : 1, 'dev5on' : 1, 'dev5off' : 1, 'dev6on' : 1, 'dev6off' : 1, 'dev7on' : 1, 'dev7off' : 1, 'dev8on' : 1, 'dev8off' : 1, 'dev9on' : 1, 'dev9off' : 1, 'dev10on' : 1, 'dev10off' : 1, 'dev11on' : 1, 'dev11off' : 1, 'dev12on' : 1, 'dev12off' : 1, 'dev13on' : 1, 'dev13off' : 1, 'dev14on' : 1, 'dev14off' : 1, 'dev15on' : 1, 'dev15off' : 1, 'dev16on' : 1, 'dev16off' : 1, } buffer = '' lastev = None lastcount = 0 def process(dat, handler): global buffer global lastev global lastcount buffer = buffer + dat while (1): pos = string.find(buffer, '\325') if (pos < 0): return end = string.find(buffer, '\255', pos) if (end < 0): return ev = buffer[pos:end+1] buffer = buffer[end+1:] #print 'debug: ', reduce((lambda ss, ch: ss+' '+oct(ord(ch))), tuple(ev), '') ev = ev[2:4] res = mapping.get(ev) if (res == None): return if (deviceevents.get(res) == None): limit = 2 else: limit = 5 if (res == lastev): lastcount = lastcount+1 if (limit-1 == lastcount): lastcount = 0 lastev = None return else: lastev = res lastcount = 0 handler(res) def watch(device, handler): file = os.open(device, (os.O_RDWR | os.O_NOCTTY)) oldset = termios.tcgetattr(file) newcc = ['\000'] * 32 newset = [0, 0, 0, 0, 0, 0, newcc] newset[4] = term.B9600 newset[5] = term.B9600 newset[2] = (term.B9600 | term.CRTSCTS | term.CS8 | term.CLOCAL | term.CREAD) newset[0] = term.IGNPAR newcc[term.VTIME] = 0 newcc[term.VMIN] = 1 termios.tcflush(file, term.TCIFLUSH) termios.tcsetattr(file, term.TCSANOW, newset) try: print 'watching serial port ' + devname + '...' while 1: val = os.read(file, 1) process(val, handler) except KeyboardInterrupt: print 'interrupted, closing down' except Exception, ex: print 'exception, closing down' (extyp, exval, extrace) = sys.exc_info() traceback.print_exception(extyp, exval, extrace) extyp = None exval = None extrace = None termios.tcsetattr(file, term.TCSANOW, oldset) os.close(file) class StableConnection: def __init__(self, port, host=None): if (host == None): host = 'localhost' self.sock = None self.host = host self.port = port def connect(self): if (self.sock != None): return if (type(self.port) == type(1)): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sockaddr = (self.host, self.port) else: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sockaddr = self.port try: sock.connect(sockaddr) self.sock = sock except socket.error: sock.close() def send(self, line): if (self.sock == None): self.connect() if (self.sock == None): print 'unable to send "' + line + '" to ' + self.host + ':' + str(self.port) return try: self.sock.send('remote '+line+'\n') except socket.error: self.close() self.send(line) def close(self): if (self.sock != None): self.sock.close() self.sock = None devname = '/dev/ttyS0' hostname = None port = 31863 try: (opts, args) = getopt.getopt(sys.argv[1:], 'd:h:p:', ['device=', 'hostname=', 'port=']) except getopt.error, ex: print (sys.argv[0] + ':'), str(ex) sys.exit() for (opname, opval) in opts: if (opname == '--device' or opname == '-d'): devname = opval if (opname == '--hostname' or opname == '-h'): hostname = opval if (opname == '--port' or opname == '-p'): if (opval[0] == '/'): port = opval else: port = int(opval) connection = StableConnection(port, hostname) watch(devname, connection.send)