.. vim: syntax=rst .. highlight:: sh Go ============================ 本文主要介绍如何在 Go 项目中,使用 paho.mqtt.golang 库实现一个 MQTT 客戶端与 SX-IOT物联网设备的连接、订阅、收发消息等功能。 Go 是 Google 开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言。而 paho.mqtt.golang 是一个 MQTT 库,它提供了一个简单的 API,用于在 Go 项目中连接到 MQTT 服务器,并发送和接收消息。 前提条件 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 安装依赖包 ------------------ .. code-block:: bash :linenos: sudo apt install golang-go -y go env -w GOPROXY=https://goproxy.cn cd mqtt-client-Go-paho go mod init mqtt-client-Go-paho go get github.com/eclipse/paho.mqtt.golang 连接使用 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 连接设置 ------------------ 本文将使用自定义的接入认证方式,服务器接入信息如下: - Broker: **mqtt.geek-smart.cn** - TCP Port: **1883** - WebSocket Port: **8083** 导入依赖包 ------------------ .. code-block:: golang :linenos: import ( "fmt" "log" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) 定义连接地址、认证信息以及消息发布主题 ------------------ .. code-block:: golang :linenos: const protocol = "tcp" const broker = "mqtt.geek-smart.cn" const port = 1883 const sub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe" const pub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish" const username = "************" const password = "************" 定义消息发布函数 ------------------ .. code-block:: golang :linenos: func publish(client mqtt.Client) { qos := 0 msgCount := 0 for { payload := `{"type":"info"}` if token := client.Publish(pub_topic, byte(qos), false, payload); token.Wait() && token.Error() != nil { fmt.Printf("publish failed, topic: %s, payload: %s\n", pub_topic, payload) } else { fmt.Printf("publish success, topic: %s, payload: %s\n", pub_topic, payload) } msgCount++ time.Sleep(time.Second * 1) } } 定义 on_message 回调函数,用于打印订阅主题接收的消息内容 ------------------------------------------------------------------------ .. code-block:: golang :linenos: func subscribe(client mqtt.Client) { qos := 0 client.Subscribe(sub_topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic()) }) } 初始化 MQTT 客户端并订阅主题 ------------------------------------------------------------------------ .. code-block:: golang :linenos: func createMqttClient() mqtt.Client { connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port) clientID := fmt.Sprintf("go-client") fmt.Println("connect address: ", connectAddress) opts := mqtt.NewClientOptions() opts.AddBroker(connectAddress) opts.SetUsername(username) opts.SetPassword(password) opts.SetClientID(clientID) opts.SetKeepAlive(time.Second * 60) client := mqtt.NewClient(opts) token := client.Connect() if token.WaitTimeout(3*time.Second) && token.Error() != nil { log.Fatal(token.Error()) } return client } 完整代码 ------------------------------------------------------------------------ .. code-block:: golang :linenos: package main import ( "fmt" "log" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) const protocol = "tcp" const broker = "mqtt.geek-smart.cn" const port = 1883 const sub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/subscribe" const pub_topic = "/HYUGHV/lVtAcHuor***/4cebd60bf***/publish" const username = "************" const password = "************" func main() { client := createMqttClient() go subscribe(client) // 在主函数里, 我们用另起一个 go 协程来订阅消息 time.Sleep(time.Second * 1) // 暂停一秒等待 subscribe 完成 publish(client) } func createMqttClient() mqtt.Client { connectAddress := fmt.Sprintf("%s://%s:%d", protocol, broker, port) clientID := fmt.Sprintf("go-client") fmt.Println("connect address: ", connectAddress) opts := mqtt.NewClientOptions() opts.AddBroker(connectAddress) opts.SetUsername(username) opts.SetPassword(password) opts.SetClientID(clientID) opts.SetKeepAlive(time.Second * 60) client := mqtt.NewClient(opts) token := client.Connect() if token.WaitTimeout(3*time.Second) && token.Error() != nil { log.Fatal(token.Error()) } return client } func publish(client mqtt.Client) { qos := 0 msgCount := 0 for { payload := `{"type":"info"}` if token := client.Publish(pub_topic, byte(qos), false, payload); token.Wait() && token.Error() != nil { fmt.Printf("publish failed, topic: %s, payload: %s\n", pub_topic, payload) } else { fmt.Printf("publish success, topic: %s, payload: %s\n", pub_topic, payload) } msgCount++ time.Sleep(time.Second * 1) } } func subscribe(client mqtt.Client) { qos := 0 client.Subscribe(sub_topic, byte(qos), func(client mqtt.Client, msg mqtt.Message) { fmt.Printf("Received `%s` from `%s` topic\n", msg.Payload(), msg.Topic()) }) } 测试验证 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 运行 ------------------------------------------------------------------------ .. code-block:: bash :linenos: go run main.go