Subversion Repositories Projects

Compare Revisions

Ignore whitespace Rev 551 → Rev 552

/VibrationTest/trunk/VibrationTest/VibrationTest.py
0,0 → 1,202
#! /usr/bin/env python
 
#
# Mikrokopter Vibration Test
#
# Author: FredericG
#
 
 
import mkProto
import time
import traceback
import getopt
import sys
 
 
 
CHANNEL_NAMES = ["GyroYaw", "GyroRoll", "GyroNick", "Pressure", "Batt", "AccTop", "AccRoll", "AccNick"]
 
def usage():
print
print "VibrationTest.py COMPORT MOTORS SPEEDS CHANNELS [-m MINSPEED] [-s NBSAMPLES] [-n NAME] [-d FILENAME] [-v]"
print " COMPORT Serial port to use. e.g. COM4"
print " MOTORS Motors to activate during test. Multiple motors can be used at the same time. e.g. 1,2,3,4"
print " SPEEDS Indicates at what speeds the motors need to be tested. e.g. 50,80,110,140,170"
print " CHANNELS Channels to monitor. e.g. 5,6,7"
i = 0
for channelName in CHANNEL_NAMES:
print " Channel %d = %s" % (i, channelName)
i += 1
print " -m MINSPEED Minimum speed of the motor(s)"
print " -s NBSAMPLES Number of samples"
print " -n NAME Name of the test"
print " -d FILENAME File to which the measured values will be logged in"
print " -v Verbose"
print
 
 
if __name__ == '__main__':
 
#print sys.argv
if len(sys.argv)<5:
usage()
sys.exit(2)
 
parComPort = sys.argv[1]
parMotors = sys.argv[2].split(',')
parSpeeds = sys.argv[3].split(',')
parChannels = sys.argv[4].split(',')
try:
opts, args = getopt.getopt(sys.argv[5:], "s:n:d:m:v")
except Exception, err:
print str(err) # will print something like "option -a not recognized"
usage()
sys.exit(2)
 
parVerbose = False
parFileName = None
parNbSamples = 400
parMinSpeed = 25
parTestName = ""
 
for o, a in opts:
if o == "-v":
parVerbose = True
elif o == "-d":
parFileName = a
elif o == "-s":
parNbSamples = int(a)
elif o == "-m":
parMinSpeed = int(a)
elif o == "-n":
parTestName = a
else:
assert False, "unhandled option %s" % (o)
 
if parVerbose:
print "comPort =", parComPort
print "motors =", parMotors
print "minSpeed =", parMinSpeed
print "speeds =", parSpeeds
print "channels =", parChannels
print "nbSamples=", parNbSamples
print "fileName =", parFileName
print "testName =", parTestName
 
 
try:
parMotors = map(int, parMotors)
parSpeeds = map(int, parSpeeds)
parChannels = map(int, parChannels)
if parVerbose:
print "Opening comPort... "
mk = mkProto.MkComm()
mk.open(comPort=parComPort)
msg = mk.getVersionMsg()
voltage = 0
if parVerbose:
print "Version: %d.%d" % msg.getVersion()
msg = mk.getDebugMsg()
if (voltage > 4.2*3):
minVoltage = 4*3.5
else:
minVoltage = 3*3.5
 
if parVerbose:
print "Voltage: %2.1fV" % msg.getVoltage()
print "Minimum Voltage: %2.1fV" % minVoltage
motorStartSpeeds = [0,0,0,0]
if parVerbose:
print "Starting motor(s) (speed=%d)... " % (parMinSpeed),
for i in range(0,parMinSpeed):
time.sleep(.2)
for motor in parMotors:
motorStartSpeeds[motor-1] = i
mk.setMotorTest(motorStartSpeeds)
if parVerbose:
print "OK"
for speed in parSpeeds:
if len(parChannels)>1:
print
if parVerbose:
print "Setting speed to %3d ..." % speed
motorSpeeds = [0,0,0,0]
for motor in parMotors:
motorSpeeds[motor-1] = speed
for i in range(0,10):
time.sleep(.1)
mk.setMotorTest(motorSpeeds)
for channel in parChannels:
channelName = CHANNEL_NAMES[channel]
if parVerbose:
print "Getting data... " ,
data = mk.doVibrationTest(motorSpeeds, parNbSamples, channel)
minval = min(data)
maxval = max(data)
if (maxval-minval)>100: print data
msg = mk.getDebugMsg()
voltage = msg.getVoltage()
if voltage<minVoltage:
print "VOLTAGE TOO LOW, TEST ABORTED"
sys.exit(2)
print "%s Speed=%3d U=%2.1fV Channel=%-10s Min=%3d Max=%3d pp=%3d" % (parTestName, speed, voltage, channelName, minval, maxval, maxval-minval)
if parFileName != None:
try:
logfile = open(parFileName, "r")
newFile = False
except:
newFile = True
if newFile:
logfile = open(parFileName, "w")
if parVerbose:
print "Writing result to %s ..." % parFileName,
logfile.write("%s %d %s\n" % (parTestName, speed, channelName))
for value in data[1:]:
logfile.write("%d\n" % value)
logfile.close()
if parVerbose:
print "OK"
else:
if parVerbose:
print "Appending result to %s ..." % parFileName,
prevData = []
for line in logfile:
prevData.append(line[:-1])
logfile.close()
logfile = open(parFileName, "w")
logfile.write("%s,%s %d %s\n" % (prevData[0], parTestName, speed, channelName))
i = 1
for value in data[1:]:
logfile.write("%s,%d\n" % (prevData[i], value))
i += 1
logfile.close()
if parVerbose:
print "OK"
except Exception,e:
print
print "== ERROR ==: \"%s\"" % e
if parVerbose:
print
print "Traceback:"
traceback.print_exc()
print
raw_input("Press ENTER, the application will close")
print
/VibrationTest/trunk/VibrationTest/mkProto.py
0,0 → 1,352
#! /usr/bin/env python
 
#
# Mikrokopter Serial protocol
#
# Author: FredericG
#
 
import os
import glob
import serial
import time
import traceback
 
class MkException(Exception):
pass
 
class InvalidMsg(MkException):
def __str__(self):
return "Invalid message"
 
class CrcError(MkException):
def __str__(self):
return "CRC error"
 
class InvalidArguments(MkException):
def __str__(self):
return "Invalid Arguments"
 
class InvalidMsgType(MkException):
def __str__(self):
return "Invalid Message type"
 
class NoResponse(MkException):
def __init__(self, cmd):
self.cmd = cmd
def __str__(self):
return "No Reponse. Waiting for \"%s\" message" % self.cmd
pass
 
def calcCrcBytes(str):
crc = 0
for c in str:
crc += ord(c)
crc &= 0xfff
return (chr(crc/64+ord('=')), chr(crc%64+ord('=')))
 
 
class MkMsg:
def __init__(self, msg=None, address=None, cmd=None, data=None):
if (msg != None):
# Create instance based on received message
self.parse(msg)
elif (address != None and cmd != None and data != None):
# Create instance based on address, command and data
self.address = address
self.cmd = cmd
self.data = data
else:
# Cannot create instance
raise InvalidArguments
 
def generateMsg(self):
msg = ""
 
# make header
msg += '#'
msg += chr(self.address+ord('a'))
msg += self.cmd
 
# add data
done = False
i = 0
while (i<len(self.data)) and not done:
a = 0
b = 0
c = 0
try:
a = self.data[i]
b = self.data[i+1]
c = self.data[i+2]
i = i + 3
except IndexError:
done = True
msg += chr(ord('=') + (a >> 2))
msg += chr(ord('=') + (((a & 0x03) << 4) | ((b & 0xf0) >> 4)))
msg += chr(ord('=') + (((b & 0x0f) << 2) | ((c & 0xc0) >> 6)))
msg += chr(ord('=') + ( c & 0x3f))
 
# add crc and NL
crc1,crc2 = calcCrcBytes(msg)
msg += crc1 + crc2
msg += '\r'
return msg
 
 
def parse(self, msg):
if len(msg)<6:
raise InvalidMsg()
if (msg[0] != '#'):
raise InvalidMsg()
if (msg[-1] != '\r'):
raise InvalidMsg()
 
self.address = ord(msg[1])
self.cmd = msg[2]
 
data64 = map(ord, msg[3:-3]) # last 3 bytes are CRC and \n
 
done = False
i = 0
self.data = []
while (i<len(data64)) and not done:
a = 0
b = 0
c = 0
d = 0
try:
a = data64[i] - ord('=')
b = data64[i+1] - ord('=')
c = data64[i+2] - ord('=')
d = data64[i+3] - ord('=')
i = i + 4
except IndexError:
done = True
 
self.data.append((a << 2)&0xFF | (b >> 4))
self.data.append(((b & 0x0f) << 4)&0xFF | (c >> 2));
self.data.append(((c & 0x03) << 6)&0xFF | d);
 
crc1,crc2 = calcCrcBytes(msg[:-3])
if (crc1 != msg[-3] or crc2 != msg[-2]):
#print msg
raise CrcError
#print "crc= %x %x %x %x" % ( crc1, crc2, (ord(msg[-3])-ord('=')), (ord(msg[-2])-ord('=')))
 
 
def data2SignedInt(self, index):
int = self.data[index]+self.data[index+1]*256
if (int > 0xFFFF/2):
int -= 0xFFFF
return int
 
 
class DebugDataMsg:
IDX_ANALOG_ACCNICK = 2+2*2
IDX_ANALOG_ACCROLL = 2+2*3
IDX_ANALOG_COMPASS = 2+2*3
IDX_ANALOG_VOLTAGE = 2+2*9
 
def __init__(self, msg):
if (msg.cmd != 'D'):
raise InvalidMsgType
self.msg = msg
 
def getAccNick(self):
return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_ACCNICK);
 
def getAccRoll(self):
return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_ACCROLL);
 
def getCompassHeading(self):
return self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_COMPASS);
 
def getVoltage(self):
return float(self.msg.data2SignedInt(DebugDataMsg.IDX_ANALOG_VOLTAGE))/10;
 
class VibrationDataMsg:
def __init__(self, msg):
if (msg.cmd != 'F'):
raise InvalidMsgType
self.msg = msg
def getData(self):
data = []
for i in range(0,25):
data.append(self.msg.data2SignedInt(2*i))
return data
 
class VersionMsg:
def __init__(self, msg):
if (msg.cmd != 'V'):
raise InvalidMsgType
self.msg = msg
 
def getVersion(self):
return (self.msg.data[0], self.msg.data[1])
 
class MkComm:
FCADDRESS = 1
 
def __init__(self, printDebugMsg=False):
#self.logfile = open('mklog.txt', "rbU")
 
self.serPort = None
self.printDebugMsg = printDebugMsg
 
msg = MkMsg(address=0, cmd='v', data=[])
self.getVersionMsgLn = msg.generateMsg()
msg = MkMsg(address=0, cmd='d', data=[500])
self.getDebugMsgLn = msg.generateMsg()
 
 
def open(self, comPort):
self.serPort = serial.Serial(comPort, 57600, timeout=0.5)
if not self.serPort.isOpen():
raise IOError("Failed to open serial port")
 
def isOpen(self):
return self.serPort != None
 
def sendLn(self, ln):
self.serPort.write(ln)
 
def waitForLn(self):
return self.serPort.readline(eol='\r')
 
def waitForMsg(self, cmd2wait4):
msg = None
done = False
while (not done):
line = self.waitForLn()
if len(line) == 0:
raise NoResponse(cmd2wait4)
try:
msg = MkMsg(msg=line)
if (msg.cmd == cmd2wait4):
done = True
except InvalidMsg:
if self.printDebugMsg:
print "DebugMsg: \"%s\"" % line[:-1]
pass
return msg
 
def getDebugMsg(self):
self.serPort.flushInput()
self.sendLn(self.getDebugMsgLn)
msg = self.waitForMsg('D')
msg = DebugDataMsg(msg)
return msg
 
def getVersionMsg(self):
self.sendLn(self.getVersionMsgLn)
msg = self.waitForMsg('V')
msg = VersionMsg(msg)
return msg
 
def setMotorTest(self, motorSpeeds):
msg = MkMsg(address=MkComm.FCADDRESS, cmd='t', data=motorSpeeds)
self.sendLn(msg.generateMsg())
def doVibrationTest(self, motorSpeeds, nbSamples, channel):
data = []
for i in range(0,min(nbSamples,1000)/50):
self.setMotorTest(motorSpeeds)
time.sleep(0.05)
#print i
msg = MkMsg(address=MkComm.FCADDRESS, cmd='f', data=[channel, i])
self.sendLn(msg.generateMsg())
msg = self.waitForMsg('F')
msg = VibrationDataMsg(msg)
data += msg.getData()
#print data
return data
 
 
 
 
 
if __name__ == '__main__':
try:
 
## file = open('mklog.txt', "rbU")
##
## for line in file:
## try:
## msg = MkMsg(msg=line)
## #print "Rec Addr:%02d cmd:%c data:" % (msg.address, msg.cmd), msg.data
## if (msg.cmd == 'D'):
## #print len(msg.data)
## msg = DebugDataMsg(msg)
##
## #print "%d %d" % (msg.getAccNick(), msg.getAccRoll())
##
## #val = abs(msg.getAccNick())
## #print "*************************************************************"[:val/10]
##
## print msg.getCompassHeading()
##
## except InvalidMsg:
## pass
##
## file.close()
## print "OK"
 
## msg = MkMsg(address=1, cmd='a', data=[255, 254, 10, 11, 12])
## print msg.generateMsg()
## msg2 = MkMsg(msg.generateMsg())
## print msg2.address, msg2.cmd, msg2.data
 
comm = MkComm()
comm.open(comPort="COM5")
msg = comm.getVersionMsg()
print "Version: %d.%d" % msg.getVersion()
 
comm.doVibrationTest()
 
# msg = comm.getVersionMsg()
# print "Version: %d.%d" % msg.getVersion()
#
# comm.setMotorTest([0,0,80,0])
# time.sleep(10)
#
# minN = 0
# maxN = 0
# minR = 0
# maxR = 0
#
# time.clock()
# for i in range(0,1000):
# try:
# msg = comm.getDebugMsg()
# n = msg.getAccNick()
# r = msg.getAccRoll()
# print "%d,%d" % (n, r)
# minN = min(minN, n)
# maxN = max(maxN, n)
# minR = min(minR, r)
# maxR = max(maxR, r)
# except Exception:
# pass
#
# comm.setMotorTest([0,0,0,0])
# print time.clock()
# print maxN-minN,maxR-minR
 
 
 
except Exception,e:
print
print "An error occured: ", e
print
traceback.print_exc()
raw_input("Press ENTER, the application will close")