.. vim: syntax=rst .. highlight:: sh Java ============================ 本文主要介绍如何在 Java 项目中,使用 [Eclipse Paho Java Client](https://github.com/eclipse/paho.mqtt.java) 库实现一个 MQTT 客戶端与 SX-IOT物联网设备的连接、订阅、收发消息等功能。 前提条件 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 安装依赖包 ------------------ .. code-block:: bash :linenos: sudo apt install openjdk-11-jdk maven -v cd mqtt-client-Java-paho mvn install 连接使用 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 连接设置 ------------------ 本文将使用自定义的接入认证方式,服务器接入信息如下: - Broker: **mqtt.geek-smart.cn** - TCP Port: **1883** - WebSocket Port: **8083** 导入依赖包 ------------------ .. code-block:: java :linenos: package io.geekopen.mqtt; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 定义连接地址、认证信息以及消息发布主题 ------------------ .. code-block:: java :linenos: String sub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/subscribe"; String pub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/publish"; String content = "{\"type\":\"info\"}"; int qos = 0; String broker = "tcp://mqtt.geek-smart.cn:1883"; String username = "xxxxxxxxxxxxx"; String password = "xxxxxxxxxxxxx"; String clientId = MqttClient.generateClientId(); 消息发布 ------------------ .. code-block:: java :linenos: MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(pub_topic, message); Subscribe 回调函数,用于打印订阅主题接收的消息内容 ------------------------------------------------------------------------ .. code-block:: java :linenos: MqttClient client = new MqttClient(broker, clientId, persistence); client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { System.out.println("connectionLost: " + cause.getMessage()); } public void messageArrived(String sub_topic, MqttMessage message) { System.out.println("topic: " + sub_topic); System.out.println("Qos: " + message.getQos()); System.out.println("message content: " + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }); 初始化 MQTT 客户端并订阅主题 ------------------------------------------------------------------------ .. code-block:: java :linenos: String clientId = MqttClient.generateClientId(); MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(username); connOpts.setPassword(password.toCharArray()); connOpts.setConnectionTimeout(60); connOpts.setKeepAliveInterval(60); 完整代码 ------------------------------------------------------------------------ .. code-block:: java :linenos: package io.geekopen.mqtt; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class MqttSample { public static void main(String[] args) { String sub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/subscribe"; String pub_topic = "/HYUGHV/lVtAcHuorxxx/4cebd60bfxxx/publish"; String content = "{\"type\":\"info\"}"; int qos = 0; String broker = "tcp://mqtt.geek-smart.cn:1883"; String username = "xxxxxxxxxxxxx"; String password = "xxxxxxxxxxxxx"; String clientId = MqttClient.generateClientId(); MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setUserName(username); connOpts.setPassword(password.toCharArray()); connOpts.setConnectionTimeout(60); connOpts.setKeepAliveInterval(60); try { MqttClient client = new MqttClient(broker, clientId, persistence); client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { System.out.println("connectionLost: " + cause.getMessage()); } public void messageArrived(String sub_topic, MqttMessage message) { System.out.println("topic: " + sub_topic); System.out.println("Qos: " + message.getQos()); System.out.println("message content: " + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }); System.out.println("Connecting to broker: " + broker); client.connect(connOpts); System.out.println("Connected to broker: " + broker); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); client.publish(pub_topic, message); System.out.println("Message published"); client.subscribe(sub_topic, qos); System.out.println("Subscribed to topic: " + sub_topic); System.out.println("Subscribed to message: " + message); // client.disconnect(); // System.out.println("Disconnected"); // client.close(); // System.exit(0); } catch (MqttException me) { System.out.println("reason " + me.getReasonCode()); System.out.println("msg " + me.getMessage()); System.out.println("loc " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("excep " + me); me.printStackTrace(); } } } 测试验证 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 运行 ------------------------------------------------------------------------ .. code-block:: bash :linenos: mvn compile mvn exec:java -Dexec.mainClass="io.geekopen.mqtt.MqttExample" # Only Publish mvn exec:java -Dexec.mainClass="io.geekopen.mqtt.PublishSample" # Only Subscribe mvn exec:java -Dexec.mainClass="io.geekopen.mqtt.SubscribeSample"