Go
本文主要介绍如何在 Go 项目中,使用 paho.mqtt.golang 库实现一个 MQTT 客戶端与 SX-IOT物联网设备的连接、订阅、收发消息等功能。
Go 是 Google 开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。而 paho.mqtt.golang 是一个 MQTT 库,它提供了一个简单的 API,用于在 Go 项目中连接到 MQTT 服务器,并发送和接收消息。
前提条件
安装依赖包
1 sudo apt install golang-go -y
2 go env -w GOPROXY=https://goproxy.cn
3 cd mqtt-client-Go-paho
4 go mod init mqtt-client-Go-paho
5 go get github.com/eclipse/paho.mqtt.golang
连接使用
连接设置
本文将使用自定义的接入认证方式,服务器接入信息如下:
Broker: mqtt.geek-smart.cn
TCP Port: 1883
WebSocket Port: 8083
导入依赖包
1 import (
2 "fmt"
3 "log"
4 "time"
5
6 mqtt "github.com/eclipse/paho.mqtt.golang"
7 )
定义连接地址、认证信息以及消息发布主题
1const protocol = "tcp"
2const broker = "mqtt.geek-smart.cn"
3const port = 1883
4const sub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe"
5const pub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish"
6const username = "************"
7const password = "************"
定义消息发布函数
1 func publish(client mqtt.Client) {
2 qos := 0
3 msgCount := 0
4 for {
5 payload := `{"type":"info"}`
6 if token := client.Publish(pub_topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
7 fmt.Printf("publish failed, topic: %s, payload: %s\n", pub_topic, payload)
8 } else {
9 fmt.Printf("publish success, topic: %s, payload: %s\n", pub_topic, payload)
10 }
11 msgCount++
12 time.Sleep(time.Second * 1)
13 }
14 }
定义 on_message 回调函数,用于打印订阅主题接收的消息内容
1 func subscribe(client mqtt.Client) {
2 qos := 0
3 client.Subscribe(sub_topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
4 fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic())
5 })
6 }
初始化 MQTT 客户端并订阅主题
1 func createMqttClient() mqtt.Client {
2 connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port)
3 clientID := fmt.Sprintf("go-client")
4
5 fmt.Println("connect address: ", connectAddress)
6 opts := mqtt.NewClientOptions()
7 opts.AddBroker(connectAddress)
8 opts.SetUsername(username)
9 opts.SetPassword(password)
10 opts.SetClientID(clientID)
11 opts.SetKeepAlive(time.Second * 60)
12
13 client := mqtt.NewClient(opts)
14 token := client.Connect()
15 if token.WaitTimeout(3*time.Second) && token.Error() != nil {
16 log.Fatal(token.Error())
17 }
18 return client
19 }
完整代码
1 package main
2
3 import (
4 "fmt"
5 "log"
6 "time"
7
8 mqtt "github.com/eclipse/paho.mqtt.golang"
9 )
10
11 const protocol = "tcp"
12 const broker = "mqtt.geek-smart.cn"
13 const port = 1883
14 const sub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe"
15 const pub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish"
16 const username = "************"
17 const password = "************"
18
19 func main() {
20 client := createMqttClient()
21 go subscribe(client) // 在主函数里, 我们用另起一个 go 协程来订阅消息
22 time.Sleep(time.Second * 1) // 暂停一秒等待 subscribe 完成
23 publish(client)
24 }
25
26 func createMqttClient() mqtt.Client {
27 connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port)
28 clientID := fmt.Sprintf("go-client")
29
30 fmt.Println("connect address: ", connectAddress)
31 opts := mqtt.NewClientOptions()
32 opts.AddBroker(connectAddress)
33 opts.SetUsername(username)
34 opts.SetPassword(password)
35 opts.SetClientID(clientID)
36 opts.SetKeepAlive(time.Second * 60)
37
38 client := mqtt.NewClient(opts)
39 token := client.Connect()
40 if token.WaitTimeout(3*time.Second) && token.Error() != nil {
41 log.Fatal(token.Error())
42 }
43 return client
44 }
45
46 func publish(client mqtt.Client) {
47 qos := 0
48 msgCount := 0
49 for {
50 payload := `{"type":"info"}`
51 if token := client.Publish(pub_topic, byte(qos), false, payload); token.Wait() && token.Error() != nil {
52 fmt.Printf("publish failed, topic: %s, payload: %s\n", pub_topic, payload)
53 } else {
54 fmt.Printf("publish success, topic: %s, payload: %s\n", pub_topic, payload)
55 }
56 msgCount++
57 time.Sleep(time.Second * 1)
58 }
59 }
60
61 func subscribe(client mqtt.Client) {
62 qos := 0
63 client.Subscribe(sub_topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) {
64 fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic())
65 })
66 }
测试验证
运行
1go run main.go