lark1fq/ate/script/ate.py
2025-10-06 18:02:51 +08:00

576 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import tkinter as tk
from tkinter import ttk, messagebox
import serial
import serial.tools.list_ports
import threading
import time
from datetime import datetime
import crcmod
class ATEApplication:
def __init__(self, root):
self.root = root
self.root.title("LARK-1FQ ATE设备监控")
self.root.geometry("680x480")
# 串口相关变量
self.serial_port = None
self.is_connected = False
self.read_thread = None
self.stop_thread = False
self.receive_buffer = bytearray() # 接收缓冲区
# 创建CRC计算函数
self.crc16_func = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
# 数据存储
self.frequency_values = [0, 0, 0, 0] # ch0-ch3 频率值
self.peak_values = [0, 0, 0, 0] # ch0-ch3 峰峰值
self.key_status = 0 # 按键状态
self.sd_status = 0 # SD卡状态
# 上下限设置
self.freq_limits = [
{"min": 7, "max": 9}, # ch0
{"min": 7, "max": 9}, # ch1
{"min": 7, "max": 9}, # ch2
{"min": 7, "max": 9} # ch3
]
self.peak_limits = [
{"min": 2000, "max": 4000}, # ch0
{"min": 2000, "max": 4000}, # ch1
{"min": 2000, "max": 4000}, # ch2
{"min": 2000, "max": 4000} # ch3
]
self.create_widgets()
self.refresh_ports()
# 设置关闭事件
self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
def create_widgets(self):
# 1. 串口相关界面
serial_frame = ttk.LabelFrame(self.root, text="串口设置", padding=10)
serial_frame.pack(fill="x", padx=10, pady=5)
# 串口号选择
ttk.Label(serial_frame, text="串口号:").grid(row=0, column=0, padx=5)
self.port_var = tk.StringVar()
self.port_combobox = ttk.Combobox(serial_frame, textvariable=self.port_var, width=15)
self.port_combobox.grid(row=0, column=1, padx=5)
self.refresh_btn = ttk.Button(serial_frame, text="刷新", command=self.refresh_ports)
self.refresh_btn.grid(row=0, column=2, padx=5)
# 波特率选择
ttk.Label(serial_frame, text="波特率:").grid(row=0, column=3, padx=5)
self.baud_var = tk.StringVar(value="19200")
baud_combobox = ttk.Combobox(serial_frame, textvariable=self.baud_var,
values=["9600", "19200", "38400", "57600", "115200"], width=10)
baud_combobox.grid(row=0, column=4, padx=5)
# 打开关闭按钮
self.open_btn = ttk.Button(serial_frame, text="打开串口", command=self.open_serial)
self.open_btn.grid(row=0, column=5, padx=5)
self.close_btn = ttk.Button(serial_frame, text="关闭串口", command=self.close_serial, state="disabled")
self.close_btn.grid(row=0, column=6, padx=5)
# 调试信息显示
self.debug_var = tk.StringVar(value="等待连接...")
debug_label = ttk.Label(serial_frame, textvariable=self.debug_var, foreground="blue")
debug_label.grid(row=1, column=0, columnspan=7, pady=5)
# 2. 数值比较界面
value_frame = ttk.LabelFrame(self.root, text="数值比较", padding=10)
value_frame.pack(fill="both", expand=True, padx=10, pady=5)
# 频率部分(左边)
freq_frame = ttk.LabelFrame(value_frame, text="频率 (Hz)")
freq_frame.pack(side="left", fill="both", expand=True, padx=5)
self.freq_widgets = []
for i in range(4):
ch_frame = ttk.Frame(freq_frame)
ch_frame.pack(fill="x", pady=2)
ttk.Label(ch_frame, text=f"CH{i}:").pack(side="left")
# 最小值输入
min_frame = ttk.Frame(ch_frame)
min_frame.pack(side="left", padx=2)
ttk.Label(min_frame, text="min:").pack()
min_var = tk.StringVar(value="7")
min_entry = tk.Entry(min_frame, textvariable=min_var, width=5, bg="white")
min_entry.pack()
min_entry.bind('<Return>', lambda e, ch=i: self.update_freq_limit(ch))
# 当前值显示
value_label = tk.Label(ch_frame, text="0.0", width=8, relief="sunken", bg="white")
value_label.pack(side="left", padx=5)
# 最大值输入
max_frame = ttk.Frame(ch_frame)
max_frame.pack(side="left", padx=2)
ttk.Label(max_frame, text="max:").pack()
max_var = tk.StringVar(value="9")
max_entry = tk.Entry(max_frame, textvariable=max_var, width=5, bg="white")
max_entry.pack()
max_entry.bind('<Return>', lambda e, ch=i: self.update_freq_limit(ch))
self.freq_widgets.append({
"min_var": min_var,
"max_var": max_var,
"min_entry": min_entry,
"max_entry": max_entry,
"value_label": value_label
})
# 峰峰值部分(右边)
peak_frame = ttk.LabelFrame(value_frame, text="峰峰值 (mV)")
peak_frame.pack(side="right", fill="both", expand=True, padx=5)
self.peak_widgets = []
for i in range(4):
ch_frame = ttk.Frame(peak_frame)
ch_frame.pack(fill="x", pady=2)
ttk.Label(ch_frame, text=f"CH{i}:").pack(side="left")
# 最小值输入
min_frame = ttk.Frame(ch_frame)
min_frame.pack(side="left", padx=2)
ttk.Label(min_frame, text="min:").pack()
min_var = tk.StringVar(value="2000")
min_entry = tk.Entry(min_frame, textvariable=min_var, width=5, bg="white")
min_entry.pack()
min_entry.bind('<Return>', lambda e, ch=i: self.update_peak_limit(ch))
# 当前值显示
value_label = tk.Label(ch_frame, text="0", width=8, relief="sunken", bg="white")
value_label.pack(side="left", padx=5)
# 最大值输入
max_frame = ttk.Frame(ch_frame)
max_frame.pack(side="left", padx=2)
ttk.Label(max_frame, text="max:").pack()
max_var = tk.StringVar(value="4000")
max_entry = tk.Entry(max_frame, textvariable=max_var, width=5, bg="white")
max_entry.pack()
max_entry.bind('<Return>', lambda e, ch=i: self.update_peak_limit(ch))
self.peak_widgets.append({
"min_var": min_var,
"max_var": max_var,
"min_entry": min_entry,
"max_entry": max_entry,
"value_label": value_label
})
# 3. 状态显示界面
status_frame = ttk.LabelFrame(self.root, text="状态显示", padding=10)
status_frame.pack(fill="x", padx=10, pady=5)
# 通讯状态
comm_frame = ttk.Frame(status_frame)
comm_frame.pack(side="left", expand=True)
ttk.Label(comm_frame, text="通讯状态:").pack()
self.comm_status = tk.Label(comm_frame, text="", font=("Arial", 20), fg="red")
self.comm_status.pack()
# SD卡状态
sd_frame = ttk.Frame(status_frame)
sd_frame.pack(side="left", expand=True)
ttk.Label(sd_frame, text="SD卡状态:").pack()
self.sd_status_label = tk.Label(sd_frame, text="", font=("Arial", 20), fg="red")
self.sd_status_label.pack()
# 按键状态
key_frame = ttk.Frame(status_frame)
key_frame.pack(side="left", expand=True)
ttk.Label(key_frame, text="按键状态:").pack()
self.key_status_label = tk.Label(key_frame, text="", font=("Arial", 20), fg="red")
self.key_status_label.pack()
def refresh_ports(self):
"""刷新可用串口列表"""
ports = serial.tools.list_ports.comports()
port_list = [port.device for port in ports]
self.port_combobox['values'] = port_list
if port_list and not self.port_var.get():
self.port_var.set(port_list[0])
def open_serial(self):
"""打开串口"""
port = self.port_var.get()
baudrate = self.baud_var.get()
if not port:
messagebox.showerror("错误", "请选择串口号")
return
try:
self.serial_port = serial.Serial(
port=port,
baudrate=int(baudrate),
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.1
)
# 清空缓冲区
self.clear_serial_buffer()
self.receive_buffer = bytearray()
self.is_connected = True
self.open_btn.config(state="disabled")
self.close_btn.config(state="normal")
self.refresh_btn.config(state="disabled")
# 启动读取线程
self.stop_thread = False
self.read_thread = threading.Thread(target=self.serial_read_loop)
self.read_thread.daemon = True
self.read_thread.start()
self.update_debug_info("串口打开成功,开始发送查询命令...")
messagebox.showinfo("成功", "串口打开成功")
except Exception as e:
messagebox.showerror("错误", f"打开串口失败: {str(e)}")
def close_serial(self):
"""关闭串口"""
self.stop_thread = True
self.is_connected = False
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self.open_btn.config(state="normal")
self.close_btn.config(state="disabled")
self.refresh_btn.config(state="normal")
self.comm_status.config(fg="red")
self.update_debug_info("串口已关闭")
def clear_serial_buffer(self):
"""清空串口缓冲区"""
if self.serial_port and self.serial_port.is_open:
try:
self.serial_port.reset_input_buffer()
self.serial_port.reset_output_buffer()
self.receive_buffer.clear()
self.update_debug_info("串口缓冲区已清空")
except Exception as e:
self.update_debug_info(f"清空缓冲区错误: {e}")
def update_debug_info(self, message):
"""更新调试信息"""
timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3]
debug_msg = f"[{timestamp}] {message}"
self.debug_var.set(debug_msg)
print(debug_msg)
def calculate_crc(self, data):
"""计算CRC并返回对调后的两个字节"""
if not data:
return bytes.fromhex('0000')
# 计算CRC
crc_value = self.crc16_func(data)
# 转换为两个字节并对调
crc_bytes = crc_value.to_bytes(2, byteorder='big')
crc_swapped = bytes([crc_bytes[1], crc_bytes[0]])
return crc_swapped
def send_and_receive_command(self):
"""发送命令并接收响应"""
if not (self.serial_port and self.serial_port.is_open):
return
try:
# 清空接收缓冲区
self.clear_serial_buffer()
# 发送查询命令
full_command = bytes.fromhex('5A04123400097991')
self.serial_port.write(full_command)
self.update_debug_info(f"发送命令: {full_command.hex().upper()}")
# 等待100ms让设备准备响应
time.sleep(0.1)
# 读取响应数据
if self.serial_port.in_waiting:
data = self.serial_port.read(self.serial_port.in_waiting)
if data:
self.update_debug_info(f"收到 {len(data)} 字节响应数据")
self.process_received_data(data)
else:
self.update_debug_info("未收到响应数据")
self.root.after(0, lambda: self.comm_status.config(fg="red"))
else:
self.update_debug_info("无响应数据可读")
self.root.after(0, lambda: self.comm_status.config(fg="red"))
except Exception as e:
self.update_debug_info(f"发送接收命令错误: {e}")
self.root.after(0, lambda: self.comm_status.config(fg="red"))
def serial_read_loop(self):
"""串口读取循环"""
last_send_time = 0
send_interval = 0.2 # 200ms发送一次
while self.is_connected and not self.stop_thread:
current_time = time.time()
# 发送并接收命令
if current_time - last_send_time >= send_interval:
self.send_and_receive_command()
last_send_time = current_time
time.sleep(0.01)
def process_received_data(self, data):
"""处理接收到的数据"""
hex_data = data.hex().upper()
self.update_debug_info(f"原始响应数据: {hex_data}")
# 检查数据长度
if len(data) < 23:
self.update_debug_info(f"数据长度不足: {len(data)}字节期望至少23字节")
return
# 查找帧头
frame_start = -1
for i in range(len(data) - 2):
if data[i] == 0x5A and data[i+1] == 0x04 and data[i+2] == 0x12:
frame_start = i
break
if frame_start == -1:
self.update_debug_info("未找到帧头 5A0412")
return
self.update_debug_info(f"找到帧头,位置: {frame_start}")
# 提取完整帧从帧头开始的23字节
if len(data) - frame_start >= 23:
frame_data = data[frame_start:frame_start+23]
self.process_received_frame(frame_data)
else:
self.update_debug_info("帧数据不完整")
def process_received_frame(self, data):
"""处理接收到的完整数据帧23字节"""
hex_data = data.hex().upper()
self.update_debug_info(f"处理数据帧: {hex_data}")
# CRC校验 - 前21字节计算CRC最后2字节是CRC
if not self.check_crc(data):
self.update_debug_info("CRC校验失败")
self.root.after(0, lambda: self.comm_status.config(fg="red"))
return
# 解析数据
try:
# 解析峰峰值 (4个通道每个2字节)
peak_values = []
for i in range(4):
start_idx = 3 + i * 2 # 跳过帧头5A0412 (3字节)
peak_value = (data[start_idx] << 8) | data[start_idx + 1]
peak_values.append(peak_value)
# 解析频率 (4个通道每个2字节需要除以10)
freq_values = []
for i in range(4):
start_idx = 11 + i * 2 # 峰峰值后是频率
freq_value = (data[start_idx] << 8) | data[start_idx + 1]
freq_values.append(freq_value / 10.0)
# 解析状态
key_status = data[19] # 按键状态
sd_status = data[20] # SD卡状态
self.update_debug_info(f"解析成功: 峰峰值{peak_values}, 频率{freq_values}, 按键{key_status}, SD卡{sd_status}")
# 更新界面
self.root.after(0, self.update_display, peak_values, freq_values, key_status, sd_status)
# 通讯成功,更新状态
self.root.after(0, lambda: self.comm_status.config(fg="green"))
except Exception as e:
self.update_debug_info(f"解析数据错误: {e}")
self.root.after(0, lambda: self.comm_status.config(fg="red"))
def check_crc(self, data):
"""CRC校验 - 前21字节计算CRC最后2字节是CRC需要对调"""
try:
if len(data) < 23:
return False
# 分离数据前21字节和CRC最后2字节
data_part = data[:21] # 前21字节是数据
crc_received = data[21:23] # 最后2字节是CRC
# 计算期望的CRC值对调后的
expected_crc = self.calculate_crc(data_part)
# 比较CRC
result = crc_received == expected_crc
if not result:
self.update_debug_info(f"CRC不匹配: 收到{crc_received.hex().upper()}, 期望{expected_crc.hex().upper()}")
return result
except Exception as e:
self.update_debug_info(f"CRC校验错误: {e}")
return False
def update_display(self, peak_values, freq_values, key_status, sd_status):
"""更新显示"""
# 更新峰峰值
for i in range(4):
value = peak_values[i]
self.peak_values[i] = value
widget = self.peak_widgets[i]
widget["value_label"].config(text=str(value))
# 检查范围并设置颜色
in_range = self.peak_limits[i]["min"] <= value <= self.peak_limits[i]["max"]
color = "green" if in_range else "red"
widget["value_label"].config(bg=color)
# 如果越界,设置上下限输入框颜色
min_color = "red" if value < self.peak_limits[i]["min"] else "white"
max_color = "red" if value > self.peak_limits[i]["max"] else "white"
widget["min_entry"].config(bg=min_color)
widget["max_entry"].config(bg=max_color)
# 更新频率
for i in range(4):
value = freq_values[i]
self.frequency_values[i] = value
widget = self.freq_widgets[i]
widget["value_label"].config(text=f"{value:.1f}")
# 检查范围并设置颜色
in_range = self.freq_limits[i]["min"] <= value <= self.freq_limits[i]["max"]
color = "green" if in_range else "red"
widget["value_label"].config(bg=color)
# 如果越界,设置上下限输入框颜色
min_color = "red" if value < self.freq_limits[i]["min"] else "white"
max_color = "red" if value > self.freq_limits[i]["max"] else "white"
widget["min_entry"].config(bg=min_color)
widget["max_entry"].config(bg=max_color)
# 更新状态
self.key_status = key_status
self.sd_status = sd_status
# 按键状态0=未按下1=按下
key_color = "green" if key_status == 1 else "red"
self.key_status_label.config(fg=key_color)
# SD卡状态5=成功,其他=失败
sd_color = "green" if sd_status == 5 else "red"
self.sd_status_label.config(fg=sd_color)
self.update_debug_info("界面更新完成")
def update_freq_limit(self, channel):
"""更新频率上下限"""
try:
min_val = float(self.freq_widgets[channel]["min_var"].get())
max_val = float(self.freq_widgets[channel]["max_var"].get())
# 验证最小值不大于最大值
if min_val > max_val:
messagebox.showerror("错误", "最小值不能大于最大值")
return
self.freq_limits[channel]["min"] = min_val
self.freq_limits[channel]["max"] = max_val
# 立即检查当前值并更新显示
self.check_freq_value(channel)
except ValueError:
messagebox.showerror("错误", "请输入有效的数字")
def update_peak_limit(self, channel):
"""更新峰峰值上下限"""
try:
min_val = float(self.peak_widgets[channel]["min_var"].get())
max_val = float(self.peak_widgets[channel]["max_var"].get())
# 验证最小值不大于最大值
if min_val > max_val:
messagebox.showerror("错误", "最小值不能大于最大值")
return
self.peak_limits[channel]["min"] = min_val
self.peak_limits[channel]["max"] = max_val
# 立即检查当前值并更新显示
self.check_peak_value(channel)
except ValueError:
messagebox.showerror("错误", "请输入有效的数字")
def check_freq_value(self, channel):
"""检查指定通道的频率值是否在范围内"""
value = self.frequency_values[channel]
widget = self.freq_widgets[channel]
# 检查范围并设置颜色
in_range = self.freq_limits[channel]["min"] <= value <= self.freq_limits[channel]["max"]
color = "green" if in_range else "red"
widget["value_label"].config(bg=color)
# 如果越界,设置上下限输入框颜色
min_color = "red" if value < self.freq_limits[channel]["min"] else "white"
max_color = "red" if value > self.freq_limits[channel]["max"] else "white"
widget["min_entry"].config(bg=min_color)
widget["max_entry"].config(bg=max_color)
def check_peak_value(self, channel):
"""检查指定通道的峰峰值是否在范围内"""
value = self.peak_values[channel]
widget = self.peak_widgets[channel]
# 检查范围并设置颜色
in_range = self.peak_limits[channel]["min"] <= value <= self.peak_limits[channel]["max"]
color = "green" if in_range else "red"
widget["value_label"].config(bg=color)
# 如果越界,设置上下限输入框颜色
min_color = "red" if value < self.peak_limits[channel]["min"] else "white"
max_color = "red" if value > self.peak_limits[channel]["max"] else "white"
widget["min_entry"].config(bg=min_color)
widget["max_entry"].config(bg=max_color)
def check_current_values(self):
"""检查当前值是否在范围内"""
for i in range(4):
self.check_freq_value(i)
self.check_peak_value(i)
def on_closing(self):
"""程序关闭时的清理工作"""
self.stop_thread = True
if self.serial_port and self.serial_port.is_open:
self.serial_port.close()
self.root.destroy()
if __name__ == "__main__":
root = tk.Tk()
app = ATEApplication(root)
root.mainloop()