lark1fq/ate/script/ate.py

576 lines
23 KiB
Python
Raw Permalink Normal View History

2025-10-06 17:55:08 +08:00
import tkinter as tk
from tkinter import ttk, messagebox
import serial
import serial.tools.list_ports
import threading
import time
from datetime import datetime
import crcmod
class ATEApplication:
def __init__(self, root):
self.root = root
self.root.title("LARK-1FQ ATE设备监控")
self.root.geometry("680x480")
# 串口相关变量
self.serial_port = None
self.is_connected = False
self.read_thread = None
self.stop_thread = False
self.receive_buffer = bytearray() # 接收缓冲区
# 创建CRC计算函数
self.crc16_func = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
# 数据存储
self.frequency_values = [0, 0, 0, 0] # ch0-ch3 频率值
self.peak_values = [0, 0, 0, 0] # ch0-ch3 峰峰值
self.key_status = 0 # 按键状态
self.sd_status = 0 # SD卡状态
# 上下限设置
self.freq_limits = [
{"min": 7, "max": 9}, # ch0
{"min": 7, "max": 9}, # ch1
{"min": 7, "max": 9}, # ch2
{"min": 7, "max": 9} # ch3
]
self.peak_limits = [
{"min": 2000, "max": 4000}, # ch0
{"min": 2000, "max": 4000}, # ch1
{"min": 2000, "max": 4000}, # ch2
{"min": 2000, "max": 4000} # ch3
]
self.create_widgets()
self.refresh_ports()
# 设置关闭事件
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def create_widgets(self):
# 1. 串口相关界面
serial_frame = ttk.LabelFrame(self.root, text="串口设置", padding=10)
serial_frame.pack(fill="x", padx=10, pady=5)
# 串口号选择
ttk.Label(serial_frame, text="串口号:").grid(row=0, column=0, padx=5)
self.port_var = tk.StringVar()
self.port_combobox = ttk.Combobox(serial_frame, textvariable=self.port_var, width=15)
self.port_combobox.grid(row=0, column=1, padx=5)
self.refresh_btn = ttk.Button(serial_frame, text="刷新", command=self.refresh_ports)
self.refresh_btn.grid(row=0, column=2, padx=5)
# 波特率选择
ttk.Label(serial_frame, text="波特率:").grid(row=0, column=3, padx=5)
self.baud_var = tk.StringVar(value="19200")
baud_combobox = ttk.Combobox(serial_frame, textvariable=self.baud_var,
values=["9600", "19200", "38400", "57600", "115200"], width=10)
baud_combobox.grid(row=0, column=4, padx=5)
# 打开关闭按钮
self.open_btn = ttk.Button(serial_frame, text="打开串口", command=self.open_serial)
self.open_btn.grid(row=0, column=5, padx=5)
self.close_btn = ttk.Button(serial_frame, text="关闭串口", command=self.close_serial, state="disabled")
self.close_btn.grid(row=0, column=6, padx=5)
# 调试信息显示
self.debug_var = tk.StringVar(value="等待连接...")
debug_label = ttk.Label(serial_frame, textvariable=self.debug_var, foreground="blue")
debug_label.grid(row=1, column=0, columnspan=7, pady=5)
# 2. 数值比较界面
value_frame = ttk.LabelFrame(self.root, text="数值比较", padding=10)
value_frame.pack(fill="both", expand=True, padx=10, pady=5)
# 频率部分(左边)
freq_frame = ttk.LabelFrame(value_frame, text="频率 (Hz)")
freq_frame.pack(side="left", fill="both", expand=True, padx=5)
self.freq_widgets = []
for i in range(4):
ch_frame = ttk.Frame(freq_frame)
ch_frame.pack(fill="x", pady=2)
ttk.Label(ch_frame, text=f"CH{i}:").pack(side="left")
# 最小值输入
min_frame = ttk.Frame(ch_frame)
min_frame.pack(side="left", padx=2)
ttk.Label(min_frame, text="min:").pack()
min_var = tk.StringVar(value="7")
min_entry = tk.Entry(min_frame, textvariable=min_var, width=5, bg="white")
min_entry.pack()
min_entry.bind('<Return>', lambda e, ch=i: self.update_freq_limit(ch))
# 当前值显示
value_label = tk.Label(ch_frame, text="0.0", width=8, relief="sunken", bg="white")
value_label.pack(side="left", padx=5)
# 最大值输入
max_frame = ttk.Frame(ch_frame)
max_frame.pack(side="left", padx=2)
ttk.Label(max_frame, text="max:").pack()
max_var = tk.StringVar(value="9")
max_entry = tk.Entry(max_frame, textvariable=max_var, width=5, bg="white")
max_entry.pack()
max_entry.bind('<Return>', lambda e, ch=i: self.update_freq_limit(ch))
self.freq_widgets.append({
"min_var": min_var,
"max_var": max_var,
"min_entry": min_entry,
"max_entry": max_entry,
"value_label": value_label
})
# 峰峰值部分(右边)
peak_frame = ttk.LabelFrame(value_frame, text="峰峰值 (mV)")
peak_frame.pack(side="right", fill="both", expand=True, padx=5)
self.peak_widgets = []
for i in range(4):
ch_frame = ttk.Frame(peak_frame)
ch_frame.pack(fill="x", pady=2)
ttk.Label(ch_frame, text=f"CH{i}:").pack(side="left")
# 最小值输入
min_frame = ttk.Frame(ch_frame)
min_frame.pack(side="left", padx=2)
ttk.Label(min_frame, text="min:").pack()
min_var = tk.StringVar(value="2000")
min_entry = tk.Entry(min_frame, textvariable=min_var, width=5, bg="white")
min_entry.pack()
min_entry.bind('<Return>', lambda e, ch=i: self.update_peak_limit(ch))
# 当前值显示
value_label = tk.Label(ch_frame, text="0", width=8, relief="sunken", bg="white")
value_label.pack(side="left", padx=5)
# 最大值输入
max_frame = ttk.Frame(ch_frame)
max_frame.pack(side="left", padx=2)
ttk.Label(max_frame, text="max:").pack()
max_var = tk.StringVar(value="4000")
max_entry = tk.Entry(max_frame, textvariable=max_var, width=5, bg="white")
max_entry.pack()
max_entry.bind('<Return>', lambda e, ch=i: self.update_peak_limit(ch))
self.peak_widgets.append({
"min_var": min_var,
"max_var": max_var,
"min_entry": min_entry,
"max_entry": max_entry,
"value_label": value_label
})
# 3. 状态显示界面
status_frame = ttk.LabelFrame(self.root, text="状态显示", padding=10)
status_frame.pack(fill="x", padx=10, pady=5)
# 通讯状态
comm_frame = ttk.Frame(status_frame)
comm_frame.pack(side="left", expand=True)
ttk.Label(comm_frame, text="通讯状态:").pack()
self.comm_status = tk.Label(comm_frame, text="", font=("Arial", 20), fg="red")
self.comm_status.pack()
# SD卡状态
sd_frame = ttk.Frame(status_frame)
sd_frame.pack(side="left", expand=True)
ttk.Label(sd_frame, text="SD卡状态:").pack()
self.sd_status_label = tk.Label(sd_frame, text="", font=("Arial", 20), fg="red")
self.sd_status_label.pack()
# 按键状态
key_frame = ttk.Frame(status_frame)
key_frame.pack(side="left", expand=True)
ttk.Label(key_frame, text="按键状态:").pack()
self.key_status_label = tk.Label(key_frame, text="", font=("Arial", 20), fg="red")
self.key_status_label.pack()
def refresh_ports(self):
"""刷新可用串口列表"""
ports = serial.tools.list_ports.comports()
port_list = [port.device for port in ports]
self.port_combobox['values'] = port_list
if port_list and not self.port_var.get():
self.port_var.set(port_list[0])
def open_serial(self):
"""打开串口"""
port = self.port_var.get()
baudrate = self.baud_var.get()
if not port:
messagebox.showerror("错误", "请选择串口号")
return
try:
self.serial_port = serial.Serial(
port=port,
baudrate=int(baudrate),
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.1
)
# 清空缓冲区
self.clear_serial_buffer()
self.receive_buffer = bytearray()
self.is_connected = True
self.open_btn.config(state="disabled")
self.close_btn.config(state="normal")
self.refresh_btn.config(state="disabled")
# 启动读取线程
self.stop_thread = False
self.read_thread = threading.Thread(target=self.serial_read_loop)
self.read_thread.daemon = True
self.read_thread.start()
self.update_debug_info("串口打开成功,开始发送查询命令...")
messagebox.showinfo("成功", "串口打开成功")
except Exception as e:
messagebox.showerror("错误", f"打开串口失败: {str(e)}")
def close_serial(self):
"""关闭串口"""
self.stop_thread = True
self.is_connected = False
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self.open_btn.config(state="normal")
self.close_btn.config(state="disabled")
self.refresh_btn.config(state="normal")
self.comm_status.config(fg="red")
self.update_debug_info("串口已关闭")
def clear_serial_buffer(self):
"""清空串口缓冲区"""
if self.serial_port and self.serial_port.is_open:
try:
self.serial_port.reset_input_buffer()
self.serial_port.reset_output_buffer()
self.receive_buffer.clear()
self.update_debug_info("串口缓冲区已清空")
except Exception as e:
self.update_debug_info(f"清空缓冲区错误: {e}")
def update_debug_info(self, message):
"""更新调试信息"""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
debug_msg = f"[{timestamp}] {message}"
self.debug_var.set(debug_msg)
print(debug_msg)
def calculate_crc(self, data):
"""计算CRC并返回对调后的两个字节"""
if not data:
return bytes.fromhex('0000')
# 计算CRC
crc_value = self.crc16_func(data)
# 转换为两个字节并对调
crc_bytes = crc_value.to_bytes(2, byteorder='big')
crc_swapped = bytes([crc_bytes[1], crc_bytes[0]])
return crc_swapped
def send_and_receive_command(self):
"""发送命令并接收响应"""
if not (self.serial_port and self.serial_port.is_open):
return
try:
# 清空接收缓冲区
self.clear_serial_buffer()
# 发送查询命令
full_command = bytes.fromhex('5A04123400097991')
self.serial_port.write(full_command)
self.update_debug_info(f"发送命令: {full_command.hex().upper()}")
# 等待100ms让设备准备响应
time.sleep(0.1)
# 读取响应数据
if self.serial_port.in_waiting:
data = self.serial_port.read(self.serial_port.in_waiting)
if data:
self.update_debug_info(f"收到 {len(data)} 字节响应数据")
self.process_received_data(data)
else:
self.update_debug_info("未收到响应数据")
self.root.after(0, lambda: self.comm_status.config(fg="red"))
else:
self.update_debug_info("无响应数据可读")
self.root.after(0, lambda: self.comm_status.config(fg="red"))
except Exception as e:
self.update_debug_info(f"发送接收命令错误: {e}")
self.root.after(0, lambda: self.comm_status.config(fg="red"))
def serial_read_loop(self):
"""串口读取循环"""
last_send_time = 0
send_interval = 0.2 # 200ms发送一次
while self.is_connected and not self.stop_thread:
current_time = time.time()
# 发送并接收命令
if current_time - last_send_time >= send_interval:
self.send_and_receive_command()
last_send_time = current_time
time.sleep(0.01)
def process_received_data(self, data):
"""处理接收到的数据"""
hex_data = data.hex().upper()
self.update_debug_info(f"原始响应数据: {hex_data}")
# 检查数据长度
if len(data) < 23:
self.update_debug_info(f"数据长度不足: {len(data)}字节期望至少23字节")
return
# 查找帧头
frame_start = -1
for i in range(len(data) - 2):
if data[i] == 0x5A and data[i+1] == 0x04 and data[i+2] == 0x12:
frame_start = i
break
if frame_start == -1:
self.update_debug_info("未找到帧头 5A0412")
return
self.update_debug_info(f"找到帧头,位置: {frame_start}")
# 提取完整帧从帧头开始的23字节
if len(data) - frame_start >= 23:
frame_data = data[frame_start:frame_start+23]
self.process_received_frame(frame_data)
else:
self.update_debug_info("帧数据不完整")
def process_received_frame(self, data):
"""处理接收到的完整数据帧23字节"""
hex_data = data.hex().upper()
self.update_debug_info(f"处理数据帧: {hex_data}")
# CRC校验 - 前21字节计算CRC最后2字节是CRC
if not self.check_crc(data):
self.update_debug_info("CRC校验失败")
self.root.after(0, lambda: self.comm_status.config(fg="red"))
return
# 解析数据
try:
# 解析峰峰值 (4个通道每个2字节)
peak_values = []
for i in range(4):
start_idx = 3 + i * 2 # 跳过帧头5A0412 (3字节)
peak_value = (data[start_idx] << 8) | data[start_idx + 1]
peak_values.append(peak_value)
# 解析频率 (4个通道每个2字节需要除以10)
freq_values = []
for i in range(4):
start_idx = 11 + i * 2 # 峰峰值后是频率
freq_value = (data[start_idx] << 8) | data[start_idx + 1]
freq_values.append(freq_value / 10.0)
# 解析状态
key_status = data[19] # 按键状态
sd_status = data[20] # SD卡状态
self.update_debug_info(f"解析成功: 峰峰值{peak_values}, 频率{freq_values}, 按键{key_status}, SD卡{sd_status}")
# 更新界面
self.root.after(0, self.update_display, peak_values, freq_values, key_status, sd_status)
# 通讯成功,更新状态
self.root.after(0, lambda: self.comm_status.config(fg="green"))
except Exception as e:
self.update_debug_info(f"解析数据错误: {e}")
self.root.after(0, lambda: self.comm_status.config(fg="red"))
def check_crc(self, data):
"""CRC校验 - 前21字节计算CRC最后2字节是CRC需要对调"""
try:
if len(data) < 23:
return False
# 分离数据前21字节和CRC最后2字节
data_part = data[:21] # 前21字节是数据
crc_received = data[21:23] # 最后2字节是CRC
# 计算期望的CRC值对调后的
expected_crc = self.calculate_crc(data_part)
# 比较CRC
result = crc_received == expected_crc
if not result:
self.update_debug_info(f"CRC不匹配: 收到{crc_received.hex().upper()}, 期望{expected_crc.hex().upper()}")
return result
except Exception as e:
self.update_debug_info(f"CRC校验错误: {e}")
return False
def update_display(self, peak_values, freq_values, key_status, sd_status):
"""更新显示"""
# 更新峰峰值
for i in range(4):
value = peak_values[i]
self.peak_values[i] = value
widget = self.peak_widgets[i]
widget["value_label"].config(text=str(value))
# 检查范围并设置颜色
in_range = self.peak_limits[i]["min"] <= value <= self.peak_limits[i]["max"]
color = "green" if in_range else "red"
widget["value_label"].config(bg=color)
# 如果越界,设置上下限输入框颜色
min_color = "red" if value < self.peak_limits[i]["min"] else "white"
max_color = "red" if value > self.peak_limits[i]["max"] else "white"
widget["min_entry"].config(bg=min_color)
widget["max_entry"].config(bg=max_color)
# 更新频率
for i in range(4):
value = freq_values[i]
self.frequency_values[i] = value
widget = self.freq_widgets[i]
widget["value_label"].config(text=f"{value:.1f}")
# 检查范围并设置颜色
in_range = self.freq_limits[i]["min"] <= value <= self.freq_limits[i]["max"]
color = "green" if in_range else "red"
widget["value_label"].config(bg=color)
# 如果越界,设置上下限输入框颜色
min_color = "red" if value < self.freq_limits[i]["min"] else "white"
max_color = "red" if value > self.freq_limits[i]["max"] else "white"
widget["min_entry"].config(bg=min_color)
widget["max_entry"].config(bg=max_color)
# 更新状态
self.key_status = key_status
self.sd_status = sd_status
# 按键状态0=未按下1=按下
key_color = "green" if key_status == 1 else "red"
self.key_status_label.config(fg=key_color)
# SD卡状态5=成功,其他=失败
sd_color = "green" if sd_status == 5 else "red"
self.sd_status_label.config(fg=sd_color)
self.update_debug_info("界面更新完成")
def update_freq_limit(self, channel):
"""更新频率上下限"""
try:
min_val = float(self.freq_widgets[channel]["min_var"].get())
max_val = float(self.freq_widgets[channel]["max_var"].get())
# 验证最小值不大于最大值
if min_val > max_val:
messagebox.showerror("错误", "最小值不能大于最大值")
return
self.freq_limits[channel]["min"] = min_val
self.freq_limits[channel]["max"] = max_val
# 立即检查当前值并更新显示
self.check_freq_value(channel)
except ValueError:
messagebox.showerror("错误", "请输入有效的数字")
def update_peak_limit(self, channel):
"""更新峰峰值上下限"""
try:
min_val = float(self.peak_widgets[channel]["min_var"].get())
max_val = float(self.peak_widgets[channel]["max_var"].get())
# 验证最小值不大于最大值
if min_val > max_val:
messagebox.showerror("错误", "最小值不能大于最大值")
return
self.peak_limits[channel]["min"] = min_val
self.peak_limits[channel]["max"] = max_val
# 立即检查当前值并更新显示
self.check_peak_value(channel)
except ValueError:
messagebox.showerror("错误", "请输入有效的数字")
def check_freq_value(self, channel):
"""检查指定通道的频率值是否在范围内"""
value = self.frequency_values[channel]
widget = self.freq_widgets[channel]
# 检查范围并设置颜色
in_range = self.freq_limits[channel]["min"] <= value <= self.freq_limits[channel]["max"]
color = "green" if in_range else "red"
widget["value_label"].config(bg=color)
# 如果越界,设置上下限输入框颜色
min_color = "red" if value < self.freq_limits[channel]["min"] else "white"
max_color = "red" if value > self.freq_limits[channel]["max"] else "white"
widget["min_entry"].config(bg=min_color)
widget["max_entry"].config(bg=max_color)
def check_peak_value(self, channel):
"""检查指定通道的峰峰值是否在范围内"""
value = self.peak_values[channel]
widget = self.peak_widgets[channel]
# 检查范围并设置颜色
in_range = self.peak_limits[channel]["min"] <= value <= self.peak_limits[channel]["max"]
color = "green" if in_range else "red"
widget["value_label"].config(bg=color)
# 如果越界,设置上下限输入框颜色
min_color = "red" if value < self.peak_limits[channel]["min"] else "white"
max_color = "red" if value > self.peak_limits[channel]["max"] else "white"
widget["min_entry"].config(bg=min_color)
widget["max_entry"].config(bg=max_color)
def check_current_values(self):
"""检查当前值是否在范围内"""
for i in range(4):
self.check_freq_value(i)
self.check_peak_value(i)
def on_closing(self):
"""程序关闭时的清理工作"""
self.stop_thread = True
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
app = ATEApplication(root)
root.mainloop()