paho 连接mqtt比较方便和稳定.

import orjson
import paho.mqtt.client as mqtt
import threading

from config.config_app import AppConfig
from config.config_machine import MachineConfig
import logging

BH =  MachineConfig.SiteCode
 
def register(client):
  '''
   zhu ce she bei xin xi 
  '''
  info = {
      "version": AppConfig.version, 
      "device_type": AppConfig.device_type,
      "firmware_version": AppConfig.firmware_version,
      "SiteCode": MachineConfig.SiteCode ,  #="SiteCodeAAAAAAAAAAA" 
      "SiteName": MachineConfig.SiteName ,  #="XX小区3号站"
      "City": MachineConfig.City ,  # = "3301" #浙江省
      "Area": MachineConfig.Area ,  # = "330127"#杭州市
      "Street": MachineConfig.Street ,  # = "330127111" #淳安县
      "District": MachineConfig.District ,  # = "330127111000" #枫树岭镇
      # "Quarter": MachineConfig.Quarter ,  # = 158 #  来自quarters 表, 
      "QuarterName": MachineConfig.QuarterName ,  #="XX小区"
      "gps_lng":0,
      "gps_lat":0
  }

  mqtt_client.publish(f"/hhh/to_server/{BH}/register",orjson.dumps(info).decode("utf-8"),qos=0)



def on_connect_fail(client, userdata, flags, rc):
  print("MQTT Connect fail with result code " + str(rc))
  

def on_publish(client, userdata, mid):
  print("MQTT Publish message with mid " + str(mid))

def on_connect(client, userdata, flags_dict, reason, properties):
  print("MQTT Connected with result code " + str(reason))
  register(client)

def on_message(client, userdata, message):
  print("MQTT Received message: " + message.payload.decode())
  # 解析服务器反馈的消息,获取已接收的ID列表
#   received_ids = orjson.loads(message.payload.decode())
  # 从records中移除已接收的记录
#   records = [record for record in records if record["id"] not in received_ids]
def on_log(client, userdata,level,message):
    print("mqtt.py log",level,message)
    # logging.error(message)

def send_mqtt_message(topic, message):
    if(mqtt_client.is_connected()== False):
      mqtt_client.connect(AppConfig.mqtt_host, AppConfig.mqtt_port, 60)
    
    mqttmsg = mqtt_client.publish(topic,message,qos=1)
    mqttmsg.wait_for_publish()#must wait else not publish complete


# 创建MQTT客户端
mqtt_client_id = f"ZNY_{BH}"
mqtt_client = mqtt.Client(mqtt_client_id, userdata=None, protocol=mqtt.MQTTv5)
mqtt_client.enable_logger()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.on_connect_fail= on_connect_fail
mqtt_client.on_log= on_log
mqtt_client.on_publish = on_publish

 
def StartConnect():
	
  BH =  MachineConfig.SiteCode

  mqtt_client.username_pw_set(AppConfig.mqtt_username, AppConfig.mqtt_password)
  #
  mqtt_client.will_set(f"/hhh/to_server/{BH}/disconnected", f"{BH} disconnected ", 0, False)
  #
  mqtt_client.connect(AppConfig.mqtt_host, AppConfig.mqtt_port, 60)
  # mqtt_client.loop_forever()
  mqtt_client.loop_start()
  
  
# 创建一个独立的线程来运行MQTT客户端
mqtt_thread = threading.Thread(target=StartConnect)
mqtt_thread.start()

# 等待MQTT客户端完成
mqtt_thread.join()

  # 在其他线程中发送消息
#   send_message("my/topic", "Hello, MQTT!")
  
   

其中需要注意的是下面 这段代码必须要, 否则发送大图片的时候会因为时间太长导致发送不完导致超时失败.

    mqttmsg.wait_for_publish()#must wait else not publish complete

使用的时候只需要 即可

 send_mqtt_message("/ys/fewafw/tosave", message):
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐