Java

本文主要介绍如何在 Java 项目中,使用 [Eclipse Paho Java Client](https://github.com/eclipse/paho.mqtt.java) 库实现一个 MQTT 客戶端与 SX-IOT物联网设备的连接、订阅、收发消息等功能。

前提条件

安装依赖包

1sudo apt install openjdk-11-jdk maven -v
2cd mqtt-client-Java-paho
3mvn install

连接使用

连接设置

本文将使用自定义的接入认证方式,服务器接入信息如下:

  • Broker: mqtt.geek-smart.cn

  • TCP Port: 1883

  • WebSocket Port: 8083

导入依赖包

1 package io.geekopen.mqtt;
2
3 import org.eclipse.paho.client.mqttv3.*;
4 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

定义连接地址、认证信息以及消息发布主题

1String sub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/subscribe";
2String pub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/publish";
3String content = "{\"type\":\"info\"}";
4int qos = 0;
5String broker = "tcp://mqtt.geek-smart.cn:1883";
6String username = "xxxxxxxxxxxxx";
7String password = "xxxxxxxxxxxxx";
8String clientId = MqttClient.generateClientId();

消息发布

1 MqttMessage message = new MqttMessage(content.getBytes());
2 message.setQos(qos);
3 client.publish(pub_topic, message);

Subscribe 回调函数,用于打印订阅主题接收的消息内容

 1 MqttClient client = new MqttClient(broker, clientId, persistence);
 2
 3 client.setCallback(new MqttCallback() {
 4     public void connectionLost(Throwable cause) {
 5         System.out.println("connectionLost: " + cause.getMessage());
 6     }
 7     public void messageArrived(String sub_topic, MqttMessage message) {
 8         System.out.println("topic: " + sub_topic);
 9         System.out.println("Qos: " + message.getQos());
10         System.out.println("message content: " + new String(message.getPayload()));
11     }
12     public void deliveryComplete(IMqttDeliveryToken token) {
13         System.out.println("deliveryComplete---------" + token.isComplete());
14     }
15 });

初始化 MQTT 客户端并订阅主题

1 String clientId = MqttClient.generateClientId();
2 MemoryPersistence persistence = new MemoryPersistence();
3 MqttConnectOptions connOpts = new MqttConnectOptions();
4 connOpts.setCleanSession(true);
5 connOpts.setUserName(username);
6 connOpts.setPassword(password.toCharArray());
7 connOpts.setConnectionTimeout(60);
8 connOpts.setKeepAliveInterval(60);

完整代码

 1 package io.geekopen.mqtt;
 2
 3 import org.eclipse.paho.client.mqttv3.*;
 4 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 5
 6
 7 public class MqttSample {
 8     public static void main(String[] args) {
 9         String sub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/subscribe";
10         String pub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/publish";
11         String content = "{\"type\":\"info\"}";
12         int qos = 0;
13         String broker = "tcp://mqtt.geek-smart.cn:1883";
14         String username = "xxxxxxxxxxxxx";
15         String password = "xxxxxxxxxxxxx";
16         String clientId = MqttClient.generateClientId();
17         MemoryPersistence persistence = new MemoryPersistence();
18         MqttConnectOptions connOpts = new MqttConnectOptions();
19         connOpts.setCleanSession(true);
20         connOpts.setUserName(username);
21         connOpts.setPassword(password.toCharArray());
22         connOpts.setConnectionTimeout(60);
23         connOpts.setKeepAliveInterval(60);
24
25         try {
26             MqttClient client = new MqttClient(broker, clientId, persistence);
27
28             client.setCallback(new MqttCallback() {
29
30                 public void connectionLost(Throwable cause) {
31                     System.out.println("connectionLost: " + cause.getMessage());
32                 }
33
34                 public void messageArrived(String sub_topic, MqttMessage message) {
35                     System.out.println("topic: " + sub_topic);
36                     System.out.println("Qos: " + message.getQos());
37                     System.out.println("message content: " + new String(message.getPayload()));
38
39                 }
40
41                 public void deliveryComplete(IMqttDeliveryToken token) {
42                     System.out.println("deliveryComplete---------" + token.isComplete());
43                 }
44
45             });
46
47             System.out.println("Connecting to broker: " + broker);
48             client.connect(connOpts);
49             System.out.println("Connected to broker: " + broker);
50
51             MqttMessage message = new MqttMessage(content.getBytes());
52             message.setQos(qos);
53             client.publish(pub_topic, message);
54             System.out.println("Message published");
55             client.subscribe(sub_topic, qos);
56             System.out.println("Subscribed to topic: " + sub_topic);
57             System.out.println("Subscribed to message: " + message);
58             // client.disconnect();
59             // System.out.println("Disconnected");
60             // client.close();
61             // System.exit(0);
62         } catch (MqttException me) {
63             System.out.println("reason " + me.getReasonCode());
64             System.out.println("msg " + me.getMessage());
65             System.out.println("loc " + me.getLocalizedMessage());
66             System.out.println("cause " + me.getCause());
67             System.out.println("excep " + me);
68             me.printStackTrace();
69         }
70     }
71
72 }

测试验证

运行

1 mvn compile
2 mvn exec:java -Dexec.mainClass="io.geekopen.mqtt.MqttExample"
3 # Only Publish
4 mvn exec:java -Dexec.mainClass="io.geekopen.mqtt.PublishSample"
5 # Only Subscribe
6 mvn exec:java -Dexec.mainClass="io.geekopen.mqtt.SubscribeSample"