一、导入serial模块

  1. 参考https://www.jb51.net/article/164455.htm
  2. 模块下载地址https://files.pythonhosted.org/packages/1e/7d/ae3f0a63f41e4d2f6cb66a5b57197850f919f59e558159a4dd3a818f5082/pyserial-3.5.tar.gz
    在这里插入图片描述
  3. 找到serial文件夹,并找到python的安装位置 放入到 site-packages文件夹C:\Users\Administrator\AppData\Local\Programs\Python\Python39\Lib\site-packages
    在这里插入图片描述

二、编写程序

  1. 程序参考https://www.cnblogs.com/dongxiaodong/p/9992083.html
  2. 收发参考https://blog.csdn.net/bluebloodye/article/details/102870827

三、验证

  1. 插入USB转TTL
  2. 执行下面程序
    import serial 
     
    import serial.tools.list_ports
     
    port_list = list(serial.tools.list_ports.comports())
    print(port_list)
     
    if len(port_list) == 0:
        print("无可用串口!")
    else:
        for i in range(0, len(port_list)):
            print(port_list[i])
    
    在这里插入图片描述

四、收发测试代码

这里采用读取数据超时的方法来做处理,能读取一定时间内的全部数据
注意一下这几个点

  1. ser.read了之后 in_waiting马上变成0
  2. bytearray重新配置长度时,会清楚所有数据
  3. ser.read的数据,是byte数组
import serial 
import time
import serial.tools.list_ports
import threading
 
port_list = list(serial.tools.list_ports.comports())
print(port_list)
 
if len(port_list) == 0:
    print("无可用串口!")
else:
    for i in range(0, len(port_list)):
        print(port_list[i])
        
print(time.time())
##发送 
#d=bytes.fromhex('10 11 12 34 3f') 
#s.write(d)
#s.close()
        

DATA = "" # 读取的数据
DATA_ALL = bytearray(40960)
DATA_LEN = 0
NOEND = True # 是否读取结束
DATA_IN_FLAG = 0
START_TIME = 0
 
# 读数据的本体
def read_data(ser):
    global DATA, NOEND , DATA_IN_FLAG , START_TIME, DATA_ALL , DATA_LEN
    START_TIME = time.time()
    
    # 循环接收数据(此为死循环,可用线程实现)
    while NOEND:
        if ser.in_waiting:
            START_TIME = time.time()
            DATA_IN_FLAG = 1
            length = ser.in_waiting
            
            DATA = ser.read(length)# 注意 ser.read了之后 in_waiting马上变成0#DATA_ALL = bytearray(DATA_LEN+length)
            for i in  range(0, length):
                DATA_ALL[DATA_LEN+i] = DATA[i]
            
            #print("DATA_LEN=%d"%(DATA_LEN))
            DATA_LEN+=length
            #DATA = ser.read(ser.in_waiting).decode("gbk")
            #print("\n>> receive: ", DATA, "\n>>", end="")
            #print(">>", end="")
            
            if(DATA == "quit"):
                print("oppo seri has closen.\n>>", end="")
                    
                
        else:
            if DATA_IN_FLAG:
                #print("time.time() - START_TIME = %d \r\n"%(time.time()))
                if time.time() - START_TIME > 0.010:#串口超时10ms
                    DATA_PRINT = bytearray(DATA_LEN)
                    for i in  range(0, DATA_LEN):
                        DATA_PRINT[i] = DATA_ALL[i]
                    
                    print("\n>> receive: ", DATA_PRINT, "\n>>", end="")
                    START_TIME = time.time()
                    DATA_IN_FLAG = 0
                    DATA_LEN = 0

            
            
 
 
# 打开串口
def open_seri(portx, bps, timeout):
    ret = False
    try:
        # 打开串口,并得到串口对象
        ser = serial.Serial(portx, bps, timeout=timeout)
 
        # 判断是否成功打开
        if(ser.is_open):
            ret = True
            th = threading.Thread(target=read_data, args=(ser,)) # 创建一个子线程去等待读数据
            th.start()
    except Exception as e:
        print("error!", e)
 
    return ser, ret
 
 
 
# 关闭串口
def close_seri(ser):
    global NOEND
    NOEND = False
    ser.close()
 
# 写数据
def write_to_seri(ser, text):
    res = ser.write(text.encode("gbk")) # 写
    return res
 
# 读数据
def read_from_seri():
    global DATA
    data = DATA
    DATA = "" #清空当次读取
    return data
 
if __name__ == "__main__":
    
    # ser, ret = open_seri("COM4", 115200, None) # 串口com3、bps为115200,等待时间为永久
    # if ret == True: # 判断串口是否成功打开
    #     count = write_to_seri(ser, "exit")
    #     print("写入总字节数:", count)
 
    # 打开一个串口
    port = input("输入串口名:")
    ser, ret = open_seri(port, 921600, None) # 串口com3、bps为115200,等待时间为永久
 
    # oprate_lst = {"quit":close_seri}
    # print("操作数字所对应的行为,1:read_from_seri 2:write_to_seri 3:close_seri: ")
    while True:
        text = input(">>")
        write_to_seri(ser, text)
        if text == "quit":
            close_seri(ser)
            print("bye!")
            break

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐