网站建设自主开发的三种方式,wordpress首页文章缩略图插件,广州市增城建设局网站,用WordPress管理app目录
一、前言
二、MQTT协议概述
概念
基本原理
MQTT协议的结构
MQTT的QoS机制
QoS 0#xff1a;最多一次传输
QoS 1#xff1a;至少一次传输
QoS 2#xff1a;恰好一次传输
三、MQTT的应用场景
四、MQTT的优点和缺点
五、MQTT协议的实现
六、实战体验MQTT
…目录
一、前言
二、MQTT协议概述
概念
基本原理
MQTT协议的结构
MQTT的QoS机制
QoS 0最多一次传输
QoS 1至少一次传输
QoS 2恰好一次传输
三、MQTT的应用场景
四、MQTT的优点和缺点
五、MQTT协议的实现
六、实战体验MQTT
七、MQTT协议的未来发展 一、前言
随着物联网和智能化应用的快速发展,对于通信协议的需求越来越多样化和复杂化对于物联网应用来说基于TCP/IP的协议MQTTMessage Queuing Telemetry Transport正逐渐成为主流的协议之一。本文将对MQTT协议的相关概念、基本原理、应用场景等进行介绍和分析。 二、MQTT协议概述
概念
MQTTMessage Queuing Telemetry Transport是一种轻量级的发布/订阅消息传输协议它被设计用于低带宽和不稳定的网络环境中比如远程传感器和移动设备等。它支持一对多的消息传输可以被广泛应用于物联网、移动应用程序、工业自动化、金融服务等领域。
基本原理
MQTT使用基于TCP/IP协议的可靠传输机制来实现消息传输并采用发布/订阅模式来处理消息。在MQTT中发布者将消息发布到一个主题Topic上订阅者则可以订阅该主题以接收发布者发布的消息。MQTT Broker充当一个中间件来传递这些消息。MQTT支持三种QoS服务质量等级从而实现不同的消息传输可靠性和实时性。此外MQTT还支持各种各样的客户端包括C语言、Java、Python等等因此它被广泛用于物联网、移动应用程序、工业自动化等领域。
MQTT协议的结构
MQTT协议主要由三部分组成分别是固定头部、可变头部和消息体其中固定头部和可变头部的长度是固定的消息体的长度是可变的。 固定头部固定头部由两个部分组成第一个字节的前四位表示消息类型MessageType剩下的四位是标识符标志Flag。前四位二进制可表示16种消息类型如下图所示 第一个字节后四位中DUP表示发布消息的副本用来保证消息的可靠传输。消息重传时DUP标志被设置为1。而两位的Qos则表示消息的服务质量以二进制表示
00最多一次传输01至少一次传输10恰好一次传输11预留标识
RETAIN作为发布保留标识发布保留标识表示服务器要保留这次推送的信息如果有新的订阅者出现就把这消息推送给它如果设有那么推送至当前订阅者后释放。
第二个字节为剩余长度用来保存变长头部和消息体的总大小的但不是直接保存的。这一字节是可以扩展其保存机制前7位用于保存长度后一部用做标识。当最后一位为 1时表示长度不足需要使用二个字节继续保存。
可变头部可变头部长度根据消息类型不同而不同一般包括报文标识符Packet Identifier和主题Topic等信息。 消息体消息体的长度也根据消息类型不同而不同主要包括消息正文和可选的附加属性等信息。 对于MQTT协议而言有以下四种类型的消息体
CONNECT消息体内容主要是客户端的ClientID、订阅的Topic、Message以及用户名和密码。SUBSCRIBE消息体内容是一系列的要订阅的主题以及QoS。SUBACK消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。UNSUBSCRIBE消息体内容是要订阅的主题。
MQTT的QoS机制
MQTT协议支持三种不同的QoS服务质量等级分别为0、1和2它们分别代表不同的消息传输可靠性和实时性。下面是三种QoS等级的详细说明
QoS 0最多一次传输
消息发布者只发送一次消息无论它是否被接收。此时消息传输的效率最高但消息的可靠性最低。 消息丢失情况当我们使用 QoS 0 传递消息时消息的可靠性完全依赖于底层的 TCP 协议。而 TCP 只能保证在连接稳定不关闭的情况下消息的可靠到达一旦出现连接关闭、重置仍有可能丢失当前处于网络链路或操作系统底层缓冲区中的消息。这也是 QoS 0 消息最主要的丢失场景。
QoS 1至少一次传输
为了保证消息到达QoS 1加入了应答与重传机制发送方只有在收到接收方的 PUBACK 报文以后才能认为消息投递成功在此之前发送方需要存储该 PUBLISH 报文以便下次重传。此时消息传输的效率较高且消息的可靠性较高。 消息重复情况对于发送方而言没有收到回传的PUBACK报文有一下两种情况。
第一种情况接收方未收到消息。第二种情况接收方收到了消息回传的PUBACK报文未到达发送方。
如果是第一种情况发送方将会重传报文接收方相当于只收到了一次消息。如果是第二种情况发送方重传报文但此时接收方已经收到了这个消息这就导致接收方收到了重复的消息。虽然在重传时PUBLUSH报文的DUP标志会被设置为1.表示这是一个重传的报文、但是接收方不能假定自己曾经接受过这个消息仍然需要将其视作一个全新的消息。
QoS 2恰好一次传输
发送方和接收方之间进行一个消息确认PUBREC、PUBREL、PUBCOMP的三步握手以确保消息的可靠传输。每一次的QoS 2消息投递都要求发送方与接收方进行至少两次请求/响应流程。此时消息传输的效率较低但消息的可靠性最高。 下面是设置QoS为2时的消息发送流程
首先发送方存储并发送QoS 为 2的 PUBLISH 报文以启动一次QoS 2消息的传输然后等待接收方回复 PUBREC 报文。这一部分与QoS 1基本一致只是响应报文从 PUBACK 变成了 PUBREC。当发送方收到 PUBREC 报文即可确认对端已经收到了 PUBLISH 报文发送方将不再需要重传这个报文并且也不能再重传这个报文。所以此时发送方可以删除本地存储的 PUBLISH 报文然后发送一个 PUBREL 报文通知对端自己准备将本次使用的 Packet ID 标记为可用了。与 PUBLISH 报文一样我们需要确保 PUBREL 报文到达对端所以也需要一个响应报文并且这个 PUBREL 报文需要被存储下来以便后续重传。当接收方收到 PUBREL 报文也可以确认在这一次的传输流程中不会再有重传的 PUBLISH 报文到达因此回复 PUBCOMP 报文表示自己也准备好将当前的 Packet ID 用于新的消息了。当发送方收到 PUBCOMP 报文这一次的 QoS 2 消息传输就算正式完成了。在这之后发送方可以再次使用当前的 Packet ID 发送新的消息而接收方再次收到使用这个 Packet ID 的 PUBLISH 报文时也会将它视为一个全新的消息。
消息去重原理QoS 2 规定发送方只有在收到 PUBREC 报文之前可以重传 PUBLISH 报文。一旦收到 PUBREC 报文并发出 PUBREL 报文发送方就进入了 Packet ID 释放流程不可以再使用当前 Packet ID 重传 PUBLISH 报文。同时在收到对端回复的 PUBCOMP 报文确认双方都完成 Packet ID 释放之前也不可以使用当前 Packet ID 发送新的消息。因此对于接收方来说能够以 PUBREL 报文为界限凡是在 PUBREL 报文之前到达的 PUBLISH 报文都必然是重复的消息而凡是在 PUBREL 报文之后到达的 PUBLISH 报文都必然是全新的消息。一旦有了这个前提我们就能够在协议层面完成 QoS 2 消息的去重。 三、MQTT的应用场景
MQTT协议被广泛应用于各种领域如下所示
物联网应用MQTT是物联网应用中最常用的协议之一它支持低带宽、低功耗设备并且可以轻松地扩展到海量设备。
移动应用程序MQTT是移动应用程序中使用最广泛的协议之一它可以为移动应用程序提供可靠的消息传输机制而且消耗的带宽和电量很低。
工业自动化MQTT可以被广泛应用于工业自动化领域用于实时监控和控制生产过程中的各种设备。
金融服务MQTT可以为金融服务提供可靠的消息传输机制以保证消息的安全性和可靠性。
四、MQTT的优点和缺点
MQTT协议有以下优点
灵活性高。MQTT协议是基于发布/订阅模式来处理消息的可以很方便地支持一对多的通信模式。可扩展性强。MQTT协议支持可靠的消息传输机制并且可以轻松地扩展到海量设备。轻量级。MQTT协议是一种轻量级的协议适用于低带宽和不稳定的网络环境中消耗的带宽和电量很低。易于实现。MQTT协议具有简单的结构和易于实现的特点可以在各种硬件平台上运行。
MQTT协议的缺点包括以下几点
安全性较低。MQTT协议没有内置的安全机制需要使用TLS/SSL等加密协议来保证通信的安全性。可靠性较低。QoS等级为0的消息只能保证最多一次传输可能会出现消息丢失的情况。消息头较大。MQTT协议的消息头比较大占用了较大的带宽资源。无序传输。MQTT协议的消息传输是无序的不能保证消息的顺序性。
五、MQTT协议的实现
MQTT协议的实现需要以下几个组件
MQTT客户端MQTT客户端可以是任何设备包括PC、手机、传感器等它用于连接到MQTT Broker并发布或订阅消息。
MQTT BrokerMQTT Broker是消息代理服务器负责接收、存储和转发MQTT消息。
MQTT订阅者MQTT订阅者用于订阅消息并接收发布者发布的消息。
MQTT发布者MQTT发布者用于发布消息并将消息发送给MQTT Broker。
MQTT协议的实现可以使用多种编程语言如Java、Python、C等还可以使用各种MQTT客户端库和MQTT Broker实现如Eclipse Mosquitto、EMQ X等。
六、实战体验MQTT
实战体验MQTT协议我们首先需要一个MQTT Broker建议使用EMQX作为MQTT Broker具体的安装方式请自行搜索建议通过docker安装。安装完成后浏览器访问localhost:18083登录之后出现如下界面表示安装成功。 可在右上角设置中设置中文。 Maven依赖
dependencygroupIdorg.springframework.integration/groupIdartifactIdspring-integration-mqtt/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependency
想要发送消息我们需要用到一个消息发送者一个消息接受者一个消息代理服务器。消息发送者和接受者代码如下
ClientMQTT代码
Data
public class ClientMQTT {private String topic;private String clientId;private MqttClient mqttClient;private MqttConnectOptions mqttConnectOptions;private static String host tcp://127.0.0.1:1883;// 协议地址private static String userName admin;// 账号private static String passWord 123456;// 密码public static void main(String[] args) throws MqttException {ClientMQTT oneClientMQTT ClientMQTT.getOneClientMQTT(topic123, clientId123);}public ClientMQTT() {this.topic test;//设置默认的topic 和 clientId;this.clientId testClientId-003;}public ClientMQTT(String topic, String clientId) throws MqttException {this.topic topic;this.clientId clientId;}/*** 连接到服务器*/private void start() {try {// host为主机名clientid即连接MQTT的客户端ID一般以唯一标识符表示MemoryPersistence设置clientid的保存形式默认为以内存保存mqttClient new MqttClient(host, clientId, new MemoryPersistence());// MQTT的连接设置mqttConnectOptions new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录这里设置为true表示每次连接到服务器都以新的身份连接mqttConnectOptions.setCleanSession(true);// 设置连接的用户名mqttConnectOptions.setUserName(userName);// 设置连接的密码mqttConnectOptions.setPassword(passWord.toCharArray());// 设置超时时间 单位为秒mqttConnectOptions.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线但这个方法并没有重连的机制mqttConnectOptions.setKeepAliveInterval(20);mqttConnectOptions.setAutomaticReconnect(true);// 设置回调mqttClient.setCallback(new PushCallback());MqttTopic mqttTopic mqttClient.getTopic(topic);//setWill方法如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息mqttConnectOptions.setWill(mqttTopic, close.getBytes(), 2, true);mqttClient.connect(mqttConnectOptions);//订阅消息int[] Qos {2};String[] topic1 {topic};mqttClient.subscribe(topic1, Qos);} catch (Exception e) {e.printStackTrace();}}/*** 返回一个Client*/public static ClientMQTT getOneClientMQTT(String topic, String clientId) throws MqttException {ClientMQTT client new ClientMQTT(topic, clientId);client.start();return client;}
}ServerMQTT代码
Data
public class ServerMQTT {private static Scanner input new Scanner(new BufferedInputStream(System.in));private static PrintWriter pw new PrintWriter(System.out);private String topic;private String clientId;private MqttClient mqttClient;private MqttTopic mqttTopic;private MqttMessage mqttMessage;private static String host tcp://127.0.0.1:1883;// 协议地址private static String userName admin;// 账号private static String passWord 123456;// 密码public static void main(String[] args) throws MqttException {ServerMQTT oneServerMQTT ServerMQTT.getOneServerMQTT(topic123, client321);while (true) {String str input.next();int Qos input.nextInt();ServerMQTT.sendMsg(oneServerMQTT, str, Qos);}}public ServerMQTT() throws MqttException {this.topic test;//设置默认的topic 和 clientId;this.clientId testClientId-001;mqttClient new MqttClient(host, clientId, new MemoryPersistence());connect();}public ServerMQTT(String topic, String clientId) throws MqttException {this.topic topic;this.clientId clientId;mqttClient new MqttClient(host, clientId, new MemoryPersistence());connect();}/*** 用来连接服务器*/private void connect() {MqttConnectOptions options new MqttConnectOptions();// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录这里设置为true表示每次连接到服务器都以新的身份连接options.setCleanSession(false);options.setUserName(userName);// 设置用户密码options.setPassword(passWord.toCharArray());options.setConnectionTimeout(10);// 设置超时时间options.setKeepAliveInterval(20);// 设置会话心跳时间options.setAutomaticReconnect(true);try {mqttClient.setCallback(new PushCallback());mqttClient.connect(options);mqttTopic mqttClient.getTopic(topic);} catch (Exception e) {e.printStackTrace();}}public void publish(MqttTopic topic, MqttMessage message) throws MqttPersistenceException, MqttException {MqttDeliveryToken token topic.publish(message);token.waitForCompletion();}/*** 返回一个Server*/public static ServerMQTT getOneServerMQTT(String topic, String clientId) throws MqttException {ServerMQTT server new ServerMQTT(topic, clientId);return server;}/*** 发送消息*/public static boolean sendMsg(ServerMQTT server, String msg, int qos) throws MqttException {server.mqttMessage new MqttMessage();server.mqttMessage.setQos(qos);server.mqttMessage.setRetained(true);server.mqttMessage.setPayload(msg.getBytes());server.publish(server.mqttTopic, server.mqttMessage);return server.mqttMessage.isRetained();}
}PushBack代码
public class PushCallback implements MqttCallback {public void connectionLost(Throwable cause) {System.out.println(连接断开可以做重连);}public void deliveryComplete(IMqttDeliveryToken token) {}public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println(接收到topic为 topic Qos message.getQos() 内容为 new String(message.getPayload()));}
}首先启动ClientMQTT然后启动ServerMQTT可直接通过控制台输入消息和Qos等级发送消息。 七、MQTT协议的未来发展
随着物联网技术的不断发展和应用MQTT协议将继续发挥重要作用。未来MQTT协议可能会有以下几个发展趋势
安全性和可靠性的提升。MQTT协议的未来发展方向之一是提高安全性和可靠性例如加强数据加密、身份认证和消息确认等功能。实时性的提高。MQTT协议将会逐步优化提高消息传输的实时性适用于更多实时应用场景。扩展性的增强。MQTT协议将会逐步增强其可扩展性以满足海量设备的连接和通信需求。集成性的提高。MQTT协议将会逐步与其他物联网技术集成例如传感器、无线网络、云计算等以支持更多的应用场景。
参考资料
MQTT QoS 0, 1, 2 介绍 | EMQ (emqx.com)
MQTT 协议快速体验 | EMQ (emqx.com)