python 操作 串口
一、导入serial模块参考https://www.jb51.net/article/164455.htm模块下载地址https://files.pythonhosted.org/packages/1e/7d/ae3f0a63f41e4d2f6cb66a5b57197850f919f59e558159a4dd3a818f5082/pyserial-3.5.tar.gz找到serial文件夹,并找到
·
一、导入serial模块
- 参考https://www.jb51.net/article/164455.htm
- 模块下载地址https://files.pythonhosted.org/packages/1e/7d/ae3f0a63f41e4d2f6cb66a5b57197850f919f59e558159a4dd3a818f5082/pyserial-3.5.tar.gz
- 找到serial文件夹,并找到python的安装位置 放入到 site-packages文件夹
C:\Users\Administrator\AppData\Local\Programs\Python\Python39\Lib\site-packages
二、编写程序
- 程序参考https://www.cnblogs.com/dongxiaodong/p/9992083.html
- 收发参考https://blog.csdn.net/bluebloodye/article/details/102870827
三、验证
- 插入USB转TTL
- 执行下面程序
import serial import serial.tools.list_ports port_list = list(serial.tools.list_ports.comports()) print(port_list) if len(port_list) == 0: print("无可用串口!") else: for i in range(0, len(port_list)): print(port_list[i])
四、收发测试代码
这里采用读取数据超时的方法来做处理,能读取一定时间内的全部数据
注意一下这几个点
- ser.read了之后 in_waiting马上变成0
- bytearray重新配置长度时,会清楚所有数据
- ser.read的数据,是byte数组
import serial
import time
import serial.tools.list_ports
import threading
port_list = list(serial.tools.list_ports.comports())
print(port_list)
if len(port_list) == 0:
print("无可用串口!")
else:
for i in range(0, len(port_list)):
print(port_list[i])
print(time.time())
##发送
#d=bytes.fromhex('10 11 12 34 3f')
#s.write(d)
#s.close()
DATA = "" # 读取的数据
DATA_ALL = bytearray(40960)
DATA_LEN = 0
NOEND = True # 是否读取结束
DATA_IN_FLAG = 0
START_TIME = 0
# 读数据的本体
def read_data(ser):
global DATA, NOEND , DATA_IN_FLAG , START_TIME, DATA_ALL , DATA_LEN
START_TIME = time.time()
# 循环接收数据(此为死循环,可用线程实现)
while NOEND:
if ser.in_waiting:
START_TIME = time.time()
DATA_IN_FLAG = 1
length = ser.in_waiting
DATA = ser.read(length)# 注意 ser.read了之后 in_waiting马上变成0了
#DATA_ALL = bytearray(DATA_LEN+length)
for i in range(0, length):
DATA_ALL[DATA_LEN+i] = DATA[i]
#print("DATA_LEN=%d"%(DATA_LEN))
DATA_LEN+=length
#DATA = ser.read(ser.in_waiting).decode("gbk")
#print("\n>> receive: ", DATA, "\n>>", end="")
#print(">>", end="")
if(DATA == "quit"):
print("oppo seri has closen.\n>>", end="")
else:
if DATA_IN_FLAG:
#print("time.time() - START_TIME = %d \r\n"%(time.time()))
if time.time() - START_TIME > 0.010:#串口超时10ms
DATA_PRINT = bytearray(DATA_LEN)
for i in range(0, DATA_LEN):
DATA_PRINT[i] = DATA_ALL[i]
print("\n>> receive: ", DATA_PRINT, "\n>>", end="")
START_TIME = time.time()
DATA_IN_FLAG = 0
DATA_LEN = 0
# 打开串口
def open_seri(portx, bps, timeout):
ret = False
try:
# 打开串口,并得到串口对象
ser = serial.Serial(portx, bps, timeout=timeout)
# 判断是否成功打开
if(ser.is_open):
ret = True
th = threading.Thread(target=read_data, args=(ser,)) # 创建一个子线程去等待读数据
th.start()
except Exception as e:
print("error!", e)
return ser, ret
# 关闭串口
def close_seri(ser):
global NOEND
NOEND = False
ser.close()
# 写数据
def write_to_seri(ser, text):
res = ser.write(text.encode("gbk")) # 写
return res
# 读数据
def read_from_seri():
global DATA
data = DATA
DATA = "" #清空当次读取
return data
if __name__ == "__main__":
# ser, ret = open_seri("COM4", 115200, None) # 串口com3、bps为115200,等待时间为永久
# if ret == True: # 判断串口是否成功打开
# count = write_to_seri(ser, "exit")
# print("写入总字节数:", count)
# 打开一个串口
port = input("输入串口名:")
ser, ret = open_seri(port, 921600, None) # 串口com3、bps为115200,等待时间为永久
# oprate_lst = {"quit":close_seri}
# print("操作数字所对应的行为,1:read_from_seri 2:write_to_seri 3:close_seri: ")
while True:
text = input(">>")
write_to_seri(ser, text)
if text == "quit":
close_seri(ser)
print("bye!")
break
更多推荐
已为社区贡献2条内容
所有评论(0)