13,6 → 13,12 |
import wx.lib.plot |
import wx.lib.newevent |
|
import mkProto |
|
|
|
CHANNEL_NAMES = ["GyroYaw", "GyroRoll", "GyroNick", "Pressure", "Batt", "AccTop", "AccRoll", "AccNick"] |
|
# Needs Numeric or numarray or NumPy |
try: |
import numpy.oldnumeric as _Numeric |
36,7 → 42,8 |
|
|
# This creates a new Event class and a EVT binder function |
(MeasStatusUpdateEvent, EVT_MEAS_STATUS_UPDATE) = wx.lib.newevent.NewEvent() |
(MeasStatusUpdateEvent, EVT_MEAS_STATUS_UPDATE) = wx.lib.newevent.NewEvent() |
(MeasDataEvent, EVT_MEAS_DATA) = wx.lib.newevent.NewEvent() |
|
class MeasureDialog(wx.Dialog): |
def __init__(self, *args, **kwds): |
54,6 → 61,10 |
|
self.running = True |
self.Bind(EVT_MEAS_STATUS_UPDATE, self.OnUpdate) |
self.Bind(EVT_MEAS_DATA, self.OnData) |
# The first argument that is passed to the constructor is the parent |
self.app = args[0].app |
self.error = False |
|
def __set_properties(self): |
# begin wxGlade: MeasureDialog.__set_properties |
77,17 → 88,31 |
sizer_1.Fit(self) |
self.Layout() |
# end wxGlade |
|
def OnData(self, evt): |
print "Received Data" |
self.app.AddTest2(evt.vibTest) |
|
def OnUpdate(self, evt): |
print "Event received" |
print "Status update" |
self.running = evt.running |
if evt.error: |
self.error = True; |
self.text_ctrl_1.WriteText("ERROR: ") |
self.text_ctrl_1.SetBackgroundColour("Red") |
self.text_ctrl_1.WriteText("%s\n"%evt.msg) |
if (not self.running): |
if (not self.error): |
self.text_ctrl_1.SetBackgroundColour("Green") |
self.text_ctrl_1.write(" ") # so that the background is redrawn |
self.button.SetLabel("Close") |
|
|
def onButton(self, event): # wxGlade: MeasureDialog.<event_handler> |
if (not self.running): |
self.Destroy() |
else: |
self.app.cancelMeasurement() |
|
# end of class MeasureDialog |
|
190,15 → 215,15 |
self.SetMenuBar(self.frame_1_menubar) |
# Menu Bar end |
self.Description = wx.StaticText(self, -1, "Description") |
self.text_ctrl_6 = wx.TextCtrl(self, -1, "Test") |
self.tcDescr = wx.TextCtrl(self, -1, "Test") |
self.label_37 = wx.StaticText(self, -1, "Speed(s)") |
self.text_ctrl_9 = wx.TextCtrl(self, -1, "100-200:10") |
self.tcSpeeds = wx.TextCtrl(self, -1, "100-200:10") |
self.label_35 = wx.StaticText(self, -1, "Motor(s)") |
self.text_ctrl_7 = wx.TextCtrl(self, -1, "1") |
self.tcMotors = wx.TextCtrl(self, -1, "1") |
self.label_38 = wx.StaticText(self, -1, "") |
self.text_ctrl_10 = wx.TextCtrl(self, -1, "") |
self.label_36 = wx.StaticText(self, -1, "Channel") |
self.text_ctrl_8 = wx.TextCtrl(self, -1, "6") |
self.label_36 = wx.StaticText(self, -1, "Channel(s)") |
self.tcChannels = wx.TextCtrl(self, -1, "6") |
self.label_39 = wx.StaticText(self, -1, "") |
self.text_ctrl_11 = wx.TextCtrl(self, -1, "") |
self.button_4 = wx.Button(self, -1, "Start") |
247,15 → 272,15 |
sizer_3.Add((20, 20), 0, 0, 0) |
sizer_8.Add((20, 20), 0, 0, 0) |
grid_sizer_1.Add(self.Description, 0, wx.ALIGN_RIGHT|wx.ALIGN_CENTER_VERTICAL, 0) |
grid_sizer_1.Add(self.text_ctrl_6, 0, 0, 0) |
grid_sizer_1.Add(self.tcDescr, 0, 0, 0) |
grid_sizer_1.Add(self.label_37, 0, wx.ALIGN_RIGHT|wx.ALIGN_CENTER_VERTICAL, 0) |
grid_sizer_1.Add(self.text_ctrl_9, 0, 0, 0) |
grid_sizer_1.Add(self.tcSpeeds, 0, 0, 0) |
grid_sizer_1.Add(self.label_35, 0, wx.ALIGN_RIGHT|wx.ALIGN_CENTER_VERTICAL, 0) |
grid_sizer_1.Add(self.text_ctrl_7, 0, 0, 0) |
grid_sizer_1.Add(self.tcMotors, 0, 0, 0) |
grid_sizer_1.Add(self.label_38, 0, wx.ALIGN_RIGHT|wx.ALIGN_CENTER_VERTICAL, 0) |
grid_sizer_1.Add(self.text_ctrl_10, 0, 0, 0) |
grid_sizer_1.Add(self.label_36, 0, wx.ALIGN_RIGHT|wx.ALIGN_CENTER_VERTICAL, 0) |
grid_sizer_1.Add(self.text_ctrl_8, 0, 0, 0) |
grid_sizer_1.Add(self.tcChannels, 0, 0, 0) |
grid_sizer_1.Add(self.label_39, 0, wx.ALIGN_RIGHT|wx.ALIGN_CENTER_VERTICAL, 0) |
grid_sizer_1.Add(self.text_ctrl_11, 0, 0, 0) |
sizer_9.Add(grid_sizer_1, 0, 0, 0) |
336,8 → 361,7 |
vv = int(test.getVibValue()) |
vvs = "|%s| (%d)" % ("----------------------------------------------------------------------------------------------------"[0:min(vv,100)], vv) |
self.TestListCtrl.SetStringItem(index, 3, vvs) |
if (index == 0): |
self.TestListCtrl.Select(index) |
self.TestListCtrl.Select(index) |
|
|
def OnTestSelected(self, event): |
440,12 → 464,29 |
self.app.onSettingsChanged() |
|
def onStartMeasure(self, event): # wxGlade: MainFrame.<event_handler> |
|
# Collect measure parameters |
mp = MeasureParameters() |
mp.descr = self.tcDescr.GetValue() |
mp.motors = map(int,self.tcMotors.GetValue().split(',')) |
mp.channels = map(int,self.tcChannels.GetValue().split(',')) |
s = self.tcSpeeds.GetValue() |
if s.count("-") == 1: |
# assume from-to:step format |
s = s.split("-") |
if len(s) != 2: raise Exception("Invalid format") |
s[1] = s[1].split(":") |
if len(s[1]) != 2: raise Exception("Invalid format") |
mp.speeds = range(int(s[0]),int(s[1][0])+int(s[1][1]),int(s[1][1])) |
else: |
mp.speeds = s.split(',') |
|
# create the dialog that will show the satus |
dlg = MeasureDialog(self) |
dlg.CenterOnScreen() |
# Signal that application |
self.app.startMeasure(dlg) |
|
# Signal the application |
self.app.startMeasure(mp, dlg) |
|
# Show the dialog |
val = dlg.ShowModal() # this does not return until the dialog is closed. |
dlg.Destroy() |
463,24 → 504,90 |
else: |
self.value = str(newValue) |
|
class MeasureParameters: |
pass |
|
class MeasureThread: |
def __init__(self, evtConsumer): |
def __init__(self, measureParameters, evtConsumer): |
self.mk = mkProto.MkComm() |
self.param = measureParameters |
self.evtConsumer = evtConsumer |
self.cancel = False |
self.running = False |
|
def start(self): |
thread.start_new_thread(self._run, ()) |
|
def stop(self): |
self.cancel = True |
|
def _testCancel(self): |
if self.cancel: |
raise Exception("Operation cancelled") |
|
def _sendEvent(self, msg, error=False): |
evt = MeasStatusUpdateEvent(running=self.running, msg=msg, error=error) |
wx.PostEvent(self.evtConsumer, evt) |
|
def _setMotorSpeed(self, speed, settlingTime): |
speeds = [0,0,0,0] |
for motor in self.param.motors: |
speeds[motor-1] = speed |
for i in range(int(settlingTime*10)): |
self._testCancel() |
self.mk.setMotorTest(speeds) |
time.sleep(.1) |
|
|
def _run(self): |
i=0 |
for i in range(20): |
time.sleep(0.5) |
evt = MeasStatusUpdateEvent(running=True, msg="Hello %d" % i) |
wx.PostEvent(self.evtConsumer, evt) |
evt = MeasStatusUpdateEvent(running=False, msg="Done") |
wx.PostEvent(self.evtConsumer, evt) |
self.running = True |
self._sendEvent("Starting test \"%s\"" % self.param.descr) |
|
try: |
self._sendEvent("Opening SerialPort \"%s\"" % self.param.serialPort) |
self.mk.open(comPort=self.param.serialPort) |
|
msg = self.mk.getVersionMsg() |
version = msg.getVersion() |
self._sendEvent("Version: %d.%d" % version) |
|
msg = self.mk.getDebugMsg() |
voltage = msg.getVoltage() |
if (voltage == 0): |
minVoltage = 0 |
else: |
if (voltage > 4.2*3): |
minVoltage = 4*3.5 |
else: |
minVoltage = 3*3.5 |
|
self._sendEvent("Voltage: %2.1fV" % voltage) |
self._sendEvent("Minimum Voltage: %2.1fV" % minVoltage) |
|
self._sendEvent("Starting motor(s) (speed=%d)... " % self.param.motorStartupSpeed) |
self._setMotorSpeed(self.param.motorStartupSpeed, self.param.motorStartupSettlingTime) |
|
for speed in self.param.speeds: |
self._sendEvent("Changing motor speed to %d... " % speed) |
self._setMotorSpeed(speed, 1) |
|
for channel in self.param.channels: |
self._setMotorSpeed(speed, .1) |
self._sendEvent("Getting data from channel %s" % CHANNEL_NAMES[channel]) |
data = self.mk.doVibrationTest(1000, channel) |
|
vt = VibTest(self.param.descr, self.param.motors, speed, CHANNEL_NAMES[channel], data) |
evt = MeasDataEvent(vibTest = vt) |
wx.PostEvent(self.evtConsumer, evt) |
|
self._sendEvent("Done !") |
|
except Exception, e: |
self._sendEvent("Exception \"%s\"" % e, error=True) |
|
self.running = False |
self._sendEvent("") |
|
|
class VibTest: |
def __init__(self, descr, motor, speed, channel, rawData): |
545,6 → 652,9 |
# Init settings |
self.settings={} |
self.settings["serialport"] = Setting("Serial Port", "COM1") |
self.settings["startupspeed"] = Setting("Motor Startup Speed", 25) |
self.settings["startupsettling"] = Setting("Motor Startup Setting time (s)", 3) |
self.settings["serialport"] = Setting("Serial Port", "COM1") |
self.settings["hpf"] = Setting("HP Filter cutoff (Hz)", 50) |
self.settings["lpf"] = Setting("LP Filter cutoff (Hz)", 400) |
|
563,7 → 673,7 |
for setting in cp.items("DEFAULT"): |
print " ",setting |
try: |
self.settings[setting[0]].value = setting[1] |
self.settings[setting[0]].set(setting[1]) |
except: |
print "WARNING, unknown setting" |
except: |
584,12 → 694,14 |
|
def onSettingsChanged(self): |
self.storeSettings() |
|
|
def AddTest2(self, vibTest): |
self.VibTests.append(vibTest) |
self.frame_1.onNewTest(vibTest) |
|
def AddTest(self, descr, motor, speed, channel, rawData): |
test = VibTest(descr, motor, speed, channel, rawData) |
self.VibTests.append(test) |
self.frame_1.onNewTest(test) |
self.AddTest2(test) |
|
def getTest(self, testId): |
return self.VibTests[testId] |
637,12 → 749,21 |
data[c].append(data[c][-1]) |
self.AddTest(descr[c], 0, int(speed[c]), channel[c], data[c]) |
|
def startMeasure(self, dialog): |
def startMeasure(self, measureParams, dialog): |
print "Start measuring" |
|
self.measureThread = MeasureThread(dialog) |
measureParams.serialPort = self.settings["serialport"].value |
measureParams.motorStartupSpeed = self.settings["startupspeed"].value |
measureParams.motorStartupSettlingTime = self.settings["startupsettling"].value |
|
self.measureThread = MeasureThread(measureParams, dialog) |
self.measureThread.start() |
|
def cancelMeasurement(self): |
print "Measuring CANCEL" |
|
self.measureThread.stop() |
|
|
|
|