487 lines
16 KiB
Python
487 lines
16 KiB
Python
#coding=utf-8
|
|
|
|
# Name: debugger
|
|
# Date: 12:26 2024/1/8
|
|
# Note:
|
|
|
|
import os
|
|
import sys
|
|
import glob
|
|
import time
|
|
import msvcrt
|
|
import queue
|
|
import threading
|
|
|
|
import winreg
|
|
import win32api
|
|
import win32gui
|
|
from win32con import WM_INPUTLANGCHANGEREQUEST
|
|
|
|
import mysys
|
|
import myserial
|
|
import myconvert
|
|
|
|
## size of shell cmd buffer
|
|
|
|
SHELL_CMD_SIZE = 128
|
|
|
|
## size of download package
|
|
|
|
DOWNLOAD_PKG_SIZE = 64
|
|
|
|
## number of serial ports while scan
|
|
|
|
SERIAL_PORT_NUM = 30
|
|
|
|
## baudrate of serial port
|
|
|
|
SERIAL_BAUDRATE = 115200
|
|
|
|
## valid value of 'shell_stat'
|
|
|
|
STAT_WAIT_SPEC_KEY = 0
|
|
STAT_WAIT_FUNC_KEY = 1
|
|
|
|
## valid value of 'shell_mode'
|
|
|
|
MODE_MAX = 2
|
|
|
|
MODE_NORMAL = 0
|
|
MODE_CONTROL = 1
|
|
|
|
'''
|
|
debugger.py
|
|
'''
|
|
|
|
welcome_info = """This is a PuSH Terminal.
|
|
Type 'help' for more information.
|
|
"""
|
|
|
|
help_info = """Command:
|
|
----------------------------------------
|
|
~ - Switch mode.
|
|
quit - (q) Quit terminal.
|
|
repeat - (r) Repeat last command.
|
|
help - (h) Show help menu.
|
|
cd - Switch to work path.
|
|
load - (ld) Load file.
|
|
download - (d) Download to target device.
|
|
go - (g) Control target device jump and run.
|
|
com - Scan valid serial port.
|
|
com* - Open target serial port.
|
|
"""
|
|
|
|
example_info = """Example:
|
|
----------------------------------------
|
|
1. download & run
|
|
1) switch to control mode
|
|
2) ld > com > com* > [hardware reset] > [ESC] > d
|
|
2. go without download
|
|
1) switch to control mode
|
|
2) [hardware reset] > g
|
|
"""
|
|
|
|
|
|
class GlobalParam:
|
|
exitFlag = False
|
|
|
|
workPath = None
|
|
|
|
binFile = None
|
|
binData = None
|
|
binSize = 0
|
|
|
|
workQueue = None
|
|
queueLock = None
|
|
|
|
currentSerial = None
|
|
lastSerial = None
|
|
|
|
threadInput = None
|
|
threadOutput = None
|
|
|
|
def __init__(self):
|
|
self.workQueue = queue.Queue(SHELL_CMD_SIZE)
|
|
self.queueLock = threading.Lock()
|
|
|
|
def serial_check(self):
|
|
if self.currentSerial != None or self.lastSerial != None:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def get_desktop_path(self):
|
|
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r'Software\Microsoft\Windows\CurrentVersion\Explorer\Shell Folders')
|
|
return winreg.QueryValueEx(key, 'Desktop')[0]
|
|
|
|
def get_work_path(self):
|
|
return r'..\..'
|
|
|
|
def cleanup(self):
|
|
while self.serial_check():
|
|
pass
|
|
|
|
|
|
class NewThread (threading.Thread):
|
|
def __init__(self, *args, **keywords):
|
|
threading.Thread.__init__(self, *args, **keywords)
|
|
self.killed = False
|
|
|
|
def start(self):
|
|
self.__run_backup = self.run
|
|
self.run = self.__run
|
|
threading.Thread.start(self)
|
|
|
|
def __run(self):
|
|
sys.settrace(self.globaltrace)
|
|
self.__run_backup()
|
|
self.run = self.__run_backup
|
|
|
|
def globaltrace(self, frame, event, arg):
|
|
if event == 'call':
|
|
return self.localtrace
|
|
else:
|
|
return None
|
|
|
|
def localtrace(self, frame, event, arg):
|
|
if self.killed:
|
|
if event == 'line':
|
|
raise SystemExit()
|
|
return self.localtrace
|
|
|
|
def kill(self):
|
|
self.killed = True
|
|
|
|
class Shell:
|
|
shell_trig = False
|
|
shell_mode = MODE_CONTROL
|
|
shell_stat = STAT_WAIT_SPEC_KEY
|
|
shell_line = None
|
|
shell_line_curpos = 0
|
|
|
|
def __init__(self, cmdsize):
|
|
self.shell_line = [0]*cmdsize
|
|
|
|
def output(self, ser, ch, redirect=True):
|
|
## handle control key
|
|
if self.shell_stat == STAT_WAIT_SPEC_KEY and ch == 0xe0:
|
|
self.shell_stat = STAT_WAIT_FUNC_KEY
|
|
return
|
|
elif self.shell_stat == STAT_WAIT_FUNC_KEY:
|
|
self.shell_stat = STAT_WAIT_SPEC_KEY
|
|
return
|
|
|
|
## handle tab key
|
|
if ch == ord('\t'):
|
|
return
|
|
## handle backspace key
|
|
elif ch == 0x08 or ch == 0x7f:
|
|
if self.shell_line_curpos == 0:
|
|
pass
|
|
else:
|
|
self.shell_line_curpos = self.shell_line_curpos - 1
|
|
self.shell_line[self.shell_line_curpos] = 0
|
|
_xputs(ser, "\b \b")
|
|
return
|
|
## handle enter key
|
|
elif ch == ord('\r'):
|
|
if redirect:
|
|
_xputs(ser, "\n")
|
|
return
|
|
|
|
## handle normal character
|
|
self.shell_line[self.shell_line_curpos] = ch
|
|
_xputs(ser, chr(ch))
|
|
|
|
## handle overwrite
|
|
self.shell_line_curpos = self.shell_line_curpos + 1
|
|
if self.shell_line_curpos >= SHELL_CMD_SIZE:
|
|
self.shell_line_curpos = 0
|
|
|
|
|
|
def _is_equal(v1, v2):
|
|
rslt = False
|
|
if type(v2) != type(list()):
|
|
rslt = mysys._is_equal(v1, v2)
|
|
else:
|
|
for v in v2:
|
|
if mysys._is_equal(v1, v):
|
|
rslt = True
|
|
break
|
|
return rslt
|
|
|
|
def _xputs(ser, info):
|
|
if ser == None:
|
|
mysys._xputs(info)
|
|
else:
|
|
for ch in info:
|
|
myserial.WriteData(ser, myconvert.ToHexStr([ord(ch),]))
|
|
|
|
def _tostr(data):
|
|
return "".join(chr(i) for i in data).lower()
|
|
|
|
|
|
def serial_thread(port):
|
|
global globalParam
|
|
|
|
ser = myserial.OpenPort(port, baud_rate=SERIAL_BAUDRATE)
|
|
globalParam.currentSerial = ser
|
|
globalParam.lastSerial = ser
|
|
|
|
shell = Shell(SHELL_CMD_SIZE)
|
|
|
|
try:
|
|
while not globalParam.exitFlag:
|
|
(state, length, data) = myserial.ReadData(ser)
|
|
if not state:
|
|
break
|
|
if length > 0:
|
|
for ch in data:
|
|
shell.output(None, ch, False)
|
|
else:
|
|
time.sleep(0.02)
|
|
finally:
|
|
myserial.ClosePort(ser)
|
|
globalParam.currentSerial = None
|
|
globalParam.lastSerial = None
|
|
|
|
def input_thread():
|
|
global globalParam
|
|
|
|
while not globalParam.exitFlag:
|
|
if msvcrt.kbhit():
|
|
globalParam.queueLock.acquire()
|
|
if not globalParam.workQueue.full():
|
|
globalParam.workQueue.put( msvcrt.getch() )
|
|
globalParam.queueLock.release()
|
|
time.sleep(0.02)
|
|
|
|
def output_thread():
|
|
global globalParam
|
|
|
|
## searh serial ports but not show them
|
|
serial_ports = myserial.SearchPort(SERIAL_PORT_NUM)
|
|
|
|
## only one serial thread is allowed to be created
|
|
tserial = None
|
|
|
|
## save the last command
|
|
last_cmd_line = "help"
|
|
|
|
## create shell and work on control mode
|
|
shell = Shell(SHELL_CMD_SIZE)
|
|
_xputs(None, ":")
|
|
|
|
while not globalParam.exitFlag:
|
|
## try to get one character
|
|
if not shell.shell_trig:
|
|
globalParam.queueLock.acquire()
|
|
ch = -1
|
|
if not globalParam.workQueue.empty():
|
|
ch = int.from_bytes(globalParam.workQueue.get(), byteorder='big')
|
|
globalParam.queueLock.release()
|
|
|
|
if ch < 0:
|
|
time.sleep(0.02)
|
|
continue
|
|
|
|
## handle esc key
|
|
if shell.shell_trig or ch == 0x1b:
|
|
shell.shell_trig = False
|
|
shell.shell_line_curpos = 0
|
|
shell.shell_mode = (shell.shell_mode + 1) % MODE_MAX
|
|
if shell.shell_mode == MODE_NORMAL:
|
|
_xputs(None, "enter normal mode ...\n")
|
|
if globalParam.serial_check():
|
|
_xputs(None, "serial port is alive.\n")
|
|
globalParam.currentSerial = globalParam.lastSerial
|
|
if shell.shell_mode == MODE_CONTROL:
|
|
_xputs(None, ":")
|
|
globalParam.currentSerial = None
|
|
continue
|
|
|
|
if shell.shell_mode == MODE_CONTROL:
|
|
if ch == ord('\r') or ch == ord('\n'):
|
|
cmd_line = _tostr(shell.shell_line[:shell.shell_line_curpos])
|
|
shell.shell_line_curpos = 0
|
|
## USER COMMAND PARSE AND EXECUTE BEGIN
|
|
if _is_equal(cmd_line, ['quit', 'q']):
|
|
_xputs(None, "\n")
|
|
if globalParam.serial_check():
|
|
tserial.kill()
|
|
tserial.join()
|
|
globalParam.exitFlag = True
|
|
continue
|
|
|
|
if _is_equal(cmd_line, ['repeat', 'r']):
|
|
_xputs(None, ":" + last_cmd_line)
|
|
cmd_line = last_cmd_line
|
|
|
|
last_cmd_line = cmd_line
|
|
_xputs(None, "\n")
|
|
|
|
if _is_equal(cmd_line, ['help', 'h']):
|
|
_xputs(None, help_info + "\n")
|
|
_xputs(None, example_info + "\n")
|
|
pass
|
|
|
|
if _is_equal(cmd_line.split(' ')[0], 'cd'):
|
|
globalParam.workPath = globalParam.get_desktop_path()
|
|
flist = glob.glob(globalParam.workPath + '\\*.bin')
|
|
_xputs(None, "work path: " + globalParam.workPath + "\n")
|
|
_xputs(None, "find {0} valid file: ".format(len(flist)) + "\n")
|
|
for file in flist:
|
|
_xputs(None, file.split('\\')[-1] + "\n")
|
|
_xputs(None, "\n")
|
|
pass
|
|
|
|
if _is_equal(cmd_line.split(' ')[0], ['load', 'ld']): ## 'load [xxx.bin]'
|
|
params = cmd_line.split(' ')
|
|
file = None
|
|
if len(params) > 1:
|
|
if len(params[1].split('.')) > 1:
|
|
file = params[1]
|
|
else:
|
|
file = params[1].split('.')[0] + '.bin'
|
|
else:
|
|
flist = glob.glob(globalParam.workPath + '\\*.bin')
|
|
if len(flist) > 0:
|
|
file = flist[0].split('\\')[-1]
|
|
if file != None:
|
|
_xputs(None, "open file: " + file + "\n")
|
|
try:
|
|
globalParam.binFile = globalParam.workPath + '\\' + file
|
|
with open(globalParam.binFile, 'rb') as fd:
|
|
globalParam.binData = fd.read()
|
|
globalParam.binSize = len(globalParam.binData)
|
|
_xputs(None, "read {0} bytes.".format(globalParam.binSize) + "\n")
|
|
except:
|
|
_xputs(None, "open failed." + "\n")
|
|
else:
|
|
_xputs(None, "invalid param." + "\n")
|
|
_xputs(None, "\n")
|
|
pass
|
|
|
|
if _is_equal(cmd_line.split(' ')[0], ['download', 'd']): ## 'download [com*]'
|
|
params = cmd_line.split(' ')
|
|
ser = None
|
|
if len(params) > 1:
|
|
ser = myserial.OpenPort(params[1], baud_rate=SERIAL_BAUDRATE)
|
|
needClose = True
|
|
else:
|
|
if globalParam.serial_check():
|
|
ser = globalParam.lastSerial
|
|
needClose = False
|
|
if globalParam.binSize > 0:
|
|
pkgs = 0
|
|
if ser != None:
|
|
bindata = globalParam.binData
|
|
while True:
|
|
pkgs = pkgs + 1
|
|
if len(bindata) >= DOWNLOAD_PKG_SIZE:
|
|
data = bindata[:DOWNLOAD_PKG_SIZE]
|
|
bindata = bindata[DOWNLOAD_PKG_SIZE:]
|
|
data = b'\xa5' + data + b'\x5a'
|
|
myserial.WriteData(ser, data)
|
|
_xputs(None, ".")
|
|
continue
|
|
else:
|
|
data = b'\xa5' + bindata + b'\x00'*(DOWNLOAD_PKG_SIZE-len(bindata)) + b'\x5a'
|
|
myserial.WriteData(ser, data)
|
|
_xputs(None, ".")
|
|
break
|
|
data = b'\xa6' + b'\x01' + b'\x00'*(DOWNLOAD_PKG_SIZE-1) + b'\x6a'
|
|
myserial.WriteData(ser, data)
|
|
_xputs(None, "." + "\n")
|
|
_xputs(None, "send {0} pkgs, pkg size {1}.".format(pkgs, DOWNLOAD_PKG_SIZE) + "\n")
|
|
if needClose:
|
|
myserial.ClosePort(ser)
|
|
shell.shell_trig = True
|
|
else:
|
|
_xputs(None, "invalid param." + "\n")
|
|
else:
|
|
_xputs(None, "please load bin file first." + "\n")
|
|
_xputs(None, "\n")
|
|
pass
|
|
|
|
if _is_equal(cmd_line.split(' ')[0], ['go', 'g']): ## 'go [com*]'
|
|
params = cmd_line.split(' ')
|
|
ser = None
|
|
if len(params) > 1:
|
|
ser = myserial.OpenPort(params[1], baud_rate=SERIAL_BAUDRATE)
|
|
needClose = True
|
|
else:
|
|
if globalParam.serial_check():
|
|
ser = globalParam.lastSerial
|
|
needClose = False
|
|
if ser != None:
|
|
data = b'\xa6' + b'\x02' + b'\x00'*(DOWNLOAD_PKG_SIZE-1) + b'\x6a'
|
|
myserial.WriteData(ser, data)
|
|
_xputs(None, "." + "\n")
|
|
_xputs(None, "send {0} pkgs, pkg size {1}.".format(1, DOWNLOAD_PKG_SIZE) + "\n")
|
|
if needClose:
|
|
myserial.ClosePort(ser)
|
|
shell.shell_trig = True
|
|
else:
|
|
_xputs(None, "invalid param." + "\n")
|
|
_xputs(None, "\n")
|
|
pass
|
|
|
|
if _is_equal(cmd_line, 'com'):
|
|
serial_ports = myserial.SearchPort(SERIAL_PORT_NUM)
|
|
_xputs(None, "valid port: " + str(serial_ports).replace('[','').replace(']','').replace('\'','') + "\n")
|
|
_xputs(None, "\n")
|
|
pass
|
|
|
|
if serial_ports != None: ## 'com*'
|
|
for i in range(len(serial_ports)):
|
|
if _is_equal(cmd_line, serial_ports[i]): # e.g. _is_equal(cmd_line, 'com20')
|
|
if globalParam.serial_check():
|
|
tserial.kill()
|
|
tserial.join()
|
|
tserial = NewThread(target=serial_thread, args=(serial_ports[i],))
|
|
tserial.start()
|
|
while not globalParam.serial_check():
|
|
pass
|
|
shell.shell_trig = True
|
|
pass
|
|
|
|
if shell.shell_trig:
|
|
continue
|
|
|
|
_xputs(None, ":")
|
|
## USER COMMAND PARSE AND EXECUTE END
|
|
continue
|
|
|
|
if shell.shell_mode == MODE_CONTROL or shell.shell_mode == MODE_NORMAL:
|
|
shell.output(globalParam.currentSerial, ch, True)
|
|
|
|
if __name__ == "__main__":
|
|
global globalParam
|
|
globalParam = GlobalParam()
|
|
|
|
mysys.env_init(lang='EN', cls=True, curs=False)
|
|
|
|
print(welcome_info, end="\n")
|
|
|
|
globalParam.workPath = globalParam.get_work_path()
|
|
|
|
globalParam.threadInput = NewThread(target=input_thread)
|
|
globalParam.threadOutput = NewThread(target=output_thread)
|
|
|
|
globalParam.threadInput.start()
|
|
globalParam.threadOutput.start()
|
|
|
|
globalParam.threadInput.join()
|
|
globalParam.threadOutput.join()
|
|
|
|
globalParam.cleanup()
|
|
|
|
timeout = 0
|
|
print("Exit After %d Second ..." % timeout, end="\n")
|
|
time.sleep(timeout)
|
|
|
|
mysys.env_init(lang='ZH', cls=False, curs=True)
|
|
|
|
print("Exit.", end="\n")
|