0,0 → 1,362 |
#! /usr/bin/env python |
|
# |
# Mikrokopter Serial protocol |
# |
# Author: FredericG |
# |
|
import os |
import glob |
import serial |
import time |
import traceback |
|
class MkException(Exception): |
pass |
|
class InvalidMsg(MkException): |
def __str__(self): |
return "Invalid message" |
|
class CrcError(MkException): |
def __str__(self): |
return "CRC error" |
|
class InvalidArguments(MkException): |
def __str__(self): |
return "Invalid Arguments" |
|
class InvalidMsgType(MkException): |
def __str__(self): |
return "Invalid Message type" |
|
class InvalidMsgLen(MkException): |
def __str__(self): |
return "Invalid Message Length" |
|
class NoResponse(MkException): |
def __init__(self, cmd): |
self.cmd = cmd |
|
def __str__(self): |
return "No Reponse. Waiting for \"%s\" message" % self.cmd |
|
pass |
|
def calcCrcBytes(str): |
crc = 0 |
for c in str: |
crc += ord(c) |
crc &= 0xfff |
return (chr(crc/64+ord('=')), chr(crc%64+ord('='))) |
|
|
class MkMsg: |
def __init__(self, msg=None, address=None, cmd=None, data=None): |
if (msg != None): |
# Create instance based on received message |
self.parseUartLine(msg) |
elif (address != None and cmd != None and data != None): |
# Create instance based on address, command and data |
self.address = address |
self.cmd = cmd |
self.data = data |
else: |
# Cannot create instance |
raise InvalidArguments |
|
def generateUartLine(self): |
msg = "" |
|
# make header |
msg += '#' |
msg += chr(self.address+ord('a')) |
msg += self.cmd |
|
# add data |
done = False |
i = 0 |
while (i<len(self.data)) and not done: |
a = 0 |
b = 0 |
c = 0 |
try: |
a = self.data[i] |
b = self.data[i+1] |
c = self.data[i+2] |
i = i + 3 |
except IndexError: |
done = True |
msg += chr(ord('=') + (a >> 2)) |
msg += chr(ord('=') + (((a & 0x03) << 4) | ((b & 0xf0) >> 4))) |
msg += chr(ord('=') + (((b & 0x0f) << 2) | ((c & 0xc0) >> 6))) |
msg += chr(ord('=') + ( c & 0x3f)) |
|
# add crc and NL |
crc1,crc2 = calcCrcBytes(msg) |
msg += crc1 + crc2 |
msg += '\r' |
return msg |
|
|
def parseUartLine(self, msg): |
if len(msg)<6: |
raise InvalidMsg() |
if (msg[0] != '#'): |
raise InvalidMsg() |
if (msg[-1] != '\r'): |
raise InvalidMsg() |
|
self.address = ord(msg[1]) |
self.cmd = msg[2] |
|
data64 = map(ord, msg[3:-3]) # last 3 bytes are CRC and \n |
|
done = False |
i = 0 |
self.data = [] |
while (i<len(data64)) and not done: |
a = 0 |
b = 0 |
c = 0 |
d = 0 |
try: |
a = data64[i] - ord('=') |
b = data64[i+1] - ord('=') |
c = data64[i+2] - ord('=') |
d = data64[i+3] - ord('=') |
i = i + 4 |
except IndexError: |
done = True |
|
self.data.append((a << 2)&0xFF | (b >> 4)) |
self.data.append(((b & 0x0f) << 4)&0xFF | (c >> 2)); |
self.data.append(((c & 0x03) << 6)&0xFF | d); |
|
crc1,crc2 = calcCrcBytes(msg[:-3]) |
if (crc1 != msg[-3] or crc2 != msg[-2]): |
#print msg |
raise CrcError |
|
#print "crc= %x %x %x %x" % ( crc1, crc2, (ord(msg[-3])-ord('=')), (ord(msg[-2])-ord('='))) |
|
|
def data2SignedInt(self, index): |
int = self.data[index]+self.data[index+1]*256 |
if (int > 0xFFFF/2): |
int -= 0xFFFF |
return int |
|
class SettingsMsg: |
DATENREVISION = 80 |
IDX_INDEX = 0 |
IDX_STICK_P = 1 + 19 |
IDX_STICK_D = 1 + 20 |
IDX_NAME = 1 + 90 |
|
def __init__(self, msg): |
if (msg.cmd != 'Q'): |
raise InvalidMsgType |
if len(msg.data) != 105: |
raise InvalidMsgLen |
self.msg = msg |
|
def getSettings(self): |
return self.msg.data |
|
def getIndex(self): |
return self.getSetting(SettingsMsg.IDX_INDEX) |
|
def getSetting(self, settingIndex): |
return self.msg.data[settingIndex] |
|
def getName(self): |
name = "" |
for i in self.msg.data[SettingsMsg.IDX_NAME:]: |
if i==0: |
break |
name += chr(i) |
return name |
|
def setSetting(self, settingIndex, value): |
self.msg.data[settingIndex] = value |
|
|
class DebugDataMsg: |
IDX_ANALOG_ACCNICK = 2+2*2 |
IDX_ANALOG_ACCROLL = 2+2*3 |
IDX_ANALOG_COMPASS = 2+2*3 |
IDX_ANALOG_VOLTAGE = 2+2*9 |
|
def __init__(self, msg): |
if (msg.cmd != 'D'): |
raise InvalidMsgType |
self.msg = msg |
|
def getAccNick(self): |
return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_ACCNICK) |
|
def getAccRoll(self): |
return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_ACCROLL) |
|
def getCompassHeading(self): |
return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_COMPASS) |
|
def getVoltage(self): |
return float(self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_VOLTAGE))/10 |
|
class VibrationDataMsg: |
def __init__(self, msg): |
if (msg.cmd != 'F'): |
raise InvalidMsgType |
self.msg = msg |
|
def getData(self): |
data = [] |
for i in range(0,50): |
data.append(self.msg.data2SignedInt(2*i)) |
return data |
|
class VersionMsg: |
def __init__(self, msg): |
if (msg.cmd != 'V'): |
raise InvalidMsgType |
self.msg = msg |
|
def getVersion(self): |
return (self.msg.data[0], self.msg.data[1]) |
|
class MkComm: |
ADDRESS_ALL = 0 |
ADDRESS_FC = 1 |
ADDRESS_NC = 2 |
ADDRESS_MK3MAG = 3 |
|
def __init__(self, printDebugMsg=False): |
#self.logfile = open('mklog.txt', "rbU") |
|
self.serPort = None |
self.printDebugMsg = printDebugMsg |
|
msg = MkMsg(address=0, cmd='v', data=[]) |
self.getVersionMsgLn = msg.generateUartLine() |
msg = MkMsg(address=0, cmd='d', data=[500]) |
self.getDebugMsgLn = msg.generateUartLine() |
|
|
def open(self, comPort): |
self.serPort = serial.Serial(comPort, 57600, timeout=0.5) |
if not self.serPort.isOpen(): |
raise IOError("Failed to open serial port") |
|
def close(self): |
self.serPort.close() |
|
def isOpen(self): |
return self.serPort != None |
|
def sendLn(self, ln): |
self.serPort.write(ln) |
|
def waitForLn(self): |
return self.serPort.readline(eol='\r') |
|
def waitForMsg(self, cmd2wait4): |
msg = None |
done = False |
while (not done): |
line = self.waitForLn() |
if len(line) == 0: |
raise NoResponse(cmd2wait4) |
try: |
msg = MkMsg(msg=line) |
if (msg.cmd == cmd2wait4): |
done = True |
except InvalidMsg: |
if self.printDebugMsg: |
print "DebugMsg: \"%s\"" % line[:-1] |
pass |
return msg |
|
def sendMsg(self, msg): |
self.sendLn(msg.generateUartLine()) |
|
def sendNCRedirectUartFromFC(self): |
self.serPort.flushInput() |
msg = MkMsg(address=MkComm.ADDRESS_NC, cmd='u', data=[0]) |
self.sendMsg(msg) |
time.sleep(.5) |
# No reply expected... |
|
def sendSettings(self, settings): |
msg = MkMsg(address=MkComm.ADDRESS_FC, cmd='s', data=settings) |
self.sendMsg(msg) |
time.sleep(1) |
msg = self.waitForMsg('S') |
|
def getDebugMsg(self): |
self.serPort.flushInput() |
self.sendLn(self.getDebugMsgLn) |
msg = self.waitForMsg('D') |
msg = DebugDataMsg(msg) |
return msg |
|
def getSettingsMsg(self, index=0xFF): |
self.serPort.flushInput() |
msg = MkMsg(address=MkComm.ADDRESS_FC, cmd='q', data=[index]) |
self.sendMsg(msg) |
msg = self.waitForMsg('Q') |
msg = SettingsMsg(msg) |
return msg |
|
def getVersionMsg(self): |
self.sendLn(self.getVersionMsgLn) |
msg = self.waitForMsg('V') |
msg = VersionMsg(msg) |
return msg |
|
def setMotorTest(self, motorSpeeds): |
msg = MkMsg(address=MkComm.ADDRESS_FC, cmd='t', data=motorSpeeds) |
self.sendMsg(msg) |
|
def doVibrationTest(self, nbSamples, channel): |
data = [] |
for i in range(0,(min(nbSamples,1000)/50)): |
msg = MkMsg(address=MkComm.ADDRESS_FC, cmd='f', data=[channel, i]) |
self.sendMsg(msg) |
msg = self.waitForMsg('F') |
msg = VibrationDataMsg(msg) |
data += msg.getData() |
|
# FIXE: should be fixed in the FC code |
data[0]=data[1] |
return data |
|
|
|
|
if __name__ == '__main__': |
try: |
comm = MkComm() |
comm.open(comPort="COM5") |
|
msg = comm.getVersionMsg() |
print "Version: %d.%d" % msg.getVersion() |
|
msg = comm.getSettingsMsg() |
print "Index=",msg.getIndex() |
print "Name=",msg.getName() |
print "StickP=",msg.getSetting(SettingsMsg.IDX_STICK_P) |
print "StickD=",msg.getSetting(SettingsMsg.IDX_STICK_D) |
|
msg.setSetting(SettingsMsg.IDX_STICK_P, msg.getSetting(SettingsMsg.IDX_STICK_P)+1) |
comm.sendSettings(msg.getSettings()) |
|
|
except Exception,e: |
print |
print "An error occured: ", e |
print |
traceback.print_exc() |
raw_input("Press ENTER, the application will close") |