Subversion Repositories Projects

Rev

Rev 716 | Blame | Compare with Previous | Last modification | View Log | RSS feed

#! /usr/bin/env python

#
# Mikrokopter Serial protocol
#
# Author: FredericG
#

import os
import glob
import serial
import time
import traceback

class MkException(Exception):
    pass

class InvalidMsg(MkException):  
    def __str__(self):
      return "Invalid message"

class CrcError(MkException):
    def __str__(self):
      return "CRC error"

class InvalidArguments(MkException):
    def __str__(self):
      return "Invalid Arguments"

class InvalidMsgType(MkException):
    def __str__(self):
      return "Invalid Message type"
 
class InvalidMsgLen(MkException):
    def __str__(self):
      return "Invalid Message Length"

class NoResponse(MkException):
    def __init__(self, cmd):
      self.cmd = cmd
     
    def __str__(self):
      return "No Reponse. Waiting for \"%s\" message" % self.cmd
     
    pass

def calcCrcBytes(str):
    crc = 0
    for c in str:
        crc += ord(c)
    crc &= 0xfff
    return (chr(crc/64+ord('=')), chr(crc%64+ord('=')))


class MkMsg:
    def __init__(self, msg=None, address=None, cmd=None, data=None):
        if (msg != None):
            # Create instance based on received message
            self.parseUartLine(msg)
        elif (address != None and cmd != None and data != None):
            # Create instance based on address, command and data
            self.address = address
            self.cmd = cmd
            self.data = data
        else:
            # Cannot create instance
            raise InvalidArguments

    def generateUartLine(self):
        msg = ""

        # make header
        msg += '#'
        msg += chr(self.address+ord('a'))
        msg += self.cmd

        # add data
        done = False
        i = 0
        while (i<len(self.data)) and not done:
            a = 0
            b = 0
            c = 0
            try:
                a = self.data[i]
                b = self.data[i+1]
                c = self.data[i+2]
                i = i + 3
            except IndexError:
                done = True
            msg += chr(ord('=') + (a >> 2))
            msg += chr(ord('=') + (((a & 0x03) << 4) | ((b & 0xf0) >> 4)))
            msg += chr(ord('=') + (((b & 0x0f) << 2) | ((c & 0xc0) >> 6)))
            msg += chr(ord('=') + ( c & 0x3f))

        # add crc and  NL
        crc1,crc2 = calcCrcBytes(msg)
        msg += crc1 + crc2
        msg += '\r'
        return msg


    def parseUartLine(self, msg):
        if len(msg)<6:
            raise InvalidMsg()
        if (msg[0] != '#'):
            raise InvalidMsg()
        if (msg[-1] != '\r'):
            raise InvalidMsg()

        self.address = ord(msg[1])
        self.cmd = msg[2]

        data64 = map(ord, msg[3:-3])    # last 3 bytes are CRC and \n

        done = False
        i = 0
        self.data = []
        while (i<len(data64)) and not done:
            a = 0
            b = 0
            c = 0
            d = 0
            try:
                a = data64[i] - ord('=')
                b = data64[i+1] - ord('=')
                c = data64[i+2] - ord('=')
                d = data64[i+3] - ord('=')
                i = i + 4
            except IndexError:
                done = True

            self.data.append((a << 2)&0xFF | (b >> 4))
            self.data.append(((b & 0x0f) << 4)&0xFF | (c >> 2));
            self.data.append(((c & 0x03) << 6)&0xFF | d);

        crc1,crc2 = calcCrcBytes(msg[:-3])
        if (crc1 != msg[-3] or crc2 != msg[-2]):
            #print msg
            raise CrcError
       
        #print "crc= %x %x %x %x" % ( crc1, crc2, (ord(msg[-3])-ord('=')), (ord(msg[-2])-ord('=')))


    def data2SignedInt(self, index):
        int = self.data[index]+self.data[index+1]*256
        if (int > 0xFFFF/2):
            int -= 0xFFFF
        return int

class SettingsMsg:
    DATENREVISION   = 80
    IDX_INDEX       = 0
    IDX_STICK_P     = 1 + 19
    IDX_STICK_D     = 1 + 20
    IDX_GYRO_P      = 1 + 26
    IDX_GYRO_I      = 1 + 27
    IDX_GYRO_D      = 1 + 28
    IDX_I_FACTOR    = 1 + 35
    IDX_NAME        = 1 + 90
   
    def __init__(self, msg):
        if (msg.cmd != 'Q'):
            raise InvalidMsgType
        if len(msg.data) != 105:
            raise InvalidMsgLen
        self.msg = msg
       
    def getSettings(self):
        return self.msg.data
       
    def getIndex(self):
        return self.getSetting(SettingsMsg.IDX_INDEX)
   
    def getSetting(self, settingIndex):
        return self.msg.data[settingIndex]
   
    def getName(self):
        name = ""
        for i in self.msg.data[SettingsMsg.IDX_NAME:]:
            if i==0:
                break
            name += chr(i)
        return name
   
    def setSetting(self, settingIndex, value):
        self.msg.data[settingIndex] = value


class DebugDataMsg:
    IDX_ANALOG_ANGLENICK=   2+2*0
    IDX_ANALOG_ANGLEROLL=   2+2*1
    IDX_ANALOG_ACCNICK  =   2+2*2
    IDX_ANALOG_ACCROLL  =   2+2*3
    IDX_ANALOG_COMPASS  =   2+2*3
    IDX_ANALOG_VOLTAGE  =   2+2*9

    def __init__(self, msg):
        if (msg.cmd != 'D'):
            raise InvalidMsgType
        self.msg = msg

    def getAngleNick(self):
        return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_ANGLENICK)

    def getAngleRoll(self):
        return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_ANGLEROLL)

    def getAccNick(self):
        return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_ACCNICK)

    def getAccRoll(self):
        return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_ACCROLL)

    def getCompassHeading(self):
        return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_COMPASS)

    def getVoltage(self):
        return float(self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_VOLTAGE))/10

class VibrationDataMsg:
     def __init__(self, msg):
        if (msg.cmd != 'F'):
            raise InvalidMsgType
        self.msg = msg
       
     def getData(self):
        data = []
        for i in range(0,50):
          data.append(self.msg.data2SignedInt(2*i))
        return data

class VersionMsg:
    def __init__(self, msg):
        if (msg.cmd != 'V'):
            raise InvalidMsgType
        self.msg = msg

    def getVersion(self):
        return (self.msg.data[0], self.msg.data[1])

class MkComm:
    ADDRESS_ALL    = 0
    ADDRESS_FC     = 1
    ADDRESS_NC     = 2
    ADDRESS_MK3MAG = 3
   
    def __init__(self, printDebugMsg=False):
        #self.logfile = open('mklog.txt', "rbU")

        self.serPort = None
        self.printDebugMsg = printDebugMsg

        msg = MkMsg(address=0, cmd='v', data=[])
        self.getVersionMsgLn = msg.generateUartLine()
        msg = MkMsg(address=0, cmd='d', data=[500])
        self.getDebugMsgLn = msg.generateUartLine()
       

    def open(self, comPort):
        self.serPort = serial.Serial(comPort, 57600, timeout=0.5)
        if not self.serPort.isOpen():
            raise IOError("Failed to open serial port")

    def close(self):
        self.serPort.close()
       
    def isOpen(self):
        return self.serPort != None

    def sendLn(self, ln):
        self.serPort.write(ln)

    def waitForLn(self):
        return self.serPort.readline(eol='\r')

    def waitForMsg(self, cmd2wait4, timeout=0.5):
        msg = None
        done = False
        if self.printDebugMsg: print "[Debug] =>Wait4 %s TO=%.1fs" % (cmd2wait4, timeout)
        startTime = time.clock()
        while (not done):
            line = self.waitForLn()
            try:
                msg = MkMsg(msg=line)
                if self.printDebugMsg: print "[Debug]    msg %s" % msg.cmd
                if (msg.cmd == cmd2wait4):
                    done = True
            except InvalidMsg:
                if self.printDebugMsg: print "[Debug]    no valid msg"
               
            if ((time.clock()-startTime) > timeout):
                raise NoResponse(cmd2wait4)
        if self.printDebugMsg: print "[Debug]    Done: %.1fs" % (time.clock()-startTime)
        return msg
   
    def sendMsg(self, msg):
        self.sendLn(msg.generateUartLine())

    def sendNCRedirectUartFromFC(self):
        self.serPort.flushInput()
        msg = MkMsg(address=MkComm.ADDRESS_NC, cmd='u', data=[0])
        self.sendMsg(msg)
        time.sleep(.5)
        # No reply expected...  
       
    def sendSettings(self, settings):
        msg = MkMsg(address=MkComm.ADDRESS_FC, cmd='s', data=settings)
        self.sendMsg(msg)
        #time.sleep(1)
        msg = self.waitForMsg('S', timeout=2)
       
    def getDebugMsg(self):
        self.serPort.flushInput()
        self.sendLn(self.getDebugMsgLn)
        msg = self.waitForMsg('D')
        msg = DebugDataMsg(msg)
        return msg
   
    def getSettingsMsg(self, index=0xFF):
        self.serPort.flushInput()
        msg = MkMsg(address=MkComm.ADDRESS_FC, cmd='q', data=[index])
        self.sendMsg(msg)
        msg = self.waitForMsg('Q')
        msg = SettingsMsg(msg)
        return msg

    def getVersionMsg(self):
        self.sendLn(self.getVersionMsgLn)
        msg = self.waitForMsg('V')
        msg = VersionMsg(msg)
        return msg

    def setMotorTest(self, motorSpeeds):
        msg = MkMsg(address=MkComm.ADDRESS_FC, cmd='t', data=motorSpeeds)
        self.sendMsg(msg)
       
    def doVibrationTest(self, nbSamples, channel):
        data = []
        for i in range(0,(min(nbSamples,1000)/50)):
          msg = MkMsg(address=MkComm.ADDRESS_FC, cmd='f', data=[channel, i])
          self.sendMsg(msg)
          msg = self.waitForMsg('F')
          msg = VibrationDataMsg(msg)
          data += msg.getData()
           
        # FIXE: should be fixed in the FC code
        data[0]=data[1]
        return data
   
    def recordDbgMsg(self, samplePeriod, nbSamples):
        result = []
        self.serPort.flushInput()
        msg = MkMsg(address=0, cmd='d', data=[int(samplePeriod*100)])
        self.sendMsg(msg)
        start = time.clock()
        for i in range(nbSamples):
            msg = self.waitForMsg('D', timeout=samplePeriod+1)
            msg = DebugDataMsg(msg)
            result.append(msg)
        print "Time = ", (time.clock()-start)
        self.getDebugMsg();
        return result


import copy

if __name__ == '__main__':
    try:
        comm = MkComm()
        comm.printDebugMsg = True
        comm.open(comPort="COM4")
       
        msg = comm.getVersionMsg()
        print "Version: %d.%d" % msg.getVersion()
       
        msg = MkMsg(address=2, cmd='f', data=[7, 50])
        comm.sendMsg(msg)
       
       
#        while True:
#            msg = comm.getSettingsMsg()
#            print "Index=",msg.getIndex()
#            print "Name=",msg.getName()
#            print
#            print "StickP =",msg.getSetting(SettingsMsg.IDX_STICK_P)
#            print "StickD =",msg.getSetting(SettingsMsg.IDX_STICK_D)
#            print
#            print "GyroP =",msg.getSetting(SettingsMsg.IDX_GYRO_P)
#            print "GyroI =",msg.getSetting(SettingsMsg.IDX_GYRO_I)
#            print "GyroD =",msg.getSetting(SettingsMsg.IDX_GYRO_D)
#            print "Gyro I Factor =",msg.getSetting(SettingsMsg.IDX_I_FACTOR)
#            
#            testSettings = copy.deepcopy(msg)
#            testSettings.setSetting(SettingsMsg.IDX_STICK_D, 0)
#            
#            testSettings.setSetting(SettingsMsg.IDX_GYRO_P, 0)
#            testSettings.setSetting(SettingsMsg.IDX_GYRO_I, 50)
#    
#            print "Test Settings..."
#            comm.sendSettings(testSettings.getSettings())
#            raw_input("Press ENTER to end test")
#            
#            print "Normal Settings..."
#            comm.sendSettings(msg.getSettings())
       
       
#        msg.setSetting(SettingsMsg.IDX_STICK_P, msg.getSetting(SettingsMsg.IDX_STICK_P)+1)
#        comm.sendSettings(msg.getSettings())
       
#        messages = comm.recordDbgMsg(0.05, 20)
#        for msg in messages:
#            print msg.getAngleRoll()
       
       
       
        comm.close()

    except Exception,e:
        print
        print "An error occured: ", e
        print
        traceback.print_exc()
#        raw_input("Press ENTER, the application will close")