Node
本文主要介绍如何在 Node.js 项目中使用 MQTT.js 客户端库 ,实现 SX-IOT设备与 MQTT 服务器的连接、订阅、收发消息等功能。
前提条件
安装依赖包
1sudo apt install nodejs npm -y
2npm install mqtt
连接使用
连接设置
本文将使用自定义的接入认证方式,服务器接入信息如下:
Broker: mqtt.geek-smart.cn
TCP Port: 1883
WebSocket Port: 8083
导入依赖包
1 const mqtt = require('mqtt')
2 /**
3 choose which protocol to use for connection here
4 * TCP Port: 1883
5 * WebSocket port: 8083
6 */
7 // tcp
8 const { connectOptions } = require('./use_mqtt.js')
9 // ws
10 // const { connectOptions } = require('./use_ws.js')
定义连接地址、认证信息以及消息发布主题
1 // tcp
2 const connectOptions = {
3 protocol: 'mqtt',
4 port: 1883,
5 host: 'mqtt.geek-smart.cn',
6 }
7
8 // ws
9 // const connectOptions = {
10 // protocol: 'ws',
11 // port: 8083,
12 // host: 'mqtt.geek-smart.cn',
13 // }
14
15 module.exports = {
16 connectOptions,
17 }
18 const pub_topic = '/HYUGHV/lVtAcHuorFDJ/4cebd60bfada/publish'
19 const sub_topic = '/HYUGHV/lVtAcHuorFDJ/4cebd60bfada/subscribe'
20 // tcp
21 const clientId = 'nodejs-mqtt-tcp-client'
22 // ws
23 // const clientId = 'nodejs-mqtt-ws-client'
24 const options = {
25 clientId,
26 clean: true,
27 connectTimeout: 4000,
28 username: 'OGVkSlBrskPY',
29 password: 'XaIkqRXwcNuuQjXGPi',
30 reconnectPeriod: 1000,
31 }
定义消息发布函数
1 client.publish(pub_topic, payload, { qos }, (error) => {
2 if (error) {
3 console.error(error)
4 }
5 })
定义 on_message 回调函数,用于打印订阅主题接收的消息内容
1 client.on('message', (sub_topic, payload) => {
2 console.log('Received Message:', sub_topic, payload.toString())
3 })
连接 MQTT
1 client.on('connect', () => {
2 console.log(`${protocol}: Connected`)
3 })
完整代码
1 const mqtt = require('mqtt')
2 /**
3 choose which protocol to use for connection here
4 * TCP Port: 1883
5 * WebSocket port: 8083
6 */
7 // tcp
8 const { connectOptions } = require('./use_mqtt.js')
9 // ws
10 // const { connectOptions } = require('./use_ws.js')
11
12 const clientId = 'nodejs-mqtt-tcp-client'
13 // const clientId = 'nodejs-mqtt-ws-client'
14 const options = {
15 clientId,
16 clean: true,
17 connectTimeout: 4000,
18 username: 'OGVkSlBrskPY',
19 password: 'XaIkqRXwcNuuQjXGPi',
20 reconnectPeriod: 1000,
21 }
22
23 const { protocol, host, port } = connectOptions
24 /**
25 * if protocol is "mqtt", connectUrl = "mqtt://mqtt.geek-smart.cn:1883"
26 * if protocol is "ws", connectUrl = "ws://mqtt.geek-smart.cn:8083/mqtt"
27 *
28 * for more details about "mqtt.connect" method & options,
29 * please refer to https://github.com/mqttjs/MQTT.js#mqttconnecturl-options
30 */
31 let connectUrl = `${protocol}://${host}:${port}`
32 if (['ws', 'wss'].includes(protocol)) {
33 // mqtt: MQTT-WebSocket uniformly uses /path as the connection path,
34 // which should be specified when connecting, and the path used on EMQX is /mqtt.
35 connectUrl += '/mqtt'
36 }
37
38 const client = mqtt.connect(connectUrl, options)
39
40 const pub_topic = '/HYUGHV/lVtAcHuorFDJ/4cebd60bfada/publish'
41 const sub_topic = '/HYUGHV/lVtAcHuorFDJ/4cebd60bfada/subscribe'
42 const payload = '{"type":"info"}'
43 // https://github.com/mqttjs/MQTT.js#qos
44 const qos = 0
45
46 // https://github.com/mqttjs/MQTT.js#event-connect
47 client.on('connect', () => {
48 console.log(`${protocol}: Connected`)
49
50 // subscribe topic
51 // https://github.com/mqttjs/MQTT.js#mqttclientsubscribetopictopic-arraytopic-object-options-callback
52 client.subscribe(sub_topic, { qos }, (error) => {
53 if (error) {
54 console.log('subscribe error:', error)
55 return
56 }
57 console.log(`${protocol}: Subscribe to topic '${sub_topic}'`)
58 // publish message
59 // https://github.com/mqttjs/MQTT.js#mqttclientpublishtopic-message-options-callback
60 client.publish(pub_topic, payload, { qos }, (error) => {
61 if (error) {
62 console.error(error)
63 }
64 })
65 })
66 })
67
68 // https://github.com/mqttjs/MQTT.js#event-reconnect
69 client.on('reconnect', (error) => {
70 console.log(`Reconnecting(${protocol}):`, error)
71 })
72
73 // https://github.com/mqttjs/MQTT.js#event-error
74 client.on('error', (error) => {
75 console.log(`Cannot connect(${protocol}):`, error)
76 })
77
78 // https://github.com/mqttjs/MQTT.js#event-message
79 client.on('message', (sub_topic, payload) => {
80 console.log('Received Message:', sub_topic, payload.toString())
81 })
82
83 /**
84 * If you need to unsubscribe from a topic, you can use the following code.
85 */
86 // // unsubscribe topic
87 // // https://github.com/mqttjs/MQTT.js#mqttclientunsubscribetopictopic-array-options-callback
88 // client.unsubscribe(sub_topic, { qos }, (error) => {
89 // if (error) {
90 // console.log('unsubscribe error:', error)
91 // return
92 // }
93 // console.log(`unsubscribed topic: ${sub_topic}`)
94 // })
95
96 /**
97 * If you need to disconnect, you can use the following code.
98 */
99 // if (client.connected) {
100 // try {
101 // // disconnect
102 // // https://github.com/mqttjs/MQTT.js#mqttclientendforce-options-callback
103 // client.end(false, () => {
104 // console.log('disconnected successfully')
105 // })
106 // } catch (error) {
107 // console.log('disconnect error:', error)
108 // }
109 // }
测试验证
运行
1npm start