偷的网站怎么做seo,wordpress网站怎么进去,sem是指什么,拓者设计吧网站消息队列的设计 一、消息队列的背景知识二、需求分析核心概念⼀个⽣产者, ⼀个消费者N 个⽣产者, N 个消费者Broker Server 中的相关概念核⼼ API交换机类型 (Exchange Type)持久化⽹络通信消息应答 三、 模块划分四、 项⽬创建五、创建核心类创建 Exchange创建 MSGQUeue创建 B… 消息队列的设计 一、消息队列的背景知识二、需求分析核心概念⼀个⽣产者, ⼀个消费者N 个⽣产者, N 个消费者Broker Server 中的相关概念核⼼ API交换机类型 (Exchange Type)持久化⽹络通信消息应答 三、 模块划分四、 项⽬创建五、创建核心类创建 Exchange创建 MSGQUeue创建 Binding创建Message 六、 数据库设计配置 SQLite实现创建表实现数据库基本操作实现 DataBaseManager测试 DataBaseManager 七、消息存储设计设计思路创建 MessageFileManager 类实现统计⽂件读写实现创建队列⽬录实现删除队列⽬录检查队列⽂件是否存在实现消息对象序列化/反序列化实现写⼊消息⽂件实现删除消息实现消息加载实现垃圾回收(GC)测试 MessageFileManager ⼋、 整合数据库和⽂件创建 DiskDataCenter封装 Exchange ⽅法封装 Queue ⽅法封装 Binding 方法封装 Message ⽅法 九、 内存数据结构设计创建 MemoryDataCenter封装 Exchange ⽅法封装 Queue ⽅法封装 Binding ⽅法封装 Message ⽅法针对未确认的消息的处理实现重启后恢复内存测试 MemoryDataCenter ⼗、 虚拟主机设计创建 VirtualHost实现构造⽅法和 getter创建交换机删除交换机创建队列删除队列创建绑定删除绑定发布消息路由规则订阅消息消息确认测试 VirtualHost ⼗⼀、 ⽹络通信协议设计明确需求设计应⽤层协议定义 Request / Response定义参数⽗类定义返回值⽗类定义其他参数类 ⼗⼆、 实现 BrokerServer创建 BrokerServer 类启动/停⽌服务器实现处理连接实现 readRequest实现 writeResponse实现处理请求实现 clearClosedSession ⼗三、 实现客⼾端创建 ConnectionFactoryConnection 和 Channel 的定义封装请求响应读写操作创建 channel发送请求关闭 channel创建交换机删除交换机创建队列删除队列创建绑定删除绑定发送消息订阅消息确认消息处理响应关闭 Connection测试代码 项目结果 一、消息队列的背景知识
Java标准库中有提供阻塞队列的数据结构, 阻塞队列最重要的用途是实现生产者消费者模型;生产者消费者模型存在诸多好处, 是后端开发的常用编程方式 解耦合削峰填谷 在实际的后端开发中, 尤其是分布式系统里, 跨主机之间使用生产者消费者模型, 也是非常普遍的需求, 因此我们通常会把阻塞队列, 封装成一个独立的服务器程序, 并且赋予其更丰富的功能, 这样的程序就被称为 消息队列 (Message Queue, MQ)市场上有许多消息队列, 如 RabbitMQKafka… 这里仿照 RabbitMQ 来模拟实现一下消息队列
二、需求分析
核心概念
⽣产者 (Producer)消费者 (Consumer)中间⼈ (Broker)发布 (Publish) : 生产者向中间人这里投递消息的过程订阅 (Subscribe) : 哪些消费者要从这个中间人这里取数据, 这个注册的过程, 称为 “订阅”消费 (Consume): 消费者从中间人这里取走消息后处理数据的动作 通过取快递来理解上述概念 商家就是生产者我就是消费者菜鸟驿站就是中间人首先可以是商家向菜鸟驿站发快递 (发布)接着 我 关注哪个商家发的快递 (订阅)最后我从菜鸟驿站中取走快递后, 并使用快递里的商品 (消费) ⼀个⽣产者, ⼀个消费者 N 个⽣产者, N 个消费者 其中, Broker 是最核⼼的部分. 负责消息的存储和转发.
Broker Server 中的相关概念
虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”, 是一个逻辑上的集合, 一个 BrokerServer 上可以存在多个 VirtualHost交换机(Exchange): 生产者把消息先发到Broker的Exchange上, 在根据不同的规则, 把消息转发给不同的Queue队列 (Queue): 真正用来存储消息的部分, 每个消费者决定自己从哪个Queue上读取消息绑定(Binding): Exchange 和 Queue 之间的关联关系, Exchange 和 Queue可以理解成 “多对多” 关系, 使用一个关联表就可以把这两个概念联系起来消息 (Message): 传递的内容 所谓的Exchange 和 Queue 可以理解成 “多对多” 关系, 和数据库中的 “多对多” 一样, 意思是: 一个Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息) 一个 Queue 也可被多个Exchange 绑定(一个 Queue 中的消息可以来自于多个 Exchange) 关系结构图 这些概念, 既需要在内存中存储, 也需要在硬盘上存储.
内存存储: 方便使用磁盘存储: 重启服务器后数据不丢失
核⼼ API
对于 Broker 来说, 要实现以下核心 API, 通过这些 API 来实现消息队列的基本功能
创建队列 (queueDeclare)销毁队列 (queueDelete)创建交换机 (exchangeDeclare)销毁交换机 (exchangeDelete)创建绑定 (queueBind)解除绑定 (queueUnbind)发布消息 (basicPublish)订阅消息 (basicConsume)确认消息 (basicAck)
另⼀⽅⾯, Producer 和 Consumer 则通过⽹络的⽅式, 远程调⽤这些 API, 实现 ⽣产者消费者模型.
交换机类型 (Exchange Type)
对于 RabbitMQ 来说, 主要支持四种交换机类型
DirectFanoutTopicHeader
其中 Header 这种⽅式⽐较复杂, ⽐较少⻅. 常⽤的是前三种交换机类型. 此处也主要实现这三种.
Direct: 生产者发送消息时, 直接指定被该交换机绑定的队列名Fanout: 生产者发送的消息会被复制到交换机的所有队列中Topic: 绑定队列到交换机上时, 指定一个字符串为 bindingKey, 发送消息指定一个字符串为 routingKey, 当 routingKey 和 bindingKey 满足一定匹配条件的时候, 则把消息投递到指定队列中 这三个操作就像有发奖品一样 Direct是发一个专属的奖品给特定的人, 只有指定的人才能领取Fanout 就是给每一个人都发一个安慰奖Topic是有奖竞猜, 出了一道题, 只有作答并正确的人才能领取到奖品 持久化
Exchange, Queue, Binding, Message 都有持久化需求.
当程序重启 / 主机重启, 保证上述内容不丢失.
⽹络通信
⽣产者和消费者都是客⼾端程序, broker 则是作为服务器. 通过⽹络进⾏通信.
在⽹络通信的过程中, 客⼾端部分要提供对应的 api, 来实现对服务器的操作
创建 Connection关闭 Connection创建 Channel关闭 Channel创建队列 (queueDeclare)销毁队列 (queueDelete)创建交换机 (exchangeDeclare)销毁交换机 (exchangeDelete)创建绑定 (queueBind)解除绑定 (queueUnbind)发布消息 (basicPublish)订阅消息 (basicConsume)确认消息 (basicAck)
可以看到, 在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.
Connection 对应⼀个 TCP 连接
Channel 则是 Connection 中的逻辑通道
⼀个 Connection 中可以包含多个 Channel.
Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰.
这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接. Connection 可以理解成⼀根⽹线. Channel 则是⽹线⾥具体的线缆. 消息应答
被消费的消息, 需要进行应答
应答模式分成两种
自动应答: 消费者只要消费了消息, 就算应答完毕了, Broker 直接删除这个消息手动应答: 消费者手动调用应答接口, Broker 收到应答请求之后, 才真正删除这个消息 手动应答的目的, 是为了保证消息确实被消费者处理成功了, 在一些对于数据可靠性要求高的场景, 比较常见 三、 模块划分 可以看到, 像 交换机, 队列, 绑定, 消息, 这⼏个核⼼概念在内存和硬盘中都是存储了的.
其中内存为主, 是⽤来实现消息转发的关键; 硬盘为辅, 主要是保证服务器重启之后, 之前的信息都可以正常保持.
四、 项⽬创建
创建 SpringBoot 项⽬. 使⽤ SpringBoot 2 系列版本, Java 8. 依赖引⼊ Spring Web 、 MyBatis 和 lombok.
五、创建核心类
创建包 mqserver.mq
创建 Exchange
/*** Created with IntelliJ IDEA.* Description这个类表示一个交换机** author: zxj* date: 2024-02-25* time: 20:05:48*/
Data
public class Exchange {// 此处使用 name 来作为交换机的身份标识 (唯一的)private String name;// 交换机类型: Direct, Fanout, Topicprivate ExchangeType type ExchangeType.DIRECT;// 该交换机是否需要持久化存储, true 表示需要持久化存储, false 表示不必持久化.private Boolean durable false;// RabbitMQ 有的字段, 相关功能待开发// 该属性表示 如果当前交换机没有人用了, 就会自动删除private Boolean autoDelete false;// arguments 表示的是创建交换机时指定的一些额外的参数选项, 待开发private MapString,Object arguments new HashMap();
}package en.edu.zxj.mq.mqserver.core;/*** Created with IntelliJ IDEA.* Description交换机类型* • Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.* • Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.* • Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为 routingKey.* 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.** author: zxj* date: 2024-02-25* time: 20:10:02*/
public enum ExchangeType {DIRECT(0),FANOUT(1),TOPIC(2);private Integer type;ExchangeType(Integer type) {this.type type;}public Integer getType() {return type;}
}name: 交换机的名字, 相当于交换机的身份标识type: 交换机的类型, 三种取值, DIRECT, FANOUT, TOPICdurable: 交换机是否要持久化存储, true 为持久化, false 不持久化autoDelete: 使用完毕后是否自动删除arguments: 交换机的其他参数属性
创建 MSGQUeue
/*** Created with IntelliJ IDEA.* Description消息队列,* 类名叫做 MSGQueue, ⽽不是 Queue, 是为了防⽌和标准库中的 Queue 混淆** author: zxj* date: 2024-02-25* time: 20:19:52*/
Data
public class MSGQueue {// 表示队列的身份标识.private String name;// 该消息队列是否需要持久化存储, true 表示需要持久化存储, false 表示不必持久化.private Boolean durable false;// 以下为保留字段// exclusive 为 true, 表示这个队列只能被一个消费者使用(别人用不了), 如果为 false 则是大家都能使用private Boolean exclusive false;// 该属性表示 如果当前交换机没有人用了, 就会自动删除private Boolean autoDelete false;// arguments 表示的是创建交换机时指定的一些额外的参数选项, 待开发private MapString, Object arguments new HashMap();
} 类名叫做MSGQUeue, 而不是 Queue, 是为了防止和标准库中的Queue混淆 name: 队列名字, 相当于队列的身份标识durable: 交换机是否要持久化存储, true 为持久化, false 不持久化exclusive: 独占(排他), 队列只能被一个消费者使用autoDelete: 使用完毕后是否自动删除arguments: 消息队列的其他参数属性
创建 Binding
/*** Created with IntelliJ IDEA.* Description表示队列和交换机之间的关联关系** author: zxj* date: 2024-02-25* time: 20:24:23*/
Data
public class Binding {// exchangeName 交换机名字private String exchangeName;// queueName 队列名字private String queueName;// bindingKey 只在交换机类型为 TOPIC 时才有效. ⽤于和消息中的 routingKey 进⾏匹配.private String bindingKey;
}exchangeName: 交换机名字queueName: 队列名字bindingKey: 只在交换机类型为 TOPIC 时才有效, 用于和消息中的 routingKey 进行匹配
创建Message
/*** Created with IntelliJ IDEA.* Description表示一个要传递的消息* 此处的 Message 对象, 是需要能够在网络上传输, 并且也需要能写入到文件中.* 此时就需要针对 Message 进行序列化和反序列化.* 此处使用 标准库 自带的 序列化/反序列化 操作.** author: zxj* date: 2024-02-25* time: 20:55:17*/
Data
Component
public class Message implements Serializable {// Message 核心属性private BasicProperties basicProperties new BasicProperties();// 存储需要传输的消息, 使用字节的方式存储private byte[] body;// 辅助属性/*** 一个文件中会存储很多信息, 如何找到某个消息, 在文件中的具体位置呢?* 使用下列的两个偏移量进行表示, [offset, offsetEnd)* 这两个属性并不需要被反序列化保存到文件中, 此时信息一旦被写入文件之后, 所在的位置就固定了, 并不需要单独存储* 这两个属性存在的目的, 主要是为了让内存中的 Message 对象, 能够快速找到对应硬盘上的 Message 位置.**/private long offsetBeg 0; // 消息数据开头距离文件开头的偏移位置 (单位是字节)private long offsetEnd 0; // 消息数据结尾距离文件开头的偏移位置 (单位是字节)/*** 使用这个属性表示该消息在文件中是否是有效信息. (针对文件中的信息, 如果删除, 使用逻辑删除的方式)* 0x1 表示有效, 0x0 表示无效**/private byte isValid 0x1;/*** description: 工厂模式创建 Message 实例* 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程* 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId* 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主**/public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {Message message new Message();if (basicProperties ! null) {message.setBasicProperties(basicProperties);}// 此处生成的 MessageId 以 M- 作为前缀.message.setMessageId(M- UUID.randomUUID());message.setRoutingKey(routingKey);message.setBody(body);/* 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.此处只是在内存中创建一个 Message 对象.*/return message;}
}/*** Created with IntelliJ IDEA.* Description** author: zxj* date: 2024-02-25* time: 21:18:11*/
Component
Data
public class BasicProperties implements Serializable {// 消息的唯一身份标识, 此处为了保证 id 的唯一性, 使用 UUID 来作为 messageIdprivate String messageId;/*** 是一个消息上带有的内容, 和 bindingKey 做匹配* 如果当前交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名* 如果当前交换机类型是 FANOUT, 此时 routingKey 没有意义* 如果当前交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey做匹配, 符合要求的才能转发给对应的队列**/private String routingKey;// 这个属性表示消息是否要持久化: 1 表示不持久化, 2 表示持久化. (RabbitMQ 就是这样的)private Integer deliverMode 1;
}
Message 需要实现 Serializable 接口, 后续需要把Message写入文件以及进行网络传输basicProperties: 是消息的属性信息, body 是消息体offsetBeg 和 offsetEnd 表示消息在消息文件中所在的起始位置和结束位置, 这一块具体的设计后续再说; 使用 transient 关键字避免属性被序列化isValid 用来标识消息在文件中是否有效, 这一块具体设计后续再说createMessageWithId 相当于一个工厂方法, 用来创建一个 Message 实例, messageId 通过 UUID 的方式来生成
六、 数据库设计
对于 ExchangeMSGQUeueBinding我们使用数据库进行持久化保存
此处我们使用的数据库是 SQLite是一个更轻量的数据库
SQLite 只是一个动态库我们在 Java 中直接注入 SQLite 依赖即可直接使用不必安装其他的软件
配置 SQLite
引入 pom.xml 依赖 !--导入 sqlite 数据库--dependencygroupIdorg.xerial/groupIdartifactIdsqlite-jdbc/artifactIdversion3.45.1.0/version/dependency配置数据源 application.yml
spring:datasource:url: jdbc:sqlite:./data/meta.dbusername:password:driver-class-name: org.sqlite.JDBC
mybatis:configuration:map-underscore-to-camel-case: true #配置驼峰自动转换log-impl: org.apache.ibatis.logging.stdout.StdOutImpl #打印sql语句mapper-locations: classpath:mapper/**Mapper.xmlusername 和 password 空着即可 此处我们约定, 把数据库文件放到 ./data/meta.db 中
SQLite 只是把数据单存的存储到一个文件中, 非常简单方便
实现创建表
Mapper
public interface MetaMapper {// 建表操作void createExchangeTable();void createMSGQueueTable();void createBindingTable();
}本身 MyBatis 针对 MySQL 和 Oracle 是可以执行多个 sql 语句的, 但是 SQLite 不行 MetaMapper.xml 中 具体实现 sql 语句
update idcreateExchangeTablecreate table if not exists exchange_table(name varchar(64) primary key,type int comment 0 表示 Direct, 1 表示 Fanout, 2 表示 Topic,durable boolean,auto_delete boolean,arguments varchar(1024),delete_flag tinyint(4) DEFAULT 0,create_time datetime DEFAULT CURRENT_TIMESTAMP,update_time datetime DEFAULT CURRENT_TIMESTAMP);/updateupdate idcreateMSGQueueTablecreate table if not exists msg_queue_table(name varchar(64) primary key,durable boolean,exclusive boolean,auto_delete boolean,arguments varchar(1024),delete_flag tinyint(4) DEFAULT 0,create_time datetime DEFAULT CURRENT_TIMESTAMP,update_time datetime DEFAULT CURRENT_TIMESTAMP);/updateupdate idcreateBindingTablecreate table if not exists binding_table(exchange_name varchar(64),queue_name varchar(64),binding_key varchar(64),delete_flag tinyint(4) DEFAULT 0,create_time datetime DEFAULT CURRENT_TIMESTAMP,update_time datetime DEFAULT CURRENT_TIMESTAMP);/update实现数据库基本操作
给 mapper.MetaMapper 中添加
// 相关的增删改查操作Integer insertExchange(Exchange exchange);Integer deleteExchangeByName(String name);ListExchange selectAllExchanges();Integer insertMSGQueue(MSGQueue msgQueue);Integer deleteMSGQueueByName(String name);ListMSGQueue selectAllMSGQueues();Integer insertBinding(Binding binding);Integer deleteBinding(String exchangeName, String queueName);ListBinding selectAllBindings();相关sql语句实现 insert idinsertExchangeinsert into exchange_table (name, type, durable, auto_delete, arguments)values (#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments});/insertinsert idinsertMSGQueueinsert into msg_queue_table (name, durable, exclusive, auto_delete, arguments)values (#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments});/insertinsert idinsertBindinginsert into binding_table (exchange_name, queue_name, binding_key)values (#{exchangeName}, #{queueName}, #{bindingKey});/insertupdate iddeleteExchangeByNameupdate exchange_tableset delete_flag 1where name #{nanme};/updateupdate iddeleteMSGQueueByNameupdate msg_queue_tableset delete_flag 1where name #{nanme};/updateupdate iddeleteBindingupdate binding_tableset delete_flag 1where exchange_name #{exchangeName}and queue_name #{queueName};/updateselect idselectAllExchanges resultTypeen.edu.zxj.mq.mqserver.core.Exchangeselect name,type,durable,auto_delete,arguments,delete_flag,create_time,update_timefrom exchange_tablewhere delete_flag 0;/selectselect idselectAllMSGQueues resultTypeen.edu.zxj.mq.mqserver.core.MSGQueueselect name,durable,exclusive,auto_delete,arguments,delete_flag,create_time,update_timefrom msg_queue_tablewhere delete_flag 0;/selectselect idselectAllBindings resultTypeen.edu.zxj.mq.mqserver.core.Bindingselect exchange_name,queue_name,binding_key,delete_flag,create_time,update_timefrom binding_tablewhere delete_flag 0;/select实现 DataBaseManager
mqserver.datacenter.DatabaseManager
创建 DatabaseManager 类 – 通过这个类来封装对数据库的操作 /*** Created with IntelliJ IDEA.* Description通过这个类来封装针对数据库的操作.** author: zxj* date: 2024-02-26* time: 21:21:21*/
Slf4j
public class DatabaseManager {// 由于 DataBaseManager 不是⼀个 Bean// 需要⼿动来获取实例private MetaMapper metaMapper;public void init() {metaMapper MqApplication.context.getBean(MetaMapper.class);if (!checkDBExits()) {// 数据库不存在// 1. 先创建目录File file new File(./data/);if (!file.exists()) {file.mkdirs();}// 2. 建表createTables();// 3. 插入默认的数据createDefaultData();log.info(创建数据库成功~);} else {log.info(数据库已经存在!);}}}如果数据库已经存在了, 就不必建库建表了 针对 MqApplication, 需要新增⼀个 context 属性. 并初始化
SpringBootApplication
public class MqApplication {public static ConfigurableApplicationContext context null;public static void main(String[] args) {context SpringApplication.run(MqApplication.class, args);}
}实现 checkDBExists private boolean checkDBExits() {File file new File(./data/meta.db);return file.exists();}实现 createTable /*** 这个方法用来建表.* 建库操作并不需要手动执行. (不需要手动创建 meta.db 文件)* 首次执行这里的数据库操作的时候, 就会自动的创建出 meta.db 文件来 (MyBatis 帮我们完成的)**/private void createTables() {metaMapper.createMSGQueueTable();metaMapper.createBindingTable();metaMapper.createExchangeTable();log.info(建表成功);}实现 createDefaultData
/*** description: 给数据库表中, 添加默认的数据* 此处主要是添加一个默认的交换机* RabbitMQ 里有一个这样的设定: 带有一个 匿名 的交换机, 类型是 DIRECT**/
private void createDefaultData() {// 构造一个默认的交换机Exchange exchange new Exchange();exchange.setName();exchange.setType(ExchangeType.DIRECT);exchange.setDurable(false);exchange.setAutoDelete(false);metaMapper.insertExchange(exchange);log.info(创建默认的数据成功~);
}默认数据主要是创建⼀个默认的交换机. 这个默认交换机没有名字, 并且是直接交换机. 封装其他的数据库操作
// 封装其他的数据库操作Integer insertExchange(Exchange exchange) {return metaMapper.insertExchange(exchange);}Integer deleteExchangeByName(String name) {return metaMapper.deleteExchangeByName(name);}ListExchange selectAllExchanges() {return metaMapper.selectAllExchanges();}Integer insertMSGQueue(MSGQueue msgQueue) {return metaMapper.insertMSGQueue(msgQueue);}Integer deleteMSGQueueByName(String name) {return metaMapper.deleteMSGQueueByName(name);}ListMSGQueue selectAllMSGQueues() {return metaMapper.selectAllMSGQueues();}Integer insertBinding(Binding binding) {return metaMapper.insertBinding(binding);}Integer deleteBinding(String exchangeName, String queueName) {return metaMapper.deleteBinding(exchangeName, queueName);}ListBinding selectAllBindings() {return metaMapper.selectAllBindings();}// 清理资源public void deleteDB() {File file new File(./data/meta.db);if (file.delete()) {log.info(删除数据库文件成功~);} else {log.info(删除数据库文件失败~);}File dataDir new File(./data/);// 使用 delete 删除目录的时候, 需要保证目录是空的.if (dataDir.delete()) {log.info(删除数据库目录成功~);} else {log.info(删除数据库目录失败~);}}测试 DataBaseManager
使⽤ Spring ⾃带的单元测试, 针对上述代码进⾏测试验证.
在 test ⽬录中, 创建 DataBaseManagerTests
准备⼯作 SpringBootTest
class DatabaseManagerTest {private final DatabaseManager databaseManager new DatabaseManager();// 接下来需要编写多个方法, 每个方法都是一个/一组单元测试用例// 还需要做一个准备工作, 需要写两个方法, 分别用于今 准备工作 和 收尾工作// 使用这个方法, 来执行准备工作, 每个用例执行前, 都要调用这个方法BeforeEachpublic void setUp() {// 由于在 init 中, 需要通过 context 对象拿到 metaMapper 实例的// 所以就需要先把 context 对象给搞出来, 给搞出来MqApplication.context SpringApplication.run(MqApplication.class);databaseManager.init();}// 使用这个方法, 来执行收尾工作, 每个用例执行后, 都要调用这个方法AfterEachpublic void tearDown() {/*这里需要进行操作, 就是把数据库给清空~ (把数据库文件, meta.db 直接删了就行了)注意, 此处不能直接就删除, 而需要先关闭上述 context 对象!此处的 context 对象, 持有了 MetaMapper 的实例, MetaMapper 实例又打开了 meta.db 数据库文件如果 meta.db 被别人打开了, 次数的删除文件操作是不会成功的 (Windows 系统的限制, Linux 没有这个问题)另一方面, 获取 context 操作, 会占用 8080 端口, 此处的 close 也是释放 8080*/MqApplication.context.close();databaseManager.deleteDB();}
}SpringBootTest 注解表示该类是一个测试类BeforeEach 每个测试用例之前执行, 一般用来做准备工作, 此处进行数据库初始化, 以及针对 Spring 服务的初始化AfterEach 每个测试用例之后执行, 一般用来做收尾工作, 此处需要先关闭 Spring 项目, 再删除数据库
编写测试用例
Test 注解表示一个测试用例Assertions 是断言, 用来判定结果的每个用例执行之前, 都会先调用 setUp, 每次用例执行后, 都会调用 tearDown确保每个用例执行的都是 “clean” 的, 也就是每个测试用例不会被上一个测试用例干扰
具体代码
Testvoid init() {// 由于 init 方法, 已经在上面 setUp 方法中调用了, 直接在测试用例代码中, 检查当前的数据库状态即可// 直接从数据库中查询, 看数据是否符合预期.// 查交换机表, 里面应该有一个数据 (匿名的 exchange); 查消息队列表, 没有数据; 查绑定表, 没有数据ListExchange exchanges databaseManager.selectAllExchanges();ListMSGQueue msgQueues databaseManager.selectAllMSGQueues();ListBinding bindings databaseManager.selectAllBindings();/*直接打印结果, 通过肉眼来检查结果, 可以但是不优雅, 不方便更好的方法是使用断言System.out.println(exchanges.size());assertEquals 判定结果是不是相等注意 assertEquals 两个参数的顺序, 虽然比较相等, 谁在前, 谁在后, 无所谓但是 assertEquals 的形参, 第一个形参叫做 expected (预期), 第二个形参叫做 actual (实际的)*/Assertions.assertEquals(1, exchanges.size());Assertions.assertEquals(, exchanges.get(0).getName());Assertions.assertEquals(ExchangeType.DIRECT, exchanges.get(0).getType());Assertions.assertEquals(0, msgQueues.size());Assertions.assertEquals(0, bindings.size());}org.jetbrains.annotations.NotNullprivate Exchange createTestExchange(String exchangeName) {Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.FANOUT);exchange.setDurable(true);exchange.setAutoDelete(false);exchange.setArguments(11, aa);exchange.setArguments(22, bb);return exchange;}Testvoid insertExchange() {// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.String exchangeName exchangeTest;Exchange exchange createTestExchange(exchangeName);databaseManager.insertExchange(exchange);// 插入完毕后, 查询结果ListExchange exchanges databaseManager.selectAllExchanges();Exchange newExchange exchanges.get(1);Assertions.assertEquals(2, exchanges.size());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());Assertions.assertEquals(true, newExchange.getDurable());Assertions.assertEquals(false, newExchange.getAutoDelete());Assertions.assertEquals(aa, newExchange.getArguments(11));Assertions.assertEquals(bb, newExchange.getArguments(22));}Testvoid deleteExchangeByName() {// 先构造一个交换机, 插入数据库; 然后再按照名字删除即可!String exchangeName exchangeTest;Exchange exchange createTestExchange(exchangeName);databaseManager.insertExchange(exchange);// 插入完毕后, 查询结果ListExchange exchanges databaseManager.selectAllExchanges();Exchange newExchange exchanges.get(1);Assertions.assertEquals(2, exchanges.size());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());// 删除交换机databaseManager.deleteExchangeByName(exchangeName);exchanges databaseManager.selectAllExchanges();Assertions.assertEquals(1, exchanges.size());Assertions.assertEquals(, exchanges.get(0).getName());}Testvoid selectAllExchange() {// 构造一个 Exchange 对象, 插入到数据库中. 再查询出来, 看结果是否符合预期.String exchangeName exchangeTest;Exchange exchange createTestExchange(exchangeName);databaseManager.insertExchange(exchange);// 插入完毕后, 查询结果ListExchange exchanges databaseManager.selectAllExchanges();Exchange newExchange exchanges.get(1);Assertions.assertEquals(2, exchanges.size());Assertions.assertEquals(ExchangeType.FANOUT, newExchange.getType());Assertions.assertEquals(true, newExchange.getDurable());Assertions.assertEquals(false, newExchange.getAutoDelete());Assertions.assertEquals(aa, newExchange.getArguments(11));Assertions.assertEquals(bb, newExchange.getArguments(22));}private MSGQueue createTestMSGQueue(String msgQueueName) {MSGQueue msgQueue new MSGQueue();msgQueue.setName(msgQueueName);msgQueue.setDurable(true);msgQueue.setAutoDelete(false);msgQueue.setExclusive(false);msgQueue.setArguments(a, 1);msgQueue.setArguments(b, 2);return msgQueue;}Testvoid insertMSGQueue() {// 插入数据String msgQueueName testMSGQueueName;MSGQueue msgQueue createTestMSGQueue(msgQueueName);databaseManager.insertMSGQueue(msgQueue);// 查询数据, 判断插入的数据是否正确ListMSGQueue msgQueues databaseManager.selectAllMSGQueues();MSGQueue msgQueueNew msgQueues.get(0);Assertions.assertEquals(1, msgQueues.size());Assertions.assertEquals(msgQueueName, msgQueueNew.getName());Assertions.assertEquals(true, msgQueueNew.getDurable());Assertions.assertEquals(false, msgQueueNew.getAutoDelete());Assertions.assertEquals(false, msgQueueNew.getExclusive());Assertions.assertEquals(1, msgQueueNew.getArguments(a));Assertions.assertEquals(2, msgQueueNew.getArguments(b));}Testvoid deleteMSGQueueByName() {// 插入数据String msgQueueName testMSGQueueName;MSGQueue msgQueue createTestMSGQueue(msgQueueName);databaseManager.insertMSGQueue(msgQueue);// 查询数据, 判断插入的数据是否正确ListMSGQueue msgQueues databaseManager.selectAllMSGQueues();MSGQueue msgQueueNew msgQueues.get(0);Assertions.assertEquals(1, msgQueues.size());Assertions.assertEquals(msgQueueName, msgQueueNew.getName());// 依据名字删除databaseManager.deleteMSGQueueByName(msgQueueName);msgQueues databaseManager.selectAllMSGQueues();Assertions.assertEquals(0, msgQueues.size());}Testvoid selectAllMSGQueue() {// 插入数据String msgQueueName testMSGQueueName;MSGQueue msgQueue createTestMSGQueue(msgQueueName);databaseManager.insertMSGQueue(msgQueue);// 查询数据, 判断插入的数据是否正确ListMSGQueue msgQueues databaseManager.selectAllMSGQueues();MSGQueue msgQueueNew msgQueues.get(0);Assertions.assertEquals(1, msgQueues.size());Assertions.assertEquals(msgQueueName, msgQueueNew.getName());}private NotNull Binding createTestBinding(String exchangeName, String msgQueueName) {Binding binding new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(msgQueueName);binding.setBindingKey(Hello word);return binding;}Testvoid insertBinding() {// 插入 binding 数据String exchangeName testBindingExchangeName;String msgQueueName testBindingMSGQueueName;Binding binding createTestBinding(exchangeName, msgQueueName);databaseManager.insertBinding(binding);// 查询ListBinding bindings databaseManager.selectAllBindings();Binding bindingNew bindings.get(0);Assertions.assertEquals(1, bindings.size());Assertions.assertEquals(exchangeName, bindingNew.getExchangeName());Assertions.assertEquals(msgQueueName, bindingNew.getQueueName());Assertions.assertEquals(Hello word, bindingNew.getBindingKey());}Testvoid deleteBinding() {// 插入 binding 数据String exchangeName testBindingExchangeName;String msgQueueName testBindingMSGQueueName;Binding binding createTestBinding(exchangeName, msgQueueName);databaseManager.insertBinding(binding);// 查询ListBinding bindings databaseManager.selectAllBindings();Binding bindingNew bindings.get(0);Assertions.assertEquals(1, bindings.size());// 删除databaseManager.deleteBinding(exchangeName, msgQueueName);bindings databaseManager.selectAllBindings();Assertions.assertEquals(0, bindings.size());}Testvoid selectAllBinding() {// 插入 binding 数据String exchangeName testBindingExchangeName;String msgQueueName testBindingMSGQueueName;Binding binding createTestBinding(exchangeName, msgQueueName);databaseManager.insertBinding(binding);// 查询ListBinding bindings databaseManager.selectAllBindings();Binding bindingNew bindings.get(0);Assertions.assertEquals(1, bindings.size());}七、消息存储设计
设计思路
消息需要再硬盘上存储 但是并不是直接放到数据库中而是直接使用文件存储。
原因如下
对于消息的操作不需要复杂的 增删改查对于文件的操作效率比数据库会高很多 主流 mq 的实现 (包括 RabbitMQ), 都是把消息存储在文件中, 而不是数据库中 我们给每个队列分配一个目录, 目录的名字为 data 队列名, 形如 ./data/testQueue, 该目录中包含两个固定名字的文件
queue_data.txt 消息数据文件, 用来保存消息内容queue_stat.txt 消息统计文件, 用来保存消息统计信息
queue_data.txt 文件格式:
使用二进制方式存储.
每个消息分成两个部分:
前四个字节, 表示 Message 对象的长度(字节为单位)后面若干个字节, 表示 Message 内容消息和消息之间收尾相连
每个 Message 基于 Java 标准库的 ObjectInputStream / ObjectOutputStream 序列化 Message 对象中的 offsetBeg 和 offsetEnd 正是用来描述每个消息体所在的位置 queue_static.txt 文件格式: 使用文本方式存储
文件中只包含一行, 里面包含两列(都是整数), 使用 \t 分割.
第一列表示当前总的消息数目. 第二列表示有效消息数目.
形如:
2000\t1500创建 MessageFileManager 类
创建 mqserver.datacenter.MessageFileManager /*** Created with IntelliJ IDEA.* Description消息持久化存储* 存储单位是以队列名字为单位存储的** author: zxj* date: 2024-02-28* time: 13:43:23*/
Slf4j
public class MessageFileManger {private final static String BASIC_DIR ./data/;/*** description: 内部类, 用于管理 queue_stat.txt 中的数据* 存储格式: totalCount \t validCount* 作用: 为了后面垃圾回收功能做准备的* 约定: 当 有效信息的占比低于 50% 时, 并且 总的消息数目大于 2000 时, 触发 GC 功能**/static public class Stat {// 总信息的存储数目public Integer totalCount;// 有效的信息数目public Integer validCount;// 最少消息数目private static final Integer atLeastCount 2000;// 最低有效信息占比private static final Double minProportion 0.5;}public void init() {// 暂时不需要任何初始化操作, 方便后续扩展}// 设定信息存储的目录和文件/*** description: 用来获取指定队列信息存储的目录**/Contract(pure true)private NotNull String getQueueDir(String queueName) {return BASIC_DIR queueName;}/*** description: 用来获取指定队列信息数据存储路径**/Contract(pure true)private NotNull String getQueueDataPath(String queueName) {return getQueueDir(queueName) /queue_data.txt;}/*** description: 用来获取指定队列信息记录存储路径**/Contract(pure true)private NotNull String getQueueStatPath(String queueName) {return getQueueDir(queueName) /queue_stat.txt;}/*** description: 用来获取指定队列新数据存储路径**/Contract(pure true)private NotNull String getQueueDataNewPath(String queueName) {return getQueueDir(queueName) /queue_data_new.txt;}}内部包含一个 Stat 类, 用来标识消息统计文件的内容getQueueDir, getQueueDataPath, getQueueStatPath, getQueueDataNewPath 用来表示这几个文件的位置
实现统计⽂件读写 这是后续操作的一些准备工作 /*** description: 读取 queue_stat.txt 文件里面的内容**/private NotNull Stat readStat(String queueName) throws IOException {Stat stat new Stat();try (InputStream inputStream Files.newInputStream(Paths.get(getQueueStatPath(queueName)))) {// 因为 queue_stat.txt 里面存储的内容是文本的, 所以可以使用 Scanner 来读取Scanner scanner new Scanner(inputStream);stat.totalCount scanner.nextInt();stat.validCount scanner.nextInt();}return stat;}/*** description: 向 queue_stat.txt 文件里面写入内容**/private void writeStat(String queueName, NotNull Stat stat) throws IOException {// 使用 PrintWrite 来写文件.// OutputStream 打开文件, 默认情况下, 会直接把原文件清空. 此时相当于新的数据覆盖了旧的.try (OutputStream outputStream Files.newOutputStream(Paths.get(getQueueStatPath(queueName)))) {PrintWriter printWriter new PrintWriter(outputStream);printWriter.write(stat.totalCount \t stat.validCount);printWriter.flush();}}直接使用 Scanner 和 Printer 写即可
实现创建队列⽬录
每个队列都是自己的目录和配置的文件, 通过下列方法把目录和文件先准备好
/*** description: 创建相关目录信息, 并进行相关的初始化操作**/public void createQueueFiles(String queueName) throws IOException {// 1. 创建对应的目录File fileDir new File(getQueueDir(queueName));if (!fileDir.exists()) {// 不存在对应目录if (!fileDir.mkdirs()) {throw new IOException(创建目录失败, fileDir: fileDir.getAbsoluteFile());}}// 2. 创建 queue_data.txt 文件File fileData new File(getQueueDataPath(queueName));if (!fileData.exists()) {// 不存在对应文件if (!fileData.createNewFile()) {throw new IOException(创建目录失败, fileData: fileData.getAbsoluteFile());}}// 3. 创建 queue_stat.txt 文件File fileStat new File(getQueueDataPath(queueName));if (!fileStat.exists()) {// 不存在对应文件if (!fileStat.createNewFile()) {throw new IOException(创建目录失败, fileStat: fileStat.getAbsoluteFile());}}// 4. 初始化 queue_stat.txt 文件Stat stat new Stat();stat.totalCount 0;stat.validCount 0;writeStat(queueName, stat);}
把上述约定的文件都创建出来, 并对消息统计文件进行初始化 初始化 0\t0 这样的初始值 实现删除队列⽬录
如果队列需要删除, 则队列对应的⽬录/⽂件也需要删除 /*** description: 销毁消息的目录文件**/public void destroyQueueFiles(String queueName) throws IOException {// 先删除文件, 在删除目录File fileData new File(getQueueDataPath(queueName));boolean ok1 fileData.delete();File fileStat new File(getQueueStatPath(queueName));boolean ok2 fileStat.delete();File fileDir new File(getQueueDir(queueName));boolean ok3 fileDir.delete();if (!ok1 || !ok2 || !ok3) {// 但凡有一个失败, 就算整体是失败的throw new IOException(删除指定文件和目录失败, dir: fileDir.getAbsoluteFile());}}注意: File 类的 delete ⽅法只能删除空⽬录. 因此需要先把内部的⽂件先删除掉
检查队列⽂件是否存在 /*** description: 判断 queueName 对应的文件是否存在* 比如后续生产者给 broker server 生产消息了, 这消息可能需要被记录到文件上(取决于该信息是否需要持久化)* return:**/public boolean checkFilesExists(String queueName) {// 判断队里的数据文件和状态文件是否存在即可File fileData new File(getQueueDataPath(queueName));File fileStat new File(getQueueStatPath(queueName));return fileStat.exists() fileData.exists();}实现消息对象序列化/反序列化
Message 对象需要转成⼆进制写⼊⽂件. 并且也需要把⽂件中的⼆进制读出来解析成 Message 对象. 此处针对这⾥的逻辑进⾏封装.
创建 common.BinaryUtils /*** Created with IntelliJ IDEA.* Description操作二级制数据相关的工具类 -- 提供将 java 对象 反序列化和序列化** author: zxj* date: 2024-02-28* time: 14:33:24*/
public class BinaryUtils {/*** description: 反序列化, 将字节数组转化为 java 对象**/public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException {Object object null;try (ByteArrayInputStream byteArrayInputStream new ByteArrayInputStream(data)) {try (ObjectInputStream objectInputStream new ObjectInputStream(byteArrayInputStream)) {// 这里的 readObject 就是将字节数组 反序列为 java 对象object objectInputStream.readObject();}}return object;}public static byte[] toBytes(Object object) throws IOException {// 这个流对象相当于一个变长的字节数据// 可以把 Object 序列化的数据逐渐的写入到 byteArrayOutputStream 中, 再统一转成 byte[]try (ByteArrayOutputStream byteArrayOutputStream new ByteArrayOutputStream()){try (ObjectOutputStream objectOutputStream new ObjectOutputStream(byteArrayOutputStream)){// 此处的 writeObject 就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到 ObjectOutputStream中// 由于 ObjectOutputStream 又是关联到了 ByteArrayOutputStream 中, 最终结果就写入到 ByteArrayOutputStream 中了objectOutputStream.writeObject(object);}// 这个操作就是把 byteArrayOutputStream 中持有的二进制数据提取出现, 转化为 byte[]return byteArrayOutputStream.toByteArray();}}}使用 ByteArrayInputStream / ByteArrayOutputStream 针对 byte[ ]进行封装, 方便后续操作 (这两个流对象是纯内存的, 不需要进行 close)使用 ObjectInputStream / ObjectOutputStream 进行序列化和反序列化操作, 通过内部的 readObject / writeObject 即可完成对应操作此处涉及到的序列化对象, 需要实现 Serializable 接口
实现写⼊消息⽂件
/*** description: 增* 将 message 放到 msgQueue 对应的队列文件中* param: [msgQueue 消息队列, message 需要存储的信息 - 内存中也会管理该对象]**/public void sendMessage(NotNull MSGQueue msgQueue, Message message) throws MqException, IOException {// 1. 检查当前队列要写入的文件是否存在if (!checkFilesExists(msgQueue.getName())) {throw new MqException([MessageFileManager] 队列所对应的文件不存在! queueName msgQueue.getName());}// 2. 把 message 转化为 字节数组byte[] messageBinary BinaryUtils.toBytes(message);// 将 messageBinary 写入到 msgQueue 所对应的队列文件中// 文件属于一个公共资源, 此时进行写操作, 存在线程安全的问题// 需要对对应的队列进行加锁, 确保同时向同一个队列中写入信息是线程安全的synchronized (msgQueue) {// 3. 设置 Message 对象中 offsetBeg 和 offsetEnd 字段// 3.1 获取此时对应文件的总长度, fileQueueData.length() 就可以获取File fileQueueData new File(getQueueDataPath(msgQueue.getName()));// 3.2 计算// 把新的 message 写入到文件中, offsetBeg 旧的总长度 4, offsetEnd 旧的总长度 messageBinary.length 4message.setOffsetBeg(fileQueueData.length() 4);message.setOffsetEnd(fileQueueData.length() messageBinary.length 4);// 4. 将 messageBinary 写入到文件的默认, 注意: 这里是追加写try (OutputStream outputStream new FileOutputStream(getQueueDataPath(msgQueue.getName()), true)) {try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {// 4.1 先写入新 messageBinary 的长度 -- 固定占四个字节// 知识点: outputStream.write() 参数看似是 int 类型, 但是实际上只是写入一个字节的数据, dataOutputStream.writeInt() 就是写四个字节的数据dataOutputStream.writeInt(messageBinary.length);// 4.2 写入主体信息dataOutputStream.write(messageBinary);}}// 5. 更新信息统计文件的信息Stat stat readStat(msgQueue.getName());stat.validCount 1;stat.totalCount 1;writeStat(msgQueue.getName(), stat);}}考虑线程安全, 按照队列维度进行加锁使用 DataOutputStream 进行二进制写操作, 比原生 OutputStream 要方便需要记录 Message 对象在文件中的偏移量, 后续的删除操作依赖这个偏移量定位到信息, offsetBeg是原文件大小的基础上 4, 4个字节是存放消息大小的空间写完消息, 要同时更新统计消息
创建 common.MqException , 作为⾃定义异常类. 后续业务上出现问题, 都统⼀抛出这个异常.
/*** Created with IntelliJ IDEA.* Description自定义异常信息** author: zxj* date: 2024-02-28* time: 14:30:32*/
public class MqException extends Exception {public MqException() {}public MqException(String message) {super(message);}
}实现删除消息
此处的删除只是 “逻辑删除”, 即把 Message 类中的 isValid 字段设置为 0. 这样删除速度⽐较快. 实际的彻底删除, 则通过我们⾃⼰实现的 GC 来解决. /*** description: 删除 message* 这里的删除是逻辑删除, 也就是把硬盘上存储的这个数据里面的那个 isValid 属性, 设置为 0x0* 1. 先把这个文件中的这一段数据, 读出现来, 还原回 Message 对象* 2. 把 isValid 该成 0;* 3. 把上述数据重新写回文件* 此处这个参数中的 message 对象, 必须要包含有效的 offsetBeg 和 offsetEnd**/public void deleteMessage(NotNull MSGQueue msgQueue, NotNull Message message) throws IOException, ClassNotFoundException {// 修改文件, 存在线程安全问题synchronized (msgQueue) {try (RandomAccessFile randomAccessFile new RandomAccessFile(getQueueDataPath(msgQueue.getName()), rw)) {// 1. 先从文件中读取对应的 message 数据byte[] bufferSrc new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())];randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.read(bufferSrc);// 2. 将当前读出来的二进制数据, 转换成 Message 对象Message diskMessage (Message) BinaryUtils.fromBytes(bufferSrc);// 3. 把 isValid 设置为无效diskMessage.setIsValid((byte) 0x0);// 此处不需要宰割参数这个 Message 的 isValid 设为 0, 因为这个参数代表的内容中管理的 Message 对象, 而这个对象也马上要被从内存中销毁了// 4. 重新写入文件byte[] buffDest BinaryUtils.toBytes(diskMessage);// 虽然上面已经 seek 过了, 但是上面 seek 完了之后, 进行了读操作, 这一读, 就导致, 文件光标往后移动, 移动到下一个信息的位置了,// 因此想要接下来的写入, 能能够刚好写回到之前的位置, 就需要重新调整文件光标randomAccessFile.seek(message.getOffsetBeg());randomAccessFile.write(buffDest);// 通过上述这些操作, 对于文件来说, 只是有一个字节发生了改变了而已~}// 更新统计文件, 把一个消息设置为无效了, 此时有效信息个数就需要 -1Stat stat readStat(msgQueue.getName());if (stat.validCount 0) {stat.validCount - 1;}writeStat(msgQueue.getName(), stat);}}使用 RandomAccessFile 来随机访问到文件的内容根据 Message 中的 offsetBeg 和 offsetEnd 定位到消息在文件中的位置, 通过 randomAccessFile.seek 操作文件指针偏移过去, 在读取读出的结果解析成 Message 对象, 修改 isValid 字段, 再重新写回文件, 注意写的时候要重新设定文件指针的位置, 文件指针会随着上述的读操作产生改变最好, 要记得更新统计文件, 把合法消息 -1
实现消息加载
把消息内容从⽂件加载到内存中. 这个功能在服务器重启, 和垃圾回收的时候都很关键. /*** description: 查* 使用这个方法, 从文件中, 读取所有的消息内容, 加载到内存中 (具体来说是放到一个链表里面)* 这个方法,使用一个 LinkedList, 主要目的是为了后续进行头删操作* 这个方法的参数, 只是一个 queueName 而不是 MSGQueue 对象, 因为这个方法不需要加锁,只使用 queueName 就够了* 由于该方法是在程序启动调用, 此时服务还不能处理请求, 不涉及多线程操作文件**/public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {LinkedListMessage messages new LinkedList();try (InputStream inputStream Files.newInputStream(Paths.get(getQueueDataPath(queueName)))) {try (DataInputStream dataInputStream new DataInputStream(inputStream)) {// 这个变量记录当前文件的光标long currentOffset 0;while (true) {// 1. 先读取当前消息长度字段int messageSize dataInputStream.readInt();// 2. 按照这个长度, 读取后续的数据长度byte[] buffSrc new byte[messageSize];int actualSize dataInputStream.read(buffSrc);if (messageSize ! actualSize) {// 如果不匹配, 说明文件有问题, 格式错了throw new MqException([MessageFileManager] 文件格式错误, queueName queueName);}// 3. 把读到的二进制数据, 反序列化为 Message 对象Message message (Message) BinaryUtils.fromBytes(buffSrc);// 4. 判定一下, 看看这个消息对象, 是不是无效对象if (message.getIsValid() ! 0x1) {// 无效数据, 直接跳过// 虽然消息是无效数据, 但是 offset 仍要更新currentOffset (4 messageSize);continue;}// 5. 有效数据, 则需要将这个 Message 对象加入到链表中, 加入之前还需要添加 OffsetBeg 和 OffsetEnd// 进行计算 Offset 的时候, 需要当前文件光标的位置, 由于当下使用的 DataInputStream 并不方便直接获取文件光标位置// 因此需要手动计算下文件光标message.setOffsetBeg(currentOffset 4);message.setOffsetEnd(currentOffset 4 messageSize);currentOffset (4 messageSize);messages.add(message);}} catch (EOFException e) {// 这个异常是表示读取到文件的末尾了, 这是 DataInputStream 中规定的// 不需要做什么特殊处理log.info(恢复磁盘文件数据完成);}}return messages;}
使用 DataInputStream 读取数据, 先读 4 个字节为消息懂得长度, 然后在按照这个长度来读取实际消息内容读取完毕之后, 转化成 Message 对象同时计算出该对象的 offsetBeg 和 offsetEnd最终把结果整理成链表, 返回出去注意, 对于DataInputStream 来说, 如果读到 EOF, 就会抛出一个 EOFException, 而不是返回特定值, 因此需要注意上述循环的结束条件
实现垃圾回收(GC)
上述删除操作, 只是把消息在文件上标记成了无效, 并没有腾出磁盘空间, 因此需要定期的进行批量删除.
此处使用类似于复制算法, 当总消息数超过 2000, 并且有效消息数目少于 50%的时候, 就触发 GC.
GC 的时候会把所有有效消息加载出来, 写入一个新的消息文件中, 使用新文件, 替代旧文件即可 /*** description: 检查当前是否需要针对队列的消息数据文件进行 GC**/public boolean checkGC(String queueName) throws IOException {// 判断是否要 GC, 是根据总消息数和有效消息数, 这两个值都是在 消息统计文件中的Stat stat readStat(queueName);return stat.totalCount Stat.atLeastCount (double) stat.validCount / (double) stat.totalCount Stat.minProportion;}/*** description: 垃圾回收, 防止存储过多垃圾信息* 通过这个方法, 真正执行消息数据文件的垃圾回收操作* 使用复制算法来完成* 创建一个新的文件, 名字就是 queue_data_new.txt* 把之前消息数据文件中的有效消息都读出来, 写到新的文件中* 删除旧的文件,在把新的文件改名回 queue_data.txt* 同时要更新消息统计文件**/public void gc(MSGQueue msgQueue) throws MqException, IOException, ClassNotFoundException {// 进行 gc 的过程, 是针对消息数据文件进行大洗牌, 这个过程中, 其他线程不能针对该队列的消息文件做任何修改synchronized (msgQueue) {// 由于 gc 操作可能比较耗时, 此处统计一下消耗时间long gcBeg System.currentTimeMillis();// 1. 创建一个新的文件File queueDataNewFile new File(getQueueDataNewPath(msgQueue.getName()));if (queueDataNewFile.exists()) {// 正常情况下, 这个文件不应该存在, 如果存在, 就是意外, 说明上次 gc 了一半, 程序意味崩溃了throw new MqException([MessageFileManager] gc 时发现该队列的 queue_data_new 已经存在, queueName msgQueue.getName());}boolean ok queueDataNewFile.createNewFile();if (!ok) {throw new MqException([MessageFileManager] 创建文件失败 ,queueDataNewFile queueDataNewFile.getAbsoluteFile());}// 2. 从旧文件中, 读取出所有的有效消息对象LinkedListMessage messages loadAllMessageFromQueue(msgQueue.getName());// 3. 把有效信息写入到新的文件中try (OutputStream outputStream Files.newOutputStream(Paths.get(getQueueDataNewPath(msgQueue.getName())))) {try (DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {for (Message message : messages) {byte[] buffer BinaryUtils.toBytes(message);// 先写长度dataOutputStream.writeInt(buffer.length);// 在写整体数据dataOutputStream.write(buffer);}}}// 4. 删除旧的数据文件, 并把新的文件进行重命名File queueDataOldFile new File(getQueueDataPath(msgQueue.getName()));ok queueDataOldFile.delete();if (!ok) {throw new MqException([MessageFileManager] 删除旧的数据文件失败, queueDataOldFile: queueDataOldFile.getAbsoluteFile());}// 把 queue_data_new.txt queue_data.txtok queueDataNewFile.renameTo(queueDataOldFile);if (!ok) {throw new MqException([MessageFileManager] 文件重命名失败, queueDataOldFile: queueDataOldFile.getAbsoluteFile() , queueDataNewFile: queueDataNewFile.getAbsoluteFile());}// 5. 更新统计文件Stat stat readStat(msgQueue.getName());stat.totalCount messages.size();stat.validCount messages.size();long gcEnd System.currentTimeMillis();log.info(gc 执行消耗时间: {} ms, (gcEnd - gcBeg));}}如果文件很大, 消息非常多, 可能比较低效, 这种就需要把文件做拆分和合并了 Rabbitmq 是这样实现了, 此处实现简答, 就不做了 测试 MessageFileManager
创建两个队列, 用来辅助测试使用 ReflectionTestUtils.invokeMethod 来调用私有方法
package en.edu.zxj.mq.mqserver.datacenter;import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.core.MSGQueue;
import en.edu.zxj.mq.mqserver.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.util.ReflectionTestUtils;import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description** author: zxj* date: 2024-02-28* time: 18:30:12*/
SpringBootTest
class MessageFileMangerTest {private MessageFileManger messageFileManger new MessageFileManger();private static final String queueName1 testQueue1;private static final String queueName2 testQueue2;/*** description: 每个方法执行前的 准备工作**/BeforeEachvoid setUp() throws IOException {// 创建两个队列messageFileManger.createQueueFiles(queueName1);messageFileManger.createQueueFiles(queueName2);}/*** description: 每个方法执行前的 收尾工作工作**/AfterEachvoid tearDown() throws IOException {// 删除两个队列文件messageFileManger.destroyQueueFiles(queueName1);messageFileManger.destroyQueueFiles(queueName2);}// Test// void init() {// // 功能待开发// }Testvoid createQueueFiles() {// 创建文件已经在上面 setUp 阶段执行过了, 此处主要是验证看看文件是否存在File queueDataFile1 new File(./data/ queueName1 /queue_data.txt);Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 new File(./data/ queueName1 /queue_stat.txt);Assertions.assertEquals(true, queueStatFile1.isFile());File queueDataFile2 new File(./data/ queueName2 /queue_data.txt);Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 new File(./data/ queueName2 /queue_stat.txt);Assertions.assertEquals(true, queueStatFile2.isFile());}Testvoid destroyQueueFiles() throws IOException {// 删除文件, 看看是否存在, 不存在才对File queueDataFile1 new File(./data/ queueName1 /queue_data.txt);Assertions.assertEquals(true, queueDataFile1.isFile());File queueStatFile1 new File(./data/ queueName1 /queue_stat.txt);Assertions.assertEquals(true, queueStatFile1.isFile());// messageFileManger.destroyQueueFiles(queueName1);// Assertions.assertEquals(false,queueDataFile1.isFile());// Assertions.assertEquals(false,queueStatFile1.isFile());File queueDataFile2 new File(./data/ queueName2 /queue_data.txt);Assertions.assertEquals(true, queueDataFile2.isFile());File queueStatFile2 new File(./data/ queueName2 /queue_stat.txt);Assertions.assertEquals(true, queueStatFile2.isFile());}Testvoid checkFilesExists() {// 在 setUp 阶段, 创建了两个队列, 此处只要判断接口返回的是否是 true 即可Assertions.assertEquals(true, messageFileManger.checkFilesExists(queueName1));Assertions.assertEquals(true, messageFileManger.checkFilesExists(queueName2));}private Message createTestMessage(String content) {return Message.createMessageWithId(testRoutingKey, null, content.getBytes());}private MSGQueue createTestMSGQueue(String queueName) {MSGQueue msgQueue new MSGQueue();msgQueue.setName(queueName);msgQueue.setDurable(true);msgQueue.setAutoDelete(false);msgQueue.setExclusive(false);return msgQueue;}Testvoid sendMessage() throws IOException, MqException, ClassNotFoundException {// 构造出消息, 并构造出队列Message message createTestMessage(testSendMessage);// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue createTestMSGQueue(queueName1);// 调用发送信息方法messageFileManger.sendMessage(queue, message);// 检查 stat 文件MessageFileManger.Stat stat ReflectionTestUtils.invokeMethod(messageFileManger, readStat, queueName1);Assertions.assertEquals(1, stat.validCount);Assertions.assertEquals(1, stat.totalCount);// 检查 data 文件LinkedListMessage messages messageFileManger.loadAllMessageFromQueue(queueName1);Message newMessage messages.get(0);Assertions.assertEquals(true, newMessage.equals(message));System.out.println(message: newMessage);}Testvoid deleteMessage() throws IOException, MqException, ClassNotFoundException {// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue createTestMSGQueue(queueName1);// 构造 10 条数据, 在进行删除ListMessage exceptedMessages new ArrayList();for (int i 0; i 10; i) {Message message createTestMessage(testMessage i);messageFileManger.sendMessage(queue,message);exceptedMessages.add(message);}// 删除数据messageFileManger.deleteMessage(queue,exceptedMessages.get(9));messageFileManger.deleteMessage(queue,exceptedMessages.get(8));// 判断数据是否正确LinkedListMessage actualMessages messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(8,actualMessages.size());for (int i 0; i 8; i) {Message exceptedMessage exceptedMessages.get(i);Message actualMessage actualMessages.get(i);System.out.println(i : actualMessage);Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());}}Testvoid loadAllMessageFromQueue() throws IOException, MqException, ClassNotFoundException {// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue createTestMSGQueue(queueName1);// 构造 100 条数据ListMessage exceptedMessages new ArrayList();for (int i 0; i 100; i) {Message message createTestMessage(testMessage i);messageFileManger.sendMessage(queue,message);exceptedMessages.add(message);}// 读取所有数据, 看释放和原来的数据相同// 判断数据是否正确LinkedListMessage actualMessages messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(100,actualMessages.size());for (int i 0; i 100; i) {Message exceptedMessage exceptedMessages.get(i);Message actualMessage actualMessages.get(i);System.out.println(i : actualMessage);Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());}}Testvoid gc() throws IOException, MqException, ClassNotFoundException {// 此处创建的 queue 对象的 name, 不能随便填写, 只能使用 queueName1 和 queueName2,// 需要保证这个队列对象对应的目录和文件都存在才行MSGQueue queue createTestMSGQueue(queueName1);// 构造 100 条数据ListMessage exceptedMessages new ArrayList();for (int i 0; i 100; i) {Message message createTestMessage(testMessage i);messageFileManger.sendMessage(queue,message);exceptedMessages.add(message);}// 保存 gc 前文件的大小File file new File(./data/ queueName1 /queue_data.txt);long beforeGC file.length();// 删除偶数下标的数据for (int i 0; i 100; i2) {messageFileManger.deleteMessage(queue,exceptedMessages.get(i));}// 手动调用 gcmessageFileManger.gc(queue);// 读取所有数据LinkedListMessage actualMessages messageFileManger.loadAllMessageFromQueue(queueName1);Assertions.assertEquals(50,actualMessages.size());for (int i 0; i 50; i) {Message exceptedMessage exceptedMessages.get(i * 2 1);Message actualMessage actualMessages.get(i);System.out.println(i : actualMessage);Assertions.assertEquals(exceptedMessage.getMessageId(),actualMessage.getMessageId());Assertions.assertArrayEquals(exceptedMessage.getBody(),actualMessage.getBody());Assertions.assertEquals(0x1,actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getRoutingKey(),actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getDeliverMode(),actualMessage.getDeliverMode());}File file1 new File(./data/ queueName1 /queue_data.txt);long afterGC file1.length();Assertions.assertEquals(true,afterGC beforeGC);}
}⼋、 整合数据库和⽂件
上述代码中, 使⽤数据库存储了 Exchange, Queue, Binding, 使⽤⽂本⽂件存储了 Message.
接下来我们把两个部分整合起来, 统⼀进⾏磁盘管理.
创建 DiskDataCenter
使⽤ DiskDataCenter 来综合管理数据库和⽂本⽂件的内容.
DiskDataCenter 会持有 DataBaseManager 和 MessageFileManager 对象.
/*** Created with IntelliJ IDEA.* Description封装访问磁盘数据操作: 数据库 文件* 1. 数据库: 交换机, 消息队列, 绑定* 2. 文件: 消息* 上层逻辑如果需要访问磁盘, 就直接调用这个类, 不需要知道下层访问的是数据库还是文件** author: zxj* date: 2024-02-28* time: 21:28:00*/
public class DiskDataCenter {private DatabaseManager databaseManager new DatabaseManager();private MessageFileManger messageFileManger new MessageFileManger();public void init() {databaseManager.init();messageFileManger.init();}
}封装 Exchange ⽅法 /** 封装交换机操作*/public Integer insertExchange(Exchange exchange) {return databaseManager.insertExchange(exchange);}public Integer deleteExchangeByName(String name) {return databaseManager.deleteExchangeByName(name);}public ListExchange selectAllExchanges() {return databaseManager.selectAllExchanges();}封装 Queue ⽅法 /** 封装消息队列操作*/public Integer insertMSGQueue(MSGQueue msgQueue) throws IOException {Integer ret databaseManager.insertMSGQueue(msgQueue);messageFileManger.createQueueFiles(msgQueue.getName());return ret;}public Integer deleteMSGQueueByName(String name) throws IOException {Integer ret databaseManager.deleteMSGQueueByName(name);messageFileManger.destroyQueueFiles(name);return ret;}public ListMSGQueue selectAllMSGQueues() {return databaseManager.selectAllMSGQueues();}封装 Binding 方法
/** 封装绑定机操作*/public Integer insertBinding(Binding binding) {return databaseManager.insertBinding(binding);}public Integer deleteBinding(String exchangeName, String queueName) {return databaseManager.deleteBinding(exchangeName, queueName);}public ListBinding selectAllBindings() {return databaseManager.selectAllBindings();}封装 Message ⽅法
/** 封装消息操作*/public void createQueueFiles(String queueName) throws IOException {messageFileManger.createQueueFiles(queueName);}public void destroyQueueFiles(String queueName) throws IOException {messageFileManger.destroyQueueFiles(queueName);}public boolean checkFilesExists(String queueName) {return messageFileManger.checkFilesExists(queueName);}public void sendMessage(NotNull MSGQueue msgQueue, Message message) throws MqException, IOException {messageFileManger.sendMessage(msgQueue, message);}public void deleteMessage(NotNull MSGQueue msgQueue, NotNull Message message) throws IOException, ClassNotFoundException, MqException {messageFileManger.deleteMessage(msgQueue, message);// 删除一条信息之后, 判断是否需要 gcif (messageFileManger.checkGC(msgQueue.getName())) {messageFileManger.gc(msgQueue);}}public LinkedListMessage loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException {return messageFileManger.loadAllMessageFromQueue(queueName);}在 deleteMessage 的时候判定是否进⾏ GC.
⼩结
通过上述封装, 把数据库和硬盘⽂件两部分合并成⼀个整体. 上层代码在调⽤的时候则不再关⼼该数据是存储在哪个部分的.
九、 内存数据结构设计
硬盘上存储数据, 只是为了实现 “持久化” 这样的效果. 但是实际的消息存储/转发, 还是主要靠内存的结构.
对于 MQ 来说, 内存部分是更关键的, 内存速度更快, 可以达成更⾼的并发.
创建 MemoryDataCenter
创建 mqserver.datacenter.MemoryDataCenter
/*** Created with IntelliJ IDEA.* Description内存数据管理类 -- 实际消息转发/存储的类* 该类后续提供的一些方法, 可能会在多线程环境下使用, 因此需要注意线程安全的问题** author: zxj* date: 2024-02-29* time: 20:58:38*/
Slf4j
public class MemoryDataCenter {// key: 为 exchangeName, value: Exchange 对象private final ConcurrentHashMapString, Exchange exchangesMap new ConcurrentHashMap();// key: 为 messageId, value: Message 对象private final ConcurrentHashMapString, Message messagesMap new ConcurrentHashMap();// key: 为 exchangeName, value: Exchange 对象private final ConcurrentHashMapString, MSGQueue msgQueuesMap new ConcurrentHashMap();// key1: exchangeName, key2: msgQueueName, value: Binding 对象private final ConcurrentHashMapString, ConcurrentHashMapString, Binding bindingsMap new ConcurrentHashMap();}使用四个哈希表, 管理 Exchange, Queue, Binding, Message使用一个哈希表 链表管理队列 - 消息之间的关系使用一个哈希表 哈希表管理所有的未被确认的消息 为了保证消息被正确消费了, 会使用两种方式进行确认, 自动 Ack, 和 手动 Ack 其中自动 Ack 是指当消息被消费之后, 就会立即被销毁释放 其中手动 Ack 是指当消息被消费之后, 由消费者主动调用一个 basicAck方法, 进行主动确认, 服务器收到这个确认之后, 才能真正被销毁消息 此处的 “未确认消息” 就是指在手动Ack模式下, 该消息还没有被调用 basicAck, 此时消息不能被删除, 但是要和其他未消费的消息区分开, 于是另搞个结构 当后续basicAck到了, 就可以删除消息了 封装 Exchange ⽅法
/*** 封装 Exchange 操作**/public void insertExchange(Exchange exchange) {exchangesMap.put(exchange.getName(), exchange);log.info(新交换机添加成功! exchangeName: {}, exchange.getName());}public Exchange getExchange(String exchangeName) {return exchangesMap.get(exchangeName);}public void deleteExchange(String exchangeName) {exchangesMap.remove(exchangeName);log.info(删除交换机成功! exchangeName: {}, exchangeName);}封装 Queue ⽅法
/*** 封装 MSGQueue 操作**/public void insertMSGQueue(MSGQueue msgQueue) {msgQueuesMap.put(msgQueue.getName(), msgQueue);log.info(新交换机添加成功! msgQueueName: {}, msgQueue.getName());}public MSGQueue getMSGQueue(String msgQueueName) {return msgQueuesMap.get(msgQueueName);}public void deleteMSGQueue(String msgQueueName) {msgQueuesMap.remove(msgQueueName);log.info(删除交换机成功! msgQueueName: {}, msgQueueName);}
封装 Binding ⽅法
/*** 封装 Binding 操作**/public void insetBinding(Binding binding) throws MqException {// 传统的创建 Map 的步骤, 因为不是原子性操作, 存在线程安全的问题, 为了保证线程安全, 可以加上 synchronized 加锁// ConcurrentHashMapString, Binding stringBindingConcurrentHashMap bindingsMap.get(binding.getExchangeName()) ;// if (stringBindingConcurrentHashMap null) {// stringBindingConcurrentHashMap new ConcurrentHashMap();// bindingsMap.put(binding.getExchangeName(),stringBindingConcurrentHashMap);// }// ConcurrentHashMap 中有提供了 computeIfAbsent 方法, 简化了上述步骤, 并且是线程安全的 --// 先使用 exchangeName 查询一下, 如果存在就直接返回, 如果不存在就创建ConcurrentHashMapString, Binding stringBindingConcurrentHashMap bindingsMap.computeIfAbsent(binding.getExchangeName(), (k) - {return new ConcurrentHashMap();});synchronized (stringBindingConcurrentHashMap) {// 这里先查询在插入, 具有强的顺序关系, 数据存在二次覆盖, 存在线程安全的问题// 在根据 msgQueueName 查找, 如果存在, 就直接抛异常, 不存在才能插入if (stringBindingConcurrentHashMap.get(binding.getQueueName()) ! null) {throw new MqException([MemoryDataCenter] 绑定已经存在, exchangeName: binding.getExchangeName() ; msgQueueName: binding.getQueueName());}stringBindingConcurrentHashMap.put(binding.getQueueName(), binding);}log.info(绑定添加成功成功, binding.exchangeName: {}, binding.queueName: {},, binding.getExchangeName(), binding.getQueueName());}// 获取绑定// 1. 依据 exchangeName 和 queueName 获取唯一的 bindingpublic Binding getBinding(String exchangeName, String queueName) {ConcurrentHashMapString, Binding bindingMap bindingsMap.get(exchangeName);if (bindingMap null) {return null;}return bindingMap.get(queueName);}// 2. 依据 exchangeName 获取所有的 bindingpublic ConcurrentHashMapString, Binding getBindings(String exchangeName) {return bindingsMap.get(exchangeName);}public void deleteBinding(Binding binding) throws MqException {ConcurrentHashMapString, Binding bindingMap bindingsMap.get(binding.getExchangeName());if (bindingMap null) {// 该交换机没有绑定任何队列, 报错throw new MqException([MemoryDataCenter] 绑定不存在! binding: binding);}bindingsMap.remove(binding.getExchangeName());log.info(删除绑定成功! binding: {}, binding);}
封装 Message ⽅法
/*** 封装信息操作**/// 添加信息public void addMessage(Message message) {messagesMap.put(message.getMessageId(), message);log.info(添加信息成功! messageId: {}, message.getMessageId());}// 依据 Id 查询信息public Message getMessage(String messageId) {return messagesMap.get(messageId);}// 依据 Id 删除信息public void deleteMessage(String messageId) {messagesMap.remove(messageId);log.info(消息被删除! messageId: {}, messageId);}// 发送消息到指定队列public void sendMessage(MSGQueue queue, Message message) {// 把消息放到对应的队列数据结构中// 先根据队列的名字, 找到该队列对应的消息链表LinkedListMessage messages queueMessageMap.computeIfAbsent(queue.getName(), (k) - {return new LinkedList();});// 把数据假如到 messages 里面synchronized (messages) {messages.add(message);}// 在这里把该消息也往消息中心中插入一下, 假设如果 message 已经在消息中心存在, 重复插入也没有关系// 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器不会对 Message 内容修改 basicProperties 和 body)addMessage(message);log.info(消息被投递到队列中! messageId message.getMessageId());}// 从队列中取消息public Message pollMessage(String queueName) {// 根据队列名, 查找一下, 对应的队列的消息链表LinkedListMessage messages queueMessageMap.get(queueName);// 为空if (messages null) {return null;}synchronized (messages) {// 队列中没有任何消息if (messages.isEmpty()) {return null;}// 链表中有元素, 就进行头删Message currentMessage messages.remove(0);log.info(从消息从队列中取出! messageId: {}, currentMessage.getMessageId());return currentMessage;}}// 获取指定队列中消息的个数public int getMessageCount(String queueName) {// 根据队列名, 查找一下, 对应的队列的消息链表LinkedListMessage messages queueMessageMap.get(queueName);if (messages null) {return 0;}synchronized (messages) {return messages.size();}}针对未确认的消息的处理
// 添加未确认的消息public void addMessageWaitAck(String queueName, Message message) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAck.computeIfAbsent(queueName, (k) - {return new ConcurrentHashMap();});messageHashMap.put(message.getMessageId(), message);log.info(消息进入待确认队列! messageId: {}, message.getMessageId());}// 删除未确认的消息(消息已经确认了)public void removeMessageWaitAck(String queueName, String messageId) {ConcurrentHashMapString, Message messageHashMap queueMessageWaitAck.computeIfAbsent(queueName, (k) - {return new ConcurrentHashMap();});messageHashMap.remove(messageId);log.info(消息从待确认队列中删除! messageId: {}, messageId);}// 获取指定的未确认的信息public Message getMessageWaitAck(String queueName, String messageId) {ConcurrentHashMapString, Message messageConcurrentHashMap queueMessageWaitAck.get(queueName);if (messageConcurrentHashMap null) {return null;}return messageConcurrentHashMap.get(messageId);}
实现重启后恢复内存
// 这个方法就是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中 -- 交换机, 消息队列, 绑定, 消息public void recovery(DiskDataCenter diskDataCenter) throws MqException, IOException, ClassNotFoundException {// 0. 清空之前的所有数据exchangesMap.clear();msgQueuesMap.clear();bindingsMap.clear();messagesMap.clear();queueMessageMap.clear();// 1. 恢复所有的交换机数据ListExchange exchanges diskDataCenter.selectAllExchanges();for (Exchange exchange : exchanges) {exchangesMap.put(exchange.getName(), exchange);}log.info(恢复所有的 交换机 数据成功!);// 2. 恢复所有的队列数据ListMSGQueue msgQueues diskDataCenter.selectAllMSGQueues();for (MSGQueue msgQueue : msgQueues) {msgQueuesMap.put(msgQueue.getName(), msgQueue);}log.info(恢复所有的 队列 数据成功!);// 3. 恢复所有的绑定数据ListBinding bindings diskDataCenter.selectAllBindings();for (Binding binding : bindings) {ConcurrentHashMapString, Binding bindingMap bindingsMap.computeIfAbsent(binding.getExchangeName(), (k) - {return new ConcurrentHashMap();});bindingMap.put(binding.getQueueName(), binding);}log.info(恢复所有的 绑定 数据成功!);// 4. 恢复所有的消息队列// 遍历所有的队列, 根据每个队列的名字, 获取到所有的消息for (MSGQueue msgQueue : msgQueues) {LinkedListMessage messages diskDataCenter.loadAllMessageFromQueue(msgQueue.getName());queueMessageMap.put(msgQueue.getName(), messages);for (Message message : messages) {messagesMap.put(message.getMessageId(), message);}}log.info(恢复所有的 消息队列 成功!);log.info(从磁盘中恢复所有数据到内存成功);// 规定:// 针对 未确认的消息 这部分内存中的数据, 不需要从硬盘恢复, 之前考虑硬盘存储的时候, 也没有设定这一块// 这个消息在硬盘上存储的时候, 就是当做 为被取走}
测试 MemoryDataCenter
package en.edu.zxj.mq.mqserver.datacenter;import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.core.*;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;/*** Created with IntelliJ IDEA.* Description** author: zxj* date: 2024-02-29* time: 23:07:51*/
SpringBootTest
class MemoryDataCenterTest {private MemoryDataCenter memoryDataCenter;BeforeEachpublic void setUp() {memoryDataCenter new MemoryDataCenter();}AfterEachpublic void tearDown() {memoryDataCenter null;}/*** 创建测试 消息 对象**/private NotNull Message createTestMessage(NotNull String content) {return Message.createMessageWithId(testRoutingKey, null, content.getBytes());}/*** 创建 消息队列 对象**/private NotNull MSGQueue createTestMSGQueue(String queueName) {MSGQueue msgQueue new MSGQueue();msgQueue.setName(queueName);msgQueue.setDurable(true);msgQueue.setAutoDelete(false);msgQueue.setExclusive(false);return msgQueue;}/*** 创建 绑定 对象**/private NotNull Binding createTestBinding(String exchangeName, String msgQueueName) {Binding binding new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(msgQueueName);binding.setBindingKey(Hello word);return binding;}/*** 创建交换机对象**/org.jetbrains.annotations.NotNullprivate Exchange createTestExchange(String exchangeName) {Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setType(ExchangeType.FANOUT);exchange.setDurable(true);exchange.setAutoDelete(false);exchange.setArguments(11, aa);exchange.setArguments(22, bb);return exchange;}Testvoid testExchange() {// 对 交换机 相关操作统一测试Exchange exceptedExchange createTestExchange(exchangeTest);// 1. 插入操作memoryDataCenter.insertExchange(exceptedExchange);// 2. 查找操作Exchange actualExchange memoryDataCenter.getExchange(exceptedExchange.getName());// 此时 exceptedExchange 和 actualExchange 应该指向同一个空间才对Assertions.assertEquals(exceptedExchange, actualExchange);// 3. 删除操作memoryDataCenter.deleteExchange(exceptedExchange.getName());actualExchange memoryDataCenter.getExchange(exceptedExchange.getName());// 判断是否为空Assertions.assertNull(actualExchange);}Testvoid testMSGQueue() {// 对 消息队列 相关操作统一测试MSGQueue exceptedMSGQueue createTestMSGQueue(testMSGQueue);// 1. 插入操作memoryDataCenter.insertMSGQueue(exceptedMSGQueue);// 2. 查找操作MSGQueue actualMSGQueue memoryDataCenter.getMSGQueue(exceptedMSGQueue.getName());Assertions.assertEquals(exceptedMSGQueue, actualMSGQueue);// 3. 删除操作memoryDataCenter.deleteMSGQueue(exceptedMSGQueue.getName());actualMSGQueue memoryDataCenter.getMSGQueue(exceptedMSGQueue.getName());// 判断是否为空Assertions.assertNull(actualMSGQueue);}Testvoid testBinding() throws MqException {// 对 绑定 相关操作的统一测试Binding exceptedBinding createTestBinding(testExchangeName, testMSGQueueName);// 1. 插入操作memoryDataCenter.insetBinding(exceptedBinding);// 2. 查找操作Binding actrualBinding memoryDataCenter.getBinding(exceptedBinding.getExchangeName(), exceptedBinding.getQueueName());Assertions.assertEquals(exceptedBinding, actrualBinding);ConcurrentHashMapString, Binding bindings memoryDataCenter.getBindings(exceptedBinding.getExchangeName());Assertions.assertEquals(1, bindings.size());Assertions.assertEquals(exceptedBinding, bindings.get(exceptedBinding.getQueueName()));// 3. 删除操作memoryDataCenter.deleteBinding(exceptedBinding);actrualBinding memoryDataCenter.getBinding(exceptedBinding.getExchangeName(), exceptedBinding.getQueueName());bindings memoryDataCenter.getBindings(exceptedBinding.getExchangeName());// 判断是否为空Assertions.assertNull(actrualBinding);Assertions.assertNull(bindings);}Testvoid testMessage() {// 测试 消息相关的增删查 操作Message exceptedMessage createTestMessage(testMessage);// 1. 插入操作memoryDataCenter.addMessage(exceptedMessage);// 2. 查找操作Message actualMessage memoryDataCenter.getMessage(exceptedMessage.getMessageId());Assertions.assertEquals(exceptedMessage, actualMessage);// 3. 删除操作memoryDataCenter.deleteMessage(exceptedMessage.getMessageId());actualMessage memoryDataCenter.getMessage(exceptedMessage.getMessageId());Assertions.assertNull(actualMessage);}Testvoid sendMessage() {// 1. 创建一个队列, 创建十条消息, 把这些消息都插入到队列中MSGQueue queue createTestMSGQueue(testQueue);ListMessage exceptedMessages new ArrayList();for (int i 0; i 10; i) {Message message createTestMessage(testMessage i);memoryDataCenter.sendMessage(queue, message);exceptedMessages.add(message);}// 2. 从这个队列中取出这些消息ListMessage actualMessages new ArrayList();while (true) {Message message memoryDataCenter.pollMessage(queue.getName());if (message null) {break;}actualMessages.add(message);}// 3. 比较取出的消息和之前的消息是否是一致的Assertions.assertEquals(exceptedMessages.size(), actualMessages.size());for (int i 0; i actualMessages.size(); i) {Assertions.assertEquals(exceptedMessages.get(i), actualMessages.get(i));}}Testvoid testMessageWaitAck() {// 测试 消息相关的增删查 操作Message exceptedMessage createTestMessage(testMessage);// 1. 插入操作memoryDataCenter.addMessageWaitAck(testQueueName, exceptedMessage);// 2. 查找操作Message actualMessage memoryDataCenter.getMessageWaitAck(testQueueName, exceptedMessage.getMessageId());Assertions.assertEquals(exceptedMessage, actualMessage);// 3. 删除操作memoryDataCenter.removeMessageWaitAck(testQueueName, exceptedMessage.getMessageId());actualMessage memoryDataCenter.getMessageWaitAck(testQueueName, exceptedMessage.getMessageId());Assertions.assertNull(actualMessage);}Testvoid recovery() throws IOException, MqException, ClassNotFoundException {MqApplication.context SpringApplication.run(MqApplication.class);// 1. 在硬盘上构造好数据DiskDataCenter diskDataCenter new DiskDataCenter();diskDataCenter.init();// 构造交换机Exchange exceptedExchange createTestExchange(testExchangeName);diskDataCenter.insertExchange(exceptedExchange);// 构造消息队列MSGQueue exceptedMSGQueue createTestMSGQueue(testQueueName);diskDataCenter.insertMSGQueue(exceptedMSGQueue);// 构造绑定Binding exceptedBinding createTestBinding(testExchangeName, testMSGQueueName);diskDataCenter.insertBinding(exceptedBinding);// 构造消息Message exceptedMessage createTestMessage(testContent);diskDataCenter.sendMessage(exceptedMSGQueue, exceptedMessage);// 2. 执行恢复操作memoryDataCenter.recovery(diskDataCenter);// 3. 对比结果Exchange actualExchangeName memoryDataCenter.getExchange(testExchangeName);Assertions.assertEquals(exceptedExchange.getName(), actualExchangeName.getName());Assertions.assertEquals(exceptedExchange.getType(), actualExchangeName.getType());Assertions.assertEquals(exceptedExchange.getDurable(), actualExchangeName.getDurable());Assertions.assertEquals(exceptedExchange.getAutoDelete(), actualExchangeName.getAutoDelete());MSGQueue actualMSGQueue memoryDataCenter.getMSGQueue(testQueueName);Assertions.assertEquals(exceptedMSGQueue.getName(), actualMSGQueue.getName());Assertions.assertEquals(exceptedMSGQueue.getExclusive(), actualMSGQueue.getExclusive());Assertions.assertEquals(exceptedMSGQueue.getAutoDelete(), actualMSGQueue.getAutoDelete());Assertions.assertEquals(exceptedMSGQueue.getDurable(), actualMSGQueue.getDurable());Binding actualBinding memoryDataCenter.getBinding(testExchangeName, testMSGQueueName);Assertions.assertEquals(exceptedBinding.getExchangeName(), actualBinding.getExchangeName());Assertions.assertEquals(exceptedBinding.getBindingKey(), actualBinding.getBindingKey());Assertions.assertEquals(exceptedBinding.getQueueName(), actualBinding.getQueueName());Message actualMessage memoryDataCenter.pollMessage(testQueueName);Assertions.assertEquals(exceptedMessage.getMessageId(), actualMessage.getMessageId());Assertions.assertEquals(exceptedMessage.getRoutingKey(), actualMessage.getRoutingKey());Assertions.assertEquals(exceptedMessage.getIsValid(), actualMessage.getIsValid());Assertions.assertEquals(exceptedMessage.getDeliverMode(), actualMessage.getDeliverMode());Assertions.assertArrayEquals(exceptedMessage.getBody(), actualMessage.getBody());// 4. 清理磁盘MqApplication.context.close();File dataDir new File(./data);FileUtils.deleteDirectory(dataDir);}
}⼗、 虚拟主机设计
⾄此, 内存和硬盘的数据都已经组织完成. 接下来使⽤ “虚拟主机” 这个概念, 把这两部分的数据也串起来
并且实现⼀些 MQ 的关键 API.
创建 VirtualHost
创建 mqserver.VirtualHost
/*** Created with IntelliJ IDEA.* Description通过这个类, 来标识虚拟主机* 每个虚拟主机下面都管理着自己的 交换机, 队列, 绑定, 消息, 数据* 同时提供 API 供上层调用* 针对 VirtualHost 这个类, 作为业务逻辑的整合者, 就需要对于代码中抛出的异常进行处理了** author: zxj* date: 2024-03-01* time: 19:23:34*/
Getter
Slf4j
public class VirtualHost {private final String virtualHostName;private final DiskDataCenter diskDataCenter new DiskDataCenter();private final MemoryDataCenter memoryDataCenter new MemoryDataCenter();// Router 定义转发规则private final Router router new Router();// ConsumerManager 实现消费逻辑private final ConsumerManager consumerManager new ConsumerManager(this);
}其中 Router ⽤来定义转发规则, ConsumerManager ⽤来实现消息消费. 后续介绍
实现构造⽅法和 getter
构造⽅法中会针对 DiskDataCenter 和 MemoryDataCenter 进⾏初始化.
同时会把硬盘的数据恢复到内存中. public VirtualHost(String virtualHostName) {this.virtualHostName virtualHostName;// 先初始化硬盘操作diskDataCenter.init();// 后初始化内存操作memoryDataCenter.init();// 从磁盘中恢复数据到内存中try {memoryDataCenter.recovery(diskDataCenter);} catch (Exception e) {log.error(从磁盘中恢复数据到内存失败!);e.printStackTrace();return;}log.info(初始化 VirtualHost 成功, VirtualHostName: {}, virtualHostName);}创建交换机
约定, 交换机 / 队列的名字, 都加上 VirtualHostName 作为前置, 这样不同 VirtualHost 中就可以存在同名的交换机或者队列了exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回, 因此不叫做exchangeCreate先写硬盘, 后写内存, 因为硬盘失败概率更大, 如果硬盘写失败了,也就不必写内存了
/*** description: 创建交换机, declare 表示存在就不创建, 因此不叫做 exchangeCreate* 此处的 autoDelete, arguments 其实并没有使用, 只是先预留出来. (RabbitMQ 是支持的)* 约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀, 这样不同 VirtualHost 中就可以存在同名的交换机或者队列了* 先写磁盘, 后写内存, 因为写磁盘失败概率更大, 如果磁盘写失败了, 也就不必要写内存了* param: [exchangeName, exchangeType, durable, autoDelete, arguments]* return: 抛异常就返回 false, 正常执行逻辑就返回 true**/public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, MapString, Object arguments) {// 真实的 exchangeName 要加上 virtualHostName 前缀exchangeName virtualHostName exchangeName;synchronized (lockerExchange) {try {// 1. 判断交换机是否存在Exchange exsitsExchange memoryDataCenter.getExchange(exchangeName);if (exsitsExchange ! null) {log.info(交换机已经存在, exchangeName: {}, exchangeType: {}, exchangeName, exchangeType);return true;}// 2. 构造 Exchange 对象Exchange exchange new Exchange();exchange.setName(exchangeName);exchange.setType(exchangeType);exchange.setDurable(durable);exchange.setAutoDelete(autoDelete);exchange.setArguments(arguments);// 3. 先写入磁盘if (exchange.getDurable()) {diskDataCenter.insertExchange(exchange);}// 4. 后写入内存memoryDataCenter.insertExchange(exchange);log.info(交换机创建成功, exchangeName: {}, exchangeType: {}, exchangeName, exchangeType);return true;} catch (Exception e) {log.warn(创建交换机失败, exchangeName: {}, exchangeType: {}, exchangeName, exchangeType);e.printStackTrace();return false;}}}删除交换机
/*** description: 删除交换机**/public boolean exchangeDelete(String exchangeName) {// 真正存储的 exchangeNameexchangeName virtualHostName exchangeName;synchronized (lockerExchange) {try {// 1. 判断交换机是否存在Exchange exsitsExchange memoryDataCenter.getExchange(exchangeName);if (exsitsExchange null) {throw new MqException(交换机不存在, 无法删除! exchangeName; exchangeName);}// 2. 删除磁盘中的交换机diskDataCenter.deleteExchangeByName(exchangeName);// 3. 删除内存中的交换机memoryDataCenter.deleteExchange(exchangeName);log.info(删除交换机成功, exchangeName: {},, exchangeName);return true;} catch (Exception e) {log.warn(删除交换机失败, exchangeName: {},, exchangeName);e.printStackTrace();return false;}}}创建队列
/*** description: 创建队列**/public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, MapString, Object arguments) {// 真实的 queueNamequeueName virtualHostName queueName;synchronized (lockerQueue) {try {// 1. 判断队列是否存在MSGQueue existsQueue memoryDataCenter.getMSGQueue(queueName);if (existsQueue ! null) {log.info(队列已经存在, queueName: {}, queueName);return true;}// 2. 创建队列MSGQueue queue new MSGQueue();queue.setName(queueName);queue.setDurable(durable);queue.setExclusive(exclusive);queue.setAutoDelete(autoDelete);queue.setArguments(arguments);// 3. 先存入磁盘if (queue.getDurable()) {diskDataCenter.insertMSGQueue(queue);}// 4. 后存入到内存memoryDataCenter.insertMSGQueue(queue);log.info(队列创建成功, queueName: {}, queueName);return true;} catch (Exception e) {log.warn(创建队列失败, queueName: {}, queueName);e.printStackTrace();return false;}}}
删除队列
/*** description: 删除队列**/public boolean queueDelete(String queueName) {queueName virtualHostName queueName;synchronized (lockerQueue) {try {// 1. 判断队列是否存在MSGQueue existsQueue memoryDataCenter.getMSGQueue(queueName);if (existsQueue null) {throw new MqException(要删除的队列不存在, 无法删除! queueName queueName);}// 2. 删除磁盘中的队列if (existsQueue.getDurable()) {diskDataCenter.deleteMSGQueueByName(queueName);}// 3. 删除内存中的队列memoryDataCenter.deleteMSGQueue(queueName);log.info(删除队列成功, queueName: {}, queueName);return true;} catch (Exception e) {log.warn(删除队列失败, queueName: {}, queueName);e.printStackTrace();return false;}}}
创建绑定
bindingKey 是进⾏ topic 转发时的⼀个关键概念. 使⽤ router 类来检测是否是合法的 bindingKey.后续再介绍 router.checkBindingKeyValid 的实现. 此处先留空
/*** description: 添加绑定**/public boolean queueBind(String queueName, String exchangeName, String bindingKey) {// 加上 virtualHostName 前缀queueName virtualHostName queueName;exchangeName virtualHostName exchangeName;synchronized (lockerQueue) {synchronized (lockerExchange) {try {// 1. 判断绑定是否存在Binding existedBinding memoryDataCenter.getBinding(exchangeName, queueName);if (existedBinding ! null) {log.info(绑定存在, queueName: {}, exchangeName: {}, bindingKey: {}, queueName, exchangeName, bindingKey);return true;}// 2. 判断 bindingKey 是否合法if (!router.checkBindingKeyValid(bindingKey)) {throw new MqException(bindingKey 不合法! bindingKey: bindingKey);}// 3. 创建绑定Binding binding new Binding();binding.setExchangeName(exchangeName);binding.setQueueName(queueName);binding.setBindingKey(bindingKey);// 4. 获取绑定对应的队列和交换机, 判断这两个是否存在, 都存在才能创建MSGQueue msgQueue memoryDataCenter.getMSGQueue(queueName);if (msgQueue null) {throw new MqException(队列不存在, queueName: queueName);}Exchange exchange memoryDataCenter.getExchange(exchangeName);if (exchange null) {throw new MqException(交换机不存在, exchangeName: exchangeName);}// 5. 写入磁盘if (msgQueue.getDurable() exchange.getDurable()) {diskDataCenter.insertBinding(binding);}// 6. 写入内存memoryDataCenter.insetBinding(binding);log.info(添加绑定成功, queueName: {}, exchangeName: {}, bindingKey: {}, queueName, exchangeName, bindingKey);return true;} catch (Exception e) {log.warn(添加绑定失败, queueName: {}, exchangeName: {}, bindingKey: {}, queueName, exchangeName, bindingKey);e.printStackTrace();return false;}}}}删除绑定
/*** description: 删除绑定**/public boolean queueUnBind(String queueName, String exchangeName) {// 加上 virtualHostName 前缀queueName virtualHostName queueName;exchangeName virtualHostName exchangeName;synchronized (lockerExchange) {synchronized (lockerQueue) {try {// 1. 判断绑定是否存在Binding existedBinding memoryDataCenter.getBinding(exchangeName, queueName);if (existedBinding null) {throw new MqException(要删除的绑定不存在, 无法删除, exchangeName: exchangeName , queueName: queueName);}// 2. 无论绑定是否持久化了, 都试着删除一下磁盘中的数据, 影响不大diskDataCenter.deleteBinding(exchangeName, queueName);// 3. 删除内存memoryDataCenter.deleteBinding(existedBinding);log.info(删除绑定成功, queueName: {}, exchangeName: {}, queueName, exchangeName);return true;} catch (Exception e) {log.warn(删除绑定失败, queueName: {}, exchangeName: {}, queueName, exchangeName);e.printStackTrace();return false;}}}}
发布消息
发布消息其实是吧消息发送给指定的Exchange, 在根据 Exchange 和 Queue 的binding关系, 转发到对应队列中发送消息需要指定 routingKey, 这个值的作用和 ExchangeType是相关的 Direct: routingKey 就是对应的队列名字, 此时不需要binding关系, 也不需要bindingKey,就可以直接转发消息Fanout: routingKey 不起作用, bindingKey 也不起作用, 此时消息会转发给绑定到该交换机上的所有队列中Topic: routingKey 是一个特定的字符串, 会和bindingKey进行匹配, 如果匹配成功, 则发到对应的队列中, 具体规则后续介绍 BasicProperties 是消息的元消息, body是消息本体
/*** description: 发布消息* 发布消息其实就是把消息发送给指定的exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发到对应队列中* 发送消息需要指定 routingKey, 这个值的作用和 ExchangeType 相关的* Direct: routingKey 就是对应的队列名字, 此时不需要 binding 关系, 也不需要 bindingKey, 就可以直接转发消息* Fanout: routingKey 不起作用, bindingKey 也不起作用, 此时消息会转发给绑定该交换机上的所有队列中* Topic: routingKey 是一个特定的字符串, 会和 bindingKey 按照一定规则进行匹配, 如果匹配成功, 则发送到对应的队列中, 具体规则在 Router 类中介绍* param: [exchangeName, routingKey, basicProperties 消息的元消息, body 消息本体]* return:**/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) {try {// 1. 转换交换机的名字, 如果为 null, 就使用默认的交换机名字if (exchangeName null) {exchangeName ;}exchangeName virtualHostName exchangeName;// 2. 检验 routingKey 的合法性if (!router.checkRoutingKeyValid(routingKey)) {throw new MqException(routingKey 非法! routingKey: routingKey);}// 3. 查找交换机对象Exchange exchange memoryDataCenter.getExchange(exchangeName);if (exchange null) {throw new MqException(交换机不存在! exchangeName: exchangeName);}// 4. 依据交换机的类型来进行消息转发if (exchange.getType() ExchangeType.DIRECT) {// 按照直接交换机的方式来转发消息// 此时 routingKey 作为队列的名字, 直接把消息写入指定的队里中, 此时可以无视绑定关系String queueName virtualHostName routingKey;// 5. 构造消息对象Message message Message.createMessageWithId(routingKey, basicProperties, body);// 6. 找找队列名字对应的对象MSGQueue msgQueue memoryDataCenter.getMSGQueue(queueName);if (msgQueue null) {throw new MqException(队列不存在, queueName queueName);}// 7. 队列存在, 直接给队列中写入消息 -- 执行一次方法就消费一次消息sendMessage(msgQueue, message);} else {// 按照 fanout 和 topic 的方式来转发// 5. 找到该交换机关联的所有绑定, 并遍历这些绑定对象ConcurrentHashMapString, Binding bindingsMap memoryDataCenter.getBindings(exchangeName);for (Map.EntryString, Binding entry : bindingsMap.entrySet()) {// ① 获取到绑定对象, 判断对应的队列是否存在Binding binding entry.getValue();MSGQueue msgQueue memoryDataCenter.getMSGQueue(binding.getQueueName());if (msgQueue null) {// 此处就不抛异常, 可能此处有多个这样的队列// 希望不要因为一个队列的失败, 影响到其他队列的消息的传输log.warn(basicPublish 发送消息是, 发现队列不存在! queueName: {}, binding.getQueueName());continue;}// ② 构造消息对象, 发送给每一个队列的对象都是一个新的复制体Message message Message.createMessageWithId(routingKey, basicProperties, body);// ③ 判定这个消息是否能转发给改队列// 如果是 fanout, 所有绑定的队列都是要转发的// 如果是 topic, 还需要判定下, bindingKey 和 routingKey 是不是匹配if (!router.route(exchange.getType(), binding, message)) {continue;}// ④ 真正的转发消息给队列sendMessage(msgQueue, message);}}log.info(发送信息成功, exchangeName: {}, routingKey: {}, exchangeName, routingKey);return true;} catch (Exception e) {log.warn(发送信息失败, exchangeName: {}, routingKey: {}, exchangeName, routingKey);e.printStackTrace();return false;}}/*** description: 发送一次消息**/private void sendMessage(MSGQueue msgQueue, Message message) throws IOException, MqException, InterruptedException {// 此处发送消息, 就是把消息写入到 硬盘 和 内存 上int deliverMode message.getDeliverMode();// deliverMode 为 1, 不持久化, 为 2 表示持久化if (deliverMode 2) {diskDataCenter.sendMessage(msgQueue, message);}// 写入内存memoryDataCenter.sendMessage(msgQueue, message);// 通知消费者可以消费消息, 就是让消费者从对应的内存中取出消息consumerManager.notifyConsume(msgQueue.getName());}
路由规则
实现 mqserver.core.Router
实现 route ⽅法 /*** description: 路由选择* param: [type 交换机类型, binding 绑定对象 -- 里面提取 routingKey, message 消息对象]* return: 返回该交换机是否可以将该消息转发给绑定的队列中, true 表示可以, false 表示不可以**/public boolean route(ExchangeType type, Binding binding, Message message) throws MqException {// 根据不同的转发类型来进行不同的转发逻辑if (type ExchangeType.FANOUT) {// 如果是 FANOUT 类型, 该交换机上所有绑定的队列都需要进行转发return true;} else if (type ExchangeType.TOPIC) {// 如果是 TOPIC 类型, 规则复杂return routeTopic(binding,message);} else {// 其他情况是不应该存在的throw new MqException([Router] 交换机类型违法! type: type);}}实现 checkRoutingKeyValid
/*** description: 判断 routingKey 是否合法* routingKey 组成规则如下:* 1. 组成: 数字, 字母, 下划线* 2. 使用符号 . 将 routingKey 划分成多个部分* 形如:* aaa.bbb.ccc* a.1.b* a**/public boolean checkRoutingKeyValid(String routingKey) {if (!StringUtils.hasLength(routingKey)) {// null or 空字符串, 合法的情况, 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 return true;}for (int i 0; i routingKey.length(); i) {char ch routingKey.charAt(i);// 判断该字符是否是大写字母if (ch A ch Z) {continue;}// 判断字符是否是小写字母if (ch a ch z) {continue;}// 判断该字符阿拉伯数字if (ch 0 ch 9) {continue;}// 判断字符是否 _ 或 .if (ch . || ch _) {continue;}// 走到这里, 都不是上述任何一种合法的情况, 就直接返回 falsereturn false;}return true;}实现 checkBindingKeyValid /*** description: 判断 bindingKey 是否合法* bindingKey 组成规则如下:* 1. 组成: 数字, 字母, 下划线* 2. 使用符号 . 将 routingKey 划分成多个部分* 3. 支持两种特殊符号作为通配符 (* 和 # 必须是作为被 . 分割出来的独立的部分)* 1) *: * 可以匹配任何一个独立的部分* 2) #: # 监听匹配任何 0 个或者多个独立的部分* 形如:* aaa.bbb.ccc* a.1.b* a* #* a.*.b**/public boolean checkBindingKeyValid(String bindingKey) {if (!StringUtils.hasLength(bindingKey)) {// null or 空字符串, 合法的情况, 比如在使用 direct / fanout 交换机的时候, bindingKey 用不上, 就可以设为 return true;}for (int i 0; i bindingKey.length(); i) {char ch bindingKey.charAt(i);// 判断该字符是否是大写字母if (ch A ch Z) {continue;}// 判断字符是否是小写字母if (ch a ch z) {continue;}// 判断该字符阿拉伯数字if (ch 0 ch 9) {continue;}// 判断字符是否 _ 或 .if (ch . || ch _) {continue;}// 判断字符是否为通配符 * 或 #if (ch * || ch #) {continue;}// 走到这里, 都不是上述任何一种合法的情况, 就直接返回 falsereturn false;}// 检查 * 或者 # 是否是独立的部分// aaa.*.bbb 合法情况, aaa.a*.bbb 非法情况String[] words bindingKey.split(\\.);for (String word : words) {// 检查 word 长度 1 并且包含了 * 或者 #, 就是非法的格式了if (word.length() 1 (word.contains(*) || word.contains(#))) {return false;}}return true;}实现 routeTopic
使用动态规划的方式来进行规则的匹配
private boolean routeTopic(NotNull Binding binding, NotNull Message message) {String bindingKey binding.getBindingKey();String routingKey message.getRoutingKey();String[] bindingStr bindingKey.split(\\.);String[] routingStr routingKey.split(\\.);return mate(bindingStr,routingStr);}private boolean mate(String NotNull [] bindingStr, String NotNull [] routingStr) {int m bindingStr.length;int n routingStr.length;boolean[][] dp new boolean[m 1][n 1];dp[0][0] true;for (int i 0; i m; i) {if (#.equals(bindingStr[i])) {dp[i1][0] true;} else {break;}}for (int i 1; i m; i) {String wordBinding bindingStr[i - 1];for (int j 1; j n; j) {String wordRouting routingStr[j - 1];if (!#.equals(wordBinding) !*.equals(wordBinding)) {if (wordBinding.equals(wordRouting)) {dp[i][j] dp[i - 1][j - 1];} else {dp[i][j] false;}} else if (*.equals(wordBinding)) {dp[i][j] dp[i - 1][j - 1];} else {dp[i][j] dp[i - 1][j] || dp[i][j - 1];}}}return dp[m][n];}测试 Router 的匹配规则 测试代码如下:
package en.edu.zxj.mq.common;import en.edu.zxj.mq.mqserver.core.Binding;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import en.edu.zxj.mq.mqserver.core.Message;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description** author: zxj* date: 2024-03-02* time: 19:32:53*/
SpringBootTest
class RouterTest {private Router router new Router();private Binding binding null;private Message message null;BeforeEachpublic void setUp() {binding new Binding();message new Message();}AfterEachpublic void tearDown() {binding null;message null;}Testvoid checkBindingKeyValid1() {boolean ok router.checkBindingKeyValid(aaa.bbb.ccc);Assertions.assertTrue(ok);}Testvoid checkBindingKeyValid2() {boolean ok router.checkBindingKeyValid(1.a.c);Assertions.assertTrue(ok);}Testvoid checkBindingKeyValid3() {boolean ok router.checkBindingKeyValid(a);Assertions.assertTrue(ok);}Testvoid checkBindingKeyValid4() {boolean ok router.checkBindingKeyValid();Assertions.assertTrue(ok);}Testvoid checkBindingKeyValid5() {boolean ok router.checkBindingKeyValid(a.*.ccc);Assertions.assertTrue(ok);}Testvoid checkBindingKeyValid6() {boolean ok router.checkBindingKeyValid(#);Assertions.assertTrue(ok);}Testvoid checkBindingKeyValid7() {boolean ok router.checkBindingKeyValid(aaa.bb*b.ccc);Assertions.assertFalse(ok);}Testvoid checkBindingKeyValid8() {boolean ok router.checkBindingKeyValid(123.bbb.ccc);Assertions.assertTrue(ok);}Testvoid checkRoutingKeyValid1() {boolean ok router.checkRoutingKeyValid(a.b.c);Assertions.assertTrue(ok);}Testvoid checkRoutingKeyValid2() {boolean ok router.checkRoutingKeyValid(a.b._c);Assertions.assertTrue(ok);}Testvoid checkRoutingKeyValid3() {boolean ok router.checkRoutingKeyValid(a.b.c);Assertions.assertFalse(ok);}Testvoid checkRoutingKeyValid4() {boolean ok router.checkRoutingKeyValid(a);Assertions.assertTrue(ok);}Testvoid checkRoutingKeyValid5() {boolean ok router.checkRoutingKeyValid(a.1.b);Assertions.assertTrue(ok);}Testvoid checkRoutingKeyValid6() {boolean ok router.checkRoutingKeyValid(12222222223123123);Assertions.assertTrue(ok);}Testvoid checkRoutingKeyValid7() {boolean ok router.checkRoutingKeyValid(aaaa);Assertions.assertTrue(ok);}Testvoid checkRoutingKeyValid8() {boolean ok router.checkRoutingKeyValid(_____________._______________);Assertions.assertTrue(ok);}Testvoid checkRoutingKeyValid9() {boolean ok router.checkRoutingKeyValid(!!!!!!!!!!!!!!);Assertions.assertFalse(ok);}Testvoid checkRoutingKeyValid10() {boolean ok router.checkRoutingKeyValid(a.2._.!);Assertions.assertFalse(ok);}Testvoid checkRoutingKeyValid11() {boolean ok router.checkRoutingKeyValid(_a_.1_a.b);Assertions.assertTrue(ok);}Testvoid checkRoutingKeyValid12() {boolean ok router.checkRoutingKeyValid(a.b.c.12.7.234.4234.adf.___);Assertions.assertTrue(ok);}Testvoid checkRoutingKeyValid13() {boolean ok router.checkRoutingKeyValid(123.468a.sdfa.w);Assertions.assertTrue(ok);}Testvoid route1() throws MqException {binding.setBindingKey(aaa);message.setRoutingKey(aaa);boolean ok router.route(ExchangeType.FANOUT, binding, message);Assertions.assertTrue(ok);}Testvoid route2() throws MqException {binding.setBindingKey(aaa.bbb);message.setRoutingKey(aaa.bbb);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route3() throws MqException {binding.setBindingKey(aaa);message.setRoutingKey(aaa);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route4() throws MqException {binding.setBindingKey(aaa.bbb);message.setRoutingKey(aaa.bbb.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}Testvoid route5() throws MqException {binding.setBindingKey(aaa.bbb);message.setRoutingKey(aaa.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}Testvoid route6() throws MqException {binding.setBindingKey(aaa.bbb.ccc);message.setRoutingKey(aaa.bbb.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route7() throws MqException {binding.setBindingKey(aaa.*);message.setRoutingKey(aaa.bbb);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route8() throws MqException {binding.setBindingKey(aaa.*.bbb);message.setRoutingKey(aaa.bbb.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}Testvoid route9() throws MqException {binding.setBindingKey(*.aaa.bbb);message.setRoutingKey(aaa.bbb);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}Testvoid route10() throws MqException {binding.setBindingKey(#);message.setRoutingKey(aaa.bbb.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route11() throws MqException {binding.setBindingKey(aaa.#);message.setRoutingKey(aaa.bbb);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route12() throws MqException {binding.setBindingKey(aaa.#);message.setRoutingKey(aaa.bbb.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route13() throws MqException {binding.setBindingKey(aaa.#.ccc);message.setRoutingKey(aaa.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route14() throws MqException {binding.setBindingKey(aaa.#.ccc);message.setRoutingKey(aaa.bbb.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route15() throws MqException {binding.setBindingKey(aaa.#.ccc);message.setRoutingKey(aaa.aaa.bbb.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route16() throws MqException {binding.setBindingKey(#.ccc);message.setRoutingKey(ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route17() throws MqException {binding.setBindingKey(#.ccc);message.setRoutingKey(aaa.bbb.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route18() throws MqException {binding.setBindingKey(aaa.#.#.#);message.setRoutingKey(aaa);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}Testvoid route19() throws MqException {binding.setBindingKey(aaa.#.#.#.*);message.setRoutingKey(aaa);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertFalse(ok);}Testvoid route20() throws MqException {binding.setBindingKey(aaa.#.#.#.ccc);message.setRoutingKey(aaa.aaa.aaa.bbb.ccc);boolean ok router.route(ExchangeType.TOPIC, binding, message);Assertions.assertTrue(ok);}}订阅消息
添加一个订阅者
/*** description: 订阅消息* 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者* param: [consumerTag: 消费者的身份标识,* queueName: 订阅的队列名字,* autoAck: 消息被消费后的应当方式, true 为自动应当, false 为手动应答* consumer: 是一个回调函数, 此处类型设定成函数是接口, 这样后续调用 basicConsume 并且传递实参的时候, 就可以写成 lambda 样子了]* return: true 表示订阅成功, false 表示订阅失败**/public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {// 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中.queueName virtualHostName queueName;try {consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer);log.info(basicConsume 成功! consumerTag: {}, queueName: {}, consumerTag, queueName);return true;} catch (Exception e) {log.warn(basicConsume 失败! consumerTag: {}, queueName: {}, consumerTag, queueName);e.printStackTrace();return false;}}Consumer 相当于⼀个回调函数. 放到 common.Consumer 中.
创建订阅者管理管理类
创建 mqserver.core.ConsumerManager
Slf4j
public class ConsumerManager {// parent 用来记录虚拟主机private final VirtualHost parent;// 存放令牌的队列, 通过令牌来触发消费线程的消费操作// 使用一个阻塞队列来触发消息消费, 称为令牌队列, 每次有消息过来了, 都为队列中放一个令牌(也就是队列名), 让后消费者再去消费对应的队列信息// 作用: 令牌队列的设定, 避免搞出来太多线程, 否则就需要给每个队列都安排一个单独的线程了, 如果队列很多则开销就比较的了private final BlockingQueueString tokenQueue new LinkedBlockingQueue();// 使用一个线程池用来执行消息的回调private final ExecutorService workerPool Executors.newFixedThreadPool(4);// 扫描线程private Thread scannerThread null;
}parent 用来记录虚拟主机使用一个阻塞队列来触发信息消费,称为令牌队列, 每次有消息过来了, 都往队列中放一个令牌(也就是队列名), 然后消费者再去消费对应队列的消息.使用一个线程池用来执行消息回调 这样令牌队列的设定避免搞出来太多线程,否则需要给每个队列到安排一个单独的线程了,如果队列很多开销就比较大了. 添加令牌接⼝ /*** description: 通知消费者去消费消息**/public void notifyConsume(String name) throws InterruptedException {tokenQueue.put(name);}实现添加订阅者
/*** description: 添加订阅者* 新来的消费者需要消费掉之前的消息**/public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {// 找到对应的队列MSGQueue queue parent.getMemoryDataCenter().getMSGQueue(queueName);if (queue null) {throw new MqException([ConsumerManager] 队列不存在, queueName: queueName);}ConsumerEnv consumerEnv new ConsumerEnv(consumerTag, queueName, autoAck, consumer);synchronized (queue) {queue.addConsumerEnv(consumerEnv);// 如果当前队列中已经有了一些消息, 需要立即消费掉int n parent.getMemoryDataCenter().getMessageCount(queueName);for (int i 0; i n; i) {// 这个方法调用一次就消费一次消息consumeMessage(queue);}}}创建 ConsumerEnv , 这个类表⽰⼀个订阅者的执⾏环境.
Data
Slf4j
public class ConsumerEnv {private String consumerTag;private String queueName;private boolean autoAck;private Consumer consumer;public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {this.consumerTag consumerTag;this.queueName queueName;this.autoAck autoAck;this.consumer consumer;}
}给 MsgQueue 添加⼀个订阅者列表. 此处的 chooseConsumer 是实现⼀个轮询效果. 如果⼀个队列有多个订阅者, 将会按照轮询的⽅式轮流拿到消息. 实现扫描线程
在 ConsumerManager 中创建⼀个线程, 不停的尝试扫描令牌队列. 如果拿到了令牌, 就真正触发消费消息操作. public ConsumerManager(VirtualHost virtualHost) {this.parent virtualHost;scannerThread new Thread(() - {while (true) {try {// 1. 拿到令牌String queueName tokenQueue.take();// 2. 根据令牌找到对应的队列MSGQueue msgQueue parent.getMemoryDataCenter().getMSGQueue(queueName);if (msgQueue null) {throw new MqException(获取令牌后, 发现队列为空! queueName: queueName);}// 3. 从队列中消费一次消息consumeMessage(msgQueue);} catch (Exception e) {e.printStackTrace();}}});scannerThread.start();}实现消费消息 所谓的消费消息, 其实就是调⽤消息的回调. 并把消息删除掉.
/*** description: 消费一次消息**/private void consumeMessage(MSGQueue queue) {// 1. 按照轮询的方式, 取出消费者ConsumerEnv luckyDog queue.chooseConsumer();if (luckyDog null) {// 暂时还没有消费者, 就暂时不消费return;}// 2. 从指定队列中取出一个元素Message message parent.getMemoryDataCenter().pollMessage(queue.getName());if (message null) {return;}// 3. 把消息带入到消费者的回调方法中, 丢给线程池执行workerPool.submit(() - {try {// 1. 把消息放到待确认的集合中, 这个操作势必在执行回调之前parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message);// 2. 真正执行回调操作luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(), message.getBody());// 3. 如果当前是 自动应答, 就可以直接报消息给删除了// 如果当前是 手动应答, 则先不处理, 交给后续消费调用 basicAck 方法来处理if (luckyDog.isAutoAck()) {// ① 先删除磁盘上的消息if (message.getDeliverMode() 2) {parent.getDiskDataCenter().deleteMessage(queue, message);}// ② 删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId());// ③ 删除内存中消息parent.getMemoryDataCenter().deleteMessage(message.getMessageId());log.info(消息被消费成功, queueName: {}, queue.getName());}} catch (Exception e) {e.printStackTrace();}});}注意: ⼀个队列可能有 N 个消费者, 此处应该按照轮询的⽅式挑⼀个消费者进⾏消费.
⼩结
一、订阅者已经存在了 才发送消息
这种直接获取队列的订阅者从中按照轮询的方式挑一个消费者来调用回调即可消息先发送到队列了订阅者还没到此时当订阅者到达就快速把指定队列中的消息全部消费掉。
⼆. 关于消息不丢失的论证
每个消息在从内存队列中出队列时, 都会先进⼊ 待确认 中.
如果 autoAck 为 true 消息被消费完毕后(执⾏完消息回调之后), 再执⾏清除⼯作. 分别清除硬盘数据, 待确认队列, 消息中⼼如果 autoAck 为 false 在回调内部, 进⾏清除⼯作. 分别清除硬盘数据, 待确认队列, 消息中⼼.
执⾏消息回调的时候抛出异常
此时消息仍然处在待确认队列中 此时可以⽤⼀个线程扫描待确认队列, 如果发现队列中的消息超时未确认, 则放⼊死信队列.
执⾏消息回调的时候服务器宕机 内存所有数据都没了, 但是消息在硬盘上仍然存在. 会在服务下次启动的时候, 加载回内存. 重新被消费到.
消息确认
下列⽅法只是⼿动应答的时候才会使⽤.
应答成功, 则把消息删除掉.
/*** description: 消息确认**/public boolean basicAck(String queueName, String messageId) {queueName virtualHostName queueName;try {// 1. 获取到消息和队列Message message memoryDataCenter.getMessage(messageId);if (message null) {throw new MqException(要确认的消息不存在, messageId: messageId);}MSGQueue msgQueue memoryDataCenter.getMSGQueue(queueName);if (msgQueue null) {throw new MqException(要确认的队列不存在, queueName: queueName);}// 2. 删除硬盘上的数据if (message.getDeliverMode() 2) {diskDataCenter.deleteMessage(msgQueue,message);}// 3. 删除消息中心中心的数据memoryDataCenter.deleteMessage(messageId);// 4. 删除待确认的集合中的数据memoryDataCenter.removeMessageWaitAck(queueName,messageId);log.info(basicAck 成功, 消息确认成功! queueName: {}, messageId: {},queueName,messageId);return true;} catch (Exception e) {log.warn(basicAck 失败, 消息确认失败! queueName: {}, messageId: {},queueName,messageId);e.printStackTrace();return false;}}
测试 VirtualHost
package en.edu.zxj.mq.mqserver;import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.Consumer;
import en.edu.zxj.mq.mqserver.core.BasicProperties;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;import static org.junit.jupiter.api.Assertions.*;/*** Created with IntelliJ IDEA.* Description** author: zxj* date: 2024-03-02* time: 21:45:01*/
SpringBootTest
class VirtualHostTest {private VirtualHost virtualHost null;BeforeEachpublic void setUp() {MqApplication.context SpringApplication.run(MqApplication.class);virtualHost new VirtualHost(default);}AfterEachpublic void tearDown() throws IOException {MqApplication.context.close();virtualHost null;// 把硬盘的目录删除掉File dataDir new File(./data);FileUtils.deleteDirectory(dataDir);}Testvoid exchangeDeclare() {boolean ok virtualHost.exchangeDeclare(testExchangeName, ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);}Testvoid exchangeDelete() {boolean ok virtualHost.exchangeDeclare(testExchangeName, ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDelete(testExchangeName);Assertions.assertTrue(ok);ok virtualHost.exchangeDelete(testExchangeName);Assertions.assertFalse(ok);}Testvoid queueDeclare() {MapString, Object arguments new HashMap();arguments.put(a, 1);arguments.put(b, 2);boolean ok virtualHost.queueDeclare(testQueueName, true, false, false, arguments);Assertions.assertTrue(ok);}Testvoid queueDelete() {MapString, Object arguments new HashMap();arguments.put(a, 1);arguments.put(b, 2);boolean ok virtualHost.queueDeclare(testQueueName, true, false, false, arguments);Assertions.assertTrue(ok);ok virtualHost.queueDelete(testQueueName);Assertions.assertTrue(ok);}Testvoid queueBind() {boolean ok virtualHost.exchangeDeclare(testExchangeName, ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);MapString, Object arguments new HashMap();arguments.put(a, 1);arguments.put(b, 2);ok virtualHost.queueDeclare(testQueueName, true, false, false, arguments);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueueName, testExchangeName, testBindingKey);Assertions.assertTrue(ok);}Testvoid queueUnBind() {boolean ok virtualHost.exchangeDeclare(testExchangeName, ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);MapString, Object arguments new HashMap();arguments.put(a, 1);arguments.put(b, 2);ok virtualHost.queueDeclare(testQueueName, true, false, false, arguments);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueueName, testExchangeName, testBindingKey);Assertions.assertTrue(ok);ok virtualHost.queueUnBind(testQueueName,testExchangeName);Assertions.assertTrue(ok);}Testvoid basicPublish() {boolean ok virtualHost.basicPublish(testExchangeName, testRoutingKey, new BasicProperties(), Hello word.getBytes());Assertions.assertFalse(ok);ok virtualHost.exchangeDeclare(testExchangeName, ExchangeType.DIRECT, true, false, null);Assertions.assertTrue(ok);MapString, Object arguments new HashMap();arguments.put(a, 1);arguments.put(b, 2);ok virtualHost.queueDeclare(testQueueName, true, false, false, arguments);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueueName, testExchangeName, testBindingKey);Assertions.assertTrue(ok);ok virtualHost.basicPublish(testExchangeName, testQueueName, new BasicProperties(), Hello word.getBytes());Assertions.assertTrue(ok);}// 先订阅队列, 后发送消息Testvoid basicConsume1() throws InterruptedException {boolean ok virtualHost.queueDeclare(testQueueName, true, false, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchangeName,ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);// 先订阅队列ok virtualHost.basicConsume(testConsumerTag, testQueueName, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {try {// 消费者自身设定的回调方法System.out.println(messageId: basicProperties.getMessageId());System.out.println(body new String(body,0,body.length));Assertions.assertEquals(testQueueName,basicProperties.getRoutingKey());Assertions.assertEquals(1,basicProperties.getDeliverMode());Assertions.assertArrayEquals(hello.getBytes(),body);} catch (Error e) {//断言如果失败, 抛出的是 Error, 而不是 Exceptione.printStackTrace();}}});Assertions.assertTrue(ok);Thread.sleep(500);// 在发送消息ok virtualHost.basicPublish(testExchangeName,testQueueName,null,hello.getBytes());Assertions.assertTrue(ok);Thread.sleep(500);}// 先发送消息, 后订阅队列Testvoid basicConsume2() throws InterruptedException {boolean ok virtualHost.queueDeclare(testQueueName, true, false, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchangeName,ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);// 先发送消息ok virtualHost.basicPublish(testExchangeName,testQueueName,null,hello.getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 后订阅队列ok virtualHost.basicConsume(testConsumerTag, testQueueName, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {try {// 消费者自身设定的回调方法System.out.println(messageId: basicProperties.getMessageId());System.out.println(body new String(body,0,body.length));Assertions.assertEquals(testQueueName,basicProperties.getRoutingKey());Assertions.assertEquals(1,basicProperties.getDeliverMode());Assertions.assertArrayEquals(hello.getBytes(),body);} catch (Error e) {//断言如果失败, 抛出的是 Error, 而不是 Exceptione.printStackTrace();}}});Assertions.assertTrue(ok);Thread.sleep(500);}Testvoid basicAck() throws InterruptedException {boolean ok virtualHost.queueDeclare(testQueue, true,false, false, null);Assertions.assertTrue(ok);ok virtualHost.exchangeDeclare(testExchange, ExchangeType.DIRECT,true, false, null);Assertions.assertTrue(ok);// 先发送消息ok virtualHost.basicPublish(testExchange, testQueue, null,hello.getBytes());Assertions.assertTrue(ok);// 再订阅队列 [要改的地方, 把 autoAck 改成 false]ok virtualHost.basicConsume(testConsumerTag, testQueue, false, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {// 消费者自身设定的回调方法.System.out.println(messageId basicProperties.getMessageId());System.out.println(body new String(body, 0, body.length));Assertions.assertEquals(testQueue, basicProperties.getRoutingKey());Assertions.assertEquals(1, basicProperties.getDeliverMode());Assertions.assertArrayEquals(hello.getBytes(), body);// [要改的地方, 新增手动调用 basicAck]boolean ok virtualHost.basicAck(testQueue, basicProperties.getMessageId());Assertions.assertTrue(ok);}});Assertions.assertTrue(ok);Thread.sleep(500);}Testpublic void testBasicConsumeFanout() throws InterruptedException {boolean ok virtualHost.exchangeDeclare(testExchange, ExchangeType.FANOUT, false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueDeclare(testQueue1, false, false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueue1, testExchange, );Assertions.assertTrue(ok);ok virtualHost.queueDeclare(testQueue2, false, false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueue2, testExchange, );Assertions.assertTrue(ok);// 往交换机中发布一个消息ok virtualHost.basicPublish(testExchange, , null, hello.getBytes());Assertions.assertTrue(ok);Thread.sleep(500);// 两个消费者订阅上述的两个队列.ok virtualHost.basicConsume(testConsumer1, testQueue1, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println(consumerTag consumerTag);System.out.println(messageId basicProperties.getMessageId());Assertions.assertArrayEquals(hello.getBytes(), body);}});Assertions.assertTrue(ok);ok virtualHost.basicConsume(testConsumer2, testQueue2, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println(consumerTag consumerTag);System.out.println(messageId basicProperties.getMessageId());Assertions.assertArrayEquals(hello.getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}Testpublic void testBasicConsumeTopic() throws InterruptedException {boolean ok virtualHost.exchangeDeclare(testExchange, ExchangeType.TOPIC, false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueDeclare(testQueue, false, false, false, null);Assertions.assertTrue(ok);ok virtualHost.queueBind(testQueue, testExchange, aaa.*.bbb);Assertions.assertTrue(ok);ok virtualHost.basicPublish(testExchange, aaa.ccc.bbb, null, hello.getBytes());Assertions.assertTrue(ok);ok virtualHost.basicConsume(testConsumer, testQueue, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {System.out.println(consumerTag consumerTag);System.out.println(messageId basicProperties.getMessageId());Assertions.assertArrayEquals(hello.getBytes(), body);}});Assertions.assertTrue(ok);Thread.sleep(500);}
}⼗⼀、 ⽹络通信协议设计
明确需求
接下来需要考虑客⼾端和服务器之间的通信. 回顾交互模型.
⽣产者和消费者都是客⼾端, 都需要通过⽹络和 Broker Server 进⾏通信.
此处我们使⽤ TCP 协议, 来作为通信的底层协议. 同时在这个基础上⾃定义应⽤层协议, 完成客⼾端对服务器这边功能的远程调⽤.
要调⽤的功能有:
创建 channel关闭 channel创建 exchange删除 exchange创建 queue删除 queue创建 binding删除 binding发送 message订阅 message发送 ack返回 message (服务器 - 客⼾端)
设计应⽤层协议
使⽤⼆进制的⽅式设定协议. 因为 Message 的消息体本⾝就是⼆进制的. 因此不太⽅便使⽤ json 等⽂本格式的协议. 请求: 响应: 其中 type 表⽰请求响应不同的功能. 取值如下:
0x1 创建 channel0x2 关闭 channel0x3 创建 exchange0x4 销毁 exchange0x5 创建 queue0x6 销毁 queue0x7 创建 binding0x8 销毁 binding0x9 发送 message0xa 订阅 message0xb 返回 ack0xc 服务器给客⼾端推送的消息. (被订阅的消息) 响应独有的
其中 payload 部分, 会根据不同的 type, 存在不同的格式.
对于请求来说, payload 表⽰这次⽅法调⽤的各种参数信息.
对于响应来说, payload 表⽰这次⽅法调⽤的返回值.
定义 Request / Response
创建 common.Request
/*** Created with IntelliJ IDEA.* Description定义了请求的格式* 一个完整的请求, 分成了三个部分* 1. type: 表示请求不同的功能, 调用不同的函数 -- 4 个字节* 2. length: 表示 payload 的长度 -- 4 个字节* 3. payload: 要传输的二进制数据 -- length 个字节** author: zxj* date: 2024-03-05* time: 21:16:58*/
Data
public class Request implements Serializable {private Integer type;private Integer length;private byte[] payload;Overridepublic String toString() {return Request{ type type , length length };}
}
创建 common.Response
/*** Created with IntelliJ IDEA.* Description 定义一个完整的响应格式* 一个完整的响应, 分成了三个部分* 1. type: 表示响应不同的功能, 调用不同的函数 -- 4 个字节* 2. length: 表示 payload 的长度 -- 4 个字节* 3. payload: 要传输的二进制数据 -- length 个字节** author: zxj* date: 2024-03-05* time: 21:16:46*/
Data
public class Response implements Serializable {private Integer type;private Integer length;private byte[] payload;Overridepublic String toString() {return Response{ type type , length length };}
}
定义参数⽗类
构造⼀个类表⽰⽅法的参数, 作为 Request 的 payload.
不同的⽅法中, 参数形态各异, 但是有些信息是通⽤的, 使⽤⼀个⽗类表⽰出来. 具体每个⽅法的参数再通过继承的⽅式体现.
common.BasicArguments
/*** Created with IntelliJ IDEA.* Description定义请求父类** author: zxj* date: 2024-03-05* time: 21:31:01*/
Data
public class BasicArguments implements Serializable {// 表示一次请求/响应的唯一 Id, 用来把响应和请求对应上// 此处的 rid 和 channelId 都是基于 UUID 来生成的, rid 用来标识一个请求-响应, 这一点在请求响应非常多的时候游泳protected String rid;protected String channelId;
}
此处的 rid 和 channelId 都是基于 UUID 来⽣成的. rid ⽤来标识⼀个请求-响应. 这⼀点在请求响应⽐较多的时候⾮常重要.
定义返回值⽗类
和参数同理, 也需要构造⼀个类表⽰返回值, 作为 Response 的 payload
common.BasicReturns
/*** Created with IntelliJ IDEA.* Description定义返回的父类** author: zxj* date: 2024-03-05* time: 21:43:23*/
Data
public class BasicReturns implements Serializable {// 表示一次请求/响应的唯一 Id, 用来把响应和请求对应上protected String rid;// 用来标识一个 channelprotected String channelId;protected Boolean ok;
}定义其他参数类
针对每个 VirtualHost 提供的⽅法, 都需要有⼀个类表⽰对应的参数.
ExchangeDeclareArguments
/*** Created with IntelliJ IDEA.* DescriptionExchangeDeclare 方法请求参数类** author: zxj* date: 2024-03-05* time: 21:46:53*/
Data
public class ExchangeDeclareArguments extends BasicArguments implements Serializable {private String exchangeName;private ExchangeType exchangeType;private Boolean durable;private Boolean autoDelete;private MapString,Object arguments;
}⼀个创建交换机的请求, 形如:
可以把 ExchangeDeclareArguments 转成 byte[], 就得到了下列图⽚的结构.按照 length ⻓度读取出 payload, 就可以把读到的⼆进制数据转换成ExchangeDeclareArguments 对象. 2) ExchangeDeleteArguments
Data
public class ExchangeDeleteArguments extends BasicArguments implements Serializable {private String exchangeName;
}QueueDeclareArguments
Data
public class QueueDeclareArguments extends BasicArguments implements Serializable {private String queueName;private boolean durable;private boolean exclusive;private boolean autoDelete;private MapString, Object arguments;
}QueueDeleteArguments
Data
public class QueueDeleteArguments extends BasicArguments implements Serializable {private String queueName;
}QueueBindArguments
Data
public class QueueBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;private String bindingKey;
}QueueUnbindArguments
Data
public class QueueUnBindArguments extends BasicArguments implements Serializable {private String queueName;private String exchangeName;
}
BasicPublishArguments
Data
public class BasicPublishArguments extends BasicArguments implements Serializable {private String exchangeName;private String routingKey;private BasicProperties basicProperties;private byte[] body;
}BasicConsumeArguments
Data
public class BasicConsumeArguments extends BasicArguments implements Serializable {private String consumerTag;private String queueName;private boolean autoAck;
}SubScribeReturns
这个不是参数, 是返回值. 是服务器给消费者推送的订阅消息.consumerTag 其实是 channelId.basicProperties 和 body 共同构成了 Message.
/*** Created with IntelliJ IDEA.* Description返回值, 是服务器给消费者推送的订阅消息.** author: zxj* date: 2024-03-05* time: 22:54:08*/
Data
public class SubScribeReturns extends BasicReturns implements Serializable {private String consumerTag;private BasicProperties basicProperties;private byte[] body;
}⼗⼆、 实现 BrokerServer
创建 BrokerServer 类
Data
Slf4j
public class BrokerServer {// 当前程序只考虑一个虚拟主机的情况private VirtualHost virtualHost new VirtualHost(default-virtualHost);// 使用这个 哈希表, 表示当前的所有会话(也就是说有哪些客户端正在和服务器进行通信)// key 为 channelId, value 为 channel 对应的 socket 对象private ConcurrentHashMapString, Socket sessions new ConcurrentHashMap();private ServerSocket serverSocket;// 引入一个线程池, 来处理多个客户端的需求private ExecutorService executorService;// 引入一个 boolean 变量控制服务器是否继续运行private volatile boolean runnable true;
}virtualHost 表⽰服务器持有的虚拟主机. 队列, 交换机, 绑定, 消息都是通过虚拟主机管理.sessions ⽤来管理所有的客⼾端的连接. 记录每个客⼾端的 socketserverSocket 是服务器⾃⾝的 socketexecutorService 这个线程池⽤来处理响应runnable 这个标志位⽤来控制服务器的运⾏停⽌.
启动/停⽌服务器
这⾥就是⼀个单纯的 TCP 服务器, 没啥特别的实现停⽌操作, 主要是为了⽅便后续开展单元测试.
public BrokerServer(int port) throws IOException {serverSocket new ServerSocket(port);}// begin: 单纯的 TCP 服务器模板public void start() {log.info([BrokerServer] 服务器开始启动);executorService Executors.newCachedThreadPool();try {while (runnable) {Socket clientSocket serverSocket.accept();// 把这个处理连接的逻辑对给线程池executorService.submit(() - {// 处理连接的统一方法processConnection(clientSocket);});}} catch (SocketException e) {log.info([BrokerServer] 服务器停止运行!);} catch (IOException e) {log.error([BrokerServer] 服务器出现异常!);e.printStackTrace();}}/*** description: 一般来说, 停止服务器, 都是 kill 对应的进程就可以了* 此处还是搞一个单独的停止方法, 主要是用于后续的单元测试**/public void stop() throws IOException {runnable false;// 把线程池中的人物都放弃了, 让线程都销毁executorService.shutdownNow();serverSocket.close();}// end: 单纯的 TCP 服务器模板实现处理连接
对于 EOFException 和 SocketException , 我们视为客⼾端正常断开连接. 如果是客⼾端先 close, 后调⽤ DataInputStream 的 read, 则抛出 EOFException如果是先调⽤ DataInputStream 的 read, 后客⼾端调⽤ close, 则抛出 SocketException /*** description: 服务方法* 通过这个方法, 来处里一个客户端的连接* 在这个连接中, 可能会涉及到多个请求和响应**/private void processConnection(NotNull Socket clientSocket) {// 获取服务对象的 输入输出 流try (InputStream inputStream clientSocket.getInputStream();OutputStream outputStream clientSocket.getOutputStream()) {// 这里需要按照特定的格式来读取并解析, 此时就需要用到 DataInputStream 和 DataOutputStreamtry (DataInputStream dataInputStream new DataInputStream(inputStream); DataOutputStream dataOutputStream new DataOutputStream(outputStream)) {// 循环进行服务, 保持连接, 以便处理多个请求while (true) {// 1. 读取请求并解析Request request readRequest(dataInputStream);log.info(接收到[client: {} : {}] 请求: {},clientSocket.getInetAddress(),clientSocket.getPort(),request);// 2. 根据请求计算响应Response response process(request, clientSocket);log.info(响应给[client: {} : {}] 数据: {},clientSocket.getInetAddress(),clientSocket.getPort(),response);// 3. 把响应写回给客户端writeResponse(dataOutputStream, response);}}} catch (EOFException | SocketException e) {log.info(connection 关闭! 客户端地址: {} : {}, clientSocket.getInetAddress(), clientSocket.getPort());} catch (IOException | MqException | ClassNotFoundException e) {log.error(connection 出现异常 e: {}, e.toString());e.printStackTrace();} finally {try {// 当连接处理完了, 一定要关闭 socketclientSocket.close();// 一个 TCP 连接中, 可能包含多个 channel, 需要把当前这个 socket 对应的所有 channel 也顺便清理掉clearClosedSession(clientSocket);} catch (IOException e) {e.printStackTrace();}}}实现 readRequest
/*** description: 反序列化请求消息**/private NotNull Request readRequest(NotNull DataInputStream dataInputStream) throws IOException, MqException {Request request new Request();request.setType(dataInputStream.readInt());request.setLength(dataInputStream.readInt());byte[] payload new byte[request.getLength()];int n dataInputStream.read(payload);if (n ! request.getLength()) {throw new MqException(读取请求格式出错);}request.setPayload(payload);return request;}
实现 writeResponse
注意这⾥的 flush 操作很关键, 否则响应不⼀定能及时返回给客⼾端 /*** description: 将 Response 对象中的内容先后写入 dataOutputStream 中**/private void writeResponse(NotNull DataOutputStream dataOutputStream, Response response) throws IOException {log.info({writeResponse}: 即将发送响应为: {},response);dataOutputStream.writeInt(response.getType());dataOutputStream.writeInt(response.getLength());dataOutputStream.write(response.getPayload());// 刷新缓冲区十分重要dataOutputStream.flush();}实现处理请求
先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid再根据不同的 type, 分别处理不同的逻辑. (主要是调⽤ virtualHost 中不同的⽅法).针对消息订阅操作, 则需要在存在消息的时候通过回调, 把响应结果写回给对应的客⼾端.最后构造成统⼀的响应.
/*** description: 依据 request 中的信息, 执行相关方法, 并构造 Response 对象返回**/private NotNull Response process(NotNull Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException {// 1. 把 request 中的 payload 做一个初步的解析, 让父类来接受BasicArguments basicArguments (BasicArguments) BinaryUtils.fromBytes(request.getPayload());log.info(request 中 payload 解析结果: rid {}, channelId {}, basicArguments.getRid(),basicArguments.getChannelId());// 2. 根据 type 的值, 来近一步来区分这一次请求时要干啥的boolean ok true; // 各个方法的返回结果基本都是 booleanif (request.getType() 0x1) {// 创建 channelsessions.put(basicArguments.getChannelId(), clientSocket);log.info(创建 channel 完成! channelId: {}, basicArguments.getChannelId());} else if (request.getType() 0x2) {// 销毁 channelsessions.remove(basicArguments.getChannelId());log.info(销毁 channel 完成! channelId: {}, basicArguments.getChannelId());} else if (request.getType() 0x3) {// 创建交换机, 此时的 payload 就是 ExchangeDeclareArguments 对象了ExchangeDeclareArguments exchangeDeclareArguments (ExchangeDeclareArguments) basicArguments;ok virtualHost.exchangeDeclare(exchangeDeclareArguments.getExchangeName(), exchangeDeclareArguments.getExchangeType(), exchangeDeclareArguments.getDurable(), exchangeDeclareArguments.getAutoDelete(), exchangeDeclareArguments.getArguments());} else if (request.getType() 0x4) {// 销毁交换机ExchangeDeleteArguments exchangeDeleteArguments (ExchangeDeleteArguments) basicArguments;ok virtualHost.exchangeDelete(exchangeDeleteArguments.getExchangeName());} else if (request.getType() 0x5) {// 创建队列QueueDeclareArguments queueDeclareArguments (QueueDeclareArguments) basicArguments;ok virtualHost.queueDeclare(queueDeclareArguments.getQueueName(), queueDeclareArguments.isDurable(), queueDeclareArguments.isExclusive(), queueDeclareArguments.isAutoDelete(), queueDeclareArguments.getArguments());} else if (request.getType() 0x6) {// 销毁队列QueueDeleteArguments queueDeleteArguments (QueueDeleteArguments) basicArguments;ok virtualHost.queueDelete(queueDeleteArguments.getQueueName());} else if (request.getType() 0x7) {// 创建绑定QueueBindArguments queueBindArguments (QueueBindArguments) basicArguments;ok virtualHost.queueBind(queueBindArguments.getQueueName(), queueBindArguments.getExchangeName(), queueBindArguments.getBindingKey());} else if (request.getType() 0x8) {// 删除绑定QueueUnBindArguments queueUnBindArguments (QueueUnBindArguments) basicArguments;ok virtualHost.queueUnBind(queueUnBindArguments.getQueueName(), queueUnBindArguments.getExchangeName());} else if (request.getType() 0x9) {// 发布消息BasicPublishArguments basicPublishArguments (BasicPublishArguments) basicArguments;ok virtualHost.basicPublish(basicPublishArguments.getExchangeName(), basicPublishArguments.getRoutingKey(), basicPublishArguments.getBasicProperties(), basicPublishArguments.getBody());} else if (request.getType() 0xa) {// 订阅消息BasicConsumeArguments basicConsumeArguments (BasicConsumeArguments) basicArguments;ok virtualHost.basicConsume(basicConsumeArguments.getConsumerTag(), basicConsumeArguments.getQueueName(), basicConsumeArguments.isAutoAck(), new Consumer() {/*** 这个回调函数要做的工作, 就是把服务收到消息直接推送会给对应的消费者客户端即可, 在客户端进行对消息的消费**/Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {// 先知道当前这个收到的消息, 要发给哪个客户端// 此处 consumerTag, 其实就是 channelId (这里是规定的, 客户端填写该字段的时候, 就是以 channelId 来填写的),// 根据 channelId 去 sessions 中查询, 就可以得到对应的 socket 对象, 就可以往里面发送数据了// 1. 根据 channelId 找到 socket 对象Socket clientSocket sessions.get(consumerTag);if (clientSocket null || clientSocket.isClosed()) {throw new MqException([BrokerServer] 订阅消息的客户端已经关闭!);}// 2. 构造响应数据SubScribeReturns subScribeReturns new SubScribeReturns();subScribeReturns.setChannelId(consumerTag);subScribeReturns.setRid(); // 由于这里只有响应, 没有请求, 不需要去对应, rid 暂时不需要subScribeReturns.setOk(true);subScribeReturns.setConsumerTag(consumerTag);subScribeReturns.setBasicProperties(basicProperties);subScribeReturns.setBody(body);byte[] payload BinaryUtils.toBytes(subScribeReturns);Response response new Response();// 0xc 表示服务器给消费者客户端推送消息数据response.setType(0xc);// response 的 payload 就是一个 SubScribeReturnsresponse.setLength(payload.length);response.setPayload(payload);// 3. 把数据写回给客户端// 注意! 此处的 dataOutputStream 不能close!// 如果把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 OutputStream 也关闭了// 此时就无法继续往 socket 中写入后续数据了DataOutputStream dataOutputStream new DataOutputStream(clientSocket.getOutputStream());writeResponse(dataOutputStream, response);}});} else if (request.getType() 0xb) {// 确认应答, 消费者确认收到消息BasicAckArguments basicAckArguments (BasicAckArguments) basicArguments;ok virtualHost.basicAck(basicAckArguments.getQueueName(), basicAckArguments.getMessageId());} else {// 当前的 type 是非法的throw new MqException(未知的 type: request.getType());}// 3. 构造响应BasicReturns basicReturns new BasicReturns();basicReturns.setChannelId(basicArguments.getChannelId());basicReturns.setRid(basicArguments.getRid());basicReturns.setOk(ok);byte[] payload BinaryUtils.toBytes(basicReturns);Response response new Response();response.setType(request.getType());response.setLength(payload.length);response.setPayload(payload);log.info(构造响应完成: {}, response);return response;}实现 clearClosedSession
如果客⼾端只关闭了 Connection, 没关闭 Connection 中包含的 Channel, 也没关系, 在这⾥统⼀进⾏清理.注意迭代器失效问题.
/*** description: 用户关闭连接后, 清理对应的 channel 资源* 需要注意的是 迭代器失效的问题**/private void clearClosedSession(Socket clientSocket) {// 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该关闭的 socket 对应的键值对, 统统删除ListString toDeleteChannelId new ArrayList();for (Map.EntryString, Socket entry : sessions.entrySet()) {if (entry.getValue() clientSocket) {// 不能在这里直接删除!!// 这属于使用集合类的一个大忌 -- 一边遍历, 一边删除!// sessions.remove(entry.getKey());toDeleteChannelId.add(entry.getKey());}}for (String channelId : toDeleteChannelId) {sessions.remove(channelId);}log.info(清理 session 完成! 被清理的 channelId: {}, toDeleteChannelId);}
⼗三、 实现客⼾端
创建包 mqclient
创建 ConnectionFactory
⽤来创建连接的⼯⼚类
/*** Created with IntelliJ IDEA.* Description工厂类 -- 以工厂模式来创建 Connection 类** author: zxj* date: 2024-03-06* time: 21:55:32*/
Data
public class ConnectionFactory {// BrokerServer 的 IP 和 portprivate String host;private Integer port;// more ...// 建立一个 TCP 连接public Connection newConnection() throws IOException {Connection connection new Connection(host,port);return connection;}
}Connection 和 Channel 的定义
⼀个客⼾端可以创建多个 Connection.
⼀个 Connection 对应⼀个 socket, ⼀个 TCP 连接.
⼀个 Connection 可以包含多个 Channel
Connection 的定义
Data
Slf4j
public class Connection {private Socket socket;private InputStream inputStream;private OutputStream outputStream;private DataInputStream dataInputStream;private DataOutputStream dataOutputStream;// 记录当前 Connection 包含的 Channelprivate ConcurrentHashMapString, Channel channelMap new ConcurrentHashMap();// 执行消息回调的线程池private ExecutorService callbackPool null;
} Socket 是客⼾端持有的套接字. InputStream OutputStream DataInputStream DataOutputStream 均为 socket 通信的接⼝.channelMap ⽤来管理该连接中所有的 Channel.callbackPool 是⽤来在客⼾端这边执⾏⽤⼾回调的线程池.
Channel 的定义
Data
public class Channel {// channelId 为 channel 的身份标识, 使用 UUID 标识private String channelId;// connection 为 channel 对应的连接private Connection connection;// key 为 rid, 即 requestId / responseId// basicReturnsMap 用来保存响应的返回值, 放到这个哈希表中方便和请求匹配private ConcurrentHashMapString, BasicReturns basicReturnsMap new ConcurrentHashMap();// 订阅消息的回调 -- 为消费者的回调(用户注册的), 对应消息响应, 应该调用这个回调处理消息private Consumer consumer null;public Channel(String channelId, Connection connection) {this.channelId channelId;this.connection connection;}
} channelId 为 channel 的⾝份标识, 使⽤ UUID 标识.Connection 为 channel 对应的连接.baseReturnsMap ⽤来保存响应的返回值. 放到这个哈希表中⽅便和请求匹配.consumer 为消费者的回调(⽤⼾注册的). 对于消息响应, 应该调⽤这个回调处理消息.
封装请求响应读写操作
在 Connection 中, 实现下列⽅法
/*** description: 读取响应**/public Response readResponse() throws IOException, MqException {log.info(客户端: 开始等待读取消息);Response response new Response();response.setType(dataInputStream.readInt());response.setLength(dataInputStream.readInt());byte[] payload new byte[response.getLength()];int n dataInputStream.read(payload);if (n ! response.getLength()) {throw new MqException(读取的响应数据不完整);}response.setPayload(payload);log.info(收到响应: type: {}, length: {}, response.getType(), response.getLength());return response;}/*** description: 写请求**/public void writeRequest(Request request) throws IOException {dataOutputStream.writeInt(request.getType());dataOutputStream.writeInt(request.getLength());dataOutputStream.write(request.getPayload());dataOutputStream.flush();log.info(发送请求:type: {}, length: {}, request.getType(), request.getLength());}创建 channel
在 Connection 中, 定义下列⽅法来创建⼀个 channel
public Channel createChannel() throws IOException {// 使用 UUID 生产 channelId, 以 C- 开头String channelId C- UUID.randomUUID().toString();Channel channel new Channel(channelId, this);// 这里需要先把 channel 键值放到 Map 中 进行管理channelMap.put(channelId, channel);// 同时也需要把 创建 channel 的这个消息也告诉服务器boolean ok channel.createChannel();if (!ok) {// 服务器这里创建失败了, 整个这次创建 channel 操作不顺利// 把刚才已经加入 hash 表的键值对, 再删了channelMap.remove(channelId);return null;}return channel;}发送请求
通过 Channel 提供请求的发送操作.
创建 channel
/*** description: 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了**/public boolean createChannel() throws IOException {// 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象BasicArguments basicArguments new BasicAckArguments();basicArguments.setChannelId(channelId);basicArguments.setRid(generateRid());byte[] payload BinaryUtils.toBytes(basicArguments);Request request new Request();request.setType(0x1);request.setLength(payload.length);request.setPayload(payload);// 构造出完整请求之后, 就可以发送这个请求connection.writeRequest(request);// 等待服务器的响应BasicReturns basicReturns waitResult(basicArguments.getRid());return basicReturns.getOk();}
generateRid 的实现 private NotNull String generateRid() {return R- UUID.randomUUID().toString();}waitResult 的实现
由于服务器的响应是异步的. 此处通过 waitResult 实现同步等待的效果
/** description: 期望使用这个方法来阻塞等待服务器的响应**/private BasicReturns waitResult(String rid) {BasicReturns basicReturns null;while ((basicReturns basicReturnsMap.get(rid)) null) {// 如果查询结果为 null, 说明包裹还没有回来// 此时就需要阻塞等待synchronized (this) {try {wait();} catch (InterruptedException e) {e.printStackTrace();}}}return basicReturns;}关闭 channel
/*** description: 关闭 channel, 给服务器发送一个 0x2 类型的请求**/public boolean close() throws IOException {BasicArguments basicArguments new BasicAckArguments();basicArguments.setRid(generateRid());basicArguments.setChannelId(channelId);byte[] payload BinaryUtils.toBytes(basicArguments);Request request new Request();request.setType(0x2);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(basicArguments.getRid());return basicReturns.getOk();}
创建交换机
/*** description: 创建交换机**/public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,MapString, Object arguments) throws IOException {ExchangeDeclareArguments exchangeDeclareArguments new ExchangeDeclareArguments();exchangeDeclareArguments.setRid(generateRid());exchangeDeclareArguments.setChannelId(channelId);exchangeDeclareArguments.setExchangeName(exchangeName);exchangeDeclareArguments.setExchangeType(exchangeType);exchangeDeclareArguments.setDurable(durable);exchangeDeclareArguments.setAutoDelete(autoDelete);exchangeDeclareArguments.setArguments(arguments);byte[] payload BinaryUtils.toBytes(exchangeDeclareArguments);Request request new Request();request.setType(0x3);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(exchangeDeclareArguments.getRid());return basicReturns.getOk();}删除交换机
/*** description: 删除交换机**/public boolean exchangeDelete(String exchangeName) throws IOException {ExchangeDeleteArguments exchangeDeleteArguments new ExchangeDeleteArguments();exchangeDeleteArguments.setExchangeName(exchangeName);exchangeDeleteArguments.setChannelId(channelId);exchangeDeleteArguments.setRid(generateRid());byte[] payload BinaryUtils.toBytes(exchangeDeleteArguments);Request request new Request();request.setType(0x4);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(exchangeDeleteArguments.getRid());return basicReturns.getOk();}
创建队列
/*** description: 创建队列**/public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,MapString, Object arguments) throws IOException {QueueDeclareArguments queueDeclareArguments new QueueDeclareArguments();queueDeclareArguments.setQueueName(queueName);queueDeclareArguments.setExclusive(exclusive);queueDeclareArguments.setDurable(durable);queueDeclareArguments.setAutoDelete(autoDelete);queueDeclareArguments.setArguments(arguments);queueDeclareArguments.setChannelId(channelId);queueDeclareArguments.setRid(generateRid());byte[] payload BinaryUtils.toBytes(queueDeclareArguments);Request request new Request();request.setType(0x5);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(queueDeclareArguments.getRid());return basicReturns.getOk();}删除队列
/*** description: 删除队列**/public boolean queueDelete(String queueName) throws IOException {QueueDeleteArguments queueDeleteArguments new QueueDeleteArguments();queueDeleteArguments.setRid(generateRid());queueDeleteArguments.setChannelId(channelId);queueDeleteArguments.setQueueName(queueName);byte[] payload BinaryUtils.toBytes(queueDeleteArguments);Request request new Request();request.setType(0x6);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(queueDeleteArguments.getRid());return basicReturns.getOk();}
创建绑定
/*** description: 创建绑定**/public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {QueueBindArguments queueBindArguments new QueueBindArguments();queueBindArguments.setBindingKey(bindingKey);queueBindArguments.setQueueName(queueName);queueBindArguments.setExchangeName(exchangeName);queueBindArguments.setRid(generateRid());queueBindArguments.setChannelId(channelId);byte[] payload BinaryUtils.toBytes(queueBindArguments);Request request new Request();request.setType(0x7);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(queueBindArguments.getRid());return basicReturns.getOk();}删除绑定
/*** description: 删除绑定**/public boolean queueUnbind(String queueName, String exchangeName) throws IOException {QueueUnBindArguments queueUnBindArguments new QueueUnBindArguments();queueUnBindArguments.setExchangeName(exchangeName);queueUnBindArguments.setQueueName(queueName);queueUnBindArguments.setRid(generateRid());queueUnBindArguments.setChannelId(channelId);byte[] payload BinaryUtils.toBytes(queueUnBindArguments);Request request new Request();request.setType(0x8);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(queueUnBindArguments.getRid());return basicReturns.getOk();}发送消息
/*** description: 发送消息**/public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties,byte[] body) throws IOException {BasicPublishArguments basicPublishArguments new BasicPublishArguments();basicPublishArguments.setBasicProperties(basicProperties);basicPublishArguments.setBody(body);basicPublishArguments.setExchangeName(exchangeName);basicPublishArguments.setRoutingKey(routingKey);basicPublishArguments.setRid(generateRid());basicPublishArguments.setChannelId(channelId);byte[] payload BinaryUtils.toBytes(basicPublishArguments);Request request new Request();request.setType(0x9);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(basicPublishArguments.getRid());return basicReturns.getOk();}
订阅消息
/*** description: 订阅消息**/public boolean basicConsume(String queueName,boolean autoAck,Consumer consumer) throws MqException, IOException {// 先设置回调, 一个channel 只能设置一个回调方法if (this.consumer ! null) {throw new MqException(该 channel 已经设置过消费消息的回调了, 不能重复设置!);}this.consumer consumer;BasicConsumeArguments basicConsumeArguments new BasicConsumeArguments();basicConsumeArguments.setRid(generateRid());basicConsumeArguments.setChannelId(channelId);basicConsumeArguments.setConsumerTag(channelId); // 此处 consumerTag 也使用 channelId 来标识basicConsumeArguments.setAutoAck(autoAck);basicConsumeArguments.setQueueName(queueName);byte[] payload BinaryUtils.toBytes(basicConsumeArguments);Request request new Request();request.setType(0xa);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(basicConsumeArguments.getRid());return basicReturns.getOk();}确认消息
/*** description: 确认消息**/public boolean basicAck(String queueName,String messageId) throws IOException {BasicAckArguments basicAckArguments new BasicAckArguments();basicAckArguments.setMessageId(messageId);basicAckArguments.setQueueName(queueName);basicAckArguments.setRid(generateRid());basicAckArguments.setChannelId(channelId);byte[] payload BinaryUtils.toBytes(basicAckArguments);Request request new Request();request.setType(0xb);request.setLength(payload.length);request.setPayload(payload);connection.writeRequest(request);BasicReturns basicReturns waitResult(basicAckArguments.getRid());return basicReturns.getOk();}⼩结
上述发送请求的操作, 逻辑基本⼀致. 构造参数 构造请求 发送 等待结果
处理响应
创建扫描线程
创建⼀个扫描线程, ⽤来不停的读取 socket 中的响应数据
注意: ⼀个 Connection 中可能包含多个 channel, 需要把响应分别放到对应的 channel 中.
public Connection(String host, Integer port) throws IOException {socket new Socket(host, port);inputStream socket.getInputStream();outputStream socket.getOutputStream();dataInputStream new DataInputStream(inputStream);dataOutputStream new DataOutputStream(outputStream);callbackPool Executors.newFixedThreadPool(4);// 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据, 把这个响应数据再交给对应的 Channel 负责处理Thread t new Thread(() - {try {while (!socket.isClosed()) {Response response readResponse();dispatchResponse(response);}} catch (SocketException e) {// 连接正常断开的. 此时这个异常直接忽略.System.out.println([Connection] 连接正常断开!);} catch (IOException | ClassNotFoundException | MqException e) {// System.out.println([Connection] 连接异常断开!);log.error(连接异常断开! e: {}, e);e.printStackTrace();}});t.start();}实现响应的分发
给 Connection 创建 dispatchResponse ⽅法
针对服务器返回的控制响应和消息响应, 分别处理. 如果是订阅数据, 则调⽤ channel 中的回调.如果是控制消息, 直接放到结果集合中.
/*** description: 使用这个方法来分别处理, 当前的响应是针对控制请求的响应, 还是服务器推送的消息**/private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {if (response.getType() 0xc) {// 服务器推送来的消息数据SubScribeReturns subScribeReturns (SubScribeReturns) BinaryUtils.fromBytes(response.getPayload());// 根据 ChannelId 找到对应的 Channel 对象Channel channel channelMap.get(subScribeReturns.getChannelId());if (channel null) {throw new MqException(该消息对应的 Channel 在客户端中不存在, channelId: subScribeReturns.getChannelId());}// 执行该 channel 对象内部的回调callbackPool.submit(() - {try {channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),subScribeReturns.getBody());} catch (MqException | IOException e) {e.printStackTrace();}});} else {// 当前相应是针对刚才的控制请求的响应BasicReturns basicReturns (BasicReturns) BinaryUtils.fromBytes(response.getPayload());// 根据 ChannelId 找到对应的 Channel 对象Channel channel channelMap.get(basicReturns.getChannelId());if (channel null) {throw new MqException(该消息对应的 Channel 在客户端中不存在, channelId: basicReturns.getChannelId());}channel.putReturns(basicReturns);}}实现 channel.putReturns /*** description: 存入 basicReturns**/public void putReturns(BasicReturns basicReturns) {basicReturnsMap.put(basicReturns.getRid(), basicReturns);synchronized (this) {// 当前也不知道有多少个线程在等待上述的这个响应// 把所有的等待的线程都唤醒notifyAll();}}关闭 Connection public void close() {// 关闭 Connection, 释放相关资源try {callbackPool.shutdownNow();channelMap.clear();inputStream.close();outputStream.close();socket.close();} catch (Exception e) {log.error(关闭资源出现异常);e.printStackTrace();}}
测试代码
package en.edu.zxj.mq.mqclient;import en.edu.zxj.mq.MqApplication;
import en.edu.zxj.mq.common.Consumer;
import en.edu.zxj.mq.common.MqException;
import en.edu.zxj.mq.mqserver.BrokerServer;
import en.edu.zxj.mq.mqserver.core.BasicProperties;
import en.edu.zxj.mq.mqserver.core.ExchangeType;
import org.apache.tomcat.util.http.fileupload.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.context.SpringBootTest;import java.io.File;
import java.io.IOException;/*** Created with IntelliJ IDEA.* Description** author: zxj* date: 2024-03-07* time: 10:55:32*/
SpringBootTest
class MqClientTest {private ConnectionFactory factory null;private Thread t null;private BrokerServer brokerServer null;BeforeEachpublic void setUp() throws IOException {// 1. 先启动服务器MqApplication.context SpringApplication.run(MqApplication.class);brokerServer new BrokerServer(9090);t new Thread(() - {brokerServer.start();});t.start();// 2. 配置 ConnectionFactoryfactory new ConnectionFactory();factory.setHost(127.0.0.1);factory.setPort(9090);}AfterEachpublic void tearDown() throws IOException, InterruptedException {// 停止服务器brokerServer.stop();MqApplication.context.close();t.join();// 删除必要的文件File file new File(./data);FileUtils.deleteDirectory(file);factory null;}Testpublic void testConnection() throws IOException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);}Testpublic void testChannel() throws IOException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);Channel channel connection.createChannel();Assertions.assertNotNull(channel);}Testpublic void testExchange() throws IOException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);Channel channel connection.createChannel();Assertions.assertNotNull(channel);boolean ok channel.exchangeDeclare(testExchangeName, ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);ok channel.exchangeDelete(testExchangeName);Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}Testpublic void testQueue() throws IOException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);Channel channel connection.createChannel();Assertions.assertNotNull(channel);boolean ok channel.queueDeclare(testQueue,true,false,false,null);Assertions.assertTrue(ok);ok channel.queueDelete(testQueue);Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}Testpublic void testBinding() throws IOException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);Channel channel connection.createChannel();Assertions.assertNotNull(channel);boolean ok channel.queueDeclare(testQueue,true,false,false,null);Assertions.assertTrue(ok);ok channel.exchangeDeclare(testExchangeName, ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);ok channel.queueBind(testQueue,testExchangeName,testBindingKey);Assertions.assertTrue(ok);ok channel.queueUnbind(testQueue,testExchangeName);Assertions.assertTrue(ok);ok channel.exchangeDelete(testExchangeName);Assertions.assertTrue(ok);ok channel.queueDelete(testQueue);Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}Testpublic void testMessage() throws IOException, MqException, InterruptedException {Connection connection factory.newConnection();Assertions.assertNotNull(connection);Channel channel connection.createChannel();Assertions.assertNotNull(channel);boolean ok channel.queueDeclare(testQueue,true,false,false,null);Assertions.assertTrue(ok);ok channel.exchangeDeclare(testExchangeName, ExchangeType.DIRECT,true,false,null);Assertions.assertTrue(ok);byte[] requestBody hello.getBytes();ok channel.basicPublish(testExchangeName,testQueue,null,requestBody);Assertions.assertTrue(ok);ok channel.basicConsume(testQueue, true, new Consumer() {Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println([消费数据] 开始!);System.out.println(consumerTag consumerTag);System.out.println(basicProperties basicProperties);Assertions.assertArrayEquals(requestBody,body);System.out.println([消费数据] 结束!);}});Assertions.assertTrue(ok);Thread.sleep(500);ok channel.exchangeDelete(testExchangeName);Assertions.assertTrue(ok);ok channel.queueDelete(testQueue);Assertions.assertTrue(ok);// 该关闭的关闭channel.close();connection.close();}}项目结果
演示 首先启动 BrokerServer 类
SpringBootApplication
public class MqApplication {public static ConfigurableApplicationContext context null;public static void main(String[] args) throws IOException {context SpringApplication.run(MqApplication.class, args);BrokerServer brokerServer new BrokerServer(9090);brokerServer.start();}}接着分别启动消费者和生产者客户端, 不分先后启动顺序 此时消费者就会收到消息并进行处理