云南俊发建设集团网站,华为手机软文范文300,node.js做企业网站,东莞人才市场招聘会时间项目需求分析
核心概念
现在需要将这个项目梳理清楚了#xff0c;便于之后的代码实现。项目中具有一个生产消费模型#xff1a; 其中生产者和消费者的个数是可以灵活改变的#xff0c;让系统资源更加合理的分配。消息队列的主逻辑和上面的逻辑基本一样#xff0c;只不过我…项目需求分析
核心概念
现在需要将这个项目梳理清楚了便于之后的代码实现。项目中具有一个生产消费模型 其中生产者和消费者的个数是可以灵活改变的让系统资源更加合理的分配。消息队列的主逻辑和上面的逻辑基本一样只不过我现在要做的这个生产者不再是本地的一个线程了而是一个客户端消费者也是一个客户端生产者就是消息发布客户端而消费者就是消息订阅客户端而中间的就不是线程安全的阻塞队列了而是一个消息队列服务器 消息队列服务器能够存储消息消息发布客户端能够将消息发送到客户端上而消息订阅客户端订阅了消息消息服务器能够将消息推送给消息订阅客户端。这三个也是我要实现的三个部分。了解了框架下面来了解细节的信息
AMQP(Advanced Message Queuing Protocol-⾼级消息队列协议⼀个提供统⼀消息服务的应⽤层标准⾼级消息队列协议为⾯向消息的中间件设计使得遵从该规范的客⼾端应⽤和消息中间件服务器的全功能互操作成为可能)模型中也就是消息中间件服务器中⼜存在以下概念
• 虚拟机 (VirtualHost): 类似于 MySQL 的 database, 是⼀个逻辑上的集合。⼀个 消息队列服务器上可以存在多个 VirtualHost
•
交换机 (Exchange): ⽣产者把消息先发送到 消息服务器的 Exchange 上再根据不同的规则, 把消息转发给不同的 Queue
•
队列 (Queue): 真正⽤来存储消息的部分 每个消费者决定⾃⼰从哪个 Queue 上读取消息
•
绑定 (Binding): Exchange 和 Queue 之间的关联关系Exchange 和 Queue 可以理解成 多对多 关系使⽤⼀个关联表就可以把这两个概念联系起来
•
消息 (Message): 传递的内容 所谓的 Exchange 和 Queue 可以理解成 多对多 关系, 和数据库中的 多对多 ⼀样. 意思是:
⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息)
⼀个 Queue 也可以被多个 Exchange 绑定 (⼀个 Queue 中的消息可以来⾃于多个 Exchange)
对于虚拟机的说明后面设计的时候会有更加详细的说明这里能理解就理解。
下面说明一下队列假设这是一个新闻发布系统的服务器新闻具有很多的类别音乐新闻体育新闻音乐新闻歌手新闻歌曲新闻。现在不同的消息发布客户端就可以选择不同的类别去对应的模块上发布新闻而订阅客户端则选择自己感兴趣的模块然后服务器就会将发布客户端写的新闻存储到该新闻对应的队列中然后将对应模块中的新闻推送给消息订阅客户端但是如果一个新闻具有多个属性要怎么办呢如果某一个新闻具有了多个属性但是因为技术原因导致这个新闻只能推送到一个队列中此时想要看到这个新闻的人就只能订阅更多的模块由此才能看到该新闻。这就很难受由此就出现了交换机的概念。现在一个发布客户端发送的消息就会到达交换机一个交换机会绑定几个队列交换机内部会执行自己的匹配方法然后发现这个新闻具有多个属性就会将这个新闻直接放入到多个队列中。图 并且交换机也是存在不同类型的假设某一个新闻具有全部的属性那么这个新闻就需要广播到所有的队列中此时就可以交给广播交换机。交换机的作用对消息的灵活转发起到一个促进作用。对于交换机的绑定需要注意一个交换机没有必要将所有的队列都绑定了毕竟队列肯定是很多的。交换机和队列之间的关系到底是一对一还是一对多就是由这个绑定关系决定的。由此就能够知道一个消息队列服务器上是具有多个虚拟机的而一个虚拟机内部是具有多个交换机的 一个交换机内部的详细结构 bingding就是绑定Exchange就是交换机所以一个客户端首先要决定的就是自己要访问的是哪一个虚拟机。上述的这些数据结构和数据既要在内存中储存也要在硬盘中储存需要持久化。总结一下 核心API
对于消息队列服务器Broker来说需要通过下面的这些核心API来实现消息队列的基本功能
创建交换机 (exchangeDeclare) 销毁交换机 (exchangeDelete) 创建队列 (queueDeclare) 销毁队列 (queueDelete) 创建绑定 (queueBind) 解除绑定 (queueUnbind) 发布消息 (basicPublish) 订阅消息 (basicConsume) 确认消息 (basicAck) 取消订阅 (basicCancel)
对于这些API后面会有更加细致的说明。
交换机类型
对于 RabbitMQ 来说, 主要⽀持四种交换机类型
Direct
Fanout
Topic
Header
交换机决定了一个消息能够发布到哪一个队列中去这里面有一个很关键的信息就是交换机的类型因为Header交换机比较少见就不说明了主要说明前三种这三种也是这个项目需要实现的交换机。
直接交换Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名 明确说明这个消息要给哪一个消息比如在多人群中的专属红包
广播交换Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中 比如群发红包
主题交换Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey。发送消息指定⼀个字符串为routingKey。当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列类似于发⼀个画图红包, 发 10 块钱红包, 同时出个题, 得画的像的⼈,才能领。 也是每个领到的⼈都能领10块钱
持久化
Exchange, Queue, Binding, Message 等数据都有持久化需求 当程序重启 / 主机重启, 保证上述内容不丢失
网络通信
因为这个项目并不是一个本机上的生产消费模型所以无论是发布客户端要发送消息到服务器还是服务器推送数据到客户端都会涉及到网络通信的功能需求。
⽣产者和消费者都是客⼾端程序, Broker 则是作为服务器通过⽹络进⾏通信。
创建 Connection 关闭 Connection 创建 Channel信道---连接的细化为了能够充分的利用资源一个连接上可以具有多个信道每个信道都都自己的任务互不影响但是底层使用的其实是一个接口关闭 Channel 创建队列 (queueDeclare) 销毁队列 (queueDelete) 创建交换机 (exchangeDeclare) 销毁交换机 (exchangeDelete) 创建绑定 (queueBind) 解除绑定 (queueUnbind) 发布消息 (basicPublish) 订阅消息 (basicConsume) 确认消息 (basicAck) 取消订阅basicCancel
Connection 对应⼀个 TCP 连接
Channel 则是 Connection 中的逻辑通道
⼀个 Connection 中可以包含多个 Channel。Channel 和 Channel 之间的数据是独⽴的不会相互⼲扰。这样做主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接。
Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥具体的线缆.
消息应答
被消费的消息需要进行应答主要有两种应答模式
自动应答消费者在消费完消息后系统会自动认为该消息已被应答Broker 会直接删除该消息。手动应答消费者需要手动调用应答接口Broker 只有在收到应答请求后才会真正删除该消息。
手动应答的目的是确保消息确实被消费者成功处理。这种模式在对数据可靠性要求较高的场景中比较常见。这个应答机制就出现在现在一个消息已经被推送到了客户端之后我怎么确定客户端收到了这个信息呢由此就有了消息应答消息确认机制。
有了上面的知识再来看一下这幅图 就会有更加深入的理解了各个部件之间的逻辑关系。
模块划分
下面是关于整个服务端模块的简单说明划分
服务端模块
数据管理模块
交换机数据管理模块队列数据管理模块绑定数据管理模块消息数据管理模块以上四个模块分别实现数据的管理增、删、查以及持久化的存储 虚拟机数据管理模块虚拟机其实就是交换机 队列 绑定 消息的整体逻辑单元。因此虚拟的数据管理其实就是将以上四个模块的合并管理。 交换路由模块
消息的发布将一条新消息发布到交换机上由交换机决定放入哪些队列。决定交给哪个队列其中交换机类型起了很大作用直接交换、广播交换、主题交换。交换机类型
直接交换思想简单。 上面已经进行了说明广播交换思想简单。 同上主题交换涉及到一个规则匹配的流程。
交换路由模块专门做匹配过程的模块。
消费者管理模块消费者指的是订阅了某个队列消息的客户端。一旦这个队列有了消息就会推送给这个客户端。在核心API中有一个订阅消息的服务。请注意这里的订阅不是订阅某条特定的消息而是订阅了某个队列的所有消息。当前该模块主要实现了消息推送功能。因此一旦有了消息系统就需要能够找到与这条消息相关的消费者信息即消费者对应的信道。 信道通信通道管理模块一个连接可能会对应有多个通信通道。一旦某个客户端需要关闭通信关闭的不是连接本身而是该客户端对应的通信通道。关闭信道时我们需要取消客户端的订阅。 连接管理模块该模块负责管理网络通信对应的连接。因为当一个连接需要关闭时应该把与该连接关联的所有信道都关闭。因此数据管理部分至少要管理这些关联的信道。 服务端BrokerServer模块这个模块是对以上所有模块的整合将它们整合成一个完整的服务器。
然后是两个客户端的简单说明
客户端模块
消费者管理模块
一个订阅客户端在订阅一个队列消息时就相当于创建了一个消费者。 信道管理模块
客户端的信道与服务端的信道是一一对应的。服务端信道提供的服务客户端都有。相当于服务端为客户端提供服务而客户端则为用户提供服务。 连接管理模块
对于用户来说所有的服务都是通过信道完成的。信道在用户的角度就是一个通信通道而不是连接。因此所有的请求都是通过信道来完成的。连接的管理就包含了客户端资源的整合。 基于以上的三个模块封装实现
订阅客户端订阅一个队列的消息并处理收到的推送消息。发布客户端向一个交换机发布消息。
理解了这个项目的框架再去理解各个功能模块为什么这些功能模块最后能够整合出整体的服务框架下面再来一个一个的比较详细的介绍这些
服务端模块
持久化数据管理模块
首先来看一下交换机要管理的数据有哪些
交换机数据管理
要管理的数据描述了一个交换机应该具备哪些数据
交换机名称唯一标识交换机类型决定了消息的转发方式
在每个队列绑定中有个binding_key每条消息中有个routing_key直接交换若binding_key与routing_key相同则将消息放入队列广播交换将消息放入交换机绑定的所有队列中主题交换若routing_key与多个绑定队列的binding_key存在匹配规则且匹配成功则放入队列
持久化标志决定了当前交换机信息是否需要持久化存储自动删除标志当关联了当前交换机的所有客户端都退出时是否要自动删除交换机 这个标志在这个项目中并没有实现但是是可以进行扩展的功能交换机的其他参数当前未使用
然后就是对交换机本身进行的管理操作了
对交换机的管理操作
创建交换机
本质上需要的是声明体现强断言的思想。若有则保留若无则创建。
删除交换机
注意事项每个交换机都会绑定一个或多个队列意味着会有一个或多个绑定信息。因此删除交换机时需要先删除相关的绑定信息。
获取指定名称交换机。获取当前交换机数量。 以上就是对交换机要管理的数据以及对交换机本身能够进行的操作了。
下面是队列要管理的数据,以及对队列本身能够进行的操作。
队列数据管理
一、要管理的数据
队列名称唯一的标识持久化存储标志
决定了是否将队列信息持久化存储起来决定了重启后这个队列还是否存在
是否独占标志
独占指的是只有当前客户端自己能够订阅队列消息
自动删除标志
当订阅了当前队列的所有客户端退出后是否删除队列暂未实现
其他参数暂未使用
二、提供的管理操作
创建队列删除队列获取指定队列信息获取队列数量获取所有队列名称这个接口很重要原因如下
当系统重启后需要重新加载数据加载历史消息消息以队列为单元存储在文件中加载消息需要知道队列名称因为后续消息存储时存储文件以队列名称命名
所以持久化存储标志的是否设定也就是一个队列的消息是否需要持久化最重要的指标就是这个队列的信息是否需要持久化如果队列管理的数据进行了持久化但是这个队列本身的信息没有持久化那么重新加载后这个队列都没有了还有必要进行重新加载吗没有必要因为此时根本没有人会去订阅这个队列那么这个队列中的信息也就没有必要持久化了。 有了队列的信息后面在实现功能的时候我才能知道这个队列和哪一个交换机绑定起来了。
下一个模块
绑定数据管理模块
绑定数据管理模块
描述该模块用于管理哪个队列与哪个交换机之间的绑定关系。
管理的数据
交换机名称队列名称binding_key绑定密钥用于描述在交换机的主题交换或直接交换中的消息发布匹配规则。它由数字、字符、下划线_、井号#和逗号,注意此处原文中逗号可能是误打通常应为点号或省略组成。
示例binding_key: news.music.#消息中的routing_key示例news.sport.football 此时在匹配的时候就无法匹配上但是如果这里的key位news.music.pop流行音乐那么就会成功完成匹配
管理的操作
添加绑定解除绑定获取交换机相关的所有绑定信息
在删除交换机时需要删除与其相关的绑定信息。当消息发布到交换机时交换机通过这些绑定信息将消息发布到指定的队列。
获取队列相关的所有绑定信息
在删除队列时需要删除与其相关的绑定信息。
获取绑定信息数量
下一个是消息数据管理
消息数据管理
描述消息对其进行什么样的管理操作以及这个消息里面的属性都是些什么和上面的那些一样的。
首先来看消息的属性是什么
消息属性
ID消息的唯一标识持久化标志表示是否对消息进行持久化还取决于队列的持久化标志routing_key决定了当前消息要发布的队列消息发布到交换机后根据绑定队列的binding_key决定是否发布到指定队列消息主体消息内容
附加信息服务端为了管理所添加的信息
存储偏移量消息以队列为单元存储在文件中这个偏移量是当前消息相对于文件起始位置的偏移量。消息长度从偏移量位置取出指定长度的消息解决粘包问题。是否有效标志标识当前消息是否已经被删除删除一条消息并不会每次直接将后边的数据拷贝到前边而只是重置了标志。当一个文件中有效消息占据总消息比例不到50%且数据量超过2000时则进行垃圾回收重新整理文件数据存储。当系统重启时也只需要重新加载有效消息即可相当于进行了一次垃圾回收
然后是消息的管理信息这些信息都是为了让队列能够更好的管理消息
管理方式以队列为单元进行管理因为消息的所有操作都是以队列为单元的。
管理数据详情
消息链表
功能保存所有的待推送消息。
待确认消息Hash
功能消息推送给客户端后会等待客户端进行消息确认。流程收到确认后才会真正删除消息。
持久化消息Hash
假设消息都会进行持久化存储。注意事项
操作过程中会存在垃圾回收操作。垃圾回收会改变消息的存储位置但内存中的消息会存储消息的实际存储位置因此垃圾回收后可能导致位置不一致。
更新机制每次垃圾回收后都需要用新的位置去更新持久化消息的信息。
垃圾回收
步骤
将有效消息读取出来。重新截断文件将消息连续写入文件中确保文件中都是有效消息。
持久化统计信息
持久化的有效消息数量记录当前持久化存储的有效消息总数。持久化的总的消息数量记录自系统启动以来持久化存储的总消息数该数值决定了何时进行垃圾回收。 然后是对消息本身能够进行的管理操作
消息管理操作
向队列新增消息
功能将新消息添加到队列中。实现新消息会被加入到待推送消息链表中。
获取队首消息
功能从队列中获取最前面的消息。实现获取消息后该消息会从待推送消息链表中删除并加入到待确认消息中。注意此时消息不再是待发送状态而是待确认状态。
对消息进行确认
功能确认消费者已经成功接收并处理了消息。实现从待确认消息中移除该消息并进行持久化数据的删除操作。
恢复队列历史消息
功能在系统重启时恢复队列中的历史消息。实现主要在构造函数中进行确保系统重启后能够恢复之前的消息状态。
垃圾回收由消息持久化子模块完成
触发条件持久化文件中有效消息比例小于50%且总消息数量超过200条。实现进行垃圾回收操作清理无效消息优化存储。
删除队列相关消息文件
功能当队列被删除时删除其相关的消息文件。实现确保队列删除后其消息文件也被相应删除避免占用存储空间。 最后就是消息以队列为单位的管理操作 队列消息管理初始化队列消息结构
在队列创建时初始化其消息结构准备存储消息。
移除队列消息结构
在一个队列被删除时调用清理并移除该队列的消息结构。
向队列新增消息
功能允许向队列中添加新的消息。实现新消息会被添加到队列的消息结构中等待后续处理。
对队列消息进行确认
功能确认消费者已经成功接收并处理了队列中的消息。实现从队列的消息结构中移除已确认的消息确保消息不会被重复处理。
恢复队列历史消息
功能在系统重启或恢复时重新加载并恢复队列中的历史消息。实现从持久化存储中读取历史消息并重新构建队列的消息结构。 以上是根据数据和数据之间的关系摸索出的几个关系要不然管理起来很难以上的消息管理简化成一个图如下 队列中的就是消息。 下一个就是虚拟机数据管理了。经过之前的学习我们已经知道了虚拟机就是队列交换机绑定和消息的集合体所以要管理好虚拟机就要管理好这些数据。 要管理的数据交换机数据管理句柄队列数据管理句柄绑定信息数据管理句柄消息数据管理句柄 要管理的操作声明/删除交换机
在删除交换机时需同时删除与其相关的绑定信息。
声明/删除队列
在删除队列时需同时删除与其相关的绑定信息及消息数据。
队列的绑定/解除绑定
绑定时需确保交换机和队列均存在。
获取指定队列的消息
提供从指定队列中检索消息的功能。
对指定队列的指定消息进行确认
允许对指定队列中的特定消息进行确认处理。
获取交换机相关的所有绑定信息
当一条消息需要发布到指定交换机时交换机通过获取所有绑定信息来确定消息应发布到哪个队列
下一个模块是路由匹配模块
这个模块就是用来判断一个消息是否能够发布到指定的队列中的。所以这个模块理论上是没有需要管理的数据的。
路由匹配模块
概述
路由匹配模块负责决定一条消息是否能够发布到指定的队列。它通过比较消息的发布规则routing_key与队列的发布匹配规则binding_key来实现这一功能。
关键概念
binding_key队列的发布匹配规则用于确定消息是否能够发布到该队列。每个队列和交换机的绑定信息中都有一个binding_key这是队列发布的匹配规则routing_key消息的发布规则与binding_key进行匹配以决定消息的去向在每条即将发布的信息中都有一个routing_key这个是消息的发布规则。交换机类型
广播直接将消息发布给交换机的所有绑定队列。直接routing_key与binding_key完全一致时匹配成功。一般来说如果遇到了这种发布规则routing_key会被设置为队列名而binding_key也是一样主题binding_key中包含匹配规则如news.music.#routing_key符合这些规则时如news.music.pop匹配成功。
功能
路由匹配模块本质上不直接管理数据而是提供路由匹配操作接口
判断routing_key与binding_key是否匹配
提供一个接口用于判断给定的routing_key与binding_key是否能够匹配成功。
判断routing_key是否符合规定
格式约定routing_key只能由数字、字母、逗号,、点.构成。提供验证功能确保routing_key符合规定的格式。
判断binding_key是否符合规定
格式约定binding_key只能由数字、字母、点.、井号#、星号*构成。提供验证功能确保binding_key符合规定的格式。这些特殊字符在主题交换机类型中具有特定的匹配含义。
下一个消费者管理模块
消费者管理模块
概述
消费者管理模块涉及两种类型的客户端发布消息和订阅消息。只有订阅了指定队列消息的客户端才被视为消费者。
消费者数据存在的意义
当指定队列有消息时需要将消息推送给这个消费者客户端推送时需要找到与该客户端相关的信息如连接。
消费者信息
消费者标识tag用于唯一标识消费者。订阅队列名称当当前队列有消息时会推送给这个客户端同时当客户端收到消息后需要对指定队列的消息进行确认。自动确认标志
自动确认推送消息后直接删除消息无需额外确认。手动确认推送消息后需要等待收到确认回复后再删除消息。
消费处理回调函数指针当队列有一条消息时通过这个函数进行处理函数内部逻辑固定即向指定客户端推送消息。
消费者管理
管理思想
以队列为单元进行管理。每个消费者订阅的都是指定队列的消息消费者对消息进行确认也是以队列为单位进行确认。最关键的是当指定队列中有消息时会获取订阅了这个队列的消费者信息进行消息推送。
队列消费者管理结构
数据信息消费者链表用于保存当前队列的所有消费者信息。采用RR轮转策略每次取出下一个消费者进行消息推送一条消息只需要被一个客户端处理即可。
管理操作
初始化队列消费者结构创建并初始化队列消费者管理结构。删除队列消费者结构释放队列消费者管理结构所占用的资源。向指定队列添加消费者将新的消费者添加到指定队列的消费者链表中。获取指定队列消费者获取指定队列的消费者链表或其中的某个消费者信息。删除指定队列消费者从指定队列的消费者链表中移除指定的消费者。
此外还包括一些其他的管理操作如获取队列消费者数量、判断队列消费者链表是否为空等。
下一个就是信道的管理了之前就说明过信道其实就是一个连接的进行细分为了更加充分的利用资源而下面就是对信道管理的更加细节的说明
信道管理: Channel
信道是网络通信中的一个概念叫做通信通道。
网络通信的时候必然都是通过网络通信连接来完成的为了能够更加充分的利用资源因此对通信连接又进行了进一步的细化细化出了通信通道。对于用户来说一个通信通道就是进行网络通信的载体而一个真正的通信连接可以创建出多个通信通道。每一个信道之间在用户的眼中是相互独立的而在本质的底层它们使用同一个通信连接进行网络通信。因此因为信道是用户眼中的一个通信通道所以所有的网络通信服务都是由信道提供的。
信道提供的服务操作:
声明/删除交换机声明/删除队列绑定/解绑队列与交换机发布消息/订阅队列消息/取消队列订阅/队列消息确认
信道要管理的数据:
信道ID信道的ID这样以便于区分不同的信道哪一个信道来了一个新的消息便于我们快速的找到这个信道找到对应信道的信息以进行操作信道关联的虚拟机句柄信道关联的消费者句柄当信道关闭的时候所有关联的消费者订阅都要取消相当于删除所有的相关消费者。工作线程池句柄信道进行了消息发布到指定队列操作之后从指定队列获取一个消费者对这条消息进行消费。也就是将这条消息推送给一个客户端的操作交给线程池执行。
说明
并非每个信道都有一个线程池而是整个服务器有一个线程池大家所有的信道都是通过同一个线程池进行异步操作而已。
对信道的管理
1.创建信道
2.关闭一个信道
3.获取指定信道的信息(对信道的操作其实就是增删查)
连接的管理
到这一块的时候细节方面已经差不多了后面更多的是在进行整合。
概念: 网络通信连接
在网络通信模块中我们使用muduo库来实现底层通信muduo库中本身就有Connection连接的概念和对象类。
但是我们的连接中还有一个上层通信信道的概念这个概念在muduo库中是没有的。
因此我们需要在用户的层面对这个muduo库中的Connection连接进行二次封装。形成我们自己所需的连接管理。
管理数据:
muduo库的通信连接当前连接关联的信道管理句柄
连接提供的操作:
创建信道关闭信道
管理的操作:
新增连接关闭连接获取指定连接信息
Broker服务器模块
这个模块其实就是对上面所有功能模块的整合将所有的功能模块整合到一起来形成一个消息队列服务器所以在这个模块中更多提供的是管理的信息而不是管理的操作
整合以上所有模块并搭建网络通信服务器实现与客户端网络通信能够识别客户端请求并提供客户端请求的处理服务。
管理信息:
a. 虚拟机管理模块句柄虚拟机管理模块总的句柄b. 消费者管理模块句柄每一个信道中都有一个自己关联的消费者这里的是一个总的消费者的管理c. 连接管理模块句柄将所有的连接管理起来d. 工作线程池句柄整个服务器有一个工作线程池而不是每一个信道有一个只不过是一个信道有了消费者之后会将这个线程池给这个信道让其能够操作线程池而已e. muduo库通信所需元素这些元素其实就是muduo库网络通信的时候有多少个connection必须要有一个协议处理的操作
再对上面的信息进行一个总结
Broker服务器模块:
这个模块是一个功能整合模块本质上这个模块并不提供实质的功能性操作。
这个模块最重要的是资源的整合是一个资源的载体
工作线程池一个服务器有一个工作线程池其他所有的信道操作都是这同一个线程池。虚拟机一个服务器有一个虚拟机其他所有交换机队列绑定消息的操作都是针对这个虚拟机进行的。消费者管理一个服务器有一个消费者管理。通信相关连接管理协议处理模块句柄也是一整个服务器有一套。
客户端模块
客户端拥有的模块和服务端有些是差不多的因为某些服务服务端提供给了客户端而客户端也要提供给用户。当然也是存在不同的比如连接管理模块。
1.消费者管理模块
消费者管理模块
消费者标识订阅的队列名称自动确认标志消息回调处理函数指针
当当前消费者订阅了某一个队列的消息这个队列有了消息后就会将消息推送给这个客户端这时候收到了消息则使用回调函数进行处理处理完毕后根据确认标志决定是否进行消息确认。
管理操作:
增删查
2.信道管理模块
所有提供的操作与服务端雷同因为客户端给用户要提供什么服务服务器就要给客户端提供什么服务管理信息
信道ID消费者管理句柄每个信道都有自己相关的消费者线程池句柄对推送过来的消息进行回调处理处理过程通过工作线程来进行信道关联的连接
信道提供的服务:
声明/删除交换机声明/删除队列绑定/解绑队列与交换机发布消息/确认消息订阅队列消息/取消订阅队列消息创建/关闭信道
信道的管理: 信道的增删查
连接管理模块
客户端连接的管理本质上是对客户端TcpClient的二次封装和管理。
面对用户 不需要有客户端的概念连接对于用户来说就是客户端通过连接创建信道通过信道完成自己所需服务。因此当前客户端这边的连接对于用户来说就是一个资源的载体。
管理操作:
连接服务器创建信道关闭信道关闭连接
管理的资源:
工作线程池连接关联的信道管理句柄
异步工作池模块TcpClient模块需要一个EventLoopThread模块进行IO事件监控收到推送消息后需要对推送过来的消息进行处理因此需要一个线程池来帮助我们完成消息处理的过程
将异步工作线程模块单独拎出来的原因
多个连接用一个EventLoopThread进行IO事件监控就够了所有的推送消息处理也只需要有一个线程池就够了
并不需要
每个连接都有一个EventLoop每个信道的消息处理都有自己的线程池
有了这几个模块之后首先要做的就是搭建一个异步工作池然后创建连接将异步工作池传递进去然后通过连接建立信道之后用户的所有操作通过信道完成。
到这里这个项目的所有模块就介绍完毕了下面再使用图将这些模块整合起来即可
项目模块关系图 需要说明的是当一个消息来到的时候确定给了某一个虚拟机上的交换机之后这个交换机会将这个消息进行匹配对照知道这个消息要放到哪一个队列中然后将这个信息放到队列中。在这个服务中消息是以队列为单位进行保存的并且是在内存和硬盘和数据库中都存有一份当内存中某一个队列上的信息符合了垃圾回收进行了这个操作之后会将垃圾回收后的这个队列新的位置持久化到硬盘和数据库中。然后就是整合这些模块的BrokerServer模块了并且也是资源的具象化。这个模块会向客户端提供各种功能。而当客户端和服务端建立了信道之后信道来自于连接。客户端就会向上为用户提供不同的操作当用户选择了某一个操作之后客户端会通过信道向服务端进行通信然后服务端就会执行对应的操作。而针对于不同的客户端向用户提供的操作也是不同的对于订阅客户端来说如果自己订阅了某个队列之后客户端会通过订阅客户端接口通过信道向服务端进行通信发送的正是一个messege服务端再去将这个信息找到虚拟机在通过路由匹配选择合适的队列。
下面正式开始项目的实现。
项目的文件结构 demo文件夹中放的就是一些需要经过测试的代码文件再测试完成之后再引入到主项目中mqclient就是客户端模块了mqcommon存放的是多个模块需要共用的模块比如线程池protobuf日志打印等等mqtest下进行项目的单元测试third就是第三方库的存放目录。
实用工具的完成
这些工具完成了之后便于我们之后项目的书写不需要之后再项目中使用的时候再回来写。第一个日志打印工具便于我们通过打印出来的信息判断是否出错进行调试。
然后就是使用Helper工具的完成这个工具下具有几个不同的类第一个文件基础的操作类删除创建等等sqlite基础操作类字符串操作类UUID生成器类。 日志工具的完成
这里我将代码的实现放到mqdemo下生成一个log文件夹下创建log.cpp,封装一个日志宏通过日志宏进行日志的打印再打印的信息前带有系统事件已经文件名和行号
比如 [14:54:47] [log.cpp:12] 打开文件失败。
为了得到文件名字和行号需要知道下面的两个宏
_FILE__和__LINE__到时候编译器会将这两个宏替换成文件的名字和行号。
为了得到具体的事件需要使用到下面的函数 第二个参数就是需要转化成的格式是什么样的可以使用的格式如下 这个函数的作用就是将时间按照一定的格式转化为一个字符串那么上图中标记的这个结构体如何来呢需要使用下面的这个函数 这个函数会根据现在的系统时间返回一个struct tm结构体指针。 上图就是这个结构图中的内容。使用上面的结构体就能够完成时间的格式化了。将上面的接口使用起来写一个测试代码 成功的打印出了我需要的信息。
但是不可能每一次我都使用这样的操作去进行日志打印这样就会造成项目代码冗余过高下面就来将这个代码封装为一个宏并且将日志等级也添加上某些日志等级并不需要进行信息的打印而有的需要进行信息的打印。 上面的这个代码就完成了对一个宏函数并且这个宏函数会打印日志的等级。在等级大于ERR的时候才会进行打印。##__VA_ARGS__这个宏需要带上##是为了防止外部在进行参数传递的时候只传入了一个字符串(hello world)并没有后面的%d导致LOG(lev_str,level, format, ...)中format被匹配之后后面的没有被匹配导致的错误。
运行一下 运行成功然后将这个代码拷贝到项目的公共模块下 为了防止这个模块被重复包含再去定义一个条件编译 当然上面的日志打印只是一个简单的权宜之计因为实现一个真正的日志系统很复杂并且是存在第三方库可以使用的但是如果再增加的话就让项目变得更加复杂了为了简单化项目所以就使用了这个日志系统。
sqit工具类的完成
这个工具类其实在之前教学使用sqit3数据库的时候已经实现了直接将之前使用的代码拿过来然后将日志添加到之前写的代码中即可。 这样就完成了。
字符串分割类的完成
这里需要知道的是字符串分割的完成思想就是遍历字符串找到分割符然后获取答案。 这样就完成了这个代码然后再将这个代码封装为一个类函数然后放到工具文件中 uuid生成工具类的完成
因为发送的每一条信息都需要有一个独一无二的id所以此时就需要使用一个uuid的生成器来保证每一个消息都具有自己的uuid了。在这⾥uuid⽣成我们采⽤⽣成8个随机数字加上8字节序号共16字节数组⽣成32位16进制字符 的组合形式来确保全局唯⼀的同时能够根据序号来分辨数据加上8字节序号是为了更加容易分辨因为肉眼分辨随机数很难。
这里我们先来解决随机数的问题
c中具有一些类能够生成随机数。 这个类生成的随机数是一个机器随机数机器随机数就是通过硬件生成的随机数生成这个随机数的效率比较低因为速度比较慢这里我们需要生成8个随机数如果都使用这个方法那么对于每一个消息都需要一个随机数的情况来说会拖慢运行速度所以这里并不能完全使用这个机器随机数。而在官方库中也提供了一个生成随机数的函数但是生成的随机数是一个伪随机 这个函数就是使用了梅森旋转算法Mersenne Twister的函数19937是数据的范围说明生成的随机数范围在2^19937-1这个范围内范围越大越不容易重复。这个函数和之前使用过的random函数一样需要使用一个随机数种子否则生成的随机数就可能出现重复的这里我们就使用上面的机器随机数作为这个函数的种子来生成随机数。
解决方法通过机器随机数作为种子来生成一个伪随机数。 这样就通过梅森旋转算法生成了伪随机数相比于生成机器随机数这个函数的效率就高很多了。但是现在还有一个问题那就是我只需要8个0到255之间的随机数啊进行mod操作吗不需要因为标准库中也给我们提供了一个在数据区间取数据的函数 看这个例子 首先通过限定范围构造了一个对象出来然后再根据生成的随机数进行区间取值 之后生成的随机数都是在这个范围内的这样就能够解决获得8个随机数0到255范围。下面就是要将这个生成的8个随机数变成16进制的数字字符了 但是也可能出现下面这样的字符 因为生成的这个随机数大小只有这么大所以之后再生成的时候0就被忽略了。此时就需要设置位宽和填充的数字了。 这样即使随机数生成的这个数字只是一位的也会具有两个位宽。 这里因为我们需要的是8个随机数所以需要进行8次循环。stringstream这个类会将生成的数据存放在自己的缓冲区中所以在前面直接不断的进行放置即可。 这样就得到了8个随机数生成的16位的字符串这样生成的数据的唯一性已经很强了但是我想要修改一下格式按照844的格式也就是让这个字符串生成的第81216个字符的后面增加-。所以也就是在生成第468个字符串的时候往后追加即可。 这样生成的随机数更加便于观察。然后还需要一个编号这里使用一个原子的静态变量即可静态变量保证了编号是随程序在进行累加的而原子保证了每一个信息拥有的编号是唯一的。 这样就生成了一个我需要的uuid。这里的思想就是生成8个1字节随机数再取8字节的序号组成16字节的数据然后转化为16进制字符共32个再使用这个字符组成一个特定格式的id就是这个方法的思想。
然后需要将这个方法放到工具类中。然后进行测试 可以看到右边的序号是不断的递增的过程
文件基础操作类
这个类中需要完成的功能为以下几个
1.判断文件是否存在 2.文件大小获取 3.文件读写 4.文件创建/删除 5.目录创建/删除
这个类我就不在demo中做了而是直接写在helper文件中了。首先完成这个类的大体框架搭建。 上图代码中read函数body类型从文件特定的位置进行读取的那个read接口错了应该是char*类型而不是string类型因为在写read函数的时候发现需要使用的接口需要使用char*同时write的那个函数的string类型也需要修改为const char*写入文件的特定位置的那个write函数
然后就是去一个一个完成这些函数了。
判断文件是否存在可以使用下面这个函数 这个函数第一个参数就是你要判断的文件的文件路径第二个是mode在下面的描述中如果mode被设置为了F_OK并且你要判断的这个文件存在那么这个函数的返回值就是0。但是这个函数的头文件是unistd.h也就意味着这个函数是一个系统调用并且是在Linux下使用的也就不具备跨平台性。所以不使用这个函数转而使用下面这个函数stat 这个函数的头文件是sys/stat.h上图中的unsitd.h并不是这个函数的返回值这个函数会获取一个文件结构体这个结构体中的信息 文件的inode节点号文件的设备号文件的大小访问时间等等都在结构体中。大多数情况下都能够获取到一个文件的属性如果没有获取到这个文件的属性说明这个文件不存在获取文件属性成功返回0否则不返回0。并且这个接口具有一定的跨平台性所以就是用这个函数来完成判断文件是否存在的函数。由此就能够完成exists函数 如果获取文件的大小呢之前不是说了吗在stat函数的这个结构体中就具有这个文件大小的属性可以通过这一点来获得文件的大小。 对于读文件首先来完成读一个文件的特定部分要完成这一步需要按照下面的步骤来进行
第一步打开文件第二步跳转到文件读写位置第三步读取文件数据第四步关闭文件。
打开文件使用文件流即可完成
std::ifstream
使用这个文件流需要包含头文件fstream同时这个文件流还具有以下的选项
std::ios::in:
以读取模式打开文件默认模式。如果文件不存在将会打开失败。
std::ios::out:
以写入模式打开文件。如果文件已经存在则会清空文件内容。如果文件不存在则会创建新文件。
std::ios::app:
以追加模式打开文件。写入操作将会添加到文件末尾而不会清空文件内容。
std::ios::trunc:
如果文件已经存在打开文件时会清空文件内容。通常与 std::ios::out
std::ios::binary:
以二进制模式打开文件。默认情况下文件以文本模式打开。使用二进制模式可以处理非文本文件如图像、音频等。
std::ios::ate:
打开文件后将文件指针移动到文件末尾。这允许你从文件末尾开始读取。
std::ios::nocreate:
仅在文件存在时打开不创建新文件。
std::ios::no_trunc:
如果文件存在则不清空文件内容。
然后就来完成这个接口 这个接口中并没有太多的校验需要用户在使用的时候注意比如判断len值等等所以需要用户自己进行注意否则会崩溃。
下一个read接口复用这一个read接口即可 这个代码中的body[0]就是将body中的首位地址取出来。这里如果不调整空间大小那么body中的空间就是0然后后面的函数直接使用body中的空间就会出现访问未定义空间的问题
下一个write函数
第一步打开文件第二步跳转到文件指定位置 第三步写入文件 第四步关闭文件这里有一个小知识点要进行跳转必须要具有文件的读权限所以下面的代码中会将读权限也构建好。
并且因为是写入文件并且要进行跳转所以使用的类为ostream,整体的思路和read函数是一样的 另外一个write函数直接复用这个write函数即可。 但是在下面这种情况下是会存在一个大问题的如果一个文件中是存在数据的100字节。那么如果调用上面的函数将50字节的数据从这个文件中进行写入那么这个文件的大小任然是100只不过前50个字节被覆盖成为了我写入的文件但是后50字节的数据还是存在的此时我们再处理数据的时候就会出现问题了。但是我这里就不解决这个问题了如果遇到这种情况了将这个文件删除然后重新再去写入这个文件即可。
这里我再给这个类增加一个接口修改文件的名字 这个接口后面会有用处。
现在去完成下一个接口获取一个文件的父级路径什么是父级路径呢假设现在一个文件的路径为/ww/dd/ff/des.txt这个des文件的父级路径就是/ww/dd/ff要想获得这个路径只需要从这个路径字符串从后往前走找到最后一个/然后从开头开始截取字符串即可。而在string中恰好有一个就是从后往前找一个字符的接口。 但是如果外部传入的就是这个文件名字呢此时我们去寻找/是找不到的此时直接返回./当前目录不正是这个文件的父级目录吗 下一个函数修改文件名字的函数这个函数要如何完成呢难道是获取储存有这个文件的结构体然后修改结构体中的内容吗或者是删除原始文件然后创建一个新的文件其实都不用存在一个接口能够完成修改文件名字的操作 专门用来修改文件名字的函数成功返回0失败返回-1。所以修改文件名字的函数直接调用这个函数就可以了。 下一个创建文件的接口这个接口的实现思路也很简单打开一个文件如果这个文件不存在就创建存在什么都不做直接关闭文件。 下一个删除文件接口这个接口在标准库中依旧是提供了接口的。 这个函数内部封装的系统调用是unlink这里为了移植性没有使用系统调用。
下一个接口创建目录如果存在一个正常的路径让我去创建目录那么也是存在一个函数可以让我们使用的 但是如果在创建目录的时候某一个目录的父级目录不存在呢所以在创建多级目录的时候需要确保父级目录一定存在也就是创建多级目录的时候需要从第一个父级目录开始创建。所以我们从字符串中一个一个的去截取。最主要的是我们需要递归的去创建目录所以每一次都要从path的0位置开始往后截取。 最后一个函数要移除目录这里依旧存在一个函数 但是这个函数的要求就是这个目录必须是空的所以要删除这个目录首先就要删除这个目录中的文件然后再去删除这个目录。虽然代码不难写但是代码很多。但是不要忘了在Linux中有一个命令rm -rf能够完成这个功能而在Linux中命令也就是一个文件可以调用下面这个接口来执行这个文件来执行这个命令。 这个函数就能够执行一个指令还有一个方法就是进行程序替换也是可行的但是这里我就不使用这种方法了因为执行程序替换要创建线程。代码如下 这样这个文件基础操作类也就完成了。
下面就是测试这个文件基础操作类的功能了。
首先来测试文件是否存在以及文件大小的测试这里就测试之前写的logger.hpp文件 执行结果 执行成功文件大小也是正确的 证明这两个功能是成功的下一个功能文件的读写和文件目录的创建
这里我首先在当前目录下创建一个aaa/bbb/ccc/tmp.txt文件为了创建这个文件首先就要判断这个文件是否存在如果这个文件不存在先去创建父级目录最后再去创建文件。
代码 执行后的结果 成功创建了文件以及目录。
下面再去测试文件的读取和写入功能。 运行结果 最后再来看tmp中是否存在代码 可以看到tmp中也是成功写入了代码。读写功能也就测试完毕了。
下面我要去读取tmp中的局部内容然后进行修改这里我就读取__M_LOG_H这个宏这个宏是从第8个字节开始读取9个字符。 运行结果 局部读取成功。写一个局部的写入 运行结果 tmp中的内容 现在局部的写入和读取也测试成功了。
还有几个功能修改文件名字删除功能的测试。
名字修改 删除文件 删除路径 到这里所有的功能就测试完成了。
消息类型proto编写
因为这个项目涉及到了网络通信也就会涉及到序列化和反序列化所以这里需要先将消息类型proto文件写好之后生成相关代码。既然要将消息类型的proto文件写好就需要定义消息的类型。
消息本身要素
i. 消息属性
{
消息ID唯一标识每条消息的标识符。
消息投递模式选择消息的投递方式可以是 非持久化模式 持久化模式
消息的routing_key用于消息队列中路由消息的关键字。
ii. 消息有效载荷内容
}消息内部本身所包含的四个内容以上就是。
这部分包含消息的具体数据即消息需要传递的信息内容。
下面的额外要素是因为某些信息需要进行持久化所以需要的元素
消息额外存储所需要素
i. 消息的存储位置
指定消息存储的具体路径或位置。
ii. 消息的长度
消息内容的字节长度。
iii. 消息是否有效
使用字符的0或1来表示消息的有效性而不是使用布尔类型bool因为在持久化时布尔类型所占的长度不同可能会导致修改文件中消息有效位后消息长度发生变化
再加上客户端和服务端也会使用到一些交换机的信息所以需要将交换机的信息也一起也到proto文件中。
1. 交换机类型
a. DIRECT
b. FANOUT
c. TOPIC
2. 消息投递模式
a. UNDURABLE在RabbitMQ中此模式的值为1咱们也效仿
b. DURABLE 值为2. 如上就完成了消息的proto文件的编写然后就是进行生成了。 成功生成了我们需要的文件。
项目功能服务端模块编写
交换机数据管理
要完成这个管理工作首先就要定义出一个交换机的数据类交换机中绑定了多个队列当发布客户端发送了消息之后会交给交换机然后由交换机来决定这个消息要进入哪一个队列中。
定义交换机数据类
a. 交换机名称 唯一标识
b. 交换机类型 决定分发方法广播交给所有队列直接交换交给指定队列主题交换发送给符合匹配条件的队列
c. 是否持久化标志
d. 是否⾃动删除标志 当前项目最后没有实现
e.其它参数
以上就是所有交换机都具有的数据。
又因为交换机中的是有可能需要进行持久化的所以这里还需要定义一个交换机数据持久化类。
定义交换机数据持久化类数据持久化的sqlite3数据库中
a. 创建/删除交换机数据表
b. 新增交换机数据
c. 移除交换机数据
d. 查询所有交换机数据
e. 查询指定交换机数据根据名称
这个类提供了交换机的增删查功能以及创建表的功能。
只有先定义好了交换机的数据类然后才能进行交换机的管理类。
定义交换机数据管理类
a. 声明交换机并添加管理存在则OK不存在则创建
b. 删除交换机
c. 获取指定交换机
d. 销毁所有交换机数据
以上就是我们需要完成的功能。
下面我们就根据上面的功能将代码的框架搭建好然后再去完成各个函数。 这里我后面又添加了一个无参的构造因为后面需要使用到这个无参的构造。
上图中的setArgs和getArgs函数可以看作是数据库和map之间的序列和反序列函数。
下一个类框架交换机数据持久化管理类数据保存在sqlite数据库中。 下一个交换机数据管理类 这个数据管理类之后就会提供接口给外层外层使用这些接口完成对交换机的管理。以上只是将框架完成了为了完成这些函数还需要写一些私有函数来帮助我们完成这些函数。
那么下面我们就来实现这些函数第一个Exchange类中的setArgs函数这个函数可以认为是Exchange和数据库之间的反序列函数用于将从数据库中读取的数据转化为可以放到map中的数据另外一个函数就是getargs函数这个函数和setArgs函数相反是将交换机中的数据变成合适的字符串之后再放入到数据库中。
setArgs函数 getArgs函数 至于最后一个keyvalue的字符串需不需要带则带也可以不带也可以。没有影响因为字符串分割的时候最后一个后面的\0是不会被截取进来的。
下一个交换机持久化数据管理类的构造函数既然要进行持久化那么要做的事情就是打开数据库这里因为使用的是sqlite数据库所以首先要做的是确保这个数据库文件已经存在了所以需要打开这个文件成功没有会进行创建之后再去进行数据库的操作。 这样这个类的构造函数就完成了。其中使用了几个后面使用到的函数。
下一个函数创建/删除数据库表以及向表中插入/删除数据这些函数的实现逻辑很简单就是写好一个sql语句然后调用进行执行就可以了。 现在消息队列重启了需要重新将硬盘中的数据加载到内存中由此就引入了最后一个函数
getall()函数这个函数会返回一个哈希表这个表中的数据就是需要加载到内存中的数据。
这个函数的思路也很简单从表中进行数据的查询然后将查询的数据放到一个map中这里就需要一个私用函数了这个函数用于将从数据库中查询到的信息拿取上来(从数据中储存的信息被保存再一个结构体的缓冲区中)现在需要使用这个函数将缓冲区中的数据拿上来放到map中这个私有函数做的事情就是这个。 这样这个recover函数就完成了。
到这里交换机磁盘管理类就完成了。
最后就是交换机的数据管理类了这个类因为上面两个类的完成就很简单了
构造函数 声明交换机 删除一个交换机 判断交换机是否存在 删除所有交换机 选择了直接删除表和内存中的数据
获取某个交换机的信息 到这里整个交换机的数据管理模块就完成了要做的就只有进行测试了。
使用gtest进行测试。 其中第一个插入测试会插入几个交换机其中map中的内容为k1v1k2v2
然后是运行结果 成功执行了如果出现了段错误那么使用gdb进行调试。
下一个删除测试 在进行这一次测试之前我将gtest测试组件中的析构函数中的会删除表和数据的函数进行了注释让我在程序运行之后去数据库中检查exchange3是否被删除。 可以看到成功执行了并且获得的也是空指针。去数据库中进行查询也是成功完成了删除。 还有一个exists检测一个交换机的功能需要进行测试 运行结果 所有结果都成功运行了。还剩下最后一个重要的功能也就是recover功能。
刚好现在我的数据库中还存在数据没有清理这里我先将insert测试取消。然后再恢复了之后去查询exchange2这个交换机是否存在。 因为这一次不会进行插入所以会直接从数据库的表中进行数据的恢复ExchangeManper的构造函数中进行的不需要我进行调用。然后进行测试 成功运行了唯一一个没有通过的用例是因为哈希表中的值是无序的导致出现了那样的字符串但是还是符合前和后分别是key和val。第一个也不会影响后续的程序因为对表中的数据没有造成影响 到这里这个类的测试也就完成了并且在大的逻辑上也没有问题。最后在进行一个测试那就是如果map中的值为空是否会出现问题呢
进行下面的测试然后直接去查询表 成功了没有出现问题。到这里这个类就大体完成了。
队列数据管理
这个队列数据管理主要是服务端需要一个描述当前客户端创建了哪些队列的类。这样当客户端想要将信息发送到哪一个队列的时候才能通过这个类中的标识知道当前信息能够推送到哪些队列中所以队列数据管理本质上队列描述信息的管理用于描述当前服务器上都存在哪些队列。
那么这个队列描述信息类中究竟存在哪些信息呢
定义队列描述数据类
a. 队列名称
b. 是否持久化标志队列中的信息是否需要放入到数据库中
c.是否独占标志没有实现但是这个标志的作用是只有创建了这个队列的消费者才能去消费这个队列中的数据其它人不允许消费
d.是否自动删除标志没有实现如果对当前队列操作的所有消费者都已经退出了那么这个队列是否需要自动进行删除
e.其它参数
对当前项目来说真正用到的只有a和b其它的标志都是从RabitMQ源码中得来的只不过
RabitMQ原本项目过于庞大这里没有继续实现了。既然有了描述信息类并且这个类也需要进行持久化所以和交换机一样
定义队列数据持久化类数据持久化的sqlite3数据库中 a. 创建/删除队列数据表
b. 新增队列数据
c. 移除队列数据 d. 查询所有队列数据通过这个接口将磁盘中的信息读取出来恢复到内存中
定义队列数据管理类 提供接口给外层使用 a. 创建队列并添加管理存在则OK不存在则创建 b. 删除队列 c. 获取指定队列 d. 获取所有队列 e. 判断指定队列是否存在 用于功能测试的接口 f. 获取队列数量用于功能测试的接口 g. 销毁所有队列数据用于功能测试的接口
下面就是队列信息描述类的书写 还有一个无参的构造上图忘了添加了但是后面我添加了。
这个类需要从数据库中进行数据的读取所以这个类和交换机数据类一样需要两个格式函数并且格式化方式是一样的这里我直接将交换机数据类中的那两个类拿出来用了。 到这里第一个数据类就完成了。
持久化数据管理类的完成
这个类需要对数据库进行操作所以依旧会具有一个数据库的操作句柄并且构造函数也需要传入数据库文件的路径。 首先来完善构造函数 然后就是创建表函数了逻辑就是写sql语句然后使用句柄去进行执行 然后是删除表的函数
依旧是写sql语句然后执行 然后是插入数据的函数 然后是删除某一个队列的函数 最后一个函数是将数据库中的信息读取出来然后放入到内存中:
首先执行select语句然后通过回调函数将读取到的信息放入到一个map中最后返回map对象。 这样这个类也就完成了这个代码和之前的交换机磁盘数据管理类可以说是一样的。到这里队列磁盘数据管理就完成了。
对外提供服务的队列数据管理类 然后就是完成各种管理函数了 构造函数 声明一个queue的函数 删除一个queue的函数 返回一个指定队列的信息 返回所有队列的信息就是将内存中的哈希表进行返回 判断一个队列是否存在 然后是放回当前队列的数量已经删除所有队列信息的函数 到这里整个队列数据管理类就完成了下面就是进行测试了。
首先依旧是进行插入和查询的测试 测试运行结果 全部通过这也就证明了查询和插入功能没有问题下一个删除判断是否存在的测试 运行结果 这里我没有清理表中的数据此时去看数据库中的数据 也是符合的。
然后就是测试恢复功能了。 这里直接去看能否读取到queue2然后删除queue2再去进行存在判断。 从运行结果可以看到其它的测试都成功执行了除了将哈希表中的数据变成字符串导致的无序让我这里测试没有通过以外。但是这不重要上面的测试已经说明了恢复功能是没有问题的。 从没有删除的数据库表中查询是否还存在queueu2 果然也是不存在了.到这里测试也就完成通过了。
绑定信息数据管理
首先绑定信息描述了交换机和哪些队列之间具有关联关系的一个描述在这个描述中会给双方建立一个映射关系。
当发布客户端发送消息到交换机之后交换机通过这个绑定数据就能够知道这个交换机和哪些队列之间具有关联关系然后根据匹配规则将消息发送到对应的队列中。
下面就是对于这个绑定信息类的定义了
定义绑定信息类
a. 交换机名称
b. 队列名称
c. binding_key分发匹配规则-决定了哪些数据能被交换机放⼊队列
因为这个服务和交换机和队列有关所以这个队列中的信息也要在数据库中进行持久化所以这个类也是具有数据库持久化管理类的 定义绑定信息数据持久化类数据持久化的sqlite3数据库中句柄依旧是数据库的句柄
a. 创建/删除绑定信息数据表
b. 新增绑定信息数据
c. 移除指定绑定信息数据
d. 移除指定交换机相关绑定信息数据移除交换机的时候会被调用交换机都没有了那么绑定信息自然也需要进行删除
e. 移除指定队列相关绑定信息数据移除队列的时候会被调用同上只不过是队列没有了
f. 查询所有绑定信息数据用于重启服务器时的数据恢复。
然后就是最后的管理内存和硬盘并对外提供服务的绑定信息数据管理类了 定义绑定信息数据管理类
a. 创建绑定信息并添加管理存在则OK不存在则创建
b. 解除指定的绑定信息
c. 删除指定队列的所有绑定信息
d. 删除交换机相关的所有绑定信息
e. 获取交换机相关的所有绑定信息交换机收到消息后需要分发给⾃⼰关联的队列
f. 判断指定绑定信息是否存在 用于测试
g. 获取当前绑定信息数量 用于测试
h. 销毁所有绑定信息数据 用于测试
这里需要知道的真正的RabitMQ是具有图形化界面的这个界面会将具有多少虚拟机队列以及绑定信息都显示出来所以需要底层提供一个获取全部信息的接口。
下面首先是对这个绑定信息的描述类 这个类因为描述的东西不多所以写这个类也很简单。
然后就是绑定数据持久化管理类了。 上面是这个类最基本的东西然后我们需要知道的是队列的绑定信息以及交换机的绑定信息要怎么去定义。
因为一个队列只能绑定一个交换机所以对于一个队列来说一个交换机只会有一个绑定信息。所以队列和绑定类之间的绑定关系就是 但是一个交换机是可以绑定多个队列的 使用上面两种方法来表示是因为我们使用的最多的功能就是获取一个交换机相关的绑定队列因为当交换机收到一个消息之后这个消息需要放到哪一个队列中是通过这些绑定信息来决定的。
所以当重启服务的函数的返回值返回的应该是ExchangeBindMap通过这个表就能够找到交换机和哪些队列存在关系然后通过队列中的绑定信息知道匹配规则。
所以这个类的基本构造就是下图 构造函数 通过上面的两个map通过交换机的名字就能够知道和这个交换机相关的所有队列的信息。在这些队列的信息中就具有一个绑定信息绑定信息就能够知道这个队列的匹配规则。这里并没有再定义一个交换机和绑定信息之间的关系因为这样再去定义的话再去删除就需要再额外删除一些东西。 而使用上面的方法如果要删除交换机相关的映射只需要删除一次即可。
下面就是实现上面的函数了其实主要就是完成sql语句了这里需要创建的表名字为binding_table其中具有三个属性一个交换机名称一个队列名称一个匹配规则都是varchar 删除表也是直接执行一个sql语句即可。 插入函数 删除绑定信息函数这里的删除就是特别指定删除某一个绑定信息。 删除交换机绑定信息同时也会删除队列和绑定的信息 删除队列绑定信息这个函数和上面那个函数的实现方法是一样的只不过是将exchange_name变成了queue_name 最后就是recover函数了这个函数和之前的哪些了类的实现方法都是一样的 只不过因为绑定信息中具有了两个map所以需要先通过交换机的名字获取一个map这个map就是和交换机相关联的队列的map这个map中储存的是队列和绑定信息类的绑定。 到这里这个类也就基本完成了。这个类最难理解的就是两个map首先是队列的名字和绑定类的map另外一个map就是交换机的名字和上面这个map的map这个map代表的是在这个map中所有的队列都是和这个交换机相关联的。
然后就是最后一个管理连接以及向外提供接口的类了 上面是这个类的框架下面要做的就是实现这个类。
构造函数 然后就是函数的实现了第一个Bind函数这个函数的实现逻辑就是狗仔一个队列的绑定信息对象然后添加到队列和绑定的map中因为一定会访问map所以需要加锁当然如果这个映射已经存在了那么也是没有必要进行添加的除此之外对于绑定是否需要进行持久化也是一个问题因为如果绑定持久化了但是这个绑定对应的队列和交换机都没有进行持久化那么这个绑定持久化是没有必要的。所以如果队列和交换机都进行了持久化那么绑定自然要进行持久化反之任何一个不进行持久化那么这个绑定也没有进行持久化的必要但是这样就会存在一个问题要获取交换机和队列的信息就需要将之前写的两个类包含进来就会导致几个文件之间的耦合。所以这里我选择了添加一个参数让上层使用者决定这个连接是否进行持久化。这样就不需要让各个类之间出现耦合了。 下一个就是解除绑定
这个函数的实现原理就是首先通过交换机的名字得到和这个交换机相关的连接的map然后在这个map中删除qname的连接信息。 然后是移除交换机相关连接的函数这个函数就很容易实现了直接调用持久化类中的删除函数进行删除再去删除map中这个连接相关的信息。 然后是删除队列相关连接的函数了这个函数函数的持久化删除很简单但是难点在于如何从_bindings中删除这个队列的连接信息因为可能很多的交换机都会绑定这个队列。这里我就选择遍历这个方法了。 下一个 返回某个特定交换机的所有连接 检测特定的连接是否存在
逻辑也很简单直接从两个map中进行查找就可以了 下一个获取所有连接数量的函数
逻辑遍历即可 获取指定连接信息的函数 最后的清理函数 在测试之前需要知道两个哈希表之间的逻辑关系是怎么样的 下面就是测试函数 下面的测试测试了删除交换机/队列的测试 下面时查询的测试 测试结果 全部通过。再去数据库中检查一下是否存在数据 没有表的信息删除成功了。下面就是测试recover从硬盘中恢复的功能了 现在这个表中具有这些信息然后我直接进行查询exchange2的测试 只进行上面这一个测试如果全部通过则recover功能也是没有问题的。
执行结果 再去数据库中看一下数据是否存在 exchange1和queue1这个连接还是存在的。到这里这个类的所有功能都测试完成且通过了。
队列消息数据管理
这个模块相比较于上面的模块来说比较复杂所以需要先理解这个模块的逻辑思想首先就是消息的要素
消息的要素: 决定了消息的数据结构
网络传输的消息要素:
消息属性: 消息id 消息routing_key 消息的投递模式
消息的实际内容: 数据服务器上的消息管理所需的额外要素: 最主要就是持久化管理消息有效标志:
这个字段是需要随着消息的持久化内容一起进行持久化每一条消息都有可能要进行持久化存储等到推送给客户端就会删除掉然而每次删除一条数据就重写一次文件效率太低下如果设置了有效标志位每次只需要将这个有效标志位对应的数据给修改为无效即可
消息的实际存储位置 (相对于文件起始位置的偏移量):
当要删除某条消息时需要重写覆盖这条消息在文件的对应位置将有效标志位置为无效这时候就需要能够找到这条消息
消息的长度:
当恢复历史消息以及读取消息内容的时候需要解决粘包问题
根据以上的内容就构造出了protobuf文件这里直接定义出了消息因为之前已经基本做完了消息的定义这里直接将这些信息提取出来即可。并且这个类已经由proto文件完成并且生成了 所以这里类是不需要我自己来写代码的
然后就是消息的持久化管理类了
消息的持久化管理:
不使用数据库有些消息较大不适合数据库其次消息的持久化主要是为了备份而不是为了查询因此直接使用普通文件进行存储。
以队列为单元进行消息的持久化管理: 每个队列都有一个自己的数据文件
当消息文件垃圾回收时需要重新加载所有有效消息重新生成新的数据文件。但是生成新的数据文件后消息的存储位置就发生了变化这时候需要更新内存中的数据。这时候就需要将所有的队列数据进行加锁然后进行更新——锁冲突频繁效率低。因此如果每个队列都有自己独立的数据文件则每次只需要对操作的队列数据进行加锁即可。
既然是数据存储在文件中, 那必然就会有数据格式的要求
4字节长度|数据|4字节长度|数据......通过4字节长度描述消息实际存储长度就可以解决粘包问题。每次先读取4字节的长度就能够知道这一个包的长度读取完成之后再去读取下一个包。
然后就是这个类要向外提供的操作
向外提供的操作:
消息文件的创建与删除消息的新增持久化/删除消息的持久化 (并不是真正的删除只是将标志位置为无效)历史数据恢复/垃圾回收
什么情况下需要垃圾回收:
因为每次删除数据都不是真正删除因此文件中的数据会越来越多但是也不是每次删除都需要回收。当文件中有效消息超过2000条且其中有效消息比例低于50%。
回收思想:
加载文件中所有有效消息删除源文件生成新的数据文件将数据写入存在风险比如我删除了源文件但是写入新文件失败了怎么办。加载文件中所有有效消息先写入到一个临时文件中然后再去删除源文件将临时文件名称改为源文件名称。比起上一个思想更加安全并且修改文件名字是没有多少性能消耗的写入临时文件如果失败了但是源文件没有删除有效数据还是在的返回所有的有效消息每条消息中都记录当前的新的存储位置——用于更新内存中数据的内容必须返回否则就找不到这个消息在硬盘中的具体位置了。
需要管理的数据:
队列名根据队列名生成数据文件名称: 队列名.mqd根据队列名生成的临时文件名称: 队列名.mqd.tmp
以上是关于消息持久化管理和消息本省的定义。
这里先完成这个消息持久化管理类再继续往下定义新的类。
首先是这个消息数据持久化类的最基本框架 然后首先是构造函数 然后是创建消息数据文件的接口 删除消息数据文件的接口 然后就是往队列中进行消息数据的插入了也就是添加一个新的数据在消息数据文件的后面。
需要进行的步骤
首先就是对消息中的有效数据进行序列化因为之后要 进行发送然后是获取这个消息队列文件的长度然后是将序列化后的信息写入到这个消息数据文件的最后通过文件的大小移动偏移量来实现当然这个写入是可能失败的最后再更新这个新插入mess中的具体信息 下一个从消息队列文件中删除一个消息的函数这里需要知道的就是删除并不是删除而是表示这个消息可以被覆盖了也就是将这个消息的是否有效位设置为0当初写proto文件的时候这个标志位设置为string所以可以设置为0 从上图就可以看到是否有效字段是在payload这个类的内部的 但是上面这样写是不可行的因为msg- plaload()返回的是一个const对象无法进行修改所以这里需要使用msg-mutable_plaload()这样返回的plaload对象就不是一个const对象了做出了修改之后需要对这个消息的有效载荷重新进行序列化然后再次写入到原来这个消息所在队列数据文件的位置完成硬盘中数据的更新但是如果这个序列化后的字符串不等于这个文件之前在数据文件中的大小那么无法完成写入因为此时如果继续写入会导致其它文件的有效数据被覆盖。 然后就是最后一个最重要的函数了垃圾回收函数没有在框架中声明函数名因为当时忘记了当这个函数触发后会删除数据文件中的无效数据只保留有效数据。有效数据会暂时被保存在一个临时的文件中最后删除源文件修改临时文件名字为源文件名字最后以链表的形式返回新的有效数据。
下面是第一步加载历史数据中的所有有效数据
这里的逻辑就是按照消息的储存结构 4字节消息的长度 消息的数据 4字节消息的长度.....将每一个消息都拿到然后根据标识位判断这个消息是否还有效。有效就放到链表中否则就不放。 然后就是将有效数据写入到临时文件中了这里我写一个辅助函数来帮助我进行书写这个函数就是将一个message对象中的数据写入到指定文件中,需要注意写入的逻辑是首先写入这个文件数据的长度然后再写入文件数据方便读取的时候首先读取到长度知道接下来要读取的数据长度为多少 完成了这个函数之后先回到之前写的insert函数中那个函数就可以复用这个接口了。 回到垃圾回收函数中使用这个辅助函数也能够完成将链表中的内容写入到临时文件中
然后再删除源文件再将临时文件的名字修改为源文件的名字 这样这个函数就完成了但是现在还有一个问题就是这个函数显的有点冗余了这里可以将加载有效数据封装为一个新的函数。 后面我修改了一下格式变成了size_t长度的数据长度大小然后是数据 这里还有一个点需要注意对读取到的信息进行反序列化读取到的信息是mes中的plaload中的body中的数据而不是直接让mes进行反序列化这个错误我犯了导致后面寻找bug很长时间还有就是写入的时候首先写入的应该是body的数据长度然后才是正式的数据。这之前我忘记了这个协议导致写入的时候直接就写入了数据而没有写入数据有效长度的大小。导致出现了很多问题
由load函数来完成读取有效数据的功能这样这个垃圾回收函数只需要调用这个函数即可。 完成了这个load函数之后我想到了这个load函数是存在错误的如果load读取的是一个空的文件呢那么就不会进入到遍历的接口那么临时文件也就不会完成创建临时文件不会创建之后又删除了源文件自然会影响之后的读取导致出现错误。所以这里需要修改需要创建一个临时文件这样也就不会出现问题了。也就是在第二步的前面加上一行新代码 瞬间就让这个函数不显得冗余了gc垃圾回收函数也就完成了。到这里消息持久化管理类就完成了。
下面就是消息数据的内存管理了对于内存消息数据管理有一个特点如果内存中所有的消息整体进行管理则在进行垃圾回收以及恢复历史消息上就会变得麻烦。因此每个队列都有一个消息数据的管理结构然后最终向外提供一个总体的消息管理类。
队列消息管理:
构造对象时创建/打开队列数据文件恢复队列历史消息数据新增消息/确认消息删除垃圾回收当持久化数据总量超过2000且有效比例低于50%则进行垃圾回收获取队首消息删除队列所有消息获取待推送消息数量获取待确认消息数量获取持久化消息数量
需要管理的数据:因为这个类中的待推送数据结构频繁使用到了头插头删等操作所以综合来看使用链表比使用vector效率更高一点
持久化的管理句柄待推送消息链表以头插尾删的思想实现队列功能持久化消息的hashmap垃圾回收后需要更新消息数据实际存储位置待确认消息的hashmap一条消息被推送给客户端并不会立即真正删除而是等到被确认后才会删除。一条消息被推送给客户端后取出待推送链表加入到待确认结构中等到确认后再删除。持久化文件中有效消息数量持久化文件中总体消息数量可以计算出文件中有效消息比例根据这个比例和总体数量决定是否进行垃圾回收
有了上面的信息之后下面就可以根据上面的信息来写代码了。
下面就是消息数据类内存管理类的基本框架可能后面还会添加新函数 后面我修改了这个类的名字 上面这个类更加符合一点
然后就是去完成内部的功能了。首先就是完善构造函数 下一个函数就是insert这个函数需要以下几个步骤第一步构造消息对象第二步判断这个消息是否需要持久化第三步进行持久化存储第四步进行内存管理。但是还有一种情况那就是客户端只是发送了一条消息但是并没有给这个消息设置任何的属性此时就需要知道这个队列是否进行了持久化来决定这个消息是否进行持久化了但是此时又会导致耦合度问题所以这里依旧是添加一个参数这个参数就是是否对这个消息进行持久化如果客户端发送的消息中具有属性那么就使用客户端传入的属性没有就使用这个参数来决定是否进行持久化同时也需要这个函数自己构造消息id匹配规则。 到这里这个insert函数就完成了下一个函数取出待推送列表中首部的消息然后再将这个消息放入到待确认的哈希表中这个函数的思想就是这个 写到这里我发现了我上面代码的问题那就是这些代码都是可以被多个执行流访问的那么这个类中共有成员哈希表和链表就成为了公共资源为了防止出现问题需要加上锁。
insert函数加上锁 front函数加上锁 下一个函数删除消息函数这个函数的思路为在待确认消息哈希中寻找这个消息如果找到了这个消息根据这个消息的持久化模式判断是否需要进行持久化的删除。下一步删除持久化信息同时此时需要通过计算有效信息的比例来判断是否需要执行垃圾回收下一步删除内存信息。这里我先完成一个检查是否需要进行垃圾回收的函数这个函数是私有的因为并不需要提供给外层 在进行了gc之后会返回一个链表这个链表中的信息都是有效的消息此时的成员变量总消息数量和有效消息数量都是一样的还需要更新消息的实际储存位置的信息这里我就再将这些功能整合成为一个新的函数。 之后在remove中直接调用跟这个gc函数就能够完成垃圾回收等一系列的工作了。游戏就能够来完成remove函数了 最后还剩下几个数量函数 最后还有一个清理所有数据的函数 到这里消息队列管理函数就写完了下面要做的就是整体的消息队列管理了也就是将各个消息队列统筹起来进行管理了这个类会提供接口给上层。统筹管理所有的消息队列。
实现一个对外的总体消息管理类
管理的是每一个队列的消息
管理的成员
互斥锁每个队列的消息管理句柄队列名称 队列消息管理句柄的哈希表
提供的操作
初始化队列的消息管理句柄创建队列的时候调用销毁队列的消息管理句柄删除队列的时候调用队列的各项消息操作
向队列新增消息获取队列队首消息对队列进行消息确认获取队列消息数量可获取消息数量持久化消息数量待确认消息数量总的持久化消息数量回复队列历史消息 下面就来写代码
统筹管理所有消息队列的类了首先就是大体框架的书写 首先就是构造函数了这里需要知道的是什么时候会构造这个管理对象呢自然是服务器启动的时候了那么服务器启动的时候还需要完成一件事情那就是恢复历史消息。既然要恢复历史消息就需要知道需要恢复消息队列的队列名字这也是为什么在另外一个文件中写的队列管理文件中会返回一个所有的队列: 通过返回整个队列的名字来进行队列中消息的恢复但是我要使用这个map就必须包含另外一个文件的头文件让模块之间产生了耦合关系。这里在构造函数中就先不这么做而是交给之后的虚拟机整合的时候由虚拟机来完成。这里先不做这个工作。
不做这个工作之后初始化函数就很简单了直接给basedir赋值即可然后就是
初始化某一个消息队列的函数这个函数的逻辑就是在整体队列的映射管理表中检测是否存在这个队列不存在就插入存在就什么都不做 但是这里其实是存在一个问题的那就是在构造queue的时候是会进行历史数据的恢复的而这个操作内部又使用了一个锁去进行保护。这就导致了锁的嵌套问题锁的嵌套容易发生死锁比如现在一个线程2正在进行数据恢复获取了锁2而线程1获取了锁1正在等待线程2释放锁2但是线程2需要去获取锁1就导致了死锁问题当然我这里发生死锁的概率较低但是不是没有。所以这里为了能够解决我就将QueueMessage中的历史数据恢复功能单独组成一个函数去进行调用。 然后修改上面的代码 这样就避免了锁嵌套的问题。
下一个函数销毁消息队列这个函数的实现逻辑也很简单在映射表中寻找这个队列找到了就销毁没找到就什么都不做。
为了解决锁的嵌套问题使用了作用域。 现在初始化队列句柄和销毁队列句柄都完成了下一个insert函数
这个函数的实现逻辑就是首先在map中寻找队列句柄然后使用这个句柄完成新增即可也要注意锁嵌套的问题 front函数和上面这个函数实现逻辑基本一样 确认消息ack函数的实现逻辑也是这样的 然后就是几个获取数量的函数了这些函数的实现逻辑很简单就是获取句柄然后通过句柄调用返回数量即可 逻辑都是和上面这个图一样的只不过最后句柄调用的函数不同而已这里就不展示了。
这里还剩下最后一个函数清空所有队列中所有信息的函数这个函数的实现逻辑就是遍历map然后让map中所有的句柄都执行一次内部的clear函数即可 这里因为清理的时候即使嵌套了锁但是clear函数并不会去访问外部被锁保护的资源所以不会导致死锁问题。所以就这样写了
到这里消息队列和消息数据的管理就完成了还剩下的就是单元测试了。
首先来进行插入测试 这里的清理函数我先不执行看执行了上面的代码之后是否会存在文件。 测试全部通过了再看数据文件中的内容 也是4条消息并且还包含了一些其它的持久化内容.下面是插入和查询测试首先运行插入测试然后直接运行下面的查询测试看是否能够通过。 下面测试运行一下 测试全部通过说明查询也是没有问题的现在文件中也是存在内容注释上面两次的测试进行恢复历史数据的测试: 测试全部通过也就说明恢复功能也就测试通过了再去看一下临时文件的大小 也是存在数据的并且和写入时候的数据大小也是一样的。
下一个测试删除测试删除测试也就是从wait哈希表中确认一条消息 这里我在进行了删除测试之后又接着进行查询的测试
测试通过 证明删删除也是没有问题的。
最后就是销毁测试了也就是clear函数这个测试我之前已经测试过了可以通过这里就不做专门的测试了对于这种复杂的模块遇到问题需要使用gdb打印来寻找错误并进行修改
到这里这次测试就完成了这个模块我遇到的问题首先就是垃圾回收gc中写入数据时没有按照格式进行写入没有写入数据的长度而是直接写入了数据读取的时候也是先读长度再读取数据然后就是在反序列化的时候应该是Message中的playod中的数据进行反序列化但是我直接对message中的数据进行了反序列化导致读取数据失败了。排查很难
到这里这个模块就没有大的问题了测试也就通过了。
虚拟机管理模块
这个模块是对上面所有模块的一个整合虚拟机是一个虚幻的概念是数据管理单元的一个载体这个模块就是一个数据的整合单元整合的就是消息队列绑定信息等的一个类。所以定义的虚拟机的类就有以下的成员。
定义虚拟机类包含以下成员
a. 交换机数据管理模块句柄
b. 队列数据管理模块句柄
c. 绑定数据管理模块句柄
d. 消息数据管理模块句柄
有了这个类就可以不断的去实例化虚拟机对象然后这个对象需要对外提供数据管理功能
提供声明交换机的功能存在则OK不存在则创建提供删除交换机的功能删除交换机的同时删除关联绑定信息提供声明队列的功能存在则OK不存在则创建创建的同时创建队列关联消息管理对象提供删除队列的功能删除队列的同时删除关联绑定信息删除关联消息管理对象及队列所有消息提供交换机-队列绑定的功能提供交换机-队列解绑的功能提供获取交换机相关的所有绑定信息功能提供新增消息的功能提供获取指定队列队首消息的功能提供消息确认删除的功能
这10个操作其实就是对上面模块的功能进行整合然后通过虚拟机向外提供出去。
然后就是虚拟机的管理了但是因为我的这个项目简化了虚拟机只提供了一个虚拟机的操作所以虚拟机的管理操作对虚拟机进行增删查并没有使用到所以先不做这个功能之后可能会实现。想要实现也很简单只需要再创建一个数据库表然后将各个虚拟机的信息放入到表中然后通过查询表来实现恢复虚拟机的功能这样就可以让项目对外提供多个虚拟机。
在写架子之前我先解决一下之前模块的头文件重复包含的问题
也就是在这些头文件的头部和尾部增加一个 其它的文件我就不展示了定义的宏名则自己起。
下面就来写虚拟机的架子 然后就是去实现这个框架了。
首先就是构造函数了
构造函数的实现逻辑就是实例化这几个对象然后需要完成一个工作那就是历史数据的恢复这里的恢复就是要将上面所有模块的历史数据都要进行一次恢复。一个一个来首先来进行队列的数据恢复要进行队列数据的恢复就需要先获取到队列的名字这个接口在队列管理类中已经实现了 这个构造函数中虽然有一个虚拟机的名字但是在我的这个简化项目中其实只有一个虚拟机所以这里这个字段其实没有太大的作用但是为了以后的拓展我还是加了。
然后就是交换机相关的操作
首先是交换机的声明操作这个代码很简单直接调用底层的方法即可。 删除交换机的方法就相比来说复杂一点因为删除一个交换机的时候和这个交换机相关的数据绑定数据需要一起删除不能删除绑定的队列因为一个队列是可以被多个交换机进行绑定的虽然这两个操作也是调用一下函数即可 然后是队列的相关操作了队列比起交换机就比较复杂了因为和队列相关的操作就比较多了队列不止是创建的时候具有相关性删除也具有相关性。和队列相关的信息首先就是绑定信息然后就是消息每一个队列创建的时候都需要初始化对应的消息句柄同理删除的时候这些消息也要一起被删除。
所以对于创建队列的函数首先就要完成初始化的工作然后才是添加的操作。 这样才是正式完成了一个队列的添加打个比方就是我要开一个加盟店那么首先我要去管理加盟店的机构中完成对我的这个加盟店的注册然后才是去开加盟店。
删除的时候和对象相关的数据存在两个一个是队列的消息一个队列的绑定信息 然后就是和绑定相关的操作了这里需要知道的是要将一个队列绑定到一个交换机上是存在相关要求的首先就是队列和交换机必须是存在的另外一个绑定信息是否要进行持久化的标准是队列和交换机都要进行持久化那么这个绑定信息也要进行持久化。 解除绑定则没有这么麻烦的操作直接调用解除绑定即可。 下一个接口是获取交换机的所有绑定信息的接口直接调用函数即可。以上的函数其实都只是对之前完成的接口的一个整合。
下面是对消息队列中的消息进行的操作首先就是对消息进行发布的一个接口要实现这个功能直接调用底层的方法即可但是这里需要思考一个点就是如果我要求这个消息进行持久化但是这个队列是不进行持久化的那么这个消息也是没有必要进行持久化的所以这里需要进行一点操作。
这里首先需要修改一下之间在消息队列中对于一个消息的插入接口的方法 主要创建了一个mode变量如果队列要进行持久化那么mode就等于这个消息的持久化策略否则这个消息一定不会进行持久化即使消息要求进行持久化 下一个消费一个消息的函数也就是从消息队列中取出一个消息的方法。直接调用函数即可。 下一个函数确认一个消息依旧是调用函数即可 这样这个类也就完成了并且这个类其实不太需要测试因为这个类主要就是调用了前面写的方法但是因为我修改了一点代码所以这里我还是进行测试。最后不要忘了这个类依旧要防止头文件的重复包含。
这里为了测试我还在类中额外添加了一个clear和判断某一个交换机/队列/绑定信息是否存在的操作。 这两个操作都是为了测试而增加的
然后就是测试模块的编写了首先是数据的准备和清理 这样就建立了三个交换机三个队列。然后三个队列中分别存在3条信息.
首先进行初始化的测试
直接判断这些交换机队列和绑定信息都是应该存在的然后就是获取一条消息 直接测试看是否通过 测试全部通过并且数据库文件和消息数据文件都是存在的 这里没有被清理是因为我没有调用删除函数。
下一个就是删除交换机的测试要验证的就是在删除交换机的时候绑定信息是否会被删除 测试结果 全部通过。然后是对于队列的删除测试删除了之后还需要进行消息的获取测试此时获取的消息应该是nullptr 依旧是全部通过 这就说明了队列删除和交换机删除是没有问题的。下一个ack功能的测试为了让消息删除显示的更加明显我在ack的底层调用函数中增加了一个日志打印 然后再去测试ack功能 直接进行测试 测试全部通过并且打印出来我想要的信息
交换机路由管理
现在当一个用户想要发布消息的时候这个用户可能不知想往一个队列中发布消息而是想往多个队列中进行信息的发布。比如说这个用户发布的这个新闻可能既是个体育新闻也是个花边新闻那么这个信息就要放到两个队列中去。如果只能一个新闻专门放入到一个模块中的话这个新闻就要发布两次信息分别放入到两个不同的队列中才能将这个信息发布完成这样对用户的使用来说很不方便由此就在AMQP协议中提出了一个交换机的概念用户在发布信息的时候并不会将消息直接发布到某个队列中而是先发布到交换机上交换机在进行路由匹配决定这个信息应该发布到哪些队列中。当然创建交换机的时候交换机和队列之间的关系也是由用户自己来进行设定的。设定完成之后用户发布信息到交换机中交换机就会将这个信息发布到对应的队列当中交换机要将信息发布到哪一个队列中就由匹配路由来进行决定。在设计这个模块的时候大佬们就提出了不同类型的交换机的概念。以及当数据发布到某一个队列的时候就有了两个决定性的因素第一个就是交换机类型存在三种类型的交换机
广播交换直接将消息交给所有绑定的队列无需匹配。直接交换队列绑定信息中的 binding_key 与消息中的 routing_key主题交换只有匹配队列主题的消息才会被放入队列中。
第二个要素就是匹配规则。有了这两个要素之后再结合交换机和队列的关系画一张图表示一个用户将信息发布给交换机交换机再经过匹配规则决定这个信息要发布到哪一个队列中。 如果交换机的类型是一个广播交换那么这个news就会被直接发布到后面的三个模块中如果这个交换机是一个直接交换那么这个消息就只会被发布到队列1中因为只有这个队列的binding_key和routing_key是完全一样的。如果交换机的类型是一个主题交换则是如果binding_key和routing_key符合一个匹配规则这个匹配规则一般不可能是让binding_key和routing_key是一样的因为如果是这个机制为什么不使用直接交换呢则会进行发布否则不进行发布。简单理解这个匹配规则就是在binding_key或者routing_key中存在一些通配符这个通配符可以匹配一些特定/任意的单词。比如果上图中第一个队列的binding_key为news.#然后用户发布新闻的routing_key为news.tidbits此时这个消息可以匹配到第一个队列因为#可以匹配任何符号而第二个队列则完全一样自然也是可以匹配的第三个队列不匹配。这个匹配规则详细的说明在后面。并且为了更加方便进行匹配binding_key和routing_key也是具有自己的规则的。
由此就能够整合出这个模块的功能了
路由交换模块
功能判断一个消息中的routing_key与队列的binding_key是否匹配成功。
取决要素两个交换机类型routing_key与binding_key的匹配
因此基于功能需求分析路由交换模块只需要对传入的数据进行处理即可因此这个模块要实现的实际上是一个功能接口类没有成员变量。
提供的功能
判断routing_key是否合法必须是a~z, A~Z, 0~9, . _ 组成。不需要通配符因为routing_key只是描述一个新闻类型的字符串判断binding_key是否合法
必须是a~z, A~Z, 0~9, * # . _ 组成。注意* # 是通配符必须独立存在* 和 # 不能连续出现。需要通配符因为需要进行规则匹配
进行routing_key与binding_key的路由匹配
广播不管如何都是成功的。直接相等则成功。主题按匹配模式完成了则成功。
以上就是这个模块需要做的事情以及需要提供的操作总的来说这个模块其实是一个功能模块只需要判断传入的字符串是否符合特定的规则即可所以这个模块是没有需要进行管理的数据的。当然想要实现主题匹配也是比较难的这个功能最后去完成先去完成其它的功能。主题这个功能需要单独留出时间去进行考虑。
下面首先来完成这个功能类的架子搭建 然后就是去完成这个三个函数了首先来完成比较简单的前面两个函数。首先是判断routingkey是否符合规则的函数这个函数最是简单只需要判断字符串中是否存在不符合规则的字符即可。 然后就是bindingkey的合法性判断了。
这个合法性判断其实也很简单只不过是添加了几个东西而已首先是合法字符增加了通配符然后是通配符必须独立存在不能和其它字符一起还有一个routingkey只能存在一个通配符#通配符前面或者后面不能连续出现其它的通配符。 最后就是匹配规则函数的编写了采用的方法为动态规划。
主题匹配涉及到一个概念就是队列和交换机之间的一个匹配如何匹配呢首先bindingkey和routingkey都是以.作为间隔的一个单词这个单词就是对用户发送消息的一种描述也是对这个队列能够接收什么消息的一种描述首先是直接匹配下面两个字符串 因为music和#是不一样的所以两者匹配就失败了但是如果是主题匹配因为#是一个统配符就不能凭借但是是否一致来进行决定了还需要额外的算法来完成这个过程。这个算法是是什么呢请看下面的过程说明
首先是两个字符串然后对字符串进行分隔之后在二维数组中进行各个分割字符串的比较 可以看到routingkey的ddd和bindingkey的ddd是匹配成功了但是这并不意味着这就是匹配成功了因为ddd前面的父级没有匹配成功所以如果两个单词匹配成功了应该从数组的左上方继承结果如果没有匹配成功那么这里的结果就是0。由此能够得到一个动态转移方程dp[i][j]代表的是bindingkey的以i分割字符为结尾的字符串和routingkey的以第j个分割字符串为结尾的字符串是否匹配成功。
由此就能够得到状态转移方程
dp[i][j] 如果i位置的字符串和j位置的字符串能够匹配成功那么dp[i][j] dp[i-1][j-1]可以理解为这一个字符串匹配成功了但是之前的字符串是否匹配成功的结果就在dp[i-1][j-1]中如果没有匹配成功dp[i][j] false。但是还有通配符的存在比如下面这个例子 因为存在通配符导致aaa匹配#从左上方得到结果是1但是bbb和#进行匹配得到的结果虽然成功了但是是0导致匹配失败但是#其实可以匹配0个或者多个任意的单词这个匹配应该是成功的此时就需要进行专门的考虑当遇到#匹配成功的时候不仅能够从左上角获得结果也可以从前一列获取结果也就是dp[i][j]中如果i等于#那么dp[i][j] dp[i-1][j-1]/dp[i][j-1]。但是还是无法解决所有的问题比如下面这个 所以这个#通配符还可以从上方进行结果的得到。
下一个例子 aaa和aaa匹配之间进行匹配成功但是因为左上方为0导致匹配失败但是单词之间的匹配无法从上方进行结果获取要解决这个问题也很要解决当bindingkey是以#作为开头字符就将左上方的这个值从0修改为1即可 。*号不需要进行考虑*匹配的是任意一个单词并不会匹配多个单词所以遇到*号可以认为遇到了和本单词一样的单词即可。到这里这个算法就结束了可以进行代码的书写了 到这里路由匹配模块就完成了下面就是测试了。
因为这个类没有什么数据需要进行初始化所以初始化也不需要进行数据的初始。
首先来测试routingkey和bindingkey的合法性接口 最后的测试结果 全部通过证明routingkey和bindingkey的合法性不需要进行验证了。
还剩下最后一个route测试了首先需要准备好一堆的routingkey和bindingkey数据准备的数据如下 测试的逻辑就是准备三个数组分别储存routingkey bindingkey和这一次route的结果然后执行一次for循环遍历数组每一次都拿取一个routing_key和binding_key进行比较然后和答案进行比较即可。这里主要就是测试了主题交换另外两个交换没有测试的必要 测试全部都通过了这就说明这个路由匹配模块也是没有问题了。