Python3

本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端库 ,实现 SX-IOT设备与 MQTT 服务器的连接、订阅、收发消息等功能。

paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库, 它在 Python 2.7.9+ 或 3.6+ 上为客户端类提供了对 MQTT v5.0,v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。

前提条件

安装依赖包

1sudo apt install python3 python3-pip -y
2python3 -m pip install paho-mqtt

连接使用

连接设置

本文将使用自定义的接入认证方式,服务器接入信息如下:

  • Broker: mqtt.geek-smart.cn

  • TCP Port: 1883

  • WebSocket Port: 8083

导入依赖包

1from paho.mqtt import client as mqtt_client

定义连接地址、认证信息以及消息发布主题

1BROKER = 'mqtt.geek-smart.cn'
2PORT = 1883
3PUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish"
4SUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe"
5CLIENT_ID = f'python-mqtt-tcp-client'
6USERNAME = '************'
7PASSWORD = '************'

定义消息发布函数

 1 def publish(client):
 2     msg_count = 0
 3     while not FLAG_EXIT:
 4         msg_dict = {
 5             'type': 'info'
 6         }
 7         msg = json.dumps(msg_dict)
 8         if not client.is_connected():
 9             logging.error("publish: MQTT client is not connected!")
10             time.sleep(1)
11             continue
12         result = client.publish(PUB_TOPIC, msg)
13         # result: [0, 1]
14         status = result[0]
15         if status == 0:
16             print(f'Send `{msg}` to topic `{PUB_TOPIC}`')
17         else:
18             print(f'Failed to send message to topic {PUB_TOPIC}')
19         msg_count += 1
20         time.sleep(1)

定义 on_message 回调函数,用于打印订阅主题接收的消息内容

1 def on_message(client, userdata, msg):
2     print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')

初始化 MQTT 客户端并订阅主题

 1 def on_connect(client, userdata, flags, rc):
 2     if rc == 0 and client.is_connected():
 3         print("Connected to MQTT Broker!")
 4         client.subscribe(SUB_TOPIC)
 5     else:
 6         print(f'Failed to connect, return code {rc}')
 7
 8 def connect_mqtt():
 9     client = mqtt_client.Client(CLIENT_ID)
10     client.username_pw_set(USERNAME, PASSWORD)
11     client.on_connect = on_connect
12     client.on_message = on_message
13     client.connect(BROKER, PORT, keepalive=120)
14     client.on_disconnect = on_disconnect
15     return client

完整代码

  1 import json
  2 import logging
  3 import time
  4
  5 from paho.mqtt import client as mqtt_client
  6
  7 BROKER = 'mqtt.geek-smart.cn'
  8 PORT = 1883
  9 PUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish"
 10 SUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe"
 11 CLIENT_ID = f'python-mqtt-tcp-client'
 12 USERNAME = '************'
 13 PASSWORD = '************'
 14
 15
 16 FIRST_RECONNECT_DELAY = 1
 17 RECONNECT_RATE = 2
 18 MAX_RECONNECT_COUNT = 12
 19 MAX_RECONNECT_DELAY = 60
 20
 21 FLAG_EXIT = False
 22
 23
 24 def on_connect(client, userdata, flags, rc):
 25     if rc == 0 and client.is_connected():
 26         print("Connected to MQTT Broker!")
 27         client.subscribe(SUB_TOPIC)
 28     else:
 29         print(f'Failed to connect, return code {rc}')
 30
 31
 32 def on_disconnect(client, userdata, rc):
 33     logging.info("Disconnected with result code: %s", rc)
 34     reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
 35     while reconnect_count < MAX_RECONNECT_COUNT:
 36         logging.info("Reconnecting in %d seconds...", reconnect_delay)
 37         time.sleep(reconnect_delay)
 38
 39         try:
 40             client.reconnect()
 41             logging.info("Reconnected successfully!")
 42             return
 43         except Exception as err:
 44             logging.error("%s. Reconnect failed. Retrying...", err)
 45
 46         reconnect_delay *= RECONNECT_RATE
 47         reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
 48         reconnect_count += 1
 49     logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
 50     global FLAG_EXIT
 51     FLAG_EXIT = True
 52
 53
 54 def on_message(client, userdata, msg):
 55     print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
 56
 57
 58 def connect_mqtt():
 59     client = mqtt_client.Client(CLIENT_ID)
 60     client.username_pw_set(USERNAME, PASSWORD)
 61     client.on_connect = on_connect
 62     client.on_message = on_message
 63     client.connect(BROKER, PORT, keepalive=120)
 64     client.on_disconnect = on_disconnect
 65     return client
 66
 67
 68 def publish(client):
 69     msg_count = 0
 70     while not FLAG_EXIT:
 71         msg_dict = {
 72             'type': 'info'
 73         }
 74         msg = json.dumps(msg_dict)
 75         if not client.is_connected():
 76             logging.error("publish: MQTT client is not connected!")
 77             time.sleep(1)
 78             continue
 79         result = client.publish(PUB_TOPIC, msg)
 80         # result: [0, 1]
 81         status = result[0]
 82         if status == 0:
 83             print(f'Send `{msg}` to topic `{PUB_TOPIC}`')
 84         else:
 85             print(f'Failed to send message to topic {PUB_TOPIC}')
 86         msg_count += 1
 87         time.sleep(1)
 88
 89
 90 def run():
 91     logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
 92                         level=logging.DEBUG)
 93     client = connect_mqtt()
 94     client.loop_start()
 95     time.sleep(1)
 96     if client.is_connected():
 97         publish(client)
 98     else:
 99         client.loop_stop()
100
101
102 if __name__ == '__main__':
103     run()

测试验证

运行

1python3 pub_sub_tcp.py