uart.rar_python uart.write_python串口通信_python绘图_串口 python uart_通信

1.代码

import numpy as np
import matplotlib.pyplot as plt
import tkinter as tk
from tkinter import ttk
import serial
import time
import threading
import platform
import struct


running = False
serial_lock = threading.Lock()


class SerialApp:
    def __init__(self, root):
        self.root = root
        self.ser = serial.Serial()
        self.running = False
        self.setup_ui()

    def setup_ui(self):
        self.root.title('Uart Tool by Qifan')

        
        self.txt0 = tk.Text(self.root, width=80, height=14, border=5, wrap=tk.WORD)
        self.txt0.pack(side='top', fill='both', expand=True, padx=5, pady=5)

        
        scrollbar = tk.Scrollbar(self.txt0)
        scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
        self.txt0.config(yscrollcommand=scrollbar.set)
        scrollbar.config(command=self.txt0.yview)

        
        self.setup_serial_config()

        
        self.setup_command_areas()

    def setup_serial_config(self):
        
        config_frame = ttk.LabelFrame(self.root, text="串口配置", padding=5)
        config_frame.pack(side='top', fill='x', padx=5, pady=2)

        
        row1 = ttk.Frame(config_frame)
        row1.pack(fill='x', pady=2)

        ttk.Label(row1, text='端口:').pack(side='left')
        self.comx = tk.StringVar()
        sp_list = ['COM1', 'COM2', 'COM3', 'COM4', 'COM5'] if platform.system() == 'Windows' else ['/dev/ttyUSB0',
                                                                                                   '/dev/ttyUSB1']
        ttk.Combobox(row1, textvariable=self.comx, values=sp_list, width=12).pack(side='left', padx=2)
        self.comx.set(sp_list[0])

        ttk.Label(row1, text='波特率:').pack(side='left', padx=(10, 0))
        self.baud = tk.StringVar(value="115200")
        ttk.Combobox(row1, textvariable=self.baud, values=['9600', '19200', '38400', '57600', '115200'], width=10).pack(
            side='left', padx=2)

        ttk.Label(row1, text='数据位:').pack(side='left', padx=(10, 0))
        self.dlen = tk.StringVar(value="8")
        ttk.Combobox(row1, textvariable=self.dlen, values=['7', '8'], width=8).pack(side='left', padx=2)

        
        row2 = ttk.Frame(config_frame)
        row2.pack(fill='x', pady=2)

        ttk.Label(row2, text='停止位:').pack(side='left')
        self.slen = tk.StringVar(value="1")
        ttk.Combobox(row2, textvariable=self.slen, values=['1', '1.5', '2'], width=8).pack(side='left', padx=2)

        ttk.Label(row2, text='校验位:').pack(side='left', padx=(10, 0))
        self.chck = tk.StringVar(value="None")
        ttk.Combobox(row2, textvariable=self.chck, values=['None', 'Odd', 'Even'], width=8).pack(side='left', padx=2)

        ttk.Button(row2, text="打开串口", command=self.com_open_close, width=12).pack(side='left', padx=(20, 0))
        ttk.Button(row2, text="清空显示", command=self.clear_display, width=10).pack(side='left', padx=5)
        ttk.Button(row2, text="保存数据", command=self.save_data, width=10).pack(side='left', padx=5)

    def setup_command_areas(self):
        
        cmd_frame = ttk.LabelFrame(self.root, text="命令发送", padding=5)
        cmd_frame.pack(side='top', fill='x', padx=5, pady=2)

        
        read_frame = ttk.Frame(cmd_frame)
        read_frame.pack(fill='x', pady=2)
        ttk.Label(read_frame, text='读取地址:').pack(side='left')
        self.inpt1 = tk.StringVar()
        ttk.Entry(read_frame, textvariable=self.inpt1, width=20).pack(side='left', padx=2)
        ttk.Button(read_frame, text="发送", command=self.submit_read, width=8).pack(side='left', padx=2)
        ttk.Button(read_frame, text="清空", command=lambda: self.inpt1.set(""), width=8).pack(side='left', padx=2)

        
        write_frame = ttk.Frame(cmd_frame)
        write_frame.pack(fill='x', pady=2)
        ttk.Label(write_frame, text='写入地址:').pack(side='left')
        self.inpt21 = tk.StringVar()
        ttk.Entry(write_frame, textvariable=self.inpt21, width=10).pack(side='left', padx=2)
        ttk.Label(write_frame, text='写入数据:').pack(side='left')
        self.inpt22 = tk.StringVar()
        ttk.Entry(write_frame, textvariable=self.inpt22, width=10).pack(side='left', padx=2)
        ttk.Button(write_frame, text="发送", command=self.submit_write, width=8).pack(side='left', padx=2)
        ttk.Button(write_frame, text="清空", command=lambda: [self.inpt21.set(""), self.inpt22.set("")], width=8).pack(
            side='left', padx=2)

        
        pga_frame = ttk.Frame(cmd_frame)
        pga_frame.pack(fill='x', pady=2)
        ttk.Label(pga_frame, text='PGA增益:').pack(side='left')
        self.inpt3 = tk.StringVar()
        ttk.Entry(pga_frame, textvariable=self.inpt3, width=15).pack(side='left', padx=2)
        ttk.Button(pga_frame, text="设置", command=self.submit_pga, width=8).pack(side='left', padx=2)
        ttk.Button(pga_frame, text="绘图", command=self.plot_data, width=8).pack(side='left', padx=2)

        
        func_frame = ttk.Frame(cmd_frame)
        func_frame.pack(fill='x', pady=2)
        ttk.Button(func_frame, text="读取ADC", command=self.submit_adc, width=10).pack(side='left', padx=2)
        ttk.Button(func_frame, text="设备复位", command=self.submit_reset, width=10).pack(side='left', padx=2)
        ttk.Button(func_frame, text="清空数据", command=self.clear_data_file, width=10).pack(side='left', padx=2)

    def com_open_close(self):
        if not self.running:
            self.open_serial()
        else:
            self.close_serial()

    def open_serial(self):
        try:
            with serial_lock:
                self.ser.port = self.comx.get()
                self.ser.baudrate = int(self.baud.get())
                self.ser.bytesize = int(self.dlen.get())
                self.ser.stopbits = float(self.slen.get())
                self.ser.parity = self.chck.get()[0] if self.chck.get() != 'None' else 'N'
                self.ser.timeout = 1
                self.ser.xonxoff = 0

                self.ser.open()
                self.running = True

                
                self.receive_thread = threading.Thread(target=self.receive_data, daemon=True)
                self.receive_thread.start()

                self.log_message("串口已打开")

        except Exception as e:
            self.log_message(f"打开串口错误: {e}")

    def close_serial(self):
        self.running = False
        time.sleep(0.1)  

        with serial_lock:
            if self.ser.is_open:
                self.ser.close()

        self.log_message("串口已关闭")

    def receive_data(self):
        buffer = ""
        while self.running:
            try:
                with serial_lock:
                    if self.ser.is_open and self.ser.in_waiting:
                        data = self.ser.read(self.ser.in_waiting).decode('latin-1', errors='ignore')

                        
                        self.root.after(0, self.update_display, data)

                        
                        self.process_received_data(data)

                time.sleep(0.01)  

            except Exception as e:
                if self.running:  
                    self.log_message(f"接收错误: {e}")
                break

    def update_display(self, data):
        self.txt0.insert(tk.END, data)
        self.txt0.see(tk.END)  

        
        if float(self.txt0.index('end-1c').split('.')[0]) > 10000:
            self.txt0.delete(1.0, 5000.0)

    def process_received_data(self, data):
        
        try:
            hex_data = ""
            for char in data:
                hex_val = format(ord(char), '02x')
                hex_data += hex_val

            
            if len(hex_data) >= 6:
                with open("received_data.txt", "a", encoding='utf-8') as f:
                    
                    for i in range(0, len(hex_data), 6):
                        chunk = hex_data[i:i + 6]
                        if len(chunk) == 6:
                            try:
                                adc_value = int(chunk, 16)
                                
                                if adc_value > 0x7FFFFF:
                                    adc_value -= 0x1000000
                                f.write(f"{adc_value}\n")
                            except ValueError:
                                continue

        except Exception as e:
            self.log_message(f"数据处理错误: {e}")

    def send_hex_data(self, hex_str):
        
        if not self.ser.is_open:
            self.log_message("错误: 串口未打开")
            return False

        try:
            
            hex_str = hex_str.replace(" ", "").replace("\n", "").replace("\t", "")

            
            if not all(c in "0123456789ABCDEFabcdef" for c in hex_str):
                self.log_message("错误: 无效的16进制数据")
                return False

            
            if len(hex_str) % 2 != 0:
                hex_str = '0' + hex_str

            
            data = bytes.fromhex(hex_str)

            with serial_lock:
                self.ser.write(data)

            self.log_message(f"发送: {hex_str.upper()}")
            return True

        except Exception as e:
            self.log_message(f"发送错误: {e}")
            return False

    def submit_read(self):
        addr = self.inpt1.get().strip()
        if not addr:
            self.log_message("错误: 请输入读取地址")
            return

        
        cmd = f"5A22800100{addr:0>4}FFFFFFFF"
        self.send_hex_data(cmd)

    def submit_write(self):
        addr = self.inpt21.get().strip()
        data = self.inpt22.get().strip()
        if not addr or not data:
            self.log_message("错误: 请输入地址和数据")
            return

        
        cmd = f"5A21000100{addr:0>4}{data:0>4}FFFFFF"
        self.send_hex_data(cmd)

    def submit_pga(self):
        gain = self.inpt3.get().strip()
        if not gain:
            self.log_message("错误: 请输入PGA增益值")
            return

        
        cmd = f"5A2100010006{gain:0>2}FFFFFF"
        self.send_hex_data(cmd)

    def submit_adc(self):
        
        cmd = "5A24C00100C2FFFFFFFF"
        self.send_hex_data(cmd)

    def submit_reset(self):
        
        cmd = "5A23100100F0FFFFFFFF"
        self.send_hex_data(cmd)

    def plot_data(self):
        try:
            data = np.loadtxt('received_data.txt', delimiter='\n')
            if len(data) == 0:
                self.log_message("错误: 没有可绘制的数据")
                return

            plt.figure(figsize=(10, 6))
            plt.plot(data)
            plt.xlabel('采样点')
            plt.ylabel('ADC值')
            plt.title('ADC数据曲线')
            plt.grid(True)
            plt.show()

        except Exception as e:
            self.log_message(f"绘图错误: {e}")

    def clear_display(self):
        self.txt0.delete(1.0, tk.END)

    def clear_data_file(self):
        try:
            open('received_data.txt', 'w').close()
            self.log_message("数据文件已清空")
        except Exception as e:
            self.log_message(f"清空文件错误: {e}")

    def save_data(self):
        try:
            content = self.txt0.get(1.0, tk.END)
            with open('serial_log.txt', 'w', encoding='utf-8') as f:
                f.write(content)
            self.log_message("数据已保存到 serial_log.txt")
        except Exception as e:
            self.log_message(f"保存错误: {e}")

    def log_message(self, message):
        timestamp = time.strftime("%H:%M:%S")
        self.txt0.insert(tk.END, f"[{timestamp}] {message}\n")
        self.txt0.see(tk.END)

    def on_closing(self):
        self.running = False
        if self.ser.is_open:
            self.ser.close()
        self.root.destroy()


def main():
    root = tk.Tk()
    app = SerialApp(root)

    
    root.protocol("WM_DELETE_WINDOW", app.on_closing)

    
    root.geometry("800x600+100+100")

    root.mainloop()


if __name__ == "__main__":
    main()

2.效果

3.测试(待定)

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐