Python3
本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端库 ,实现 SX-IOT设备与 MQTT 服务器的连接、订阅、收发消息等功能。
paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库, 它在 Python 2.7.9+ 或 3.6+ 上为客户端类提供了对 MQTT v5.0,v3.1 和 v3.1.1 的支持。它还提供了一些帮助程序功能,使将消息发布到 MQTT 服务器变得非常简单。
前提条件
安装依赖包
1sudo apt install python3 python3-pip -y
2python3 -m pip install paho-mqtt
连接使用
连接设置
本文将使用自定义的接入认证方式,服务器接入信息如下:
Broker: mqtt.geek-smart.cn
TCP Port: 1883
WebSocket Port: 8083
导入依赖包
1from paho.mqtt import client as mqtt_client
定义连接地址、认证信息以及消息发布主题
1BROKER = 'mqtt.geek-smart.cn'
2PORT = 1883
3PUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish"
4SUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe"
5CLIENT_ID = f'python-mqtt-tcp-client'
6USERNAME = '************'
7PASSWORD = '************'
定义消息发布函数
1 def publish(client):
2 msg_count = 0
3 while not FLAG_EXIT:
4 msg_dict = {
5 'type': 'info'
6 }
7 msg = json.dumps(msg_dict)
8 if not client.is_connected():
9 logging.error("publish: MQTT client is not connected!")
10 time.sleep(1)
11 continue
12 result = client.publish(PUB_TOPIC, msg)
13 # result: [0, 1]
14 status = result[0]
15 if status == 0:
16 print(f'Send `{msg}` to topic `{PUB_TOPIC}`')
17 else:
18 print(f'Failed to send message to topic {PUB_TOPIC}')
19 msg_count += 1
20 time.sleep(1)
定义 on_message 回调函数,用于打印订阅主题接收的消息内容
1 def on_message(client, userdata, msg):
2 print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
初始化 MQTT 客户端并订阅主题
1 def on_connect(client, userdata, flags, rc):
2 if rc == 0 and client.is_connected():
3 print("Connected to MQTT Broker!")
4 client.subscribe(SUB_TOPIC)
5 else:
6 print(f'Failed to connect, return code {rc}')
7
8 def connect_mqtt():
9 client = mqtt_client.Client(CLIENT_ID)
10 client.username_pw_set(USERNAME, PASSWORD)
11 client.on_connect = on_connect
12 client.on_message = on_message
13 client.connect(BROKER, PORT, keepalive=120)
14 client.on_disconnect = on_disconnect
15 return client
完整代码
1 import json
2 import logging
3 import time
4
5 from paho.mqtt import client as mqtt_client
6
7 BROKER = 'mqtt.geek-smart.cn'
8 PORT = 1883
9 PUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish"
10 SUB_TOPIC = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe"
11 CLIENT_ID = f'python-mqtt-tcp-client'
12 USERNAME = '************'
13 PASSWORD = '************'
14
15
16 FIRST_RECONNECT_DELAY = 1
17 RECONNECT_RATE = 2
18 MAX_RECONNECT_COUNT = 12
19 MAX_RECONNECT_DELAY = 60
20
21 FLAG_EXIT = False
22
23
24 def on_connect(client, userdata, flags, rc):
25 if rc == 0 and client.is_connected():
26 print("Connected to MQTT Broker!")
27 client.subscribe(SUB_TOPIC)
28 else:
29 print(f'Failed to connect, return code {rc}')
30
31
32 def on_disconnect(client, userdata, rc):
33 logging.info("Disconnected with result code: %s", rc)
34 reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
35 while reconnect_count < MAX_RECONNECT_COUNT:
36 logging.info("Reconnecting in %d seconds...", reconnect_delay)
37 time.sleep(reconnect_delay)
38
39 try:
40 client.reconnect()
41 logging.info("Reconnected successfully!")
42 return
43 except Exception as err:
44 logging.error("%s. Reconnect failed. Retrying...", err)
45
46 reconnect_delay *= RECONNECT_RATE
47 reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
48 reconnect_count += 1
49 logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
50 global FLAG_EXIT
51 FLAG_EXIT = True
52
53
54 def on_message(client, userdata, msg):
55 print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
56
57
58 def connect_mqtt():
59 client = mqtt_client.Client(CLIENT_ID)
60 client.username_pw_set(USERNAME, PASSWORD)
61 client.on_connect = on_connect
62 client.on_message = on_message
63 client.connect(BROKER, PORT, keepalive=120)
64 client.on_disconnect = on_disconnect
65 return client
66
67
68 def publish(client):
69 msg_count = 0
70 while not FLAG_EXIT:
71 msg_dict = {
72 'type': 'info'
73 }
74 msg = json.dumps(msg_dict)
75 if not client.is_connected():
76 logging.error("publish: MQTT client is not connected!")
77 time.sleep(1)
78 continue
79 result = client.publish(PUB_TOPIC, msg)
80 # result: [0, 1]
81 status = result[0]
82 if status == 0:
83 print(f'Send `{msg}` to topic `{PUB_TOPIC}`')
84 else:
85 print(f'Failed to send message to topic {PUB_TOPIC}')
86 msg_count += 1
87 time.sleep(1)
88
89
90 def run():
91 logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
92 level=logging.DEBUG)
93 client = connect_mqtt()
94 client.loop_start()
95 time.sleep(1)
96 if client.is_connected():
97 publish(client)
98 else:
99 client.loop_stop()
100
101
102 if __name__ == '__main__':
103 run()
测试验证
运行
1python3 pub_sub_tcp.py