温州网站制作推广,商城型企业网站的功能,学敏网站建设,做网站推广我们是专业的【Mongoose笔记】MQTT 服务器
简介
Mongoose 笔记系列用于记录学习 Mongoose 的一些内容。
Mongoose 是一个 C/C 的网络库。它为 TCP、UDP、HTTP、WebSocket、MQTT 实现了事件驱动的、非阻塞的 API。
项目地址#xff1a;
https://github.com/cesanta/mongoose学习
下面…【Mongoose笔记】MQTT 服务器
简介
Mongoose 笔记系列用于记录学习 Mongoose 的一些内容。
Mongoose 是一个 C/C 的网络库。它为 TCP、UDP、HTTP、WebSocket、MQTT 实现了事件驱动的、非阻塞的 API。
项目地址
https://github.com/cesanta/mongoose学习
下面通过学习 Mongoose 项目代码中的 mqtt-server 示例程序 来学习如何使用 Mongoose 实现一个简单的 MQTT 服务器。使用树莓派平台进行开发验证。
mqtt-server 的示例程序不长代码如下
// Copyright (c) 2020 Cesanta Software Limited
// All rights reserved
//
// Example MQTT server. Usage:
// 1. Start this server, type make
// 2. Install mosquitto MQTT client
// 3. In one terminal, run: mosquitto_sub -h localhost -t foo -t bar
// 4. In another, run: mosquitto_pub -h localhost -t foo -m hi#include mongoose.hstatic const char *s_listen_on mqtt://0.0.0.0:1883;// A list of subscription, held in memory
struct sub {struct sub *next;struct mg_connection *c;struct mg_str topic;uint8_t qos;
};
static struct sub *s_subs NULL;// Handle interrupts, like Ctrl-C
static int s_signo;
static void signal_handler(int signo) {s_signo signo;
}static size_t mg_mqtt_next_topic(struct mg_mqtt_message *msg,struct mg_str *topic, uint8_t *qos,size_t pos) {unsigned char *buf (unsigned char *) msg-dgram.ptr pos;size_t new_pos;if (pos msg-dgram.len) return 0;topic-len (size_t) (((unsigned) buf[0]) 8 | buf[1]);topic-ptr (char *) buf 2;new_pos pos 2 topic-len (qos NULL ? 0 : 1);if ((size_t) new_pos msg-dgram.len) return 0;if (qos ! NULL) *qos buf[2 topic-len];return new_pos;
}size_t mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,uint8_t *qos, size_t pos) {uint8_t tmp;return mg_mqtt_next_topic(msg, topic, qos NULL ? tmp : qos, pos);
}size_t mg_mqtt_next_unsub(struct mg_mqtt_message *msg, struct mg_str *topic,size_t pos) {return mg_mqtt_next_topic(msg, topic, NULL, pos);
}// Event handler function
static void fn(struct mg_connection *c, int ev, void *ev_data, void *fn_data) {if (ev MG_EV_MQTT_CMD) {struct mg_mqtt_message *mm (struct mg_mqtt_message *) ev_data;MG_DEBUG((cmd %d qos %d, mm-cmd, mm-qos));switch (mm-cmd) {case MQTT_CMD_CONNECT: {// Client connectsif (mm-dgram.len 9) {mg_error(c, Malformed MQTT frame);} else if (mm-dgram.ptr[8] ! 4) {mg_error(c, Unsupported MQTT version %d, mm-dgram.ptr[8]);} else {uint8_t response[] {0, 0};mg_mqtt_send_header(c, MQTT_CMD_CONNACK, 0, sizeof(response));mg_send(c, response, sizeof(response));}break;}case MQTT_CMD_SUBSCRIBE: {// Client subscribessize_t pos 4; // Initial topic offset, where ID endsuint8_t qos, resp[256];struct mg_str topic;int num_topics 0;while ((pos mg_mqtt_next_sub(mm, topic, qos, pos)) 0) {struct sub *sub calloc(1, sizeof(*sub));sub-c c;sub-topic mg_strdup(topic);sub-qos qos;LIST_ADD_HEAD(struct sub, s_subs, sub);MG_INFO((SUB %p [%.*s], c-fd, (int) sub-topic.len, sub-topic.ptr));// Change to * for topic matching using mg_matchfor (size_t i 0; i sub-topic.len; i) {if (sub-topic.ptr[i] ) ((char *) sub-topic.ptr)[i] *;}resp[num_topics] qos;}mg_mqtt_send_header(c, MQTT_CMD_SUBACK, 0, num_topics 2);uint16_t id mg_htons(mm-id);mg_send(c, id, 2);mg_send(c, resp, num_topics);break;}case MQTT_CMD_PUBLISH: {// Client published message. Push to all subscribed channelsMG_INFO((PUB %p [%.*s] - [%.*s], c-fd, (int) mm-data.len,mm-data.ptr, (int) mm-topic.len, mm-topic.ptr));for (struct sub *sub s_subs; sub ! NULL; sub sub-next) {if (mg_match(mm-topic, sub-topic, NULL)) {mg_mqtt_pub(sub-c, mm-topic, mm-data, 1, false);}}break;}case MQTT_CMD_PINGREQ: {// The server must send a PINGRESP packet in response to a PINGREQ packet [MQTT-3.12.4-1]MG_INFO((PINGREQ %p - PINGRESP, c-fd));mg_mqtt_send_header(c, MQTT_CMD_PINGRESP, 0, 0);break;}}} else if (ev MG_EV_ACCEPT) {// c-is_hexdumping 1;} else if (ev MG_EV_CLOSE) {// Client disconnects. Remove from the subscription listfor (struct sub *next, *sub s_subs; sub ! NULL; sub next) {next sub-next;if (c ! sub-c) continue;MG_INFO((UNSUB %p [%.*s], c-fd, (int) sub-topic.len, sub-topic.ptr));LIST_DELETE(struct sub, s_subs, sub);}}(void) fn_data;
}int main(void) {struct mg_mgr mgr; // Event managersignal(SIGINT, signal_handler); // Setup signal handlers - exist eventsignal(SIGTERM, signal_handler); // manager loop on SIGINT and SIGTERMmg_mgr_init(mgr); // Initialise event managerMG_INFO((Starting on %s, s_listen_on)); // Inform that were startingmg_mqtt_listen(mgr, s_listen_on, fn, NULL); // Create MQTT listenerwhile (s_signo 0) mg_mgr_poll(mgr, 1000); // Event loop, 1s timeoutmg_mgr_free(mgr); // Cleanupreturn 0;
}
下面从main函数开始分析代码。
首先是变量定义。struct mg_mgr是用于保存所有活动连接的事件管理器。 struct mg_mgr mgr; // Event manager设置 signal 函数捕获 SIGINT 信号和 SIGTERM 信号。 signal(SIGINT, signal_handler); // Setup signal handlers - exist eventsignal(SIGTERM, signal_handler); // manager loop on SIGINT and SIGTERM下面是对应的信号处理函数当 SIGINT 信号和 SIGTERM 信号到达时修改 s_signo 的值使其值不为 0然后会让主事件循环退出。当用户通过 Ctrl-C 结束进程是会发送 SIGINT 信号通过 kill 命令不带参数时会发送 SIGTERM 信号。当通过以上两种操作时都能让主事件循环正常退出。
// Handle interrupts, like Ctrl-C
static int s_signo;
static void signal_handler(int signo) {s_signo signo;
}初始化一个事件管理器也就是将最开始定义的struct mg_mgr变量 mgr 中的数据进行初始化。 mg_mgr_init(mgr); // Initialise event manager打印出接下来要监听的本地IP地址和端口s_listen_on。 MG_INFO((Starting on %s, s_listen_on)); // Inform that were startings_listen_on是一个全局变量默认值为mqtt://0.0.0.0:1883。
static const char *s_listen_on mqtt://0.0.0.0:1883;使用mg_mqtt_listen创建一个 MQTT 监听器。s_listen_on是指定要侦听的本地IP地址和端口fn是事件处理函数。 mg_mqtt_listen(mgr, s_listen_on, fn, NULL); // Create MQTT listener进行事件循环mg_mgr_poll 遍历所有连接接受新连接发送和接收数据关闭连接并为各个事件调用事件处理函数。 while (s_signo 0) mg_mgr_poll(mgr, 1000); // Event loop, 1s timeout当 s_signo 不为 0 时也就是接收到了退出信号则结束无限循环调用 mg_mgr_free 关闭所有连接释放所有资源。 mg_mgr_free(mgr); // Cleanup分析完main函数后我们看下事件处理函数fn的代码。
判断是否接收到 MG_EV_MQTT_CMD 事件表示收到 MQTT 命令。 if (ev MG_EV_MQTT_CMD) {将函数参数ev_data转换为 struct mg_mqtt_message这个结构体用于表示 MQTT 消息。 struct mg_mqtt_message *mm (struct mg_mqtt_message *) ev_data;打印收到的命令cmd和服务质量 qos。 MG_DEBUG((cmd %d qos %d, mm-cmd, mm-qos));使用switch判断收到的命令cmd是什么。 switch (mm-cmd) {如果收到的是MQTT_CMD_CONNECT命令表示 MQTT 客户端连接到服务器。MQTT 客户端到服务端的网络连接建立后客户端发送给服务端的第一个报文必须是 CONNECT 报文。 case MQTT_CMD_CONNECT: {// Client connects判断 MQTT 帧长度是否正确如果长度小于 9表示是 MQTT 帧格式不正确。 if (mm-dgram.len 9) {mg_error(c, Malformed MQTT frame);判断 MQTT 帧头第 8 Byte的数据是否等于 4这是一个协议级别字节(Protocol Level byte)。对于 3.1.1 版协议协议级别字段的值是 4(0x04)。如果不等于 4表示这是一个不支持的 MQTT 版本。 } else if (mm-dgram.ptr[8] ! 4) {mg_error(c, Unsupported MQTT version %d, mm-dgram.ptr[8]);}如果 MQTT 的帧正常则回复 MQTT 客户端。MQTT_CMD_CONNACK确认连接请求服务端发送 CONNACK 报文响应从客户端收到的 CONNECT 报文。服务端发送给客户端的第一个报文必须是 CONNACK。调用mg_mqtt_send_header发送 MQTT 命令头固定报头(Fixed header)部分剩余长度字段为 2。调用mg_send发送可变报头(Variable header)部分共 2 个 Byte分别为连接确认标志和连接返回码。连接返回码的值为 0x00 表示连接已接受 。 } else {uint8_t response[] {0, 0};mg_mqtt_send_header(c, MQTT_CMD_CONNACK, 0, sizeof(response));mg_send(c, response, sizeof(response));}break;}如果收到的是MQTT_CMD_SUBSCRIBE命令表示客户端订阅主题。MQTT 客户端向服务端发送 SUBSCRIBE 报文用于创建一个或多个订阅。 case MQTT_CMD_SUBSCRIBE: {// Client subscribes首先定义了一些变量。pos用于指向下一个主题过滤器(Topic Filter)在数据报文中的偏移初始化为 4 是因为 SUBSCRIBE 报文的固定报头(Fixed header)和可变报头(Variable header)一共占 4 个字节所以第一个主题过滤器在报文的偏移为 4。qos和topic用于从下面的函数mg_mqtt_next_sub中获取服务质量(quality of service)和主题。resp用于后续记录每个主题的服务质量num_topics用于后续记录主题数量。 size_t pos 4; // Initial topic offset, where ID endsuint8_t qos, resp[256];struct mg_str topic;int num_topics 0;通过函数mg_mqtt_next_sub遍历所有的主题。这个函数是在示例程序中实现的。 while ((pos mg_mqtt_next_sub(mm, topic, qos, pos)) 0) {接下来为每个请求的主题创建一个struct sub订阅描述符。 struct sub *sub calloc(1, sizeof(*sub));sub-c c;sub-topic mg_strdup(topic);sub-qos qos;然后将创建的订阅描述符sub添加到订阅列表s_subs中。LIST_ADD_HEAD是一个链表管理宏用于将sub加入到s_subs中。 LIST_ADD_HEAD(struct sub, s_subs, sub);将所添加的主题打印出来。 MG_INFO((SUB %p [%.*s], c-fd, (int) sub-topic.len, sub-topic.ptr));将主题中的改为*这是为了后续可以使用mg_match进行主题匹配。 // Change to * for topic matching using mg_matchfor (size_t i 0; i sub-topic.len; i) {if (sub-topic.ptr[i] ) ((char *) sub-topic.ptr)[i] *;}记录当前主题的服务质量(quality of service)。num_topics记录了主题数量resp记录了每个主题的服务质量用于下面回复消息。 resp[num_topics] qos;}在遍历完了所有主题后开始回复消息给客户端。服务端发送 SUBACK 报文给客户端用于确认它已收到并且正在处理 SUBSCRIBE 报文。
使用mg_mqtt_send_header发送 MQTT 命令头也就是固定报头(Fixed header)部分报文类型为 SUBACK。然后可变报头为 2 Byte的报文标识符有效载荷(Payload)部分包含一个返回码(Return Code)列表每个返回码对应等待确认的 SUBSCRIBE 报文中的一个主题过滤器(Topic Filter)所以命令头后续的数据长度为num_topics 2报文标识符使用idmg_htons用于将uint16_t类型的值转换为网络字节序返回码(Return Code)部分为resp长度为num_topics。 mg_mqtt_send_header(c, MQTT_CMD_SUBACK, 0, num_topics 2);uint16_t id mg_htons(mm-id);mg_send(c, id, 2);mg_send(c, resp, num_topics);break;}接下来看下上面使用的mg_mqtt_next_sub函数是如何实现的。
在函数mg_mqtt_next_sub里面又调用了mg_mqtt_next_topic函数。
size_t mg_mqtt_next_sub(struct mg_mqtt_message *msg, struct mg_str *topic,uint8_t *qos, size_t pos) {uint8_t tmp;return mg_mqtt_next_topic(msg, topic, qos NULL ? tmp : qos, pos);
}接下来看下mg_mqtt_next_topic函数是如何实现的。
buf是指向下一个主题过滤器(Topic Filter)的位置其中dgram.ptr表示数据报文pos是指向下一个主题的偏移。如果pos大于等于数据报文的长度表示已没有下一个主题了返回 0。主题过滤器部分前两个字节表示主题名的长度然后是主题名主题名后的一个字节是服务质量要求(Requested QoS)。最后返回下一个题过滤器的偏移。
static size_t mg_mqtt_next_topic(struct mg_mqtt_message *msg,struct mg_str *topic, uint8_t *qos,size_t pos) {unsigned char *buf (unsigned char *) msg-dgram.ptr pos;size_t new_pos;if (pos msg-dgram.len) return 0;topic-len (size_t) (((unsigned) buf[0]) 8 | buf[1]);topic-ptr (char *) buf 2;new_pos pos 2 topic-len (qos NULL ? 0 : 1);if ((size_t) new_pos msg-dgram.len) return 0;if (qos ! NULL) *qos buf[2 topic-len];return new_pos;
}接下来回到事件处理函数中来看下一个判断的 MQTT 命令。
如果收到的是MQTT_CMD_PUBLISH命令表示有客户端发布消息。PUBLISH 控制报文是指从 MQTT 客户端向服务端或者服务端向客户端传输一个应用消息。下面需要将消息推送到所有订阅频道。 case MQTT_CMD_PUBLISH: {// Client published message. Push to all subscribed channels将发布的消息和主题打印出来。 MG_INFO((PUB %p [%.*s] - [%.*s], c-fd, (int) mm-data.len,mm-data.ptr, (int) mm-topic.len, mm-topic.ptr));遍历整个订阅列表s_subs通过mg_match比较主题名称。如果主题匹配则通过函数mg_mqtt_pub发布消息将消息发送到订阅主题的连接。 for (struct sub *sub s_subs; sub ! NULL; sub sub-next) {if (mg_match(mm-topic, sub-topic, NULL)) {mg_mqtt_pub(sub-c, mm-topic, mm-data, 1, false);}}break;}收到MQTT_CMD_PINGREQ表示有客户端发送心跳请求。客户端发送 PINGREQ 报文给服务端的。用于 1. 在没有任何其它控制报文从客户端发给服务的时告知服务端客户端还活着。 2. 请求服务端发送 响应确认它还活着。 3. 使用网络以确认网络连接没有断开。 case MQTT_CMD_PINGREQ: {服务端必须发送 PINGRESP 报文响应客户端的 PINGREQ 报文。使用mg_mqtt_send_header发送 MQTT 命令头也就是固定报头部分报文类型为 PINGRESP。 // The server must send a PINGRESP packet in response to a PINGREQ packet [MQTT-3.12.4-1]MG_INFO((PINGREQ %p - PINGRESP, c-fd));mg_mqtt_send_header(c, MQTT_CMD_PINGRESP, 0, 0);break;}到这里结束MG_EV_MQTT_CMD事件处理的部分接下来看其他的事件处理。
判断是否接收到 MG_EV_ACCEPT 事件这表示已接受连接。 } else if (ev MG_EV_ACCEPT) {// c-is_hexdumping 1;}判断是否接收到 MG_EV_CLOSE 事件表示客户端连接已关闭。
当客户端断开连接时遍历整个订阅列表s_subs将该客户端的所有订阅删除。其中LIST_DELETE是一个链表管理宏用于将sub从s_subs中删除。 } else if (ev MG_EV_CLOSE) {// Client disconnects. Remove from the subscription listfor (struct sub *next, *sub s_subs; sub ! NULL; sub next) {next sub-next;if (c ! sub-c) continue;MG_INFO((UNSUB %p [%.*s], c-fd, (int) sub-topic.len, sub-topic.ptr));LIST_DELETE(struct sub, s_subs, sub);}}mqtt-server 的示例程序代码就都解析完了下面实际运行一下 mqtt-server 程序。
打开示例程序编译并运行
piraspberrypi:~ $ cd Desktop/study/mongoose/examples/mqtt-server/
piraspberrypi:~/Desktop/study/mongoose/examples/mqtt-server $ make
cc ../../mongoose.c -I../.. -W -Wall -DMG_ENABLE_LINES1 -o example main.c
./example
10e0a1 2 main.c:131:main Starting on mqtt://0.0.0.0:1883
这个时候我们的 MQTT 服务器就运行起来了这个时候还需要一个 MQTT 客户端我们使用 Mongoose 的 mqtt-client 示例程序并将代码中的 URL 变量s_url修改
static const char *s_url mqtt://localhost:1883;保存后编译运行程序
piraspberrypi:~/Desktop/study/mongoose/examples/mqtt-client $ make clean all
rm -rf example *.o *.dSYM *.gcov *.gcno *.gcda *.obj *.exe *.ilk *.pdb
cc ../../mongoose.c -I../.. -W -Wall -o example main.c
./example
12305b 2 main.c:29:fn CREATED
12305d 2 main.c:44:fn CONNECTED to mqtt://localhost:1883
12305d 2 main.c:46:fn SUBSCRIBED to mg//test
12305d 2 main.c:50:fn PUBLISHED hello - mg/clnt/test
12305d 2 main.c:55:fn RECEIVED hello - mg/clnt/test
12305e 2 main.c:58:fn CLOSED
可以看到 mqtt-client 示例程序完成了 MQTT 客户端创建连接订阅主题mg//test向主题mg/clnt/test发布数据hello收到所订阅主题mg/clnt/test的数据hello最后关闭连接。
然后我们来看下 MQTT 服务器这边的日志信息
piraspberrypi:~/Desktop/study/mongoose/examples/mqtt-server $ make
cc ../../mongoose.c -I../.. -W -Wall -DMG_ENABLE_LINES1 -o example main.c
./example
10e0a1 2 main.c:131:main Starting on mqtt://0.0.0.0:1883
12305d 2 main.c:87:fn SUB 0x5 [mg//test]
12305d 2 main.c:103:fn PUB 0x5 [hello] - [mg/clnt/test]
12305e 2 main.c:119:fn UNSUB 0x5 [mg/*/test]
可以看到 MQTT 客户端订阅主题mg//test然后向所有订阅mg/clnt/test主题的客户端发布数据hello最后断开连接的时候取消订阅。
【参考资料】
examples/mqtt-server
Documentation
examples/mqtt-client
MQTT协议中文版
MQTT Version 3.1.1 本文链接https://blog.csdn.net/u012028275/article/details/129116209